diff options
| author | Zeyla Hellyer <[email protected]> | 2017-11-03 07:13:24 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-11-03 07:13:38 -0700 |
| commit | b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5 (patch) | |
| tree | 17eb07c8218f1e145d5eb3fba353fd1486b3874d /src/client | |
| parent | Make the Client return a Result (diff) | |
| download | serenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.tar.xz serenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.zip | |
Redo client internals + gateway
This commit is a rewrite of the client module's internals and the
gateway.
The main benefit of this is that there is either 0 or 1 lock retrievals
per event received, and the ability to utilize the ShardManager both
internally and in userland code has been improved.
The primary rework is in the `serenity::client` module, which now
includes a few more structures, some changes to existing ones, and more
functionality (such as to the `ShardManager`).
The two notable additions to the client-gateway bridge are the
`ShardMessenger` and `ShardManagerMonitor`.
The `ShardMessenger` is a simple-to-use interface for users to use to
interact with shards. The user is given one of these in the
`serenity::client::Context` in dispatches to the
`serenity::client::EventHandler`. This can be used for updating the
presence of a shard, sending a guild chunk message, or sending a user's
defined WebSocket message.
The `ShardManagerMonitor` is a loop run in its own thread, potentially
the main thread, that is responsible for receiving messages over an mpsc
channel on what to do with shards via the `ShardManager`. For example,
it will receive a message to shutdown a single shard, restart a single
shard, or shutdown the entire thing.
Users, in most applications, will not interact with the
`ShardManagerMonitor`. Users using the `serenity::client::Client`
interact with only the `ShardMessenger`.
The `ShardManager` is now usable by the user and is available to them,
and contains public functions for shutdowns, initializations, restarts,
and complete shutdowns of shards. It contains utility functions like
determining whether the `ShardManager` is responsible for a shard of a
given ID and the IDs of shards currently active (having an associated
`ShardRunner`). It can be found on
`serenity::client::Client::shard_manager`.
Speaking of the `ShardRunner`, it no longer owns a clone of an Arc to
its assigned `serenity::gateway::Shard`. It now completely owns the
Shard. This means that in order to open the shard, a `ShardRunner` no
longer has to repeatedly retrieve a lock to it. This reduces the number
of lock retrievals per event dispatching cycle from 3 or 4 depending on
event type to 0 or 1 depending on whether it's a message create _and_ if
the framework is in use. To interact with the Shard, one must now go
through the previously mentioned `ShardMessenger`, which the
`ShardRunner` will check for messages from on a loop.
`serenity::client::Context` is now slightly different. Instead of the
`shard` field being `Arc<Mutex<Shard>>`, it is an instance of a
`ShardMessenger`. The interface is the same (minus losing some
Shard-specific methods like `latency`), and `Context`'s shortcuts still
exist (like `Context::online` or `Context::set_game`). It now
additionally includes a `Context::shard_id` field which is a u64
containing the ID of the shard that the event was dispatched from.
`serenity::client::Client` has one changed field name, one field that is
now public, and a new field. `Client::shard_runners` is now
`Client::shard_manager` of type `Arc<Mutex<ShardManager>>`. The
`Client::token` field is now public. This can, for example, be mutated
on token resets if you know what you're doing. `Client::ws_uri` is new
and contains the URI for shards to use when connecting to the gateway.
Otherwise, the Client's usage is unchanged.
`serenity::gateway::Shard` has a couple of minor changes and many more
public methods and fields. The `autoreconnect`, `check_heartbeat`,
`handle_event`, `heartbeat`, `identify`, `initialize`, `reset`,
`resume`, `reconnect`, and `update_presence` methods are now public. The
`token` structfield is now public. There are new getters for various
structfields, such as `heartbeat_instants` and `last_heartbeat_ack`.
The breaking change on the `Shard` is that `Shard::handle_event` now
takes an event by reference and, instead of returning
`Result<Option<Event>>`, it now returns `Result<Option<ShardAction>>`.
`serenity::gateway::ShardAction` is a light enum determining an action
that someone _should_/_must_ perform on the shard, e.g. reconnecting or
identifying. This is determined by `Shard::handle_event`.
In total, there aren't too many breaking changes that most of userland
use cases has to deal with -- at most, changing some usage of `Context`.
Retrieving information like a Shard's latency is currently not possible
anymore but work will be done to make this functionality available
again.
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/mod.rs | 114 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 283 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager_monitor.rs | 54 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_messenger.rs | 276 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_queuer.rs | 56 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 385 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner_message.rs | 43 | ||||
| -rw-r--r-- | src/client/bridge/mod.rs | 9 | ||||
| -rw-r--r-- | src/client/context.rs | 65 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 132 | ||||
| -rw-r--r-- | src/client/mod.rs | 142 |
11 files changed, 1256 insertions, 303 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs index 0b873aa..752b015 100644 --- a/src/client/bridge/gateway/mod.rs +++ b/src/client/bridge/gateway/mod.rs @@ -1,28 +1,113 @@ +//! The client gateway bridge is support essential for the [`client`] module. +//! +//! This is made available for user use if one wishes to be lower-level or avoid +//! the higher functionality of the [`Client`]. +//! +//! Of interest are three pieces: +//! +//! ### [`ShardManager`] +//! +//! The shard manager is responsible for being a clean interface between the +//! user and the shard runners, providing essential functions such as +//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`] +//! to restart a shard. +//! +//! If you are using the `Client`, this is likely the only piece of interest to +//! you. Refer to [its documentation][`ShardManager`] for more information. +//! +//! ### [`ShardQueuer`] +//! +//! The shard queuer is a light wrapper around an mpsc receiver that receives +//! [`ShardManagerMessage`]s. It should be run in its own thread so it can +//! receive messages to start shards in a queue. +//! +//! Refer to [its documentation][`ShardQueuer`] for more information. +//! +//! ### [`ShardRunner`] +//! +//! The shard runner is responsible for actually running a shard and +//! communicating with its respective WebSocket client. +//! +//! It is performs all actions such as sending a presence update over the client +//! and, with the help of the [`Shard`], will be able to determine what to do. +//! This is, for example, whether to reconnect, resume, or identify with the +//! gateway. +//! +//! ### In Conclusion +//! +//! For almost every - if not every - use case, you only need to _possibly_ be +//! concerned about the [`ShardManager`] in this module. +//! +//! [`Client`]: ../../struct.Client.html +//! [`client`]: ../.. +//! [`Shard`]: ../../../gateway/struct.Shard.html +//! [`ShardManager`]: struct.ShardManager.html +//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart +//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown +//! [`ShardQueuer`]: struct.ShardQueuer.html +//! [`ShardRunner`]: struct.ShardRunner.html + mod shard_manager; +mod shard_manager_monitor; +mod shard_messenger; mod shard_queuer; mod shard_runner; +mod shard_runner_message; pub use self::shard_manager::ShardManager; +pub use self::shard_manager_monitor::ShardManagerMonitor; +pub use self::shard_messenger::ShardMessenger; pub use self::shard_queuer::ShardQueuer; pub use self::shard_runner::ShardRunner; +pub use self::shard_runner_message::ShardRunnerMessage; -use gateway::Shard; -use parking_lot::Mutex; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::mpsc::Sender; -use std::sync::Arc; -type Parked<T> = Arc<Mutex<T>>; -type LockedShard = Parked<Shard>; +/// A message either for a [`ShardManager`] or a [`ShardRunner`]. +/// +/// [`ShardManager`]: struct.ShardManager.html +/// [`ShardRunner`]: struct.ShardRunner.html +pub enum ShardClientMessage { + /// A message intended to be worked with by a [`ShardManager`]. + /// + /// [`ShardManager`]: struct.ShardManager.html + Manager(ShardManagerMessage), + /// A message intended to be worked with by a [`ShardRunner`]. + /// + /// [`ShardRunner`]: struct.ShardRunner.html + Runner(ShardRunnerMessage), +} +/// A message for a [`ShardManager`] relating to an operation with a shard. +/// +/// [`ShardManager`]: struct.ShardManager.html #[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub enum ShardManagerMessage { + /// Indicator that a [`ShardManagerMonitor`] should restart a shard. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html Restart(ShardId), + /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard + /// without bringing it back up. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html Shutdown(ShardId), - #[allow(dead_code)] + /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards + /// and end its monitoring process for the [`ShardManager`]. + /// + /// [`ShardManager`]: struct.ShardManager.html + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html ShutdownAll, } +/// A message to be sent to the [`ShardQueuer`]. +/// +/// This should usually be wrapped in a [`ShardClientMessage`]. +/// +/// [`ShardClientMessage`]: enum.ShardClientMessage.html +/// [`ShardQueuer`]: enum.ShardQueuer.html +#[derive(Clone, Debug)] pub enum ShardQueuerMessage { /// Message to start a shard, where the 0-index element is the ID of the /// Shard to start and the 1-index element is the total shards in use. @@ -31,8 +116,8 @@ pub enum ShardQueuerMessage { Shutdown, } -// A light tuplestruct wrapper around a u64 to verify type correctness when -// working with the IDs of shards. +/// A light tuplestruct wrapper around a u64 to verify type correctness when +/// working with the IDs of shards. #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct ShardId(pub u64); @@ -42,7 +127,16 @@ impl Display for ShardId { } } +/// Information about a [`ShardRunner`]. +/// +/// The [`ShardId`] is not included because, as it stands, you probably already +/// know the Id if you obtained this. +/// +/// [`ShardId`]: struct.ShardId.html +/// [`ShardRunner`]: struct.ShardRunner.html +#[derive(Debug)] pub struct ShardRunnerInfo { - pub runner_tx: Sender<ShardManagerMessage>, - pub shard: Arc<Mutex<Shard>>, + /// The channel used to communicate with the shard runner, telling it + /// what to do with regards to its status. + pub runner_tx: Sender<ShardClientMessage>, } diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index 6e3b285..2cf45fd 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,13 +1,15 @@ use internal::prelude::*; use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Sender}; use std::sync::Arc; use std::thread; use super::super::super::EventHandler; use super::{ + ShardClientMessage, ShardId, ShardManagerMessage, + ShardManagerMonitor, ShardQueuer, ShardQueuerMessage, ShardRunnerInfo, @@ -18,8 +20,80 @@ use typemap::ShareMap; #[cfg(feature = "framework")] use framework::Framework; +/// A manager for handling the status of shards by starting them, restarting +/// them, and stopping them when required. +/// +/// **Note**: The [`Client`] internally uses a shard manager. If you are using a +/// Client, then you do not need to make one of these. +/// +/// # Examples +/// +/// Initialize a shard manager with a framework responsible for shards 0 through +/// 2, of 5 total shards: +/// +/// ```rust,no_run +/// extern crate parking_lot; +/// extern crate serenity; +/// extern crate threadpool; +/// extern crate typemap; +/// +/// # use std::error::Error; +/// # +/// # #[cfg(feature = "framework")] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # +/// use parking_lot::Mutex; +/// use serenity::client::bridge::gateway::ShardManager; +/// use serenity::client::EventHandler; +/// use serenity::http; +/// use std::sync::Arc; +/// use std::env; +/// use threadpool::ThreadPool; +/// use typemap::ShareMap; +/// +/// struct Handler; +/// +/// impl EventHandler for Handler { } +/// +/// let token = env::var("DISCORD_TOKEN")?; +/// http::set_token(&token); +/// let token = Arc::new(Mutex::new(token)); +/// +/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url)); +/// let data = Arc::new(Mutex::new(ShareMap::custom())); +/// let event_handler = Arc::new(Handler); +/// let framework = Arc::new(Mutex::new(None)); +/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5); +/// +/// ShardManager::new( +/// 0, // the shard index to start initiating from +/// 3, // the number of shards to initiate (this initiates 0, 1, and 2) +/// 5, // the total number of shards in use +/// gateway_url, +/// token, +/// data, +/// event_handler, +/// framework, +/// threadpool, +/// ); +/// # Ok(()) +/// # } +/// # +/// # #[cfg(not(feature = "framework"))] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # Ok(()) +/// # } +/// # +/// # fn main() { +/// # try_main().unwrap(); +/// # } +/// ``` +/// +/// [`Client`]: ../../struct.Client.html +#[derive(Debug)] pub struct ShardManager { - pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// The shard runners currently managed. + runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, /// The index of the first shard to initialize, 0-indexed. shard_index: u64, /// The number of shards to initialize. @@ -27,10 +101,11 @@ pub struct ShardManager { /// The total shards in use, 1-indexed. shard_total: u64, shard_queuer: Sender<ShardQueuerMessage>, - thread_rx: Receiver<ShardManagerMessage>, } impl ShardManager { + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(feature = "framework")] #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new<H>( @@ -43,7 +118,7 @@ impl ShardManager { event_handler: Arc<H>, framework: Arc<Mutex<Option<Box<Framework + Send>>>>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); @@ -66,16 +141,22 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) } + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(not(feature = "framework"))] pub fn new<H>( shard_index: u64, @@ -86,21 +167,21 @@ impl ShardManager { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); let runners = Arc::new(Mutex::new(HashMap::new())); let mut shard_queuer = ShardQueuer { - data: data.clone(), - event_handler: event_handler.clone(), + data: Arc::clone(&data), + event_handler: Arc::clone(&event_handler), last_start: None, manager_tx: thread_tx.clone(), - runners: runners.clone(), + runners: Arc::clone(&runners), rx: shard_queue_rx, - token: token.clone(), - ws_url: ws_url.clone(), + token: Arc::clone(&token), + ws_url: Arc::clone(&ws_url), threadpool, }; @@ -108,21 +189,41 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) + } + + /// Returns whether the shard manager contains either an active instance of + /// a shard runner responsible for the given ID. + /// + /// If a shard has been queued but has not yet been initiated, then this + /// will return `false`. Consider double-checking [`is_responsible_for`] to + /// determine whether this shard manager is responsible for the given shard. + /// + /// [`is_responsible_for`]: #method.is_responsible_for + pub fn has(&self, shard_id: ShardId) -> bool { + self.runners.lock().contains_key(&shard_id) } + /// Initializes all shards that the manager is responsible for. + /// + /// This will communicate shard boots with the [`ShardQueuer`] so that they + /// are properly queued. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html pub fn initialize(&mut self) -> Result<()> { let shard_to = self.shard_index + self.shard_init; - debug!("{}, {}", self.shard_index, self.shard_init); - for shard_id in self.shard_index..shard_to { let shard_total = self.shard_total; @@ -132,39 +233,52 @@ impl ShardManager { Ok(()) } - pub fn run(&mut self) { - while let Ok(value) = self.thread_rx.recv() { - match value { - ShardManagerMessage::Restart(shard_id) => self.restart(shard_id), - ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id), - ShardManagerMessage::ShutdownAll => { - self.shutdown_all(); - - break; - }, - } - } - } - - pub fn shutdown_all(&mut self) { - info!("Shutting down all shards"); - let keys = { - self.runners.lock().keys().cloned().collect::<Vec<ShardId>>() - }; + /// Sets the new sharding information for the manager. + /// + /// This will shutdown all existing shards. + /// + /// This will _not_ instantiate the new shards. + pub fn set_shards(&mut self, index: u64, init: u64, total: u64) { + self.shutdown_all(); - for shard_id in keys { - self.shutdown(shard_id); - } + self.shard_index = index; + self.shard_init = init; + self.shard_total = total; } - fn boot(&mut self, shard_info: [ShardId; 2]) { - info!("Telling shard queuer to start shard {}", shard_info[0]); - - let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); - let _ = self.shard_queuer.send(msg); - } - - fn restart(&mut self, shard_id: ShardId) { + /// Restarts a shard runner. + /// + /// This sends a shutdown signal to a shard's associated [`ShardRunner`], + /// and then queues a initialization of a shard runner for the same shard + /// via the [`ShardQueuer`]. + /// + /// # Examples + /// + /// Creating a client and then restarting a shard by ID: + /// + /// _(note: in reality this precise code doesn't have an effect since the + /// shard would not yet have been initialized via [`initialize`], but the + /// concept is the same)_ + /// + /// ```rust,no_run + /// use serenity::client::bridge::gateway::ShardId; + /// use serenity::client::{Client, EventHandler}; + /// use std::env; + /// + /// struct Handler; + /// + /// impl EventHandler for Handler { } + /// + /// let token = env::var("DISCORD_TOKEN").unwrap(); + /// let mut client = Client::new(&token, Handler).unwrap(); + /// + /// // restart shard ID 7 + /// client.shard_manager.lock().restart(ShardId(7)); + /// ``` + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn restart(&mut self, shard_id: ShardId) { info!("Restarting shard {}", shard_id); self.shutdown(shard_id); @@ -173,23 +287,88 @@ impl ShardManager { self.boot([shard_id, ShardId(shard_total)]); } - fn shutdown(&mut self, shard_id: ShardId) { + /// Returns the [`ShardId`]s of the shards that have been instantiated and + /// currently have a valid [`ShardRunner`]. + /// + /// [`ShardId`]: struct.ShardId.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn shards_instantiated(&self) -> Vec<ShardId> { + self.runners.lock().keys().cloned().collect() + } + + /// Attempts to shut down the shard runner by Id. + /// + /// Returns a boolean indicating whether a shard runner was present. This is + /// _not_ necessary an indicator of whether the shard runner was + /// successfully shut down. + /// + /// **Note**: If the receiving end of an mpsc channel - theoretically owned + /// by the shard runner - no longer exists, then the shard runner will not + /// know it should shut down. This _should never happen_. It may already be + /// stopped. + pub fn shutdown(&mut self, shard_id: ShardId) -> bool { info!("Shutting down shard {}", shard_id); if let Some(runner) = self.runners.lock().get(&shard_id) { - let msg = ShardManagerMessage::Shutdown(shard_id); + let shutdown = ShardManagerMessage::Shutdown(shard_id); + let msg = ShardClientMessage::Manager(shutdown); if let Err(why) = runner.runner_tx.send(msg) { - warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why); + warn!( + "Failed to cleanly shutdown shard {}: {:?}", + shard_id, + why, + ); } } - self.runners.lock().remove(&shard_id); + self.runners.lock().remove(&shard_id).is_some() + } + + /// Sends a shutdown message for all shards that the manager is responsible + /// for that are still known to be running. + /// + /// If you only need to shutdown a select number of shards, prefer looping + /// over the [`shutdown`] method. + /// + /// [`shutdown`]: #method.shutdown + pub fn shutdown_all(&mut self) { + let keys = { + let runners = self.runners.lock(); + + if runners.is_empty() { + return; + } + + runners.keys().cloned().collect::<Vec<_>>() + }; + + info!("Shutting down all shards"); + + for shard_id in keys { + self.shutdown(shard_id); + } + } + + fn boot(&mut self, shard_info: [ShardId; 2]) { + info!("Telling shard queuer to start shard {}", shard_info[0]); + + let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); + let _ = self.shard_queuer.send(msg); } } impl Drop for ShardManager { + /// A custom drop implementation to clean up after the manager. + /// + /// This shuts down all active [`ShardRunner`]s and attempts to tell the + /// [`ShardQueuer`] to shutdown. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html fn drop(&mut self) { + self.shutdown_all(); + if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { warn!("Failed to send shutdown to shard queuer: {:?}", why); } diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs new file mode 100644 index 0000000..4a64473 --- /dev/null +++ b/src/client/bridge/gateway/shard_manager_monitor.rs @@ -0,0 +1,54 @@ +use parking_lot::Mutex; +use std::sync::mpsc::Receiver; +use std::sync::Arc; +use super::{ShardManager, ShardManagerMessage}; + +/// The shard manager monitor does what it says on the tin -- it monitors the +/// shard manager and performs actions on it as received. +/// +/// The monitor is essentially responsible for running in its own thread and +/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or +/// shutdown everything entirely. +/// +/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html +#[derive(Debug)] +pub struct ShardManagerMonitor { + /// An clone of the Arc to the manager itself. + pub manager: Arc<Mutex<ShardManager>>, + /// The mpsc Receiver channel to receive shard manager messages over. + pub rx: Receiver<ShardManagerMessage>, +} + +impl ShardManagerMonitor { + /// "Runs" the monitor, waiting for messages over the Receiver. + /// + /// This should be called in its own thread due to its blocking, looped + /// nature. + /// + /// This will continue running until either: + /// + /// - a [`ShardManagerMessage::ShutdownAll`] has been received + /// - an error is returned while receiving a message from the + /// channel (probably indicating that the shard manager should stop anyway) + /// + /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll + pub fn run(&mut self) { + debug!("Starting shard manager worker"); + + while let Ok(value) = self.rx.recv() { + match value { + ShardManagerMessage::Restart(shard_id) => { + self.manager.lock().restart(shard_id); + }, + ShardManagerMessage::Shutdown(shard_id) => { + self.manager.lock().shutdown(shard_id); + }, + ShardManagerMessage::ShutdownAll => { + self.manager.lock().shutdown_all(); + + break; + }, + } + } + } +} diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs new file mode 100644 index 0000000..069c838 --- /dev/null +++ b/src/client/bridge/gateway/shard_messenger.rs @@ -0,0 +1,276 @@ +use model::{Game, GuildId, OnlineStatus}; +use super::{ShardClientMessage, ShardRunnerMessage}; +use std::sync::mpsc::{SendError, Sender}; +use websocket::message::OwnedMessage; + +/// A lightweight wrapper around an mpsc sender. +/// +/// This is used to cleanly communicate with a shard's respective +/// [`ShardRunner`]. This can be used for actions such as setting the game via +/// [`set_game`] or shutting down via [`shutdown`]. +/// +/// [`ShardRunner`]: struct.ShardRunner.html +/// [`set_game`]: #method.set_game +/// [`shutdown`]: #method.shutdown +#[derive(Clone, Debug)] +pub struct ShardMessenger { + tx: Sender<ShardClientMessage>, +} + +impl ShardMessenger { + /// Creates a new shard messenger. + /// + /// If you are using the [`Client`], you do not need to do this. + /// + /// [`Client`]: ../../struct.Client.html + #[inline] + pub fn new(tx: Sender<ShardClientMessage>) -> Self { + Self { + tx, + } + } + + /// Requests that one or multiple [`Guild`]s be chunked. + /// + /// This will ask the gateway to start sending member chunks for large + /// guilds (250 members+). If a guild is over 250 members, then a full + /// member list will not be downloaded, and must instead be requested to be + /// sent in "chunks" containing members. + /// + /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each + /// chunk only contains a partial amount of the total members. + /// + /// If the `cache` feature is enabled, the cache will automatically be + /// updated with member chunks. + /// + /// # Examples + /// + /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not + /// specifying a query parameter: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; + /// # + /// use serenity::model::GuildId; + /// + /// let guild_ids = vec![GuildId(81384788765712384)]; + /// + /// shard.chunk_guilds(guild_ids, Some(2000), None); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// Chunk a single guild by Id, limiting to 20 members, and specifying a + /// query parameter of `"do"`: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; + /// # + /// use serenity::model::GuildId; + /// + /// let guild_ids = vec![GuildId(81384788765712384)]; + /// + /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// [`Event::GuildMembersChunk`]: + /// ../../model/event/enum.Event.html#variant.GuildMembersChunk + /// [`Guild`]: ../../model/struct.Guild.html + /// [`Member`]: ../../model/struct.Member.html + pub fn chunk_guilds<It>( + &self, + guild_ids: It, + limit: Option<u16>, + query: Option<String>, + ) where It: IntoIterator<Item=GuildId> { + let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>(); + + let _ = self.send(ShardRunnerMessage::ChunkGuilds { + guild_ids: guilds, + limit, + query, + }); + } + + /// Sets the user's current game, if any. + /// + /// Other presence settings are maintained. + /// + /// # Examples + /// + /// Setting the current game to playing `"Heroes of the Storm"`: + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::Game; + /// + /// shard.set_game(Some(Game::playing("Heroes of the Storm"))); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + pub fn set_game(&self, game: Option<Game>) { + let _ = self.send(ShardRunnerMessage::SetGame(game)); + } + + /// Sets the user's full presence information. + /// + /// Consider using the individual setters if you only need to modify one of + /// these. + /// + /// # Examples + /// + /// Set the current user as playing `"Heroes of the Storm"` and being + /// online: + /// + /// ```rust,ignore + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::{Game, OnlineStatus}; + /// + /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + pub fn set_presence(&self, game: Option<Game>, mut status: OnlineStatus) { + if status == OnlineStatus::Offline { + status = OnlineStatus::Invisible; + } + + let _ = self.send(ShardRunnerMessage::SetPresence(status, game)); + } + + /// Sets the user's current online status. + /// + /// Note that [`Offline`] is not a valid online status, so it is + /// automatically converted to [`Invisible`]. + /// + /// Other presence settings are maintained. + /// + /// # Examples + /// + /// Setting the current online status for the shard to [`DoNotDisturb`]. + /// + /// ```rust,no_run + /// # extern crate parking_lot; + /// # extern crate serenity; + /// # + /// # use parking_lot::Mutex; + /// # use serenity::client::gateway::Shard; + /// # use std::error::Error; + /// # use std::sync::Arc; + /// # + /// # fn try_main() -> Result<(), Box<Error>> { + /// # let mutex = Arc::new(Mutex::new("".to_string())); + /// # + /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # + /// use serenity::model::OnlineStatus; + /// + /// shard.set_status(OnlineStatus::DoNotDisturb); + /// # Ok(()) + /// # } + /// # + /// # fn main() { + /// # try_main().unwrap(); + /// # } + /// ``` + /// + /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb + /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible + /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline + pub fn set_status(&self, mut online_status: OnlineStatus) { + if online_status == OnlineStatus::Offline { + online_status = OnlineStatus::Invisible; + } + + let _ = self.send(ShardRunnerMessage::SetStatus(online_status)); + } + + /// Shuts down the websocket by attempting to cleanly close the + /// connection. + pub fn shutdown_clean(&self) { + let _ = self.send(ShardRunnerMessage::Close(1000, None)); + } + + /// Sends a raw message over the WebSocket. + /// + /// The given message is not mutated in any way, and is sent as-is. + /// + /// You should only use this if you know what you're doing. If you're + /// wanting to, for example, send a presence update, prefer the usage of + /// the [`set_presence`] method. + /// + /// [`set_presence`]: #method.set_presence + pub fn websocket_message(&self, message: OwnedMessage) { + let _ = self.send(ShardRunnerMessage::Message(message)); + } + + #[inline] + fn send(&self, msg: ShardRunnerMessage) + -> Result<(), SendError<ShardClientMessage>> { + self.tx.send(ShardClientMessage::Runner(msg)) + } +} diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs index cb3f749..bd02532 100644 --- a/src/client/bridge/gateway/shard_queuer.rs +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -27,20 +27,69 @@ use framework::Framework; /// blocking nature of the loop itself as well as a 5 second thread sleep /// between shard starts. pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> { + /// A copy of [`Client::data`] to be given to runners for contextual + /// dispatching. + /// + /// [`Client::data`]: ../../struct.Client.html#structfield.data pub data: Arc<Mutex<ShareMap>>, + /// A reference to an `EventHandler`, such as the one given to the + /// [`Client`]. + /// + /// [`Client`]: ../../struct.Client.html pub event_handler: Arc<H>, + /// A copy of the framework #[cfg(feature = "framework")] pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + /// The instant that a shard was last started. + /// + /// This is used to determine how long to wait between shard IDENTIFYs. pub last_start: Option<Instant>, + /// A copy of the sender channel to communicate with the + /// [`ShardManagerMonitor`]. + /// + /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html pub manager_tx: Sender<ShardManagerMessage>, + /// A copy of the map of shard runners. pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// A receiver channel for the shard queuer to be told to start shards. pub rx: Receiver<ShardQueuerMessage>, + /// A copy of a threadpool to give shard runners. + /// + /// For example, when using the [`Client`], this will be a copy of + /// [`Client::threadpool`]. + /// + /// [`Client`]: ../../struct.Client.html + /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool pub threadpool: ThreadPool, + /// A copy of the token to connect with. pub token: Arc<Mutex<String>>, + /// A copy of the URI to use to connect to the gateway. pub ws_url: Arc<Mutex<String>>, } impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { + /// Begins the shard queuer loop. + /// + /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s, + /// blocking for messages on what to do. + /// + /// If a [`ShardQueuerMessage::Start`] is received, this will: + /// + /// 1. Check how much time has passed since the last shard was started + /// 2. If the amount of time is less than the ratelimit, it will sleep until + /// that time has passed + /// 3. Start the shard by ID + /// + /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and + /// the loop will be over. + /// + /// **Note**: This should be run in its own thread due to the blocking + /// nature of the loop. + /// + /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html + /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown + /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start + /// [`rx`]: #structfield.rx pub fn run(&mut self) { while let Ok(msg) = self.rx.recv() { match msg { @@ -80,16 +129,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> { let shard_info = [shard_id.0, shard_total.0]; + let shard = Shard::new( Arc::clone(&self.ws_url), Arc::clone(&self.token), shard_info, )?; - let locked = Arc::new(Mutex::new(shard)); let mut runner = feature_framework! {{ ShardRunner::new( - Arc::clone(&locked), + shard, self.manager_tx.clone(), Arc::clone(&self.framework), Arc::clone(&self.data), @@ -98,7 +147,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { ) } else { ShardRunner::new( - locked.clone(), + shard, self.manager_tx.clone(), self.data.clone(), self.event_handler.clone(), @@ -108,7 +157,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { let runner_info = ShardRunnerInfo { runner_tx: runner.runner_tx(), - shard: locked, }; thread::spawn(move || { diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index b14e48b..a5fde3d 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -1,35 +1,45 @@ +use gateway::{Shard, ShardAction}; use internal::prelude::*; use internal::ws_impl::ReceiverExt; use model::event::{Event, GatewayEvent}; use parking_lot::Mutex; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Receiver, Sender, TryRecvError}; use std::sync::Arc; use super::super::super::{EventHandler, dispatch}; -use super::{LockedShard, ShardId, ShardManagerMessage}; +use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage}; use threadpool::ThreadPool; use typemap::ShareMap; +use websocket::message::{CloseData, OwnedMessage}; use websocket::WebSocketError; #[cfg(feature = "framework")] use framework::Framework; +#[cfg(feature = "voice")] +use internal::ws_impl::SenderExt; +/// A runner for managing a [`Shard`] and its respective WebSocket client. +/// +/// [`Shard`]: ../../../gateway/struct.Shard.html pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, #[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>, manager_tx: Sender<ShardManagerMessage>, - runner_rx: Receiver<ShardManagerMessage>, - runner_tx: Sender<ShardManagerMessage>, - shard: LockedShard, - shard_info: [u64; 2], + // channel to receive messages from the shard manager and dispatches + runner_rx: Receiver<ShardClientMessage>, + // channel to send messages to the shard runner from the shard manager + runner_tx: Sender<ShardClientMessage>, + shard: Shard, threadpool: ThreadPool, } impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { + /// Creates a new runner for a Shard. + #[allow(too_many_arguments)] #[cfg(feature = "framework")] pub fn new( - shard: LockedShard, + shard: Shard, manager_tx: Sender<ShardManagerMessage>, framework: Arc<Mutex<Option<Box<Framework + Send>>>>, data: Arc<Mutex<ShareMap>>, @@ -37,7 +47,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { threadpool: ThreadPool, ) -> Self { let (tx, rx) = mpsc::channel(); - let shard_info = shard.lock().shard_info(); Self { runner_rx: rx, @@ -47,21 +56,20 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { framework, manager_tx, shard, - shard_info, threadpool, } } + /// Creates a new runner for a Shard. #[cfg(not(feature = "framework"))] pub fn new( - shard: LockedShard, + shard: Shard, manager_tx: Sender<ShardManagerMessage>, data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, threadpool: ThreadPool, ) -> Self { let (tx, rx) = mpsc::channel(); - let shard_info = shard.lock().shard_info(); Self { runner_rx: rx, @@ -70,90 +78,276 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { event_handler, manager_tx, shard, - shard_info, threadpool, } } + /// Starts the runner's loop to receive events. + /// + /// This runs a loop that performs the following in each iteration: + /// + /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the + /// [`ShardManager`], and if there is one, acts on it. + /// + /// 2. checks if a heartbeat should be sent to the discord Gateway, and if + /// so, sends one. + /// + /// 3. attempts to retrieve a message from the WebSocket, processing it into + /// a [`GatewayEvent`]. This will block for 100ms before assuming there is + /// no message available. + /// + /// 4. Checks with the [`Shard`] to determine if the gateway event is + /// specifying an action to take (e.g. resuming, reconnecting, heartbeating) + /// and then performs that action, if any. + /// + /// 5. Dispatches the event via the Client. + /// + /// 6. Go back to 1. + /// + /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html + /// [`Shard`]: ../../../gateway/struct.Shard.html + /// [`ShardManager`]: struct.ShardManager.html + /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html pub fn run(&mut self) -> Result<()> { - debug!("[ShardRunner {:?}] Running", self.shard_info); + debug!("[ShardRunner {:?}] Running", self.shard.shard_info()); loop { - { - let mut shard = self.shard.lock(); - let incoming = self.runner_rx.try_recv(); + if !self.recv()? { + return Ok(()); + } - // Check for an incoming message over the runner channel. - // - // If the message is to shutdown, first verify the ID so we know - // for certain this runner is to shutdown. - if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == self.shard_info[0] { - let _ = shard.shutdown_clean(); + // check heartbeat + if let Err(why) = self.shard.check_heartbeat() { + warn!( + "[ShardRunner {:?}] Error heartbeating: {:?}", + self.shard.shard_info(), + why, + ); + debug!( + "[ShardRunner {:?}] Requesting restart", + self.shard.shard_info(), + ); + + return self.request_restart(); + } - return Ok(()); + #[cfg(feature = "voice")] + { + for message in self.shard.cycle_voice_recv() { + if let Err(why) = self.shard.client.send_json(&message) { + println!("Err sending from voice over WS: {:?}", why); } } + } - if let Err(why) = shard.check_heartbeat() { - error!("Failed to heartbeat and reconnect: {:?}", why); + let (event, action, successful) = self.recv_event(); - return self.request_restart(); - } - - #[cfg(feature = "voice")] - { - shard.cycle_voice_recv(); - } + if let Some(action) = action { + let _ = self.action(action); } - let (event, successful) = self.recv_event(); - if let Some(event) = event { - let data = Arc::clone(&self.data); - let event_handler = Arc::clone(&self.event_handler); - let shard = Arc::clone(&self.shard); - - feature_framework! {{ - let framework = Arc::clone(&self.framework); - - self.threadpool.execute(|| { - dispatch( - event, - shard, - framework, - data, - event_handler, - ); - }); - } else { - self.threadpool.execute(|| { - dispatch( - event, - shard, - data, - event_handler, - ); - }); - }} + self.dispatch(event); } - if !successful && !self.shard.lock().stage().is_connecting() { + if !successful && !self.shard.stage().is_connecting() { return self.request_restart(); } } } - pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> { + /// Clones the internal copy of the Sender to the shard runner. + pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> { self.runner_tx.clone() } + /// Takes an action that a [`Shard`] has determined should happen and then + /// does it. + /// + /// For example, if the shard says that an Identify message needs to be + /// sent, this will do that. + /// + /// # Errors + /// + /// Returns + fn action(&mut self, action: ShardAction) -> Result<()> { + match action { + ShardAction::Autoreconnect => self.shard.autoreconnect(), + ShardAction::Heartbeat => self.shard.heartbeat(), + ShardAction::Identify => self.shard.identify(), + ShardAction::Reconnect => self.shard.reconnect(), + ShardAction::Resume => self.shard.resume(), + } + } + + // Checks if the ID received to shutdown is equivalent to the ID of the + // shard this runner is responsible. If so, it shuts down the WebSocket + // client. + // + // Returns whether the WebSocket client is still active. + // + // If true, the WebSocket client was _not_ shutdown. If false, it was. + fn checked_shutdown(&mut self, id: ShardId) -> bool { + // First verify the ID so we know for certain this runner is + // to shutdown. + if id.0 != self.shard.shard_info()[0] { + // Not meant for this runner for some reason, don't + // shutdown. + return true; + } + + let close_data = CloseData::new(1000, String::new()); + let msg = OwnedMessage::Close(Some(close_data)); + let _ = self.shard.client.send_message(&msg); + + false + } + + fn dispatch(&self, event: Event) { + let data = Arc::clone(&self.data); + let event_handler = Arc::clone(&self.event_handler); + let runner_tx = self.runner_tx.clone(); + let shard_id = self.shard.shard_info()[0]; + + feature_framework! {{ + let framework = Arc::clone(&self.framework); + + self.threadpool.execute(move || { + dispatch( + event, + framework, + data, + event_handler, + runner_tx, + shard_id, + ); + }); + } else { + self.threadpool.execute(move || { + dispatch( + event, + data, + event_handler, + runner_tx, + shard_id, + ); + }); + }} + } + + // Handles a received value over the shard runner rx channel. + // + // Returns a boolean on whether the shard runner can continue. + // + // This always returns true, except in the case that the shard manager asked + // the runner to shutdown. + fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool { + match value { + ShardClientMessage::Manager(x) => match x { + ShardManagerMessage::Restart(id) | + ShardManagerMessage::Shutdown(id) => { + self.checked_shutdown(id) + }, + ShardManagerMessage::ShutdownAll => { + // This variant should never be received. + warn!( + "[ShardRunner {:?}] Received a ShutdownAll?", + self.shard.shard_info(), + ); + + true + }, + } + ShardClientMessage::Runner(x) => match x { + ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { + self.shard.chunk_guilds( + guild_ids, + limit, + query.as_ref().map(String::as_str), + ).is_ok() + }, + ShardRunnerMessage::Close(code, reason) => { + let reason = reason.unwrap_or_else(String::new); + let data = CloseData::new(code, reason); + let msg = OwnedMessage::Close(Some(data)); + + self.shard.client.send_message(&msg).is_ok() + }, + ShardRunnerMessage::Message(msg) => { + self.shard.client.send_message(&msg).is_ok() + }, + ShardRunnerMessage::SetGame(game) => { + // To avoid a clone of `game`, we do a little bit of + // trickery here: + // + // First, we obtain a reference to the current presence of + // the shard, and create a new presence tuple of the new + // game we received over the channel as well as the online + // status that the shard already had. + // + // We then (attempt to) send the websocket message with the + // status update, expressively returning: + // + // - whether the message successfully sent + // - the original game we received over the channel + self.shard.set_game(game); + + self.shard.update_presence().is_ok() + }, + ShardRunnerMessage::SetPresence(status, game) => { + self.shard.set_presence(status, game); + + self.shard.update_presence().is_ok() + }, + ShardRunnerMessage::SetStatus(status) => { + self.shard.set_status(status); + + self.shard.update_presence().is_ok() + }, + }, + } + } + + // Receives values over the internal shard runner rx channel and handles + // them. + // + // This will loop over values until there is no longer one. + // + // Requests a restart if the sending half of the channel disconnects. This + // should _never_ happen, as the sending half is kept on the runner. + + // Returns whether the shard runner is in a state that can continue. + fn recv(&mut self) -> Result<bool> { + loop { + match self.runner_rx.try_recv() { + Ok(value) => { + if !self.handle_rx_value(value) { + return Ok(false); + } + }, + Err(TryRecvError::Disconnected) => { + warn!( + "[ShardRunner {:?}] Sending half DC; restarting", + self.shard.shard_info(), + ); + + let _ = self.request_restart(); + + return Ok(false); + }, + Err(TryRecvError::Empty) => break, + } + } + + // There are no longer any values available. + + Ok(true) + } + /// Returns a received event, as well as whether reading the potentially /// present event was successful. - fn recv_event(&mut self) -> (Option<Event>, bool) { - let mut shard = self.shard.lock(); - - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) { + let gw_event = match self.shard.client.recv_json(GatewayEvent::decode) { Err(Error::WebSocket(WebSocketError::IoError(_))) => { // Check that an amount of time at least double the // heartbeat_interval has passed. @@ -161,58 +355,69 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // If not, continue on trying to receive messages. // // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - return (None, true); + { + let last = self.shard.last_heartbeat_ack(); + let interval = self.shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; + + if seconds_passed <= interval_in_secs * 2 { + return (None, None, true); + } + } else { + return (None, None, true); } - } else { - return (None, true); } debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { + if let Err(why) = self.shard.autoreconnect() { error!("Failed to auto-reconnect: {:?}", why); } - return (None, true); + return (None, None, true); }, Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { // This is hit when the websocket client dies this will be // hit every iteration. - return (None, false); + return (None, None, false); }, other => other, }; let event = match gw_event { Ok(Some(event)) => Ok(event), - Ok(None) => return (None, true), + Ok(None) => return (None, None, true), Err(why) => Err(why), }; - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => return (None, true), + let action = match self.shard.handle_event(&event) { + Ok(Some(action)) => Some(action), + Ok(None) => None, Err(why) => { error!("Shard handler received err: {:?}", why); - return (None, true); + return (None, None, true); }, - }; + }; + + let event = match event { + Ok(GatewayEvent::Dispatch(_, event)) => Some(event), + _ => None, + }; - (Some(event), true) + (event, action, true) } fn request_restart(&self) -> Result<()> { - debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); - let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + debug!( + "[ShardRunner {:?}] Requesting restart", + self.shard.shard_info(), + ); + let shard_id = ShardId(self.shard.shard_info()[0]); + let msg = ShardManagerMessage::Restart(shard_id); let _ = self.manager_tx.send(msg); Ok(()) diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs new file mode 100644 index 0000000..e6458eb --- /dev/null +++ b/src/client/bridge/gateway/shard_runner_message.rs @@ -0,0 +1,43 @@ +use model::{Game, GuildId, OnlineStatus}; +use websocket::message::OwnedMessage; + +/// A message to send from a shard over a WebSocket. +pub enum ShardRunnerMessage { + /// Indicates that the client is to send a member chunk message. + ChunkGuilds { + /// The IDs of the [`Guild`]s to chunk. + /// + /// [`Guild`]: ../../../model/struct.Guild.html + guild_ids: Vec<GuildId>, + /// The maximum number of members to receive [`GuildMembersChunkEvent`]s + /// for. + /// + /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html + limit: Option<u16>, + /// Text to filter members by. + /// + /// For example, a query of `"s"` will cause only [`Member`]s whose + /// usernames start with `"s"` to be chunked. + /// + /// [`Member`]: ../../../model/struct.Member.html + query: Option<String>, + }, + /// Indicates that the client is to close with the given status code and + /// reason. + /// + /// You should rarely - if _ever_ - need this, but the option is available. + /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you + /// are intending to send a 1000 close code. + /// + /// [`ShardManager`]: struct.ShardManager.html + Close(u16, Option<String>), + /// Indicates that the client is to send a custom WebSocket message. + Message(OwnedMessage), + /// Indicates that the client is to update the shard's presence's game. + SetGame(Option<Game>), + /// Indicates that the client is to update the shard's presence in its + /// entirity. + SetPresence(OnlineStatus, Option<Game>), + /// Indicates that the client is to update the shard's presence's status. + SetStatus(OnlineStatus), +} diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs index 4f27526..ea18d73 100644 --- a/src/client/bridge/mod.rs +++ b/src/client/bridge/mod.rs @@ -1 +1,10 @@ +//! A collection of bridged support between the [`client`] module and other +//! modules. +//! +//! **Warning**: You likely _do not_ need to mess with anything in here. Beware. +//! This is lower-level functionality abstracted by the [`Client`]. +//! +//! [`Client`]: ../struct.Client.html +//! [`client`]: ../ + pub mod gateway; diff --git a/src/client/context.rs b/src/client/context.rs index a60aaa0..9b89cde 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -1,7 +1,7 @@ -use Result; +use client::bridge::gateway::{ShardClientMessage, ShardMessenger}; +use std::sync::mpsc::Sender; use std::sync::Arc; use typemap::ShareMap; -use gateway::Shard; use model::*; use parking_lot::Mutex; @@ -12,7 +12,7 @@ use internal::prelude::*; #[cfg(feature = "builder")] use builder::EditProfile; #[cfg(feature = "builder")] -use {http, utils}; +use {Result, http, utils}; #[cfg(feature = "builder")] use std::collections::HashMap; @@ -38,21 +38,23 @@ pub struct Context { /// /// [`Client::data`]: struct.Client.html#structfield.data pub data: Arc<Mutex<ShareMap>>, - /// The associated shard which dispatched the event handler. - /// - /// Note that if you are sharding, in relevant terms, this is the shard - /// which received the event being dispatched. - pub shard: Arc<Mutex<Shard>>, + /// The messenger to communicate with the shard runner. + pub shard: ShardMessenger, + /// The ID of the shard this context is related to. + pub shard_id: u64, } impl Context { /// Create a new Context to be passed to an event handler. - pub(crate) fn new(shard: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>) - -> Context { + pub(crate) fn new( + data: Arc<Mutex<ShareMap>>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, + ) -> Context { Context { + shard: ShardMessenger::new(runner_tx), + shard_id, data, - shard, } } @@ -110,6 +112,7 @@ impl Context { http::edit_profile(&edited) } + /// Sets the current user as being [`Online`]. This maintains the current /// game. /// @@ -137,9 +140,9 @@ impl Context { /// ``` /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online + #[inline] pub fn online(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Online); + self.shard.set_status(OnlineStatus::Online); } /// Sets the current user as being [`Idle`]. This maintains the current @@ -168,9 +171,9 @@ impl Context { /// ``` /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle + #[inline] pub fn idle(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Idle); + self.shard.set_status(OnlineStatus::Idle); } /// Sets the current user as being [`DoNotDisturb`]. This maintains the @@ -199,9 +202,9 @@ impl Context { /// ``` /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb + #[inline] pub fn dnd(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::DoNotDisturb); + self.shard.set_status(OnlineStatus::DoNotDisturb); } /// Sets the current user as being [`Invisible`]. This maintains the current @@ -231,9 +234,9 @@ impl Context { /// /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible + #[inline] pub fn invisible(&self) { - let mut shard = self.shard.lock(); - shard.set_status(OnlineStatus::Invisible); + self.shard.set_status(OnlineStatus::Invisible); } /// "Resets" the current user's presence, by setting the game to `None` and @@ -265,9 +268,9 @@ impl Context { /// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence + #[inline] pub fn reset_presence(&self) { - let mut shard = self.shard.lock(); - shard.set_presence(None, OnlineStatus::Online); + self.shard.set_presence(None, OnlineStatus::Online); } /// Sets the current game, defaulting to an online status of [`Online`]. @@ -303,9 +306,9 @@ impl Context { /// ``` /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online + #[inline] pub fn set_game(&self, game: Game) { - let mut shard = self.shard.lock(); - shard.set_presence(Some(game), OnlineStatus::Online); + self.shard.set_presence(Some(game), OnlineStatus::Online); } /// Sets the current game, passing in only its name. This will automatically @@ -351,8 +354,7 @@ impl Context { url: None, }; - let mut shard = self.shard.lock(); - shard.set_presence(Some(game), OnlineStatus::Online); + self.shard.set_presence(Some(game), OnlineStatus::Online); } /// Sets the current user's presence, providing all fields to be passed. @@ -406,9 +408,9 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle + #[inline] pub fn set_presence(&self, game: Option<Game>, status: OnlineStatus) { - let mut shard = self.shard.lock(); - shard.set_presence(game, status); + self.shard.set_presence(game, status); } /// Disconnects the shard from the websocket, essentially "quiting" it. @@ -417,9 +419,8 @@ impl Context { /// until [`Client::start`] and vice versa are called again. /// /// [`Client::start`]: ./struct.Client.html#method.start - pub fn quit(&self) -> Result<()> { - let mut shard = self.shard.lock(); - - shard.shutdown_clean() + #[inline] + pub fn quit(&self) { + self.shard.shutdown_clean(); } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 827875e..9545a6f 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,9 +1,10 @@ use std::sync::Arc; use parking_lot::Mutex; +use super::bridge::gateway::ShardClientMessage; use super::event_handler::EventHandler; use super::Context; +use std::sync::mpsc::Sender; use typemap::ShareMap; -use gateway::Shard; use model::event::Event; use model::{Channel, Message}; @@ -35,19 +36,26 @@ macro_rules! now { () => (Utc::now().time().second() * 1000) } -fn context(conn: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>) -> Context { - Context::new(conn, data) +fn context( + data: Arc<Mutex<ShareMap>>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) -> Context { + Context::new(data, runner_tx, shard_id) } #[cfg(feature = "framework")] -pub fn dispatch<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +pub fn dispatch<H: EventHandler + 'static>( + event: Event, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { match event { Event::MessageCreate(event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); dispatch_message( context.clone(), event.message.clone(), @@ -58,21 +66,24 @@ pub fn dispatch<H: EventHandler + 'static>(event: Event, framework.dispatch(context, event.message); } }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, data, event_handler, runner_tx, shard_id), } } #[cfg(not(feature = "framework"))] -pub fn dispatch<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +pub fn dispatch<H: EventHandler + 'static>( + event: Event, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { match event { Event::MessageCreate(event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); dispatch_message(context, event.message, event_handler); }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, data, event_handler, runner_tx, shard_id), } } @@ -91,10 +102,13 @@ fn dispatch_message<H>( } #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event<H: EventHandler + 'static>(event: Event, - conn: Arc<Mutex<Shard>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>) { +fn handle_event<H: EventHandler + 'static>( + event: Event, + data: Arc<Mutex<ShareMap>>, + event_handler: Arc<H>, + runner_tx: Sender<ShardClientMessage>, + shard_id: u64, +) { #[cfg(feature = "cache")] let mut last_guild_create_time = now!(); @@ -113,7 +127,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::ChannelCreate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); // This different channel_create dispatching is only due to the fact that // each time the bot receives a dm, this event is also fired. @@ -134,7 +148,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::ChannelDelete(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); match event.channel { Channel::Private(_) | Channel::Group(_) => {}, @@ -147,28 +161,28 @@ fn handle_event<H: EventHandler + 'static>(event: Event, } }, Event::ChannelPinsUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_pins_update(context, event); }, Event::ChannelRecipientAdd(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_recipient_addition(context, event.channel_id, event.user); }, Event::ChannelRecipientRemove(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.channel_recipient_removal(context, event.channel_id, event.user); }, Event::ChannelUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ let before = CACHE.read().channel(event.channel.id()); @@ -178,12 +192,12 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }} }, Event::GuildBanAdd(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_ban_addition(context, event.guild_id, event.user); }, Event::GuildBanRemove(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_ban_removal(context, event.guild_id, event.user); }, @@ -204,7 +218,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, let cache = CACHE.read(); if cache.unavailable_guilds.is_empty() { - let context = context(Arc::clone(&conn), Arc::clone(&data)); + let context = context(Arc::clone(&data), runner_tx.clone(), shard_id); let guild_amount = cache .guilds @@ -216,7 +230,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, } } - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_create(context, event.guild, _is_new); @@ -226,7 +240,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildDelete(mut event) => { let _full = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_delete(context, event.guild, _full); @@ -237,25 +251,25 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildEmojisUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_emojis_update(context, event.guild_id, event.emojis); }, Event::GuildIntegrationsUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_integrations_update(context, event.guild_id); }, Event::GuildMemberAdd(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_member_addition(context, event.guild_id, event.member); }, Event::GuildMemberRemove(mut event) => { let _member = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_member_removal(context, event.guild_id, event.user, _member); @@ -265,7 +279,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildMemberUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ // This is safe to unwrap, as the update would have created @@ -284,20 +298,20 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildMembersChunk(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_members_chunk(context, event.guild_id, event.members); }, Event::GuildRoleCreate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_role_create(context, event.guild_id, event.role); }, Event::GuildRoleDelete(mut event) => { let _role = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); @@ -307,7 +321,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }, Event::GuildRoleUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.guild_role_update(context, event.guild_id, _before, event.role); @@ -318,14 +332,14 @@ fn handle_event<H: EventHandler + 'static>(event: Event, Event::GuildUnavailable(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.guild_unavailable(context, event.guild_id); }, Event::GuildUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ let before = CACHE.read() @@ -341,46 +355,46 @@ fn handle_event<H: EventHandler + 'static>(event: Event, // Already handled by the framework check macro Event::MessageCreate(_) => {}, Event::MessageDeleteBulk(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_delete_bulk(context, event.channel_id, event.ids); }, Event::MessageDelete(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_delete(context, event.channel_id, event.message_id); }, Event::MessageUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.message_update(context, event); }, Event::PresencesReplace(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.presence_replace(context, event.presences); }, Event::PresenceUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.presence_update(context, event); }, Event::ReactionAdd(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_add(context, event.reaction); }, Event::ReactionRemove(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_remove(context, event.reaction); }, Event::ReactionRemoveAll(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.reaction_remove_all(context, event.channel_id, event.message_id); }, @@ -393,35 +407,35 @@ fn handle_event<H: EventHandler + 'static>(event: Event, let _ = wait_for_guilds() .map(move |_| { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.ready(context, event.ready); }); } else { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.ready(context, event.ready); } } }, Event::Resumed(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.resume(context, event); }, Event::TypingStart(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.typing_start(context, event); }, Event::Unknown(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.unknown(context, event.kind, event.value); }, Event::UserUpdate(mut event) => { let _before = update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); feature_cache! {{ event_handler.user_update(context, _before.unwrap(), event.current_user); @@ -430,19 +444,19 @@ fn handle_event<H: EventHandler + 'static>(event: Event, }} }, Event::VoiceServerUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.voice_server_update(context, event); }, Event::VoiceStateUpdate(mut event) => { update!(event); - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.voice_state_update(context, event.guild_id, event.voice_state); }, Event::WebhookUpdate(mut event) => { - let context = context(conn, data); + let context = context(data, runner_tx, shard_id); event_handler.webhook_update(context, event.guild_id, event.channel_id); }, diff --git a/src/client/mod.rs b/src/client/mod.rs index 00a305a..2a97c91 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -37,13 +37,11 @@ pub use http as rest; #[cfg(feature = "cache")] pub use CACHE; -use self::bridge::gateway::{ShardId, ShardManager, ShardRunnerInfo}; +use self::bridge::gateway::{ShardManager, ShardManagerMonitor}; use self::dispatch::dispatch; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; use parking_lot::Mutex; -use std::collections::HashMap; -use std::mem; use threadpool::ThreadPool; use typemap::ShareMap; use http; @@ -103,8 +101,7 @@ impl CloseHandle { /// [`on_message`]: #method.on_message /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate /// [sharding docs]: gateway/index.html#sharding -#[derive(Clone)] -pub struct Client<H: EventHandler + Send + Sync + 'static> { +pub struct Client { /// A ShareMap which requires types to be Send + Sync. This is a map that /// can be safely shared across contexts. /// @@ -191,7 +188,6 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready - event_handler: Arc<H>, #[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>, /// A HashMap of all shards instantiated by the Client. /// @@ -224,12 +220,12 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; /// - /// let shard_runners = client.shard_runners.clone(); + /// let shard_manager = client.shard_manager.clone(); /// /// thread::spawn(move || { /// loop { /// println!("Shard count instantiated: {}", - /// shard_runners.lock().len()); + /// shard_manager.lock().shards_instantiated().len()); /// /// thread::sleep(Duration::from_millis(5000)); /// } @@ -244,16 +240,26 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// /// [`Client::start_shard`]: #method.start_shard /// [`Client::start_shards`]: #method.start_shards - pub shard_runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + pub shard_manager: Arc<Mutex<ShardManager>>, + shard_manager_worker: ShardManagerMonitor, /// The threadpool shared by all shards. /// /// Defaults to 5 threads, which should suffice small bots. Consider /// increasing this number as your bot grows. pub threadpool: ThreadPool, - token: Arc<Mutex<String>>, + /// The token in use by the client. + pub token: Arc<Mutex<String>>, + /// URI that the client's shards will use to connect to the gateway. + /// + /// This is likely not important for production usage and is, at best, used + /// for debugging. + /// + /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated + /// value available. + pub ws_uri: Arc<Mutex<String>>, } -impl<H: EventHandler + Send + Sync + 'static> Client<H> { +impl Client { /// Creates a Client for a bot user. /// /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which @@ -283,7 +289,8 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// # try_main().unwrap(); /// # } /// ``` - pub fn new(token: &str, handler: H) -> Result<Self> { + pub fn new<H>(token: &str, handler: H) -> Result<Self> + where H: EventHandler + Send + Sync + 'static { let token = if token.starts_with("Bot ") { token.to_string() } else { @@ -295,23 +302,53 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { let name = "serenity client".to_owned(); let threadpool = ThreadPool::with_name(name, 5); + let url = Arc::new(Mutex::new(http::get_gateway()?.url)); + let data = Arc::new(Mutex::new(ShareMap::custom())); + let event_handler = Arc::new(handler); Ok(feature_framework! {{ + let framework = Arc::new(Mutex::new(None)); + + let (shard_manager, shard_manager_worker) = ShardManager::new( + 0, + 0, + 0, + Arc::clone(&url), + Arc::clone(&locked), + Arc::clone(&data), + Arc::clone(&event_handler), + Arc::clone(&framework), + threadpool.clone(), + ); + Client { - data: Arc::new(Mutex::new(ShareMap::custom())), - event_handler: Arc::new(handler), framework: Arc::new(Mutex::new(None)), - shard_runners: Arc::new(Mutex::new(HashMap::new())), - threadpool, token: locked, + ws_uri: url, + data, + shard_manager, + shard_manager_worker, + threadpool, } } else { + let (shard_manager, shard_manager_worker) = ShardManager::new( + 0, + 0, + 0, + Arc::clone(&url), + locked.clone(), + data.clone(), + Arc::clone(&event_handler), + threadpool.clone(), + ); + Client { - data: Arc::new(Mutex::new(ShareMap::custom())), - event_handler: Arc::new(handler), - shard_runners: Arc::new(Mutex::new(HashMap::new())), - threadpool, token: locked, + ws_uri: url, + data, + shard_manager, + shard_manager_worker, + threadpool, } }}) } @@ -462,7 +499,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// /// [gateway docs]: gateway/index.html#sharding pub fn start(&mut self) -> Result<()> { - self.start_connection([0, 0, 1], http::get_gateway()?.url) + self.start_connection([0, 0, 1]) } /// Establish the connection(s) and start listening for events. @@ -514,15 +551,13 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown /// [gateway docs]: gateway/index.html#sharding pub fn start_autosharded(&mut self) -> Result<()> { - let mut res = http::get_bot_gateway()?; - - let x = res.shards as u64 - 1; - let y = res.shards as u64; - let url = mem::replace(&mut res.url, String::default()); + let (x, y) = { + let res = http::get_bot_gateway()?; - drop(res); + (res.shards as u64 - 1, res.shards as u64) + }; - self.start_connection([0, x, y], url) + self.start_connection([0, x, y]) } /// Establish a sharded connection and start listening for events. @@ -603,7 +638,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_autosharded`]: #method.start_autosharded /// [gateway docs]: gateway/index.html#sharding pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> { - self.start_connection([shard, shard, shards], http::get_gateway()?.url) + self.start_connection([shard, shard, shards]) } /// Establish sharded connections and start listening for events. @@ -657,10 +692,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_shard_range`]: #method.start_shard_range /// [Gateway docs]: gateway/index.html#sharding pub fn start_shards(&mut self, total_shards: u64) -> Result<()> { - self.start_connection( - [0, total_shards - 1, total_shards], - http::get_gateway()?.url, - ) + self.start_connection([0, total_shards - 1, total_shards]) } /// Establish a range of sharded connections and start listening for events. @@ -730,7 +762,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// [`start_shards`]: #method.start_shards /// [Gateway docs]: gateway/index.html#sharding pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> { - self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url) + self.start_connection([range[0], range[1], total_shards]) } /// Returns a thread-safe handle for closing shards. @@ -749,7 +781,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { // an error. // // [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { + fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> { HANDLE_STILL.store(true, Ordering::Relaxed); // Update the framework's current user if the feature is enabled. @@ -764,39 +796,37 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { } } - let gateway_url = Arc::new(Mutex::new(url)); + { + let mut manager = self.shard_manager.lock(); + + let init = shard_data[1] - shard_data[0] + 1; - let mut manager = ShardManager::new( - shard_data[0], - shard_data[1] - shard_data[0] + 1, - shard_data[2], - Arc::clone(&gateway_url), - Arc::clone(&self.token), - Arc::clone(&self.data), - Arc::clone(&self.event_handler), - #[cfg(feature = "framework")] - Arc::clone(&self.framework), - self.threadpool.clone(), - ); + manager.set_shards(shard_data[0], init, shard_data[2]); - self.shard_runners = Arc::clone(&manager.runners); + debug!( + "Initializing shard info: {} - {}/{}", + shard_data[0], + init, + shard_data[2], + ); - if let Err(why) = manager.initialize() { - error!("Failed to boot a shard: {:?}", why); - info!("Shutting down all shards"); + if let Err(why) = manager.initialize() { + error!("Failed to boot a shard: {:?}", why); + info!("Shutting down all shards"); - manager.shutdown_all(); + manager.shutdown_all(); - return Err(Error::Client(ClientError::ShardBootFailure)); + return Err(Error::Client(ClientError::ShardBootFailure)); + } } - manager.run(); + self.shard_manager_worker.run(); Err(Error::Client(ClientError::Shutdown)) } } -impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { +impl Drop for Client { fn drop(&mut self) { self.close_handle().close(); } } |