diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/bridge/gateway/mod.rs | 114 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 283 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager_monitor.rs | 54 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_messenger.rs | 276 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_queuer.rs | 56 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 385 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner_message.rs | 43 | ||||
| -rw-r--r-- | src/client/bridge/mod.rs | 9 | ||||
| -rw-r--r-- | src/client/context.rs | 65 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 132 | ||||
| -rw-r--r-- | src/client/mod.rs | 142 | ||||
| -rw-r--r-- | src/gateway/mod.rs | 17 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 889 | ||||
| -rw-r--r-- | src/gateway/ws_client_ext.rs | 133 |
14 files changed, 1717 insertions, 881 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs index 0b873aa..752b015 100644 --- a/src/client/bridge/gateway/mod.rs +++ b/src/client/bridge/gateway/mod.rs @@ -1,28 +1,113 @@ +//! The client gateway bridge is support essential for the [`client`] module. +//! +//! This is made available for user use if one wishes to be lower-level or avoid +//! the higher functionality of the [`Client`]. +//! +//! Of interest are three pieces: +//! +//! ### [`ShardManager`] +//! +//! The shard manager is responsible for being a clean interface between the +//! user and the shard runners, providing essential functions such as +//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`] +//! to restart a shard. +//! +//! If you are using the `Client`, this is likely the only piece of interest to +//! you. Refer to [its documentation][`ShardManager`] for more information. +//! +//! ### [`ShardQueuer`] +//! +//! The shard queuer is a light wrapper around an mpsc receiver that receives +//! [`ShardManagerMessage`]s. It should be run in its own thread so it can +//! receive messages to start shards in a queue. +//! +//! Refer to [its documentation][`ShardQueuer`] for more information. +//! +//! ### [`ShardRunner`] +//! +//! The shard runner is responsible for actually running a shard and +//! communicating with its respective WebSocket client. +//! +//! It is performs all actions such as sending a presence update over the client +//! and, with the help of the [`Shard`], will be able to determine what to do. +//! This is, for example, whether to reconnect, resume, or identify with the +//! gateway. +//! +//! ### In Conclusion +//! +//! For almost every - if not every - use case, you only need to _possibly_ be +//! concerned about the [`ShardManager`] in this module. +//! +//! [`Client`]: ../../struct.Client.html +//! [`client`]: ../.. +//! [`Shard`]: ../../../gateway/struct.Shard.html +//! [`ShardManager`]: struct.ShardManager.html +//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart +//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown +//! [`ShardQueuer`]: struct.ShardQueuer.html +//! [`ShardRunner`]: struct.ShardRunner.html + mod shard_manager; +mod shard_manager_monitor; +mod shard_messenger; mod shard_queuer; mod shard_runner; +mod shard_runner_message; pub use self::shard_manager::ShardManager; +pub use self::shard_manager_monitor::ShardManagerMonitor; +pub use self::shard_messenger::ShardMessenger; pub use self::shard_queuer::ShardQueuer; pub use self::shard_runner::ShardRunner; +pub use self::shard_runner_message::ShardRunnerMessage; -use gateway::Shard; -use parking_lot::Mutex; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::mpsc::Sender; -use std::sync::Arc; -type Parked<T> = Arc<Mutex<T>>; -type LockedShard = Parked<Shard>; +/// A message either for a [`ShardManager`] or a [`ShardRunner`]. +/// +/// [`ShardManager`]: struct.ShardManager.html +/// [`ShardRunner`]: struct.ShardRunner.html +pub enum ShardClientMessage { + /// A message intended to be worked with by a [`ShardManager`]. + /// + /// [`ShardManager`]: struct.ShardManager.html + Manager(ShardManagerMessage), + /// A message intended to be worked with by a [`ShardRunner`]. + /// + /// [`ShardRunner`]: struct.ShardRunner.html + Runner(ShardRunnerMessage), +} +/// A message for a [`ShardManager`] relating to an operation with a shard. +/// +/// [`ShardManager`]: struct.ShardManager.html #[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub enum ShardManagerMessage { + /// Indicator that a [`ShardManagerMonitor`] should restart a shard. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html Restart(ShardId), + /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard + /// without bringing it back up. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html Shutdown(ShardId), - #[allow(dead_code)] + /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards + /// and end its monitoring process for the [`ShardManager`]. + /// + /// [`ShardManager`]: struct.ShardManager.html + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html ShutdownAll, } +/// A message to be sent to the [`ShardQueuer`]. +/// +/// This should usually be wrapped in a [`ShardClientMessage`]. +/// +/// [`ShardClientMessage`]: enum.ShardClientMessage.html +/// [`ShardQueuer`]: enum.ShardQueuer.html +#[derive(Clone, Debug)] pub enum ShardQueuerMessage { /// Message to start a shard, where the 0-index element is the ID of the /// Shard to start and the 1-index element is the total shards in use. @@ -31,8 +116,8 @@ pub enum ShardQueuerMessage { Shutdown, } -// A light tuplestruct wrapper around a u64 to verify type correctness when -// working with the IDs of shards. +/// A light tuplestruct wrapper around a u64 to verify type correctness when +/// working with the IDs of shards. #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct ShardId(pub u64); @@ -42,7 +127,16 @@ impl Display for ShardId { } } +/// Information about a [`ShardRunner`]. +/// +/// The [`ShardId`] is not included because, as it stands, you probably already +/// know the Id if you obtained this. +/// +/// [`ShardId`]: struct.ShardId.html +/// [`ShardRunner`]: struct.ShardRunner.html +#[derive(Debug)] pub struct ShardRunnerInfo { - pub runner_tx: Sender<ShardManagerMessage>, - pub shard: Arc<Mutex<Shard>>, + /// The channel used to communicate with the shard runner, telling it + /// what to do with regards to its status. + pub runner_tx: Sender<ShardClientMessage>, } diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index 6e3b285..2cf45fd 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,13 +1,15 @@ use internal::prelude::*; use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Sender}; use std::sync::Arc; use std::thread; use super::super::super::EventHandler; use super::{ + ShardClientMessage, ShardId, ShardManagerMessage, + ShardManagerMonitor, ShardQueuer, ShardQueuerMessage, ShardRunnerInfo, @@ -18,8 +20,80 @@ use typemap::ShareMap; #[cfg(feature = "framework")] use framework::Framework; +/// A manager for handling the status of shards by starting them, restarting +/// them, and stopping them when required. +/// +/// **Note**: The [`Client`] internally uses a shard manager. If you are using a +/// Client, then you do not need to make one of these. +/// +/// # Examples +/// +/// Initialize a shard manager with a framework responsible for shards 0 through +/// 2, of 5 total shards: +/// +/// ```rust,no_run +/// extern crate parking_lot; +/// extern crate serenity; +/// extern crate threadpool; +/// extern crate typemap; +/// +/// # use std::error::Error; +/// # +/// # #[cfg(feature = "framework")] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # +/// use parking_lot::Mutex; +/// use serenity::client::bridge::gateway::ShardManager; +/// use serenity::client::EventHandler; +/// use serenity::http; +/// use std::sync::Arc; +/// use std::env; +/// use threadpool::ThreadPool; +/// use typemap::ShareMap; +/// +/// struct Handler; +/// +/// impl EventHandler for Handler { } +/// +/// let token = env::var("DISCORD_TOKEN")?; +/// http::set_token(&token); +/// let token = Arc::new(Mutex::new(token)); +/// +/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url)); +/// let data = Arc::new(Mutex::new(ShareMap::custom())); +/// let event_handler = Arc::new(Handler); +/// let framework = Arc::new(Mutex::new(None)); +/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5); +/// +/// ShardManager::new( +/// 0, // the shard index to start initiating from +/// 3, // the number of shards to initiate (this initiates 0, 1, and 2) +/// 5, // the total number of shards in use +/// gateway_url, +/// token, +/// data, +/// event_handler, +/// framework, +/// threadpool, +/// ); +/// # Ok(()) +/// # } +/// # +/// # #[cfg(not(feature = "framework"))] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # Ok(()) +/// # } +/// # +/// # fn main() { +/// # try_main().unwrap(); +/// # } +/// ``` +/// +/// [`Client`]: ../../struct.Client.html +#[derive(Debug)] pub struct ShardManager { - pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// The shard runners currently managed. + runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, /// The index of the first shard to initialize, 0-indexed. shard_index: u64, /// The number of shards to initialize. @@ -27,10 +101,11 @@ pub struct ShardManager { /// The total shards in use, 1-indexed. shard_total: u64, shard_queuer: Sender<ShardQueuerMessage>, - thread_rx: Receiver<ShardManagerMessage>, } impl ShardManager { + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(feature = "framework")] #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new<H>( @@ -43,7 +118,7 @@ impl ShardManager { event_handler: Arc<H>, framework: Arc<Mutex<Option<Box<Framework + Send>>>>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); @@ -66,16 +141,22 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) } + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(not(feature = "framework"))] pub fn new<H>( shard_index: u64, @@ -86,21 +167,21 @@ impl ShardManager { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); let runners = Arc::new(Mutex::new(HashMap::new())); let mut shard_queuer = ShardQueuer { - data: data.clone(), - event_handler: event_handler.clone(), + data: Arc::clone(&data), + event_handler: Arc::clone(&event_handler), last_start: None, manager_tx: thread_tx.clone(), - runners: runners.clone(), + runners: Arc::clone(&runners), rx: shard_queue_rx, - token: token.clone(), - ws_url: ws_url.clone(), + token: Arc::clone(&token), + ws_url: Arc::clone(&ws_url), threadpool, }; @@ -108,21 +189,41 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) + } + + /// Returns whether the shard manager contains either an active instance of + /// a shard runner responsible for the given ID. + /// + /// If a shard has been queued but has not yet been initiated, then this + /// will return `false`. Consider double-checking [`is_responsible_for`] to + /// determine whether this shard manager is responsible for the given shard. + /// + /// [`is_responsible_for`]: #method.is_responsible_for + pub fn has(&self, shard_id: ShardId) -> bool { + self.runners.lock().contains_key(&shard_id) } + /// Initializes all shards that the manager is responsible for. + /// + /// This will communicate shard boots with the [`ShardQueuer`] so that they + /// are properly queued. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html pub fn initialize(&mut self) -> Result<()> { let shard_to = self.shard_index + self.shard_init; - debug!("{}, {}", self.shard_index, self.shard_init); - for shard_id in self.shard_index..shard_to { let shard_total = self.shard_total; @@ -132,39 +233,52 @@ impl ShardManager { Ok(()) } - pub fn run(&mut self) { - while let Ok(value) = self.thread_rx.recv() { - match value { - ShardManagerMessage::Restart(shard_id) => self.restart(shard_id), - ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id), - ShardManagerMessage::ShutdownAll => { - self.shutdown_all(); - - break; - }, - } - } - } - - pub fn shutdown_all(&mut self) { - info!("Shutting down all shards"); - let keys = { - self.runners.lock().keys().cloned().collect::<Vec<ShardId>>() - }; + /// Sets the new sharding information for the manager. + /// + /// This will shutdown all existing shards. + /// + /// This will _not_ instantiate the new shards. + pub fn set_shards(&mut self, index: u64, init: u64, total: u64) { + self.shutdown_all(); - for shard_id in keys { - self.shutdown(shard_id); - } + self.shard_index = index; + self.shard_init = init; + self.shard_total = total; } - fn boot(&mut self, shard_info: [ShardId; 2]) { - info!("Telling shard queuer to start shard {}", shard_info[0]); - - let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); - let _ = self.shard_queuer.send(msg); - } - - fn restart(&mut self, shard_id: ShardId) { + /// Restarts a shard runner. + /// + /// This sends a shutdown signal to a shard's associated [`ShardRunner`], + /// and then queues a initialization of a shard runner for the same shard + /// via the [`ShardQueuer`]. + /// + /// # Examples + /// + /// Creating a client and then restarting a shard by ID: + /// + /// _(note: in reality this precise code doesn't have an effect since the + /// shard would not yet have been initialized via [`initialize`], but the + /// concept is the same)_ + /// + /// ```rust,no_run + /// use serenity::client::bridge::gateway::ShardId; + /// use serenity::client::{Client, EventHandler}; + /// use std::env; + /// + /// struct Handler; + /// + /// impl EventHandler for Handler { } + /// + /// let token = env::var("DISCORD_TOKEN").unwrap(); + /// let mut client = Client::new(&token, Handler).unwrap(); + /// + /// // restart shard ID 7 + /// client.shard_manager.lock().restart(ShardId(7)); + /// ``` + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn restart(&mut self, shard_id: ShardId) { info!("Restarting shard {}", shard_id); self.shutdown(shard_id); @@ -173,23 +287,88 @@ impl ShardManager { self.boot([shard_id, ShardId(shard_total)]); } - fn shutdown(&mut self, shard_id: ShardId) { + /// Returns the [`ShardId`]s of the shards that have been instantiated and + /// currently have a valid [`ShardRunner`]. + /// + /// [`ShardId`]: struct.ShardId.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn shards_instantiated(&self) -> Vec<ShardId> { + self.runners.lock().keys().cloned().collect() + } + + /// Attempts to shut down the shard runner by Id. + /// + /// Returns a boolean indicating whether a shard runner was present. This is + /// _not_ necessary an indicator of whether the shard runner was + /// successfully shut down. + /// + /// **Note**: If the receiving end of an mpsc channel - theoretically owned + /// by the shard runner - no longer exists, then the shard runner will not + /// know it should shut down. This _should never happen_. It may already be + /// stopped. + pub fn shutdown(&mut self, shard_id: ShardId) -> bool { info!("Shutting down shard {}", shard_id); if let Some(runner) = self.runners.lock().get(&shard_id) { - let msg = ShardManagerMessage::Shutdown(shard_id); + let shutdown = ShardManagerMessage::Shutdown(shard_id); + let msg = ShardClientMessage::Manager(shutdown); if let Err(why) = runner.runner_tx.send(msg) { - warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why); + warn!( + "Failed to cleanly shutdown shard {}: {:?}", + shard_id, + why, + ); } } - self.runners.lock().remove(&shard_id); + self.runners.lock().remove(&shard_id).is_some() + } + + /// Sends a shutdown message for all shards that the manager is responsible + /// for that are still known to be running. + /// + /// If you only need to shutdown a select number of shards, prefer looping + /// over the [`shutdown`] method. + /// + /// [`shutdown`]: #method.shutdown + pub fn shutdown_all(&mut self) { + let keys = { + let runners = self.runners.lock(); + + if runners.is_empty() { + return; + } + + runners.keys().cloned().collect::<Vec<_>>() + }; + + info!("Shutting down all shards"); + + for shard_id in keys { + self.shutdown(shard_id); + } + } + + fn boot(&mut self, shard_info: [ShardId; 2]) { + info!("Telling shard queuer to start shard {}", shard_info[0]); + + let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); + let _ = self.shard_queuer.send(msg); } } impl Drop for ShardManager { + /// A custom drop implementation to clean up after the manager. + /// + /// This shuts down all active [`ShardRunner`]s and attempts to tell the + /// [`ShardQueuer`] to shutdown. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html fn drop(&mut self) { + self.shutdown_all(); + if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { warn!("Failed to send shutdown to shard queuer: {:?}", why); } diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs new file mode 100644 index 0000000..4a64473 --- /dev/null +++ b/src/client/bridge/gateway/shard_manager_monitor.rs @@ -0,0 +1,54 @@ +use parking_lot::Mutex; +use std::sync::mpsc::Receiver; +use std::sync::Arc; +use super::{ShardManager, ShardManagerMessage}; + +/// The shard manager monitor does what it says on the tin -- it monitors the +/// shard manager and performs actions on it as received. +/// +/// The monitor is essentially responsible for running in its own thread and +/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or +/// shutdown everything entirely. +/// +/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html +#[derive(Debug)] +pub struct ShardManagerMonitor { + /// An clone of the Arc to the manager itself. + pub manager: Arc<Mutex<ShardManager>>, + /// The mpsc Receiver channel to receive shard manager messages over. + pub rx: Receiver<ShardManagerMessage>, +} + +impl ShardManagerMonitor { + /// "Runs" the monitor, waiting for messages over the Receiver. + /// + /// This should be called in its own thread due to its blocking, looped + /// nature. + /// + /// This will continue running until either: + /// + /// - a [`ShardManagerMessage::ShutdownAll`] has been received + /// - an error is returned while receiving a message from the + /// channel (probably indicating that the shard manager should stop anyway) + /// + /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll + pub fn run(&mut self) { + debug!("Starting shard manager worker"); + + while let Ok(value) = self.rx.recv() { + match value { + ShardManagerMessage::Restart(shard_id) => { + self.manager.lock().restart(shard_id); + }, + ShardManagerMessage::Shutdown(shard_id) => { + self.manager.lock().shutdown(shard_id); + }, + ShardManagerMessage::ShutdownAll => { + self.manager.lock().shutdown_all(); + + break; + }, + } + } + } +} diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs new file mode 100644 index 0000000..069c838 --- /dev/null +++ b/src/client/bridge/gateway/shard_messenger.rs @@ -0,0 +1,276 @@ +use model::{Game, GuildId, OnlineStatus}; +use super::{ShardClientMessage, ShardRunnerMessage}; +use std::sync::mpsc::{SendError, Sender}; +use websocket::message::OwnedMessage; + +/// A lightweight wrapper around an mpsc sender. +/// +/// This is used to cleanly communicate with a shard's respective +/// [`ShardRunner`]. This can be used for actions such as setting the game via +/// [`set_game`] or shutting down via [`shutdown`]. +/// +/// [`ShardRunner`]: struct.ShardRunner.html +/// [`set_game`]: #method.set_game +/// [`shutdown`]: #method.shutdown +#[derive(Clone, Debug)] +pub struct ShardMessenger { + tx: Sender<ShardClientMessage>, +} + +impl ShardMessenger { + /// Creates a new shard messenger. + /// + /// If you are using the [`Client`], you do not need to do this. + /// + /// [`Client`]: ../../struct.Client.html + #[inline] + pub fn new(tx: Sender<ShardClientMessage>) -> Self { + Self { + tx, + } + } + + /// Requests that one or multiple [`Guild`]s be chunked. + /// + /// This will ask the gateway to start sending member chunks for large + /// guilds (250 members+). If a guild is over 250 members, then a full + /// member list will not be downloaded, and must instead be requested to be + /// sent in "chunks" containing members. + /// + /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each + /// chunk only contains a partial amount of the total members. + /// + /// If the `cache` feature is enabled, the cache will automatically be + /// updated with member chunks. + /// + /// # Examples + /// + /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not + /// specifying a query parameter: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; + /// # + /// use serenity::model::GuildId; + /// + /// let guild_ids = vec![GuildId(81384788765712384)]; + /// + /// shard.chunk_guilds(guild_ids, Some(2000), None); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// Chunk a single guild by Id, limiting to 20 members, and specifying a + /// query parameter of `"do"`: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; + /// # + /// use serenity::model::GuildId; + /// + /// let guild_ids = vec![GuildId(81384788765712384)]; + /// + /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// [`Event::GuildMembersChunk`]: + /// ../../model/event/enum.Event.html#variant.GuildMembersChunk + /// [`Guild`]: ../../model/struct.Guild.html + /// [`Member`]: ../../model/struct.Member.html + pub fn chunk_guilds<It>( + &self, + guild_ids: It, + limit: Option<u16>, + query: Option<String>, + ) where It: IntoIterator<Item=GuildId> { + let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>(); + + let _ = self.send(ShardRunnerMessage::ChunkGuilds { + guild_ids: guilds, + limit, + query, + }); + } + + /// Sets the user's current game, if any. + /// + /// Other presence settings are maintained. + /// + /// # Examples + /// + /// Setting the current game to playing `"Heroes of the Storm"`: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::Game; + /// + /// shard.set_game(Some(Game::playing("Heroes of the Storm"))); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + pub fn set_game(&self, game: Option<Game>) { + let _ = self.send(ShardRunnerMessage::SetGame(game)); + } + + /// Sets the user's full presence information. + /// + /// Consider using the individual setters if you only need to modify one of + /// these. + /// + /// # Examples + /// + /// Set the current user as playing `"Heroes of the Storm"` and being + /// online: + /// + /// ```rust,ignore + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::{Game, OnlineStatus}; + /// + /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + pub fn set_presence(&self, game: Option<Game>, mut status: OnlineStatus) { + if status == OnlineStatus::Offline { + status = OnlineStatus::Invisible; + } + + let _ = self.send(ShardRunnerMessage::SetPresence(status, game)); + } + + /// Sets the user's current online status. + /// + /// Note that [`Offline`] is not a valid online status, so it is + /// automatically converted to [`Invisible`]. + /// + /// Other presence settings are maintained. + /// + /// # Examples + /// + /// Setting the current online status for the shard to [`DoNotDisturb`]. + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::OnlineStatus; + /// + /// shard.set_status(OnlineStatus::DoNotDisturb); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb + /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible + /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline + pub fn set_status(&self, mut online_status: OnlineStatus) { + if online_status == OnlineStatus::Offline { + online_status = OnlineStatus::Invisible; + } + + let _ = self.send(ShardRunnerMessage::SetStatus(online_status)); + } + + /// Shuts down the websocket by attempting to cleanly close the + /// connection. + pub fn shutdown_clean(&self) { + let _ = self.send(ShardRunnerMessage::Close(1000, None)); + } + + /// Sends a raw message over the WebSocket. + /// + /// The given message is not mutated in any way, and is sent as-is. + /// + /// You should only use this if you know what you're doing. If you're + /// wanting to, for example, send a presence update, prefer the usage of + /// the [`set_presence`] method. + /// + /// [`set_presence`]: #method.set_presence + pub fn websocket_message(&self, message: OwnedMessage) { + let _ = self.send(ShardRunnerMessage::Message(message)); + } + + #[inline] + fn send(&self, msg: ShardRunnerMessage) + -> Result<(), SendError<ShardClientMessage>> { + self.tx.send(ShardClientMessage::Runner(msg)) + } +} diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs index cb3f749..bd02532 100644 --- a/src/client/bridge/gateway/shard_queuer.rs +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -27,20 +27,69 @@ use framework::Framework; /// blocking nature of the loop itself as well as a 5 second thread sleep /// between shard starts. pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> { + /// A copy of [`Client::data`] to be given to runners for contextual + /// dispatching. + /// + /// [`Client::data`]: ../../struct.Client.html#structfield.data pub data: Arc<Mutex<ShareMap>>, + /// A reference to an `EventHandler`, such as the one given to the + /// [`Client`]. + /// + /// [`Client`]: ../../struct.Client.html pub event_handler: Arc<H>, + /// A copy of the framework #[cfg(feature = "framework")] pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + /// The instant that a shard was last started. + /// + /// This is used to determine how long to wait between shard IDENTIFYs. pub last_start: Option<Instant>, + /// A copy of the sender channel to communicate with the + /// [`ShardManagerMonitor`]. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html pub manager_tx: Sender<ShardManagerMessage>, + /// A copy of the map of shard runners. pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// A receiver channel for the shard queuer to be told to start shards. pub rx: Receiver<ShardQueuerMessage>, + /// A copy of a threadpool to give shard runners. + /// + /// For example, when using the [`Client`], this will be a copy of + /// [`Client::threadpool`]. + /// + /// [`Client`]: ../../struct.Client.html + /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool pub threadpool: ThreadPool, + /// A copy of the token to connect with. pub token: Arc<Mutex<String>>, + /// A copy of the URI to use to connect to the gateway. pub ws_url: Arc<Mutex<String>>, } impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { + /// Begins the shard queuer loop. + /// + /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s, + /// blocking for messages on what to do. + /// + /// If a [`ShardQueuerMessage::Start`] is received, this will: + /// + /// 1. Check how much time has passed since the last shard was started + /// 2. If the amount of time is less than the ratelimit, it will sleep until + /// that time has passed + /// 3. Start the shard by ID + /// + /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and + /// the loop will be over. + /// + /// **Note**: This should be run in its own thread due to the blocking + /// nature of the loop. + /// + /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html + /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown + /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start + /// [`rx`]: #structfield.rx pub fn run(&mut self) { while let Ok(msg) = self.rx.recv() { match msg { @@ -80,16 +129,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> { let shard_info = [shard_id.0, shard_total.0]; + let shard = Shard::new( Arc::clone(&self.ws_url), Arc::clone(&self.token), shard_info, )?; - let locked = Arc::new(Mutex::new(shard)); let mut runner = feature_framework! {{ ShardRunner::new( - Arc::clone(&locked), + shard, self.manager_tx.clone(), Arc::clone(&self.framework), Arc::clone(&self.data), @@ -98,7 +147,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { ) } else { ShardRunner::new( - locked.clone(), + shard, self.manager_tx.clone(), self.data.clone(), self.event_handler.clone(), @@ -108,7 +157,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { let runner_info = ShardRunnerInfo { runner_tx: runner.runner_tx(), - shard: locked, }; thread::spawn(move || { diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index b14e48b..a5fde3d 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -1,35 +1,45 @@ +use gateway::{Shard, ShardAction}; use internal::prelude::*; use internal::ws_impl::ReceiverExt; use model::event::{Event, GatewayEvent}; use parking_lot::Mutex; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Receiver, Sender, TryRecvError}; use std::sync::Arc; use super::super::super::{EventHandler, dispatch}; -use super::{LockedShard, ShardId, ShardManagerMessage}; +use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage}; use threadpool::ThreadPool; use typemap::ShareMap; +use websocket::message::{CloseData, OwnedMessage}; use websocket::WebSocketError; #[cfg(feature = "framework")] use framework::Framework; +#[cfg(feature = "voice")] +use internal::ws_impl::SenderExt; +/// A runner for managing a [`Shard`] and its respective WebSocket client. +/// +/// [`Shard`]: ../../../gateway/struct.Shard.html pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, #[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>, manager_tx: Sender<ShardManagerMessage>, - runner_rx: Receiver<ShardManagerMessage>, - runner_tx: Sender<ShardManagerMessage>, - shard: LockedShard, - shard_info: [u64; 2], + // channel to receive messages from the shard manager and dispatches + runner_rx: Receiver<ShardClientMessage>, + // channel to send messages to the shard runner from the shard manager + runner_tx: Sender<ShardClientMessage>, + shard: Shard, threadpool: ThreadPool, } impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { + /// Creates a new runner for a Shard. + #[allow(too_many_arguments)] #[cfg(feature = "framework")] pub fn new( - shard: LockedShard, + shard: Shard, manager_tx: Sender<ShardManagerMessage>, framework: Arc<Mutex<Option<Box<Framework + Send>>>>, data: Arc<Mutex<ShareMap>>, @@ -37,7 +47,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { threadpool: ThreadPool, ) -> Self { let (tx, rx) = mpsc::channel(); - let shard_info = shard.lock().shard_info(); Self { runner_rx: rx, @@ -47,21 +56,20 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { framework, manager_tx, shard, - shard_info, threadpool, } } + /// Creates a new runner for a Shard. #[cfg(not(feature = "framework"))] pub fn new( - shard: LockedShard, + shard: Shard, manager_tx: Sender<ShardManagerMessage>, data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, threadpool: ThreadPool, ) -> Self { let (tx, rx) = mpsc::channel(); - let shard_info = shard.lock().shard_info(); Self { runner_rx: rx, @@ -70,90 +78,276 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { event_handler, manager_tx, shard, - shard_info, threadpool, } } + /// Starts the runner's loop to receive events. + /// + /// This runs a loop that performs the following in each iteration: + /// + /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the + /// [`ShardManager`], and if there is one, acts on it. + /// + /// 2. checks if a heartbeat should be sent to the discord Gateway, and if + /// so, sends one. + /// + /// 3. attempts to retrieve a message from the WebSocket, processing it into + /// a [`GatewayEvent`]. This will block for 100ms before assuming there is + /// no message available. + /// + /// 4. Checks with the [`Shard`] to determine if the gateway event is + /// specifying an action to take (e.g. resuming, reconnecting, heartbeating) + /// and then performs that action, if any. + /// + /// 5. Dispatches the event via the Client. + /// + /// 6. Go back to 1. + /// + /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html + /// [`Shard`]: ../../../gateway/struct.Shard.html + /// [`ShardManager`]: struct.ShardManager.html + /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html pub fn run(&mut self) -> Result<()> { - debug!("[ShardRunner {:?}] Running", self.shard_info); + debug!("[ShardRunner {:?}] Running", self.shard.shard_info()); loop { - { - let mut shard = self.shard.lock(); - let incoming = self.runner_rx.try_recv(); + if !self.recv()? { + return Ok(()); + } - // Check for an incoming message over the runner channel. - // - // If the message is to shutdown, first verify the ID so we know - // for certain this runner is to shutdown. - if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == self.shard_info[0] { - let _ = shard.shutdown_clean(); + // check heartbeat + if let Err(why) = self.shard.check_heartbeat() { + warn!( + "[ShardRunner {:?}] Error heartbeating: {:?}", + self.shard.shard_info(), + why, + ); + debug!( + "[ShardRunner {:?}] Requesting restart", + self.shard.shard_info(), + ); + + return self.request_restart(); + } - return Ok(()); + #[cfg(feature = "voice")] + { + for message in self.shard.cycle_voice_recv() { + if let Err(why) = self.shard.client.send_json(&message) { + println!("Err sending from voice over WS: {:?}", why); } } + } - if let Err(why) = shard.check_heartbeat() { - error!("Failed to heartbeat and reconnect: {:?}", why); + let (event, action, successful) = self.recv_event(); - return self.request_restart(); - } - - #[cfg(feature = "voice")] - { - shard.cycle_voice_recv(); - } + if let Some(action) = action { + let _ = self.action(action); } - let (event, successful) = self.recv_event(); - if let Some(event) = event { - let data = Arc::clone(&self.data); - let event_handler = Arc::clone(&self.event_handler); - let shard = Arc::clone(&self.shard); - - feature_framework! {{ - let framework = Arc::clone(&self.framework); - - self.threadpool.execute(|| { - dispatch( - event, - shard, - framework, - data, - event_handler, - ); - }); - } else { - self.threadpool.execute(|| { - dispatch( - event, - shard, - data, - event_handler, - ); - }); - }} + self.dispatch(event); } - if !successful && !self.shard.lock().stage().is_connecting() { + if !successful && !self.shard.stage().is_connecting() { return self.request_restart(); } } } - pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> { + /// Clones the internal copy of the Sender to the shard runner. + pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> { self.runner_tx.clone() } + /// Takes an action that a [`Shard`] has determined should happen and then + /// does it. + /// + /// For example, if the shard says that an Identify message needs to be + /// sent, this will do that. + /// + /// # Errors + /// + /// Returns + fn action(&mut self, action: ShardAction) -> Result<()> { + match action { + ShardAction::Autoreconnect => self.shard.autoreconnect(), + ShardAction::Heartbeat => self.shard.heartbeat(), + ShardAction::Identify => self.shard.identify(), + ShardAction::Reconnect => self.shard.reconnect(), + ShardAction::Resume => self.shard.resume(), + } + } + + // Checks if the ID received to shutdown is equivalent to the ID of the + // shard this runner is responsible. If so, it shuts down the WebSocket + // client. + // + // Returns whether the WebSocket client is still active. + // + // If true, the WebSocket client was _not_ shutdown. If false, it was. + fn checked_shutdown(&mut self, id: ShardId) -> bool { + // First verify the ID so we know for certain this runner is + // to shutdown. + if id.0 != self.shard.shard_info()[0] { + // Not meant for this runner for some reason, don't + // shutdown. + return true; + } + + let close_data = CloseData::new(1000, String::new()); + let msg = OwnedMessage::Close(Some(close_data)); + let _ = self.shard.client.send_message(&msg); + + false + } + + fn dispatch(&self, event: Event) { + let data = Arc::clone(&self.data); + let event_handler = Arc::clone(&self.event_handler); + let runner_tx = self.runner_tx.clone(); + let shard_id = self.shard.shard_info()[0]; + + feature_framework! {{ + let framework = Arc::clone(&self.framework); + + self.threadpool.execute(move || { + dispatch( + event, + framework, + data, + event_handler, + runner_tx, + shard_id, + ); + }); + } else { + self.threadpool.execute(move || { + dispatch( + event, + data, + event_handler, + runner_tx, + shard_id, + ); + }); + }} + } + + // Handles a received value over the shard runner rx channel. + // + // Returns a boolean on whether the shard runner can continue. + // + // This always returns true, except in the case that the shard manager asked + // the runner to shutdown. + fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool { + match value { + ShardClientMessage::Manager(x) => match x { + ShardManagerMessage::Restart(id) | + ShardManagerMessage::Shutdown(id) => { + self.checked_shutdown(id) + }, + ShardManagerMessage::ShutdownAll => { + // This variant should never be received. + warn!( + "[ShardRunner {:?}] Received a ShutdownAll?", + self.shard.shard_info(), + ); + + true + }, + } + ShardClientMessage::Runner(x) => match x { + ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { + self.shard.chunk_guilds( + guild_ids, + limit, + query.as_ref().map(String::as_str), + ).is_ok() + }, + ShardRunnerMessage::Close(code, reason) => { + let reason = reason.unwrap_or_else(String::new); + let data = CloseData::new(code, reason); + let msg = OwnedMessage::Close(Some(data)); + + self.shard.client.send_message(&msg).is_ok() + }, + ShardRunnerMessage::Message(msg) => { + self.shard.client.send_message(&msg).is_ok() + }, + ShardRunnerMessage::SetGame(game) => { + // To avoid a clone of `game`, we do a little bit of + // trickery here: + // + // First, we obtain a reference to the current presence of + // the shard, and create a new presence tuple of the new + // game we received over the channel as well as the online + // status that the shard already had. + // + // We then (attempt to) send the websocket message with the + // status update, expressively returning: + // + // - whether the message successfully sent + // - the original game we received over the channel + self.shard.set_game(game); + + self.shard.update_presence().is_ok() + }, + ShardRunnerMessage::SetPresence(status, game) => { + self.shard.set_presence(status, game); + + self.shard.update_presence().is_ok() + }, + ShardRunnerMessage::SetStatus(status) => { + self.shard.set_status(status); + + self.shard.update_presence().is_ok() + }, + }, + } + } + + // Receives values over the internal shard runner rx channel and handles + // them. + // + // This will loop over values until there is no longer one. + // + // Requests a restart if the sending half of the channel disconnects. This + // should _never_ happen, as the sending half is kept on the runner. + + // Returns whether the shard runner is in a state that can continue. + fn recv(&mut self) -> Result<bool> { + loop { + match self.runner_rx.try_recv() { + Ok(value) => { + if !self.handle_rx_value(value) { + return Ok(false); + } + }, + Err(TryRecvError::Disconnected) => { + warn!( + "[ShardRunner {:?}] Sending half DC; restarting", + self.shard.shard_info(), + ); + + let _ = self.request_restart(); + + return Ok(false); + }, + Err(TryRecvError::Empty) => break, + } + } + + // There are no longer any values available. + + Ok(true) + } + /// Returns a received event, as well as whether reading the potentially /// present event was successful. - fn recv_event(&mut self) -> (Option<Event>, bool) { - let mut shard = self.shard.lock(); - - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) { + let gw_event = match self.shard.client.recv_json(GatewayEvent::decode) { Err(Error::WebSocket(WebSocketError::IoError(_))) => { // Check that an amount of time at least double the // heartbeat_interval has passed. @@ -161,58 +355,69 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // If not, continue on trying to receive messages. // // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - return (None, true); + { + let last = self.shard.last_heartbeat_ack(); + let interval = self.shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; + + if seconds_passed <= interval_in_secs * 2 { + return (None, None, true); + } + } else { + return (None, None, true); } - } else { - return (None, true); } debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { + if let Err(why) = self.shard.autoreconnect() { error!("Failed to auto-reconnect: {:?}", why); } - return (None, true); + return (None, None, true); }, Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { // This is hit when the websocket client dies this will be // hit every iteration. - return (None, false); + return (None, None, false); }, other => other, }; let event = match gw_event { Ok(Some(event)) => Ok(event), - Ok(None) => return (None, true), + Ok(None) => return (None, None, true), Err(why) => Err(why), }; - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => return (None, true), + let action = match self.shard.handle_event(&event) { + Ok(Some(action)) => Some(action), + Ok(None) => None, Err(why) => { error!("Shard handler received err: {:?}", why); - return (None, true); + return (None, None, true); }, - }; + }; + + let event = match event { + Ok(GatewayEvent::Dispatch(_, event)) => Some(event), + _ => None, + }; - (Some(event), true) + (event, action, true) } fn request_restart(&self) -> Result<()> { - debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); - let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + debug!( + "[ShardRunner {:?}] Requesting restart", + self.shard.shard_info(), + ); + let shard_id = ShardId(self.shard.shard_info()[0]); + let msg = ShardManagerMessage::Restart(shard_id); let _ = self.manager_tx.send(msg); Ok(()) diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs new file mode 100644 index 0000000..e6458eb --- /dev/null +++ b/src/client/bridge/gateway/shard_runner_message.rs @@ -0,0 +1,43 @@ +use model::{Game, GuildId, OnlineStatus}; +use websocket::message::OwnedMessage; + +/// A message to send from a shard over a WebSocket. +pub enum ShardRunnerMessage { + /// Indicates that the client is to send a member chunk message. + ChunkGuilds { + /// The IDs of the [`Guild`]s to chunk. + /// + /// [`Guild`]: ../../../model/struct.Guild.html + guild_ids: Vec<GuildId>, + /// The maximum number of members to receive [`GuildMembersChunkEvent`]s + /// for. + /// + /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html + limit: Option<u16>, + /// Text to filter members by. + /// + /// For example, a query of `"s"` will cause only [`Member`]s whose + /// usernames start with `"s"` to be chunked. + /// + /// [`Member`]: ../../../model/struct.Member.html + query: Option<String>, + }, + /// Indicates that the client is to close with the given status code and + /// reason. + /// + /// You should rarely - if _ever_ - need this, but the option is available. + /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you + /// are intending to send a 1000 close code. + /// + /// [`ShardManager`]: struct.ShardManager.html + Close(u16, Option<String>), + /// Indicates that the client is to send a custom WebSocket message. + Message(OwnedMessage), + /// Indicates that the client is to update the shard's presence's game. + SetGame(Option<Game>), + /// Indicates that the client is to update the shard's presence in its + /// entirity. + SetPresence(OnlineStatus, Option<Game>), + /// Indicates that the client is to update the shard's presence's status. + SetStatus(OnlineStatus), +} diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs index 4f27526..ea18d73 100644 --- a/src/client/bridge/mod.rs +++ b/src/client/bridge/mod.rs @@ -1 +1,10 @@ +//! A collection of bridged support between the [`client`] module and other +//! modules. +//! +//! **Warning**: You likely _do not_ need to mess with anything in here. Beware. +//! This is lower-level functionality abstracted by the [`Client`]. +//! +//! [`Client`]: ../struct.Client.html +//! [`client`]: ../ + pub mod gateway; diff --git a/src/client/context.rs b/src/client/context.rs index a60aaa0..9b89cde 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -1,7 +1,7 @@ -use Result; +use client::bridge::gateway::{ShardClientMessage, ShardMessenger}; +use std::sync::mpsc::Sender; use std::sync::Arc; use typemap::ShareMap; -use gateway::Shard; use model::*; use parking_lot::Mutex; @@ -12,7 +12,7 @@ use internal::prelude::*; #[cfg(feature = "builder")] use builder::EditProfile; #[cfg(feature = "builder")] -use {http, utils}; +use {Result, http, utils}; #[cfg(feature = "builder")] use std::collections::HashMap; @@ -38,21 +38,23 @@ pub struct Context { /// /// [`Client::data`]: struct.Client.html#structfield.data pub data: Arc<Mutex<ShareMap>>, - /// The associated shard which dispatched the event handler. - /// - /// Note that if you are sharding, in relevant terms, this is the shard - /// which received the event being dispatched. - pub shard: Arc<Mutex<Shard>>, + /// The messenger to communicate with the shard runner. + pub shard: ShardMessenger, + /// The ID of the shard this context is related to. + pub shard_id: u64, } impl Context { /// Create a new Context to be passed to an event handler. - pub(crate) fn new(shard: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>) - -> Context { + pub(crate) fn new( + data: Arc<Mutex<ShareMap>>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, + ) -> Context { Context { + shard: ShardMessenger::new(runner_tx), + shard_id, data, - shard, } } @@ -110,6 +112,7 @@ impl Context { http::edit_profile(&edited) } + /// Sets the current user as being [`Online`]. This maintains the current /// game. /// @@ -137,9 +140,9 @@ impl Context { /// ``` /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online + #[inline] pub fn online(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Online); + self.shard.set_status(OnlineStatus::Online); } /// Sets the current user as being [`Idle`]. This maintains the current @@ -168,9 +171,9 @@ impl Context { /// ``` /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle + #[inline] pub fn idle(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Idle); + self.shard.set_status(OnlineStatus::Idle); } /// Sets the current user as being [`DoNotDisturb`]. This maintains the @@ -199,9 +202,9 @@ impl Context { /// ``` /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb + #[inline] pub fn dnd(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::DoNotDisturb); + self.shard.set_status(OnlineStatus::DoNotDisturb); } /// Sets the current user as being [`Invisible`]. This maintains the current @@ -231,9 +234,9 @@ impl Context { /// /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible + #[inline] pub fn invisible(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Invisible); + self.shard.set_status(OnlineStatus::Invisible); } /// "Resets" the current user's presence, by setting the game to `None` and @@ -265,9 +268,9 @@ impl Context { /// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence + #[inline] pub fn reset_presence(&self) { - let mut shard = self.shard.lock(); - shard.set_presence(None, OnlineStatus::Online); + self.shard.set_presence(None, OnlineStatus::Online); } /// Sets the current game, defaulting to an online status of [`Online`]. @@ -303,9 +306,9 @@ impl Context { /// ``` /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online + #[inline] pub fn set_game(&self, game: Game) { - let mut shard = self.shard.lock(); - shard.set_presence(Some(game), OnlineStatus::Online); + self.shard.set_presence(Some(game), OnlineStatus::Online); } /// Sets the current game, passing in only its name. This will automatically @@ -351,8 +354,7 @@ impl Context { url: None, }; - let mut shard = self.shard.lock(); - shard.set_presence(Some(game), OnlineStatus::Online); + self.shard.set_presence(Some(game), OnlineStatus::Online); } /// Sets the current user's presence, providing all fields to be passed. @@ -406,9 +408,9 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle + #[inline] pub fn set_presence(&self, game: Option<Game>, status: OnlineStatus) { - let mut shard = self.shard.lock(); - shard.set_presence(game, status); + self.shard.set_presence(game, status); } /// Disconnects the shard from the websocket, essentially "quiting" it. @@ -417,9 +419,8 @@ impl Context { /// until [`Client::start`] and vice versa are called again. /// /// [`Client::start`]: ./struct.Client.html#method.start - pub fn quit(&self) -> Result<()> { - let mut shard = self.shard.lock(); - - shard.shutdown_clean() + #[inline] + pub fn quit(&self) { + self.shard.shutdown_clean(); } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 827875e..9545a6f 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,9 +1,10 @@ use std::sync::Arc; use parking_lot::Mutex; +use super::bridge::gateway::ShardClientMessage; use super::event_handler::EventHandler; use super::Context; +use std::sync::mpsc::Sender; use typemap::ShareMap; -use gateway::Shard; use model::event::Event; use model::{Channel, Message}; @@ -35,19 +36,26 @@ macro_rules! now { () => (Utc::now().time().second() * 1000) } -fn context(conn: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>) -> Context { - Context::new(conn, data) +fn context( + data: Arc<Mutex<ShareMap>>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) -> Context { + Context::new(data, runner_tx, shard_id) } #[cfg(feature = "framework")] -pub fn dispatch<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +pub fn dispatch<H: EventHandler + 'static>( + event: Event, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { match event { Event::MessageCreate(event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); dispatch_message( context.clone(), event.message.clone(), @@ -58,21 +66,24 @@ pub fn dispatch<H: EventHandler + 'static>(event: Event, framework.dispatch(context, event.message); } }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, data, event_handler, runner_tx, shard_id), } } #[cfg(not(feature = "framework"))] -pub fn dispatch<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +pub fn dispatch<H: EventHandler + 'static>( + event: Event, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { match event { Event::MessageCreate(event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); dispatch_message(context, event.message, event_handler); }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, data, event_handler, runner_tx, shard_id), } } @@ -91,10 +102,13 @@ fn dispatch_message<H>( } #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +fn handle_event<H: EventHandler + 'static>( + event: Event, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { #[cfg(feature = "cache")] let mut last_guild_create_time = now!(); @@ -113,7 +127,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::ChannelCreate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); // This different channel_create dispatching is only due to the fact that // each time the bot receives a dm, this event is also fired. @@ -134,7 +148,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::ChannelDelete(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); match event.channel { Channel::Private(_) | Channel::Group(_) => {}, @@ -147,28 +161,28 @@ fn handle_event<H: EventHandler + 'static>(event: Event, } }, Event::ChannelPinsUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_pins_update(context, event); }, Event::ChannelRecipientAdd(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_recipient_addition(context, event.channel_id, event.user); }, Event::ChannelRecipientRemove(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_recipient_removal(context, event.channel_id, event.user); }, Event::ChannelUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ let before = CACHE.read().channel(event.channel.id()); @@ -178,12 +192,12 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }} }, Event::GuildBanAdd(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_ban_addition(context, event.guild_id, event.user); }, Event::GuildBanRemove(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_ban_removal(context, event.guild_id, event.user); }, @@ -204,7 +218,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, let cache = CACHE.read(); if cache.unavailable_guilds.is_empty() { - let context = context(Arc::clone(&conn), Arc::clone(&data)); + let context = context(Arc::clone(&data), runner_tx.clone(), shard_id); let guild_amount = cache .guilds @@ -216,7 +230,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, } } - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_create(context, event.guild, _is_new); @@ -226,7 +240,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildDelete(mut event) => { let _full = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_delete(context, event.guild, _full); @@ -237,25 +251,25 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildEmojisUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_emojis_update(context, event.guild_id, event.emojis); }, Event::GuildIntegrationsUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_integrations_update(context, event.guild_id); }, Event::GuildMemberAdd(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_member_addition(context, event.guild_id, event.member); }, Event::GuildMemberRemove(mut event) => { let _member = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_member_removal(context, event.guild_id, event.user, _member); @@ -265,7 +279,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildMemberUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ // This is safe to unwrap, as the update would have created @@ -284,20 +298,20 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildMembersChunk(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_members_chunk(context, event.guild_id, event.members); }, Event::GuildRoleCreate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_role_create(context, event.guild_id, event.role); }, Event::GuildRoleDelete(mut event) => { let _role = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); @@ -307,7 +321,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildRoleUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_role_update(context, event.guild_id, _before, event.role); @@ -318,14 +332,14 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildUnavailable(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_unavailable(context, event.guild_id); }, Event::GuildUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ let before = CACHE.read() @@ -341,46 +355,46 @@ fn handle_event<H: EventHandler + 'static>(event: Event, // Already handled by the framework check macro Event::MessageCreate(_) => {}, Event::MessageDeleteBulk(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_delete_bulk(context, event.channel_id, event.ids); }, Event::MessageDelete(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_delete(context, event.channel_id, event.message_id); }, Event::MessageUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_update(context, event); }, Event::PresencesReplace(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.presence_replace(context, event.presences); }, Event::PresenceUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.presence_update(context, event); }, Event::ReactionAdd(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_add(context, event.reaction); }, Event::ReactionRemove(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_remove(context, event.reaction); }, Event::ReactionRemoveAll(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_remove_all(context, event.channel_id, event.message_id); }, @@ -393,35 +407,35 @@ fn handle_event<H: EventHandler + 'static>(event: Event, let _ = wait_for_guilds() .map(move |_| { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.ready(context, event.ready); }); } else { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.ready(context, event.ready); } } }, Event::Resumed(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.resume(context, event); }, Event::TypingStart(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.typing_start(context, event); }, Event::Unknown(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.unknown(context, event.kind, event.value); }, Event::UserUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.user_update(context, _before.unwrap(), event.current_user); @@ -430,19 +444,19 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }} }, Event::VoiceServerUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.voice_server_update(context, event); }, Event::VoiceStateUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.voice_state_update(context, event.guild_id, event.voice_state); }, Event::WebhookUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.webhook_update(context, event.guild_id, event.channel_id); }, diff --git a/src/client/mod.rs b/src/client/mod.rs index 00a305a..2a97c91 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -37,13 +37,11 @@ pub use http as rest; #[cfg(feature = "cache")] pub use CACHE; -use self::bridge::gateway::{ShardId, ShardManager, ShardRunnerInfo}; +use self::bridge::gateway::{ShardManager, ShardManagerMonitor}; use self::dispatch::dispatch; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; use parking_lot::Mutex; -use std::collections::HashMap; -use std::mem; use threadpool::ThreadPool; use typemap::ShareMap; use http; @@ -103,8 +101,7 @@ impl CloseHandle { /// [`on_message`]: #method.on_message /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate /// [sharding docs]: gateway/index.html#sharding -#[derive(Clone)] -pub struct Client<H: EventHandler + Send + Sync + 'static> { +pub struct Client { /// A ShareMap which requires types to be Send + Sync. This is a map that /// can be safely shared across contexts. /// @@ -191,7 +188,6 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready - event_handler: Arc<H>, #[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>, /// A HashMap of all shards instantiated by the Client. /// @@ -224,12 +220,12 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; /// - /// let shard_runners = client.shard_runners.clone(); + /// let shard_manager = client.shard_manager.clone(); /// /// thread::spawn(move || { /// loop { /// println!("Shard count instantiated: {}", - /// shard_runners.lock().len()); + /// shard_manager.lock().shards_instantiated().len()); /// /// thread::sleep(Duration::from_millis(5000)); /// } @@ -244,16 +240,26 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// [`Client::start_shard`]: #method.start_shard /// [`Client::start_shards`]: #method.start_shards - pub shard_runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + pub shard_manager: Arc<Mutex<ShardManager>>, + shard_manager_worker: ShardManagerMonitor, /// The threadpool shared by all shards. /// /// Defaults to 5 threads, which should suffice small bots. Consider /// increasing this number as your bot grows. pub threadpool: ThreadPool, - token: Arc<Mutex<String>>, + /// The token in use by the client. + pub token: Arc<Mutex<String>>, + /// URI that the client's shards will use to connect to the gateway. + /// + /// This is likely not important for production usage and is, at best, used + /// for debugging. + /// + /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated + /// value available. + pub ws_uri: Arc<Mutex<String>>, } -impl<H: EventHandler + Send + Sync + 'static> Client<H> { +impl Client { /// Creates a Client for a bot user. /// /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which @@ -283,7 +289,8 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// # try_main().unwrap(); /// # } /// ``` - pub fn new(token: &str, handler: H) -> Result<Self> { + pub fn new<H>(token: &str, handler: H) -> Result<Self> + where H: EventHandler + Send + Sync + 'static { let token = if token.starts_with("Bot ") { token.to_string() } else { @@ -295,23 +302,53 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { let name = "serenity client".to_owned(); let threadpool = ThreadPool::with_name(name, 5); + let url = Arc::new(Mutex::new(http::get_gateway()?.url)); + let data = Arc::new(Mutex::new(ShareMap::custom())); + let event_handler = Arc::new(handler); Ok(feature_framework! {{ + let framework = Arc::new(Mutex::new(None)); + + let (shard_manager, shard_manager_worker) = ShardManager::new( + 0, + 0, + 0, + Arc::clone(&url), + Arc::clone(&locked), + Arc::clone(&data), + Arc::clone(&event_handler), + Arc::clone(&framework), + threadpool.clone(), + ); + Client { - data: Arc::new(Mutex::new(ShareMap::custom())), - event_handler: Arc::new(handler), framework: Arc::new(Mutex::new(None)), - shard_runners: Arc::new(Mutex::new(HashMap::new())), - threadpool, token: locked, + ws_uri: url, + data, + shard_manager, + shard_manager_worker, + threadpool, } } else { + let (shard_manager, shard_manager_worker) = ShardManager::new( + 0, + 0, + 0, + Arc::clone(&url), + locked.clone(), + data.clone(), + Arc::clone(&event_handler), + threadpool.clone(), + ); + Client { - data: Arc::new(Mutex::new(ShareMap::custom())), - event_handler: Arc::new(handler), - shard_runners: Arc::new(Mutex::new(HashMap::new())), - threadpool, token: locked, + ws_uri: url, + data, + shard_manager, + shard_manager_worker, + threadpool, } }}) } @@ -462,7 +499,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// /// [gateway docs]: gateway/index.html#sharding pub fn start(&mut self) -> Result<()> { - self.start_connection([0, 0, 1], http::get_gateway()?.url) + self.start_connection([0, 0, 1]) } /// Establish the connection(s) and start listening for events. @@ -514,15 +551,13 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown /// [gateway docs]: gateway/index.html#sharding pub fn start_autosharded(&mut self) -> Result<()> { - let mut res = http::get_bot_gateway()?; - - let x = res.shards as u64 - 1; - let y = res.shards as u64; - let url = mem::replace(&mut res.url, String::default()); + let (x, y) = { + let res = http::get_bot_gateway()?; - drop(res); + (res.shards as u64 - 1, res.shards as u64) + }; - self.start_connection([0, x, y], url) + self.start_connection([0, x, y]) } /// Establish a sharded connection and start listening for events. @@ -603,7 +638,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_autosharded`]: #method.start_autosharded /// [gateway docs]: gateway/index.html#sharding pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> { - self.start_connection([shard, shard, shards], http::get_gateway()?.url) + self.start_connection([shard, shard, shards]) } /// Establish sharded connections and start listening for events. @@ -657,10 +692,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_shard_range`]: #method.start_shard_range /// [Gateway docs]: gateway/index.html#sharding pub fn start_shards(&mut self, total_shards: u64) -> Result<()> { - self.start_connection( - [0, total_shards - 1, total_shards], - http::get_gateway()?.url, - ) + self.start_connection([0, total_shards - 1, total_shards]) } /// Establish a range of sharded connections and start listening for events. @@ -730,7 +762,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_shards`]: #method.start_shards /// [Gateway docs]: gateway/index.html#sharding pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> { - self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url) + self.start_connection([range[0], range[1], total_shards]) } /// Returns a thread-safe handle for closing shards. @@ -749,7 +781,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { // an error. // // [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { + fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> { HANDLE_STILL.store(true, Ordering::Relaxed); // Update the framework's current user if the feature is enabled. @@ -764,39 +796,37 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { } } - let gateway_url = Arc::new(Mutex::new(url)); + { + let mut manager = self.shard_manager.lock(); + + let init = shard_data[1] - shard_data[0] + 1; - let mut manager = ShardManager::new( - shard_data[0], - shard_data[1] - shard_data[0] + 1, - shard_data[2], - Arc::clone(&gateway_url), - Arc::clone(&self.token), - Arc::clone(&self.data), - Arc::clone(&self.event_handler), - #[cfg(feature = "framework")] - Arc::clone(&self.framework), - self.threadpool.clone(), - ); + manager.set_shards(shard_data[0], init, shard_data[2]); - self.shard_runners = Arc::clone(&manager.runners); + debug!( + "Initializing shard info: {} - {}/{}", + shard_data[0], + init, + shard_data[2], + ); - if let Err(why) = manager.initialize() { - error!("Failed to boot a shard: {:?}", why); - info!("Shutting down all shards"); + if let Err(why) = manager.initialize() { + error!("Failed to boot a shard: {:?}", why); + info!("Shutting down all shards"); - manager.shutdown_all(); + manager.shutdown_all(); - return Err(Error::Client(ClientError::ShardBootFailure)); + return Err(Error::Client(ClientError::ShardBootFailure)); + } } - manager.run(); + self.shard_manager_worker.run(); Err(Error::Client(ClientError::Shutdown)) } } -impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { +impl Drop for Client { fn drop(&mut self) { self.close_handle().close(); } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b593f5c..147808e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -51,9 +51,18 @@ mod error; mod shard; +mod ws_client_ext; pub use self::error::Error as GatewayError; pub use self::shard::Shard; +pub use self::ws_client_ext::WebSocketGatewayClientExt; + +use model::{Game, OnlineStatus}; +use websocket::sync::client::Client; +use websocket::sync::stream::{TcpStream, TlsStream}; + +pub type CurrentPresence = (Option<Game>, OnlineStatus); +pub type WsClient = Client<TlsStream<TcpStream>>; /// Indicates the current connection stage of a [`Shard`]. /// @@ -136,3 +145,11 @@ impl ConnectionStage { } } } + +pub enum ShardAction { + Autoreconnect, + Heartbeat, + Identify, + Reconnect, + Resume, +} diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index c644eac..ef05cf4 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -1,39 +1,31 @@ -use chrono::Utc; use parking_lot::Mutex; -use serde_json::Value; -use std::env::consts; -use std::io::Write; -use std::net::Shutdown; use std::sync::Arc; use std::time::{Duration as StdDuration, Instant}; -use std::thread; -use super::{ConnectionStage, GatewayError}; +use super::{ + ConnectionStage, + CurrentPresence, + ShardAction, + GatewayError, + WsClient, + WebSocketGatewayClientExt, +}; use websocket::client::Url; -use websocket::message::{CloseData, OwnedMessage}; use websocket::stream::sync::AsTcpStream; -use websocket::sync::client::{Client, ClientBuilder}; -use websocket::sync::stream::{TcpStream, TlsStream}; +use websocket::sync::client::ClientBuilder; use websocket::WebSocketError; -use constants::{self, close_codes, OpCode}; +use constants::{self, close_codes}; use internal::prelude::*; -use internal::ws_impl::SenderExt; use model::event::{Event, GatewayEvent}; use model::{Game, GuildId, OnlineStatus}; #[cfg(feature = "voice")] +use serde_json::Value; +#[cfg(feature = "voice")] use std::sync::mpsc::{self, Receiver as MpscReceiver}; -#[cfg(feature = "cache")] -use client::CACHE; #[cfg(feature = "voice")] use voice::Manager as VoiceManager; #[cfg(feature = "voice")] use http; -#[cfg(feature = "cache")] -use utils; - -pub type WsClient = Client<TlsStream<TcpStream>>; - -type CurrentPresence = (Option<Game>, OnlineStatus); /// A Shard is a higher-level handler for a websocket connection to Discord's /// gateway. The shard allows for sending and receiving messages over the @@ -91,7 +83,7 @@ pub struct Shard { session_id: Option<String>, shard_info: [u64; 2], stage: ConnectionStage, - token: Arc<Mutex<String>>, + pub token: Arc<Mutex<String>>, ws_url: Arc<Mutex<String>>, /// The voice connections that this Shard is responsible for. The Shard will /// update the voice connections' states. @@ -138,11 +130,14 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn new(ws_url: Arc<Mutex<String>>, - token: Arc<Mutex<String>>, - shard_info: [u64; 2]) - -> Result<Shard> { - let client = connecting(&*ws_url.lock()); + pub fn new( + ws_url: Arc<Mutex<String>>, + token: Arc<Mutex<String>>, + shard_info: [u64; 2], + ) -> Result<Shard> { + let mut client = connect(&*ws_url.lock())?; + + let _ = set_client_timeout(&mut client); let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -159,6 +154,8 @@ impl Shard { let user = http::get_current_user()?; Shard { + manager: VoiceManager::new(tx, user.id), + manager_rx: rx, client, current_presence, heartbeat_instants, @@ -170,8 +167,6 @@ impl Shard { shard_info, session_id, ws_url, - manager: VoiceManager::new(tx, user.id), - manager_rx: rx, } } else { Shard { @@ -191,137 +186,122 @@ impl Shard { }) } - /// Retrieves a copy of the current shard information. - /// - /// The first element is the _current_ shard - 0-indexed - while the second - /// element is the _total number_ of shards -- 1-indexed. - /// - /// For example, if using 3 shards in total, and if this is shard 1, then it - /// can be read as "the second of three shards". - /// - /// # Examples - /// - /// Retrieving the shard info for the second shard, out of two shards total: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); - /// # - /// assert_eq!(shard.shard_info(), [1, 2]); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn shard_info(&self) -> [u64; 2] { self.shard_info } + /// Retrieves the current presence of the shard. + #[inline] + pub fn current_presence(&self) -> &CurrentPresence { + &self.current_presence + } - /// Sets the user's current game, if any. + /// Retrieves the heartbeat instants of the shard. /// - /// Other presence settings are maintained. + /// This is the time of when a heartbeat was sent and when an + /// acknowledgement was last received. + #[inline] + pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) { + &self.heartbeat_instants + } + + /// Retrieves the value of when the last heartbeat was sent. + #[inline] + pub fn last_heartbeat_sent(&self) -> Option<&Instant> { + self.heartbeat_instants.0.as_ref() + } + + /// Retrieves the value of when the last heartbeat ack was received. + #[inline] + pub fn last_heartbeat_ack(&self) -> Option<&Instant> { + self.heartbeat_instants.1.as_ref() + } + + /// Sends a heartbeat to the gateway with the current sequence. /// - /// # Examples + /// This sets the last heartbeat time to now, and + /// `last_heartbeat_acknowledged` to `false`. /// - /// Setting the current game to playing `"Heroes of the Storm"`: + /// # Errors /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::Game; + /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending + /// a heartbeat. /// - /// shard.set_game(Some(Game::playing("Heroes of the Storm"))); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` + /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed + pub fn heartbeat(&mut self) -> Result<()> { + match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) { + Ok(()) => { + self.heartbeat_instants.0 = Some(Instant::now()); + self.last_heartbeat_acknowledged = false; + + Ok(()) + }, + Err(why) => { + match why { + Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { + debug!("[Shard {:?}] Err heartbeating: {:?}", + self.shard_info, + err); + }, + other => { + warn!("[Shard {:?}] Other err w/ keepalive: {:?}", + self.shard_info, + other); + }, + } + + Err(Error::Gateway(GatewayError::HeartbeatFailed)) + } + } + } + + #[inline] + pub fn heartbeat_interval(&self) -> Option<&u64> { + self.heartbeat_interval.as_ref() + } + + #[inline] + pub fn last_heartbeat_acknowledged(&self) -> bool { + self.last_heartbeat_acknowledged + } + + #[inline] + pub fn seq(&self) -> u64 { + self.seq + } + + #[inline] + pub fn session_id(&self) -> Option<&String> { + self.session_id.as_ref() + } + + #[inline] pub fn set_game(&mut self, game: Option<Game>) { self.current_presence.0 = game; + } - self.update_presence(); + #[inline] + pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) { + self.set_game(game); + self.set_status(status); } - /// Sets the user's current online status. - /// - /// Note that [`Offline`] is not a valid online status, so it is - /// automatically converted to [`Invisible`]. - /// - /// Other presence settings are maintained. - /// - /// # Examples - /// - /// Setting the current online status for the shard to [`DoNotDisturb`]. - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::OnlineStatus; - /// - /// shard.set_status(OnlineStatus::DoNotDisturb); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb - /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible - /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline - pub fn set_status(&mut self, online_status: OnlineStatus) { - self.current_presence.1 = match online_status { - OnlineStatus::Offline => OnlineStatus::Invisible, - other => other, - }; + #[inline] + pub fn set_status(&mut self, mut status: OnlineStatus) { + if status == OnlineStatus::Offline { + status = OnlineStatus::Invisible; + } - self.update_presence(); + self.current_presence.1 = status; } - /// Sets the user's full presence information. + /// Retrieves a copy of the current shard information. /// - /// Consider using the individual setters if you only need to modify one of - /// these. + /// The first element is the _current_ shard - 0-indexed - while the second + /// element is the _total number_ of shards -- 1-indexed. + /// + /// For example, if using 3 shards in total, and if this is shard 1, then it + /// can be read as "the second of three shards". /// /// # Examples /// - /// Set the current user as playing `"Heroes of the Storm"` and being - /// online: + /// Retrieving the shard info for the second shard, out of two shards total: /// /// ```rust,no_run /// # extern crate parking_lot; @@ -335,11 +315,9 @@ impl Shard { /// # fn try_main() -> Result<(), Box<Error>> { /// # let mutex = Arc::new(Mutex::new("".to_string())); /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); /// # - /// use serenity::model::{Game, OnlineStatus}; - /// - /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online); + /// assert_eq!(shard.shard_info(), [1, 2]); /// # Ok(()) /// # } /// # @@ -347,15 +325,7 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) { - if status == OnlineStatus::Offline { - status = OnlineStatus::Invisible; - } - - self.current_presence = (game, status); - - self.update_presence(); - } + pub fn shard_info(&self) -> [u64; 2] { self.shard_info } /// Returns the current connection stage of the shard. pub fn stage(&self) -> ConnectionStage { @@ -387,21 +357,24 @@ impl Shard { /// Returns a `GatewayError::OverloadedShard` if the shard would have too /// many guilds assigned to it. #[allow(cyclomatic_complexity)] - pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> { - match event { - Ok(GatewayEvent::Dispatch(seq, event)) => { + pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>) + -> Result<Option<ShardAction>> { + match *event { + Ok(GatewayEvent::Dispatch(seq, ref event)) => { if seq > self.seq + 1 { warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq); } - match event { + match *event { Event::Ready(ref ready) => { debug!("[Shard {:?}] Received Ready", self.shard_info); self.session_id = Some(ready.ready.session_id.clone()); self.stage = ConnectionStage::Connected; + /* set_client_timeout(&mut self.client)?; + */ }, Event::Resumed(_) => { info!("[Shard {:?}] Resumed", self.shard_info); @@ -421,7 +394,7 @@ impl Shard { self.seq = seq; - Ok(Some(event)) + Ok(None) }, Ok(GatewayEvent::Heartbeat(s)) => { info!("[Shard {:?}] Received shard heartbeat", self.shard_info); @@ -438,24 +411,18 @@ impl Shard { if self.stage == ConnectionStage::Handshake { self.stage = ConnectionStage::Identifying; - self.identify()?; + return Ok(Some(ShardAction::Identify)); } else { warn!( "[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info ); - return self.autoreconnect().and(Ok(None)); + return Ok(Some(ShardAction::Autoreconnect)); } } - let map = json!({ - "d": Value::Null, - "op": OpCode::Heartbeat.num(), - }); - self.client.send_json(&map)?; - - Ok(None) + Ok(Some(ShardAction::Heartbeat)) }, Ok(GatewayEvent::HeartbeatAck) => { self.heartbeat_instants.1 = Some(Instant::now()); @@ -478,14 +445,14 @@ impl Shard { self.heartbeat_interval = Some(interval); } - if self.stage == ConnectionStage::Handshake { - self.identify().and(Ok(None)) + Ok(Some(if self.stage == ConnectionStage::Handshake { + ShardAction::Identify } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); - self.autoreconnect().and(Ok(None)) - } + ShardAction::Autoreconnect + })) }, Ok(GatewayEvent::InvalidateSession(resumable)) => { info!( @@ -493,16 +460,15 @@ impl Shard { self.shard_info, ); - if resumable { - self.resume().and(Ok(None)) + Ok(Some(if resumable { + ShardAction::Resume } else { - self.identify().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), - Err(Error::Gateway(GatewayError::Closed(data))) => { + Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)), + Err(Error::Gateway(GatewayError::Closed(ref data))) => { let num = data.as_ref().map(|d| d.status_code); - let reason = data.map(|d| d.reason); let clean = num == Some(1000); match num { @@ -562,7 +528,7 @@ impl Shard { "[Shard {:?}] Unknown unclean close {}: {:?}", self.shard_info, other, - reason + data.as_ref().map(|d| &d.reason), ); }, _ => {}, @@ -573,14 +539,14 @@ impl Shard { self.session_id.is_some() }).unwrap_or(true); - if resume { - self.resume().or_else(|_| self.reconnect()).and(Ok(None)) + Ok(Some(if resume { + ShardAction::Resume } else { - self.reconnect().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Err(Error::WebSocket(why)) => { - if let WebSocketError::NoDataAvailable = why { + Err(Error::WebSocket(ref why)) => { + if let WebSocketError::NoDataAvailable = *why { if self.heartbeat_instants.1.is_none() { return Ok(None); } @@ -592,44 +558,62 @@ impl Shard { info!("[Shard {:?}] Will attempt to auto-reconnect", self.shard_info); - self.autoreconnect().and(Ok(None)) + Ok(Some(ShardAction::Autoreconnect)) }, - Err(error) => Err(error), + _ => Ok(None), + } + } + + pub fn check_heartbeat(&mut self) -> Result<()> { + let wait = { + let heartbeat_interval = match self.heartbeat_interval { + Some(heartbeat_interval) => heartbeat_interval, + None => return Ok(()), + }; + + StdDuration::from_secs(heartbeat_interval / 1000) + }; + + // If a duration of time less than the heartbeat_interval has passed, + // then don't perform a keepalive or attempt to reconnect. + if let Some(last_sent) = self.heartbeat_instants.0 { + if last_sent.elapsed() <= wait { + return Ok(()); + } + } + + // If the last heartbeat didn't receive an acknowledgement, then + // auto-reconnect. + if !self.last_heartbeat_acknowledged { + debug!( + "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", + self.shard_info, + ); + + return self.reconnect().map_err(|why| { + warn!( + "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", + self.shard_info, + why, + ); + + why + }); + } + + // Otherwise, we're good to heartbeat. + if let Err(why) = self.heartbeat() { + warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); + + self.reconnect() + } else { + trace!("[Shard {:?}] Heartbeated", self.shard_info); + + Ok(()) } } /// Calculates the heartbeat latency between the shard and the gateway. - /// - /// # Examples - /// - /// When using the [`Client`], output the latency in response to a `"~ping"` - /// message handled through [`Client::on_message`]. - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::*; - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "~ping" { - /// if let Some(latency) = ctx.shard.lock().latency() { - /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos()); - /// - /// let _ = msg.channel_id.say(&s); - /// } else { - /// let _ = msg.channel_id.say("N/A"); - /// } - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Client`]: ../struct.Client.html - /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message // Shamelessly stolen from brayzure's commit in eris: // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6> pub fn latency(&self) -> Option<StdDuration> { @@ -640,38 +624,68 @@ impl Shard { } } - /// Shuts down the receiver by attempting to cleanly close the - /// connection. - pub fn shutdown_clean(&mut self) -> Result<()> { - { - let data = CloseData { - status_code: 1000, - reason: String::new(), - }; - - let message = OwnedMessage::Close(Some(data)); - - self.client.send_message(&message)?; + #[cfg(feature = "voice")] + fn voice_dispatch(&mut self, event: &Event) { + if let Event::VoiceStateUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } } - let mut stream = self.client.stream_ref().as_tcp(); + if let Event::VoiceServerUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + #[cfg(feature = "voice")] + pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> { + let mut messages = vec![]; - debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info); + while let Ok(v) = self.manager_rx.try_recv() { + messages.push(v); + } - Ok(()) + messages } - /// Uncleanly shuts down the receiver by not sending a close code. - pub fn shutdown(&mut self) -> Result<()> { - let mut stream = self.client.stream_ref().as_tcp(); + /// Performs a deterministic reconnect. + /// + /// The type of reconnect is deterministic on whether a [`session_id`]. + /// + /// If the `session_id` still exists, then a RESUME is sent. If not, then + /// an IDENTIFY is sent. + /// + /// Note that, if the shard is already in a stage of + /// [`ConnectionStage::Connecting`], then no action will be performed. + /// + /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting + /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id + pub fn autoreconnect(&mut self) -> Result<()> { + if self.stage == ConnectionStage::Connecting { + return Ok(()); + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + if self.session_id().is_some() { + debug!( + "[Shard {:?}] Autoreconnector choosing to resume", + self.shard_info, + ); - Ok(()) + self.resume() + } else { + debug!( + "[Shard {:?}] Autoreconnector choosing to reconnect", + self.shard_info, + ); + + self.reconnect() + } } /// Requests that one or multiple [`Guild`]s be chunked. @@ -710,7 +724,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(2000), None); + /// shard.chunk_guilds(guild_ids, Some(2000), None); /// # Ok(()) /// # } /// # @@ -740,7 +754,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(20), Some("do")); + /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); /// # Ok(()) /// # } /// # @@ -753,280 +767,40 @@ impl Shard { /// ../../model/event/enum.Event.html#variant.GuildMembersChunk /// [`Guild`]: ../../model/struct.Guild.html /// [`Member`]: ../../model/struct.Member.html - pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>) - where It: IntoIterator<Item=T> { + pub fn chunk_guilds<It>( + &mut self, + guild_ids: It, + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { debug!("[Shard {:?}] Requesting member chunks", self.shard_info); - let msg = json!({ - "op": OpCode::GetGuildMembers.num(), - "d": { - "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), - "limit": limit.unwrap_or(0), - "query": query.unwrap_or(""), - }, - }); - - let _ = self.client.send_json(&msg); - } - - /// Calculates the number of guilds that the shard is responsible for. - /// - /// If sharding is not being used (i.e. 1 shard), then the total number of - /// [`Guild`] in the [`Cache`] will be used. - /// - /// **Note**: Requires the `cache` feature be enabled. - /// - /// # Examples - /// - /// Retrieve the number of guilds a shard is responsible for: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// let info = shard.shard_info(); - /// let guilds = shard.guilds_handled(); - /// - /// println!("Shard {:?} is responsible for {} guilds", info, guilds); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`Cache`]: ../ext/cache/struct.Cache.html - /// [`Guild`]: ../model/struct.Guild.html - #[cfg(feature = "cache")] - pub fn guilds_handled(&self) -> u16 { - let cache = CACHE.read(); - - let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]); - - cache - .guilds - .keys() - .filter(|guild_id| { - utils::shard_id(guild_id.0, shard_count) == shard_id - }) - .count() as u16 - } - - #[cfg(feature = "voice")] - fn voice_dispatch(&mut self, event: &Event) { - if let Event::VoiceStateUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_state(&update.voice_state); - } - } - } - - if let Event::VoiceServerUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_server(&update.endpoint, &update.token); - } - } - } - } - - #[cfg(feature = "voice")] - pub(crate) fn cycle_voice_recv(&mut self) { - if let Ok(v) = self.manager_rx.try_recv() { - if let Err(why) = self.client.send_json(&v) { - warn!("[Shard {:?}] Err sending voice msg: {:?}", - self.shard_info, - why); - } - } + self.client.send_chunk_guilds( + guild_ids, + &self.shard_info, + limit, + query, + ) } - pub(crate) fn heartbeat(&mut self) -> Result<()> { - let map = json!({ - "d": self.seq, - "op": OpCode::Heartbeat.num(), - }); - - trace!("[Shard {:?}] Sending heartbeat d: {}", - self.shard_info, - self.seq); - - match self.client.send_json(&map) { - Ok(_) => { - self.heartbeat_instants.0 = Some(Instant::now()); - self.last_heartbeat_acknowledged = false; - - trace!("[Shard {:?}] Successfully heartbeated", - self.shard_info); - - Ok(()) - }, - Err(why) => { - match why { - Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { - debug!("[Shard {:?}] Err heartbeating: {:?}", - self.shard_info, - err); - }, - other => { - warn!("[Shard {:?}] Other err w/ keepalive: {:?}", - self.shard_info, - other); - }, - } - - Err(Error::Gateway(GatewayError::HeartbeatFailed)) - }, - } - } - - pub(crate) fn check_heartbeat(&mut self) -> Result<()> { - let heartbeat_interval = match self.heartbeat_interval { - Some(heartbeat_interval) => heartbeat_interval, - None => return Ok(()), - }; - - let wait = StdDuration::from_secs(heartbeat_interval / 1000); - - // If a duration of time less than the heartbeat_interval has passed, - // then don't perform a keepalive or attempt to reconnect. - if let Some(last_sent) = self.heartbeat_instants.0 { - if last_sent.elapsed() <= wait { - return Ok(()); - } - } - - // If the last heartbeat didn't receive an acknowledgement, then - // auto-reconnect. - if !self.last_heartbeat_acknowledged { - debug!( - "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", - self.shard_info, - ); - - return self.reconnect().map_err(|why| { - warn!( - "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", - self.shard_info, - why, - ); - - why - }); - } - - // Otherwise, we're good to heartbeat. - trace!("[Shard {:?}] Heartbeating", self.shard_info); - - if let Err(why) = self.heartbeat() { - warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); - - self.reconnect() - } else { - trace!("[Shard {:?}] Heartbeated", self.shard_info); - self.heartbeat_instants.0 = Some(Instant::now()); - - Ok(()) - } - } - - pub(crate) fn autoreconnect(&mut self) -> Result<()> { - if self.stage == ConnectionStage::Connecting { - return Ok(()); - } - - if self.session_id.is_some() { - debug!("[Shard {:?}] Autoreconnector choosing to resume", - self.shard_info); - - self.resume() - } else { - debug!("[Shard {:?}] Autoreconnector choosing to reconnect", - self.shard_info); - - self.reconnect() - } - } - - /// Retrieves the `heartbeat_interval`. - #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option<u64> { - self.heartbeat_interval - } - - /// Retrieves the value of when the last heartbeat ack was received. - #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { - self.heartbeat_instants.1 - } - - fn reconnect(&mut self) -> Result<()> { - info!("[Shard {:?}] Attempting to reconnect", self.shard_info); - self.reset(); - - self.initialize() - } - - // Attempts to send a RESUME message. - // - // # Examples - // - // Returns a `GatewayError::NoSessionId` is there is no `session_id`, - // indicating that the shard should instead [`reconnect`]. + // Sets the shard as going into identifying stage, which sets: // - // [`reconnect`]: #method.reconnect - fn resume(&mut self) -> Result<()> { - debug!("Shard {:?}] Attempting to resume", self.shard_info); - - self.initialize()?; - self.stage = ConnectionStage::Resuming; - - self.send_resume().or_else(|why| { - warn!("[Shard {:?}] Err sending resume: {:?}", - self.shard_info, - why); - - self.reconnect() - }) - } - - fn send_resume(&mut self) -> Result<()> { - let session_id = match self.session_id.clone() { - Some(session_id) => session_id, - None => return Err(Error::Gateway(GatewayError::NoSessionId)), - }; + // - the time that the last heartbeat sent as being now + // - the `stage` to `Identifying` + pub fn identify(&mut self) -> Result<()> { + self.client.send_identify(&self.shard_info, &self.token.lock())?; - debug!("[Shard {:?}] Sending resume; seq: {}", - self.shard_info, - self.seq); + self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; - self.client.send_json(&json!({ - "op": OpCode::Resume.num(), - "d": { - "session_id": session_id, - "seq": self.seq, - "token": &*self.token.lock(), - }, - })) + Ok(()) } /// Initializes a new WebSocket client. /// /// This will set the stage of the shard before and after instantiation of /// the client. - fn initialize(&mut self) -> Result<()> { + pub fn initialize(&mut self) -> Result<WsClient> { debug!("[Shard {:?}] Initializing", self.shard_info); // We need to do two, sort of three things here: @@ -1038,38 +812,15 @@ impl Shard { // This is used to accurately assess whether the state of the shard is // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; - self.client = connect(&self.ws_url.lock())?; + let mut client = connect(&self.ws_url.lock())?; self.stage = ConnectionStage::Handshake; - Ok(()) - } - - fn identify(&mut self) -> Result<()> { - let identification = json!({ - "op": OpCode::Identify.num(), - "d": { - "compression": true, - "large_threshold": constants::LARGE_THRESHOLD, - "shard": self.shard_info, - "token": &*self.token.lock(), - "v": constants::GATEWAY_VERSION, - "properties": { - "$browser": "serenity", - "$device": "serenity", - "$os": consts::OS, - }, - }, - }); - - self.heartbeat_instants.0 = Some(Instant::now()); - self.stage = ConnectionStage::Identifying; + let _ = set_client_timeout(&mut client); - debug!("[Shard {:?}] Identifying", self.shard_info); - - self.client.send_json(&identification) + Ok(client) } - fn reset(&mut self) { + pub fn reset(&mut self) { self.heartbeat_instants = (Some(Instant::now()), None); self.heartbeat_interval = None; self.last_heartbeat_acknowledged = true; @@ -1078,42 +829,39 @@ impl Shard { self.seq = 0; } - fn update_presence(&mut self) { - let (ref game, status) = self.current_presence; - let now = Utc::now().timestamp() as u64; - - let msg = json!({ - "op": OpCode::StatusUpdate.num(), - "d": { - "afk": false, - "since": now, - "status": status.name(), - "game": game.as_ref().map(|x| json!({ - "name": x.name, - "type": x.kind, - "url": x.url, - })), - }, - }); + pub fn resume(&mut self) -> Result<()> { + debug!("Shard {:?}] Attempting to resume", self.shard_info); - debug!("[Shard {:?}] Sending presence update", self.shard_info); + self.client = self.initialize()?; + self.stage = ConnectionStage::Resuming; - if let Err(why) = self.client.send_json(&msg) { - warn!("[Shard {:?}] Err sending presence update: {:?}", - self.shard_info, - why); + match self.session_id.as_ref() { + Some(session_id) => { + self.client.send_resume( + &self.shard_info, + session_id, + &self.seq, + &self.token.lock(), + ) + }, + None => Err(Error::Gateway(GatewayError::NoSessionId)), } + } - #[cfg(feature = "cache")] - { - let mut cache = CACHE.write(); - let current_user_id = cache.user.id; + pub fn reconnect(&mut self) -> Result<()> { + info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); - cache.presences.get_mut(¤t_user_id).map(|presence| { - presence.game = game.clone(); - presence.last_modified = Some(now); - }); - } + self.reset(); + self.client = self.initialize()?; + + Ok(()) + } + + pub fn update_presence(&mut self) -> Result<()> { + self.client.send_presence_update( + &self.shard_info, + &self.current_presence, + ) } } @@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> { Error::Gateway(GatewayError::BuildingUrl) }) } - -/// Tries to connect and upon failure, retries. -fn connecting(uri: &str) -> WsClient { - let waiting_time = 30; - - loop { - match connect(&uri) { - Ok(client) => return client, - Err(why) => { - warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time); - thread::sleep(StdDuration::from_secs(waiting_time)); - }, - }; - } -} diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs new file mode 100644 index 0000000..873d114 --- /dev/null +++ b/src/gateway/ws_client_ext.rs @@ -0,0 +1,133 @@ +use chrono::Utc; +use constants::{self, OpCode}; +use gateway::{CurrentPresence, WsClient}; +use internal::prelude::*; +use internal::ws_impl::SenderExt; +use model::GuildId; +use std::env::consts; + +pub trait WebSocketGatewayClientExt { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId>; + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()>; + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()>; + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()>; + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()>; +} + +impl WebSocketGatewayClientExt for WsClient { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { + debug!("[Shard {:?}] Requesting member chunks", shard_info); + + self.send_json(&json!({ + "op": OpCode::GetGuildMembers.num(), + "d": { + "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), + "limit": limit.unwrap_or(0), + "query": query.unwrap_or(""), + }, + })).map_err(From::from) + } + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()> { + trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq); + + self.send_json(&json!({ + "d": seq, + "op": OpCode::Heartbeat.num(), + })).map_err(From::from) + } + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()> { + debug!("[Shard {:?}] Identifying", shard_info); + + self.send_json(&json!({ + "op": OpCode::Identify.num(), + "d": { + "compression": true, + "large_threshold": constants::LARGE_THRESHOLD, + "shard": shard_info, + "token": token, + "v": constants::GATEWAY_VERSION, + "properties": { + "$browser": "serenity", + "$device": "serenity", + "$os": consts::OS, + }, + }, + })) + } + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()> { + let &(ref game, ref status) = current_presence; + let now = Utc::now().timestamp() as u64; + + debug!("[Shard {:?}] Sending presence update", shard_info); + + self.send_json(&json!({ + "op": OpCode::StatusUpdate.num(), + "d": { + "afk": false, + "since": now, + "status": status.name(), + "game": game.as_ref().map(|x| json!({ + "name": x.name, + "type": x.kind, + "url": x.url, + })), + }, + })) + } + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()> { + debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq); + + self.send_json(&json!({ + "op": OpCode::Resume.num(), + "d": { + "session_id": session_id, + "seq": seq, + "token": token, + }, + })).map_err(From::from) + } +} |