aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-11-03 07:13:24 -0700
committerZeyla Hellyer <[email protected]>2017-11-03 07:13:38 -0700
commitb8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5 (patch)
tree17eb07c8218f1e145d5eb3fba353fd1486b3874d /src
parentMake the Client return a Result (diff)
downloadserenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.tar.xz
serenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.zip
Redo client internals + gateway
This commit is a rewrite of the client module's internals and the gateway. The main benefit of this is that there is either 0 or 1 lock retrievals per event received, and the ability to utilize the ShardManager both internally and in userland code has been improved. The primary rework is in the `serenity::client` module, which now includes a few more structures, some changes to existing ones, and more functionality (such as to the `ShardManager`). The two notable additions to the client-gateway bridge are the `ShardMessenger` and `ShardManagerMonitor`. The `ShardMessenger` is a simple-to-use interface for users to use to interact with shards. The user is given one of these in the `serenity::client::Context` in dispatches to the `serenity::client::EventHandler`. This can be used for updating the presence of a shard, sending a guild chunk message, or sending a user's defined WebSocket message. The `ShardManagerMonitor` is a loop run in its own thread, potentially the main thread, that is responsible for receiving messages over an mpsc channel on what to do with shards via the `ShardManager`. For example, it will receive a message to shutdown a single shard, restart a single shard, or shutdown the entire thing. Users, in most applications, will not interact with the `ShardManagerMonitor`. Users using the `serenity::client::Client` interact with only the `ShardMessenger`. The `ShardManager` is now usable by the user and is available to them, and contains public functions for shutdowns, initializations, restarts, and complete shutdowns of shards. It contains utility functions like determining whether the `ShardManager` is responsible for a shard of a given ID and the IDs of shards currently active (having an associated `ShardRunner`). It can be found on `serenity::client::Client::shard_manager`. Speaking of the `ShardRunner`, it no longer owns a clone of an Arc to its assigned `serenity::gateway::Shard`. It now completely owns the Shard. This means that in order to open the shard, a `ShardRunner` no longer has to repeatedly retrieve a lock to it. This reduces the number of lock retrievals per event dispatching cycle from 3 or 4 depending on event type to 0 or 1 depending on whether it's a message create _and_ if the framework is in use. To interact with the Shard, one must now go through the previously mentioned `ShardMessenger`, which the `ShardRunner` will check for messages from on a loop. `serenity::client::Context` is now slightly different. Instead of the `shard` field being `Arc<Mutex<Shard>>`, it is an instance of a `ShardMessenger`. The interface is the same (minus losing some Shard-specific methods like `latency`), and `Context`'s shortcuts still exist (like `Context::online` or `Context::set_game`). It now additionally includes a `Context::shard_id` field which is a u64 containing the ID of the shard that the event was dispatched from. `serenity::client::Client` has one changed field name, one field that is now public, and a new field. `Client::shard_runners` is now `Client::shard_manager` of type `Arc<Mutex<ShardManager>>`. The `Client::token` field is now public. This can, for example, be mutated on token resets if you know what you're doing. `Client::ws_uri` is new and contains the URI for shards to use when connecting to the gateway. Otherwise, the Client's usage is unchanged. `serenity::gateway::Shard` has a couple of minor changes and many more public methods and fields. The `autoreconnect`, `check_heartbeat`, `handle_event`, `heartbeat`, `identify`, `initialize`, `reset`, `resume`, `reconnect`, and `update_presence` methods are now public. The `token` structfield is now public. There are new getters for various structfields, such as `heartbeat_instants` and `last_heartbeat_ack`. The breaking change on the `Shard` is that `Shard::handle_event` now takes an event by reference and, instead of returning `Result<Option<Event>>`, it now returns `Result<Option<ShardAction>>`. `serenity::gateway::ShardAction` is a light enum determining an action that someone _should_/_must_ perform on the shard, e.g. reconnecting or identifying. This is determined by `Shard::handle_event`. In total, there aren't too many breaking changes that most of userland use cases has to deal with -- at most, changing some usage of `Context`. Retrieving information like a Shard's latency is currently not possible anymore but work will be done to make this functionality available again.
Diffstat (limited to 'src')
-rw-r--r--src/client/bridge/gateway/mod.rs114
-rw-r--r--src/client/bridge/gateway/shard_manager.rs283
-rw-r--r--src/client/bridge/gateway/shard_manager_monitor.rs54
-rw-r--r--src/client/bridge/gateway/shard_messenger.rs276
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs56
-rw-r--r--src/client/bridge/gateway/shard_runner.rs385
-rw-r--r--src/client/bridge/gateway/shard_runner_message.rs43
-rw-r--r--src/client/bridge/mod.rs9
-rw-r--r--src/client/context.rs65
-rw-r--r--src/client/dispatch.rs132
-rw-r--r--src/client/mod.rs142
-rw-r--r--src/gateway/mod.rs17
-rw-r--r--src/gateway/shard.rs889
-rw-r--r--src/gateway/ws_client_ext.rs133
14 files changed, 1717 insertions, 881 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs
index 0b873aa..752b015 100644
--- a/src/client/bridge/gateway/mod.rs
+++ b/src/client/bridge/gateway/mod.rs
@@ -1,28 +1,113 @@
+//! The client gateway bridge is support essential for the [`client`] module.
+//!
+//! This is made available for user use if one wishes to be lower-level or avoid
+//! the higher functionality of the [`Client`].
+//!
+//! Of interest are three pieces:
+//!
+//! ### [`ShardManager`]
+//!
+//! The shard manager is responsible for being a clean interface between the
+//! user and the shard runners, providing essential functions such as
+//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`]
+//! to restart a shard.
+//!
+//! If you are using the `Client`, this is likely the only piece of interest to
+//! you. Refer to [its documentation][`ShardManager`] for more information.
+//!
+//! ### [`ShardQueuer`]
+//!
+//! The shard queuer is a light wrapper around an mpsc receiver that receives
+//! [`ShardManagerMessage`]s. It should be run in its own thread so it can
+//! receive messages to start shards in a queue.
+//!
+//! Refer to [its documentation][`ShardQueuer`] for more information.
+//!
+//! ### [`ShardRunner`]
+//!
+//! The shard runner is responsible for actually running a shard and
+//! communicating with its respective WebSocket client.
+//!
+//! It is performs all actions such as sending a presence update over the client
+//! and, with the help of the [`Shard`], will be able to determine what to do.
+//! This is, for example, whether to reconnect, resume, or identify with the
+//! gateway.
+//!
+//! ### In Conclusion
+//!
+//! For almost every - if not every - use case, you only need to _possibly_ be
+//! concerned about the [`ShardManager`] in this module.
+//!
+//! [`Client`]: ../../struct.Client.html
+//! [`client`]: ../..
+//! [`Shard`]: ../../../gateway/struct.Shard.html
+//! [`ShardManager`]: struct.ShardManager.html
+//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart
+//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown
+//! [`ShardQueuer`]: struct.ShardQueuer.html
+//! [`ShardRunner`]: struct.ShardRunner.html
+
mod shard_manager;
+mod shard_manager_monitor;
+mod shard_messenger;
mod shard_queuer;
mod shard_runner;
+mod shard_runner_message;
pub use self::shard_manager::ShardManager;
+pub use self::shard_manager_monitor::ShardManagerMonitor;
+pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::ShardQueuer;
pub use self::shard_runner::ShardRunner;
+pub use self::shard_runner_message::ShardRunnerMessage;
-use gateway::Shard;
-use parking_lot::Mutex;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::mpsc::Sender;
-use std::sync::Arc;
-type Parked<T> = Arc<Mutex<T>>;
-type LockedShard = Parked<Shard>;
+/// A message either for a [`ShardManager`] or a [`ShardRunner`].
+///
+/// [`ShardManager`]: struct.ShardManager.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+pub enum ShardClientMessage {
+ /// A message intended to be worked with by a [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Manager(ShardManagerMessage),
+ /// A message intended to be worked with by a [`ShardRunner`].
+ ///
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ Runner(ShardRunnerMessage),
+}
+/// A message for a [`ShardManager`] relating to an operation with a shard.
+///
+/// [`ShardManager`]: struct.ShardManager.html
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub enum ShardManagerMessage {
+ /// Indicator that a [`ShardManagerMonitor`] should restart a shard.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Restart(ShardId),
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard
+ /// without bringing it back up.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Shutdown(ShardId),
- #[allow(dead_code)]
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards
+ /// and end its monitoring process for the [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
ShutdownAll,
}
+/// A message to be sent to the [`ShardQueuer`].
+///
+/// This should usually be wrapped in a [`ShardClientMessage`].
+///
+/// [`ShardClientMessage`]: enum.ShardClientMessage.html
+/// [`ShardQueuer`]: enum.ShardQueuer.html
+#[derive(Clone, Debug)]
pub enum ShardQueuerMessage {
/// Message to start a shard, where the 0-index element is the ID of the
/// Shard to start and the 1-index element is the total shards in use.
@@ -31,8 +116,8 @@ pub enum ShardQueuerMessage {
Shutdown,
}
-// A light tuplestruct wrapper around a u64 to verify type correctness when
-// working with the IDs of shards.
+/// A light tuplestruct wrapper around a u64 to verify type correctness when
+/// working with the IDs of shards.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub struct ShardId(pub u64);
@@ -42,7 +127,16 @@ impl Display for ShardId {
}
}
+/// Information about a [`ShardRunner`].
+///
+/// The [`ShardId`] is not included because, as it stands, you probably already
+/// know the Id if you obtained this.
+///
+/// [`ShardId`]: struct.ShardId.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+#[derive(Debug)]
pub struct ShardRunnerInfo {
- pub runner_tx: Sender<ShardManagerMessage>,
- pub shard: Arc<Mutex<Shard>>,
+ /// The channel used to communicate with the shard runner, telling it
+ /// what to do with regards to its status.
+ pub runner_tx: Sender<ShardClientMessage>,
}
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
index 6e3b285..2cf45fd 100644
--- a/src/client/bridge/gateway/shard_manager.rs
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -1,13 +1,15 @@
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
use super::super::super::EventHandler;
use super::{
+ ShardClientMessage,
ShardId,
ShardManagerMessage,
+ ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
@@ -18,8 +20,80 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
+/// A manager for handling the status of shards by starting them, restarting
+/// them, and stopping them when required.
+///
+/// **Note**: The [`Client`] internally uses a shard manager. If you are using a
+/// Client, then you do not need to make one of these.
+///
+/// # Examples
+///
+/// Initialize a shard manager with a framework responsible for shards 0 through
+/// 2, of 5 total shards:
+///
+/// ```rust,no_run
+/// extern crate parking_lot;
+/// extern crate serenity;
+/// extern crate threadpool;
+/// extern crate typemap;
+///
+/// # use std::error::Error;
+/// #
+/// # #[cfg(feature = "framework")]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// #
+/// use parking_lot::Mutex;
+/// use serenity::client::bridge::gateway::ShardManager;
+/// use serenity::client::EventHandler;
+/// use serenity::http;
+/// use std::sync::Arc;
+/// use std::env;
+/// use threadpool::ThreadPool;
+/// use typemap::ShareMap;
+///
+/// struct Handler;
+///
+/// impl EventHandler for Handler { }
+///
+/// let token = env::var("DISCORD_TOKEN")?;
+/// http::set_token(&token);
+/// let token = Arc::new(Mutex::new(token));
+///
+/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url));
+/// let data = Arc::new(Mutex::new(ShareMap::custom()));
+/// let event_handler = Arc::new(Handler);
+/// let framework = Arc::new(Mutex::new(None));
+/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5);
+///
+/// ShardManager::new(
+/// 0, // the shard index to start initiating from
+/// 3, // the number of shards to initiate (this initiates 0, 1, and 2)
+/// 5, // the total number of shards in use
+/// gateway_url,
+/// token,
+/// data,
+/// event_handler,
+/// framework,
+/// threadpool,
+/// );
+/// # Ok(())
+/// # }
+/// #
+/// # #[cfg(not(feature = "framework"))]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Client`]: ../../struct.Client.html
+#[derive(Debug)]
pub struct ShardManager {
- pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The shard runners currently managed.
+ runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// The index of the first shard to initialize, 0-indexed.
shard_index: u64,
/// The number of shards to initialize.
@@ -27,10 +101,11 @@ pub struct ShardManager {
/// The total shards in use, 1-indexed.
shard_total: u64,
shard_queuer: Sender<ShardQueuerMessage>,
- thread_rx: Receiver<ShardManagerMessage>,
}
impl ShardManager {
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(feature = "framework")]
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new<H>(
@@ -43,7 +118,7 @@ impl ShardManager {
event_handler: Arc<H>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
@@ -66,16 +141,22 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
}
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(not(feature = "framework"))]
pub fn new<H>(
shard_index: u64,
@@ -86,21 +167,21 @@ impl ShardManager {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
let runners = Arc::new(Mutex::new(HashMap::new()));
let mut shard_queuer = ShardQueuer {
- data: data.clone(),
- event_handler: event_handler.clone(),
+ data: Arc::clone(&data),
+ event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
- runners: runners.clone(),
+ runners: Arc::clone(&runners),
rx: shard_queue_rx,
- token: token.clone(),
- ws_url: ws_url.clone(),
+ token: Arc::clone(&token),
+ ws_url: Arc::clone(&ws_url),
threadpool,
};
@@ -108,21 +189,41 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
+ }
+
+ /// Returns whether the shard manager contains either an active instance of
+ /// a shard runner responsible for the given ID.
+ ///
+ /// If a shard has been queued but has not yet been initiated, then this
+ /// will return `false`. Consider double-checking [`is_responsible_for`] to
+ /// determine whether this shard manager is responsible for the given shard.
+ ///
+ /// [`is_responsible_for`]: #method.is_responsible_for
+ pub fn has(&self, shard_id: ShardId) -> bool {
+ self.runners.lock().contains_key(&shard_id)
}
+ /// Initializes all shards that the manager is responsible for.
+ ///
+ /// This will communicate shard boots with the [`ShardQueuer`] so that they
+ /// are properly queued.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
pub fn initialize(&mut self) -> Result<()> {
let shard_to = self.shard_index + self.shard_init;
- debug!("{}, {}", self.shard_index, self.shard_init);
-
for shard_id in self.shard_index..shard_to {
let shard_total = self.shard_total;
@@ -132,39 +233,52 @@ impl ShardManager {
Ok(())
}
- pub fn run(&mut self) {
- while let Ok(value) = self.thread_rx.recv() {
- match value {
- ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
- ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
- ShardManagerMessage::ShutdownAll => {
- self.shutdown_all();
-
- break;
- },
- }
- }
- }
-
- pub fn shutdown_all(&mut self) {
- info!("Shutting down all shards");
- let keys = {
- self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
- };
+ /// Sets the new sharding information for the manager.
+ ///
+ /// This will shutdown all existing shards.
+ ///
+ /// This will _not_ instantiate the new shards.
+ pub fn set_shards(&mut self, index: u64, init: u64, total: u64) {
+ self.shutdown_all();
- for shard_id in keys {
- self.shutdown(shard_id);
- }
+ self.shard_index = index;
+ self.shard_init = init;
+ self.shard_total = total;
}
- fn boot(&mut self, shard_info: [ShardId; 2]) {
- info!("Telling shard queuer to start shard {}", shard_info[0]);
-
- let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
- let _ = self.shard_queuer.send(msg);
- }
-
- fn restart(&mut self, shard_id: ShardId) {
+ /// Restarts a shard runner.
+ ///
+ /// This sends a shutdown signal to a shard's associated [`ShardRunner`],
+ /// and then queues a initialization of a shard runner for the same shard
+ /// via the [`ShardQueuer`].
+ ///
+ /// # Examples
+ ///
+ /// Creating a client and then restarting a shard by ID:
+ ///
+ /// _(note: in reality this precise code doesn't have an effect since the
+ /// shard would not yet have been initialized via [`initialize`], but the
+ /// concept is the same)_
+ ///
+ /// ```rust,no_run
+ /// use serenity::client::bridge::gateway::ShardId;
+ /// use serenity::client::{Client, EventHandler};
+ /// use std::env;
+ ///
+ /// struct Handler;
+ ///
+ /// impl EventHandler for Handler { }
+ ///
+ /// let token = env::var("DISCORD_TOKEN").unwrap();
+ /// let mut client = Client::new(&token, Handler).unwrap();
+ ///
+ /// // restart shard ID 7
+ /// client.shard_manager.lock().restart(ShardId(7));
+ /// ```
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn restart(&mut self, shard_id: ShardId) {
info!("Restarting shard {}", shard_id);
self.shutdown(shard_id);
@@ -173,23 +287,88 @@ impl ShardManager {
self.boot([shard_id, ShardId(shard_total)]);
}
- fn shutdown(&mut self, shard_id: ShardId) {
+ /// Returns the [`ShardId`]s of the shards that have been instantiated and
+ /// currently have a valid [`ShardRunner`].
+ ///
+ /// [`ShardId`]: struct.ShardId.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn shards_instantiated(&self) -> Vec<ShardId> {
+ self.runners.lock().keys().cloned().collect()
+ }
+
+ /// Attempts to shut down the shard runner by Id.
+ ///
+ /// Returns a boolean indicating whether a shard runner was present. This is
+ /// _not_ necessary an indicator of whether the shard runner was
+ /// successfully shut down.
+ ///
+ /// **Note**: If the receiving end of an mpsc channel - theoretically owned
+ /// by the shard runner - no longer exists, then the shard runner will not
+ /// know it should shut down. This _should never happen_. It may already be
+ /// stopped.
+ pub fn shutdown(&mut self, shard_id: ShardId) -> bool {
info!("Shutting down shard {}", shard_id);
if let Some(runner) = self.runners.lock().get(&shard_id) {
- let msg = ShardManagerMessage::Shutdown(shard_id);
+ let shutdown = ShardManagerMessage::Shutdown(shard_id);
+ let msg = ShardClientMessage::Manager(shutdown);
if let Err(why) = runner.runner_tx.send(msg) {
- warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ warn!(
+ "Failed to cleanly shutdown shard {}: {:?}",
+ shard_id,
+ why,
+ );
}
}
- self.runners.lock().remove(&shard_id);
+ self.runners.lock().remove(&shard_id).is_some()
+ }
+
+ /// Sends a shutdown message for all shards that the manager is responsible
+ /// for that are still known to be running.
+ ///
+ /// If you only need to shutdown a select number of shards, prefer looping
+ /// over the [`shutdown`] method.
+ ///
+ /// [`shutdown`]: #method.shutdown
+ pub fn shutdown_all(&mut self) {
+ let keys = {
+ let runners = self.runners.lock();
+
+ if runners.is_empty() {
+ return;
+ }
+
+ runners.keys().cloned().collect::<Vec<_>>()
+ };
+
+ info!("Shutting down all shards");
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
}
}
impl Drop for ShardManager {
+ /// A custom drop implementation to clean up after the manager.
+ ///
+ /// This shuts down all active [`ShardRunner`]s and attempts to tell the
+ /// [`ShardQueuer`] to shutdown.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
fn drop(&mut self) {
+ self.shutdown_all();
+
if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
warn!("Failed to send shutdown to shard queuer: {:?}", why);
}
diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs
new file mode 100644
index 0000000..4a64473
--- /dev/null
+++ b/src/client/bridge/gateway/shard_manager_monitor.rs
@@ -0,0 +1,54 @@
+use parking_lot::Mutex;
+use std::sync::mpsc::Receiver;
+use std::sync::Arc;
+use super::{ShardManager, ShardManagerMessage};
+
+/// The shard manager monitor does what it says on the tin -- it monitors the
+/// shard manager and performs actions on it as received.
+///
+/// The monitor is essentially responsible for running in its own thread and
+/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or
+/// shutdown everything entirely.
+///
+/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html
+#[derive(Debug)]
+pub struct ShardManagerMonitor {
+ /// An clone of the Arc to the manager itself.
+ pub manager: Arc<Mutex<ShardManager>>,
+ /// The mpsc Receiver channel to receive shard manager messages over.
+ pub rx: Receiver<ShardManagerMessage>,
+}
+
+impl ShardManagerMonitor {
+ /// "Runs" the monitor, waiting for messages over the Receiver.
+ ///
+ /// This should be called in its own thread due to its blocking, looped
+ /// nature.
+ ///
+ /// This will continue running until either:
+ ///
+ /// - a [`ShardManagerMessage::ShutdownAll`] has been received
+ /// - an error is returned while receiving a message from the
+ /// channel (probably indicating that the shard manager should stop anyway)
+ ///
+ /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll
+ pub fn run(&mut self) {
+ debug!("Starting shard manager worker");
+
+ while let Ok(value) = self.rx.recv() {
+ match value {
+ ShardManagerMessage::Restart(shard_id) => {
+ self.manager.lock().restart(shard_id);
+ },
+ ShardManagerMessage::Shutdown(shard_id) => {
+ self.manager.lock().shutdown(shard_id);
+ },
+ ShardManagerMessage::ShutdownAll => {
+ self.manager.lock().shutdown_all();
+
+ break;
+ },
+ }
+ }
+ }
+}
diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs
new file mode 100644
index 0000000..069c838
--- /dev/null
+++ b/src/client/bridge/gateway/shard_messenger.rs
@@ -0,0 +1,276 @@
+use model::{Game, GuildId, OnlineStatus};
+use super::{ShardClientMessage, ShardRunnerMessage};
+use std::sync::mpsc::{SendError, Sender};
+use websocket::message::OwnedMessage;
+
+/// A lightweight wrapper around an mpsc sender.
+///
+/// This is used to cleanly communicate with a shard's respective
+/// [`ShardRunner`]. This can be used for actions such as setting the game via
+/// [`set_game`] or shutting down via [`shutdown`].
+///
+/// [`ShardRunner`]: struct.ShardRunner.html
+/// [`set_game`]: #method.set_game
+/// [`shutdown`]: #method.shutdown
+#[derive(Clone, Debug)]
+pub struct ShardMessenger {
+ tx: Sender<ShardClientMessage>,
+}
+
+impl ShardMessenger {
+ /// Creates a new shard messenger.
+ ///
+ /// If you are using the [`Client`], you do not need to do this.
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ #[inline]
+ pub fn new(tx: Sender<ShardClientMessage>) -> Self {
+ Self {
+ tx,
+ }
+ }
+
+ /// Requests that one or multiple [`Guild`]s be chunked.
+ ///
+ /// This will ask the gateway to start sending member chunks for large
+ /// guilds (250 members+). If a guild is over 250 members, then a full
+ /// member list will not be downloaded, and must instead be requested to be
+ /// sent in "chunks" containing members.
+ ///
+ /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each
+ /// chunk only contains a partial amount of the total members.
+ ///
+ /// If the `cache` feature is enabled, the cache will automatically be
+ /// updated with member chunks.
+ ///
+ /// # Examples
+ ///
+ /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not
+ /// specifying a query parameter:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// Chunk a single guild by Id, limiting to 20 members, and specifying a
+ /// query parameter of `"do"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`Event::GuildMembersChunk`]:
+ /// ../../model/event/enum.Event.html#variant.GuildMembersChunk
+ /// [`Guild`]: ../../model/struct.Guild.html
+ /// [`Member`]: ../../model/struct.Member.html
+ pub fn chunk_guilds<It>(
+ &self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<String>,
+ ) where It: IntoIterator<Item=GuildId> {
+ let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>();
+
+ let _ = self.send(ShardRunnerMessage::ChunkGuilds {
+ guild_ids: guilds,
+ limit,
+ query,
+ });
+ }
+
+ /// Sets the user's current game, if any.
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current game to playing `"Heroes of the Storm"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::Game;
+ ///
+ /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_game(&self, game: Option<Game>) {
+ let _ = self.send(ShardRunnerMessage::SetGame(game));
+ }
+
+ /// Sets the user's full presence information.
+ ///
+ /// Consider using the individual setters if you only need to modify one of
+ /// these.
+ ///
+ /// # Examples
+ ///
+ /// Set the current user as playing `"Heroes of the Storm"` and being
+ /// online:
+ ///
+ /// ```rust,ignore
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::{Game, OnlineStatus};
+ ///
+ /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_presence(&self, game: Option<Game>, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetPresence(status, game));
+ }
+
+ /// Sets the user's current online status.
+ ///
+ /// Note that [`Offline`] is not a valid online status, so it is
+ /// automatically converted to [`Invisible`].
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current online status for the shard to [`DoNotDisturb`].
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::OnlineStatus;
+ ///
+ /// shard.set_status(OnlineStatus::DoNotDisturb);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
+ /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
+ pub fn set_status(&self, mut online_status: OnlineStatus) {
+ if online_status == OnlineStatus::Offline {
+ online_status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetStatus(online_status));
+ }
+
+ /// Shuts down the websocket by attempting to cleanly close the
+ /// connection.
+ pub fn shutdown_clean(&self) {
+ let _ = self.send(ShardRunnerMessage::Close(1000, None));
+ }
+
+ /// Sends a raw message over the WebSocket.
+ ///
+ /// The given message is not mutated in any way, and is sent as-is.
+ ///
+ /// You should only use this if you know what you're doing. If you're
+ /// wanting to, for example, send a presence update, prefer the usage of
+ /// the [`set_presence`] method.
+ ///
+ /// [`set_presence`]: #method.set_presence
+ pub fn websocket_message(&self, message: OwnedMessage) {
+ let _ = self.send(ShardRunnerMessage::Message(message));
+ }
+
+ #[inline]
+ fn send(&self, msg: ShardRunnerMessage)
+ -> Result<(), SendError<ShardClientMessage>> {
+ self.tx.send(ShardClientMessage::Runner(msg))
+ }
+}
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
index cb3f749..bd02532 100644
--- a/src/client/bridge/gateway/shard_queuer.rs
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -27,20 +27,69 @@ use framework::Framework;
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
+ /// A copy of [`Client::data`] to be given to runners for contextual
+ /// dispatching.
+ ///
+ /// [`Client::data`]: ../../struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
+ /// A reference to an `EventHandler`, such as the one given to the
+ /// [`Client`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
pub event_handler: Arc<H>,
+ /// A copy of the framework
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ /// The instant that a shard was last started.
+ ///
+ /// This is used to determine how long to wait between shard IDENTIFYs.
pub last_start: Option<Instant>,
+ /// A copy of the sender channel to communicate with the
+ /// [`ShardManagerMonitor`].
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
+ /// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// A receiver channel for the shard queuer to be told to start shards.
pub rx: Receiver<ShardQueuerMessage>,
+ /// A copy of a threadpool to give shard runners.
+ ///
+ /// For example, when using the [`Client`], this will be a copy of
+ /// [`Client::threadpool`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool
pub threadpool: ThreadPool,
+ /// A copy of the token to connect with.
pub token: Arc<Mutex<String>>,
+ /// A copy of the URI to use to connect to the gateway.
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
+ /// Begins the shard queuer loop.
+ ///
+ /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s,
+ /// blocking for messages on what to do.
+ ///
+ /// If a [`ShardQueuerMessage::Start`] is received, this will:
+ ///
+ /// 1. Check how much time has passed since the last shard was started
+ /// 2. If the amount of time is less than the ratelimit, it will sleep until
+ /// that time has passed
+ /// 3. Start the shard by ID
+ ///
+ /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and
+ /// the loop will be over.
+ ///
+ /// **Note**: This should be run in its own thread due to the blocking
+ /// nature of the loop.
+ ///
+ /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html
+ /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown
+ /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
+ /// [`rx`]: #structfield.rx
pub fn run(&mut self) {
while let Ok(msg) = self.rx.recv() {
match msg {
@@ -80,16 +129,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
+
let shard = Shard::new(
Arc::clone(&self.ws_url),
Arc::clone(&self.token),
shard_info,
)?;
- let locked = Arc::new(Mutex::new(shard));
let mut runner = feature_framework! {{
ShardRunner::new(
- Arc::clone(&locked),
+ shard,
self.manager_tx.clone(),
Arc::clone(&self.framework),
Arc::clone(&self.data),
@@ -98,7 +147,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
)
} else {
ShardRunner::new(
- locked.clone(),
+ shard,
self.manager_tx.clone(),
self.data.clone(),
self.event_handler.clone(),
@@ -108,7 +157,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
let runner_info = ShardRunnerInfo {
runner_tx: runner.runner_tx(),
- shard: locked,
};
thread::spawn(move || {
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index b14e48b..a5fde3d 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -1,35 +1,45 @@
+use gateway::{Shard, ShardAction};
use internal::prelude::*;
use internal::ws_impl::ReceiverExt;
use model::event::{Event, GatewayEvent};
use parking_lot::Mutex;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use super::super::super::{EventHandler, dispatch};
-use super::{LockedShard, ShardId, ShardManagerMessage};
+use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage};
use threadpool::ThreadPool;
use typemap::ShareMap;
+use websocket::message::{CloseData, OwnedMessage};
use websocket::WebSocketError;
#[cfg(feature = "framework")]
use framework::Framework;
+#[cfg(feature = "voice")]
+use internal::ws_impl::SenderExt;
+/// A runner for managing a [`Shard`] and its respective WebSocket client.
+///
+/// [`Shard`]: ../../../gateway/struct.Shard.html
pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
#[cfg(feature = "framework")]
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
manager_tx: Sender<ShardManagerMessage>,
- runner_rx: Receiver<ShardManagerMessage>,
- runner_tx: Sender<ShardManagerMessage>,
- shard: LockedShard,
- shard_info: [u64; 2],
+ // channel to receive messages from the shard manager and dispatches
+ runner_rx: Receiver<ShardClientMessage>,
+ // channel to send messages to the shard runner from the shard manager
+ runner_tx: Sender<ShardClientMessage>,
+ shard: Shard,
threadpool: ThreadPool,
}
impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
+ /// Creates a new runner for a Shard.
+ #[allow(too_many_arguments)]
#[cfg(feature = "framework")]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
data: Arc<Mutex<ShareMap>>,
@@ -37,7 +47,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -47,21 +56,20 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
framework,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Creates a new runner for a Shard.
#[cfg(not(feature = "framework"))]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -70,90 +78,276 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
event_handler,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Starts the runner's loop to receive events.
+ ///
+ /// This runs a loop that performs the following in each iteration:
+ ///
+ /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the
+ /// [`ShardManager`], and if there is one, acts on it.
+ ///
+ /// 2. checks if a heartbeat should be sent to the discord Gateway, and if
+ /// so, sends one.
+ ///
+ /// 3. attempts to retrieve a message from the WebSocket, processing it into
+ /// a [`GatewayEvent`]. This will block for 100ms before assuming there is
+ /// no message available.
+ ///
+ /// 4. Checks with the [`Shard`] to determine if the gateway event is
+ /// specifying an action to take (e.g. resuming, reconnecting, heartbeating)
+ /// and then performs that action, if any.
+ ///
+ /// 5. Dispatches the event via the Client.
+ ///
+ /// 6. Go back to 1.
+ ///
+ /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html
+ /// [`Shard`]: ../../../gateway/struct.Shard.html
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html
pub fn run(&mut self) -> Result<()> {
- debug!("[ShardRunner {:?}] Running", self.shard_info);
+ debug!("[ShardRunner {:?}] Running", self.shard.shard_info());
loop {
- {
- let mut shard = self.shard.lock();
- let incoming = self.runner_rx.try_recv();
+ if !self.recv()? {
+ return Ok(());
+ }
- // Check for an incoming message over the runner channel.
- //
- // If the message is to shutdown, first verify the ID so we know
- // for certain this runner is to shutdown.
- if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
- if id.0 == self.shard_info[0] {
- let _ = shard.shutdown_clean();
+ // check heartbeat
+ if let Err(why) = self.shard.check_heartbeat() {
+ warn!(
+ "[ShardRunner {:?}] Error heartbeating: {:?}",
+ self.shard.shard_info(),
+ why,
+ );
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+
+ return self.request_restart();
+ }
- return Ok(());
+ #[cfg(feature = "voice")]
+ {
+ for message in self.shard.cycle_voice_recv() {
+ if let Err(why) = self.shard.client.send_json(&message) {
+ println!("Err sending from voice over WS: {:?}", why);
}
}
+ }
- if let Err(why) = shard.check_heartbeat() {
- error!("Failed to heartbeat and reconnect: {:?}", why);
+ let (event, action, successful) = self.recv_event();
- return self.request_restart();
- }
-
- #[cfg(feature = "voice")]
- {
- shard.cycle_voice_recv();
- }
+ if let Some(action) = action {
+ let _ = self.action(action);
}
- let (event, successful) = self.recv_event();
-
if let Some(event) = event {
- let data = Arc::clone(&self.data);
- let event_handler = Arc::clone(&self.event_handler);
- let shard = Arc::clone(&self.shard);
-
- feature_framework! {{
- let framework = Arc::clone(&self.framework);
-
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- framework,
- data,
- event_handler,
- );
- });
- } else {
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- data,
- event_handler,
- );
- });
- }}
+ self.dispatch(event);
}
- if !successful && !self.shard.lock().stage().is_connecting() {
+ if !successful && !self.shard.stage().is_connecting() {
return self.request_restart();
}
}
}
- pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> {
+ /// Clones the internal copy of the Sender to the shard runner.
+ pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> {
self.runner_tx.clone()
}
+ /// Takes an action that a [`Shard`] has determined should happen and then
+ /// does it.
+ ///
+ /// For example, if the shard says that an Identify message needs to be
+ /// sent, this will do that.
+ ///
+ /// # Errors
+ ///
+ /// Returns
+ fn action(&mut self, action: ShardAction) -> Result<()> {
+ match action {
+ ShardAction::Autoreconnect => self.shard.autoreconnect(),
+ ShardAction::Heartbeat => self.shard.heartbeat(),
+ ShardAction::Identify => self.shard.identify(),
+ ShardAction::Reconnect => self.shard.reconnect(),
+ ShardAction::Resume => self.shard.resume(),
+ }
+ }
+
+ // Checks if the ID received to shutdown is equivalent to the ID of the
+ // shard this runner is responsible. If so, it shuts down the WebSocket
+ // client.
+ //
+ // Returns whether the WebSocket client is still active.
+ //
+ // If true, the WebSocket client was _not_ shutdown. If false, it was.
+ fn checked_shutdown(&mut self, id: ShardId) -> bool {
+ // First verify the ID so we know for certain this runner is
+ // to shutdown.
+ if id.0 != self.shard.shard_info()[0] {
+ // Not meant for this runner for some reason, don't
+ // shutdown.
+ return true;
+ }
+
+ let close_data = CloseData::new(1000, String::new());
+ let msg = OwnedMessage::Close(Some(close_data));
+ let _ = self.shard.client.send_message(&msg);
+
+ false
+ }
+
+ fn dispatch(&self, event: Event) {
+ let data = Arc::clone(&self.data);
+ let event_handler = Arc::clone(&self.event_handler);
+ let runner_tx = self.runner_tx.clone();
+ let shard_id = self.shard.shard_info()[0];
+
+ feature_framework! {{
+ let framework = Arc::clone(&self.framework);
+
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ framework,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ } else {
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ }}
+ }
+
+ // Handles a received value over the shard runner rx channel.
+ //
+ // Returns a boolean on whether the shard runner can continue.
+ //
+ // This always returns true, except in the case that the shard manager asked
+ // the runner to shutdown.
+ fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool {
+ match value {
+ ShardClientMessage::Manager(x) => match x {
+ ShardManagerMessage::Restart(id) |
+ ShardManagerMessage::Shutdown(id) => {
+ self.checked_shutdown(id)
+ },
+ ShardManagerMessage::ShutdownAll => {
+ // This variant should never be received.
+ warn!(
+ "[ShardRunner {:?}] Received a ShutdownAll?",
+ self.shard.shard_info(),
+ );
+
+ true
+ },
+ }
+ ShardClientMessage::Runner(x) => match x {
+ ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => {
+ self.shard.chunk_guilds(
+ guild_ids,
+ limit,
+ query.as_ref().map(String::as_str),
+ ).is_ok()
+ },
+ ShardRunnerMessage::Close(code, reason) => {
+ let reason = reason.unwrap_or_else(String::new);
+ let data = CloseData::new(code, reason);
+ let msg = OwnedMessage::Close(Some(data));
+
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::Message(msg) => {
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::SetGame(game) => {
+ // To avoid a clone of `game`, we do a little bit of
+ // trickery here:
+ //
+ // First, we obtain a reference to the current presence of
+ // the shard, and create a new presence tuple of the new
+ // game we received over the channel as well as the online
+ // status that the shard already had.
+ //
+ // We then (attempt to) send the websocket message with the
+ // status update, expressively returning:
+ //
+ // - whether the message successfully sent
+ // - the original game we received over the channel
+ self.shard.set_game(game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetPresence(status, game) => {
+ self.shard.set_presence(status, game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetStatus(status) => {
+ self.shard.set_status(status);
+
+ self.shard.update_presence().is_ok()
+ },
+ },
+ }
+ }
+
+ // Receives values over the internal shard runner rx channel and handles
+ // them.
+ //
+ // This will loop over values until there is no longer one.
+ //
+ // Requests a restart if the sending half of the channel disconnects. This
+ // should _never_ happen, as the sending half is kept on the runner.
+
+ // Returns whether the shard runner is in a state that can continue.
+ fn recv(&mut self) -> Result<bool> {
+ loop {
+ match self.runner_rx.try_recv() {
+ Ok(value) => {
+ if !self.handle_rx_value(value) {
+ return Ok(false);
+ }
+ },
+ Err(TryRecvError::Disconnected) => {
+ warn!(
+ "[ShardRunner {:?}] Sending half DC; restarting",
+ self.shard.shard_info(),
+ );
+
+ let _ = self.request_restart();
+
+ return Ok(false);
+ },
+ Err(TryRecvError::Empty) => break,
+ }
+ }
+
+ // There are no longer any values available.
+
+ Ok(true)
+ }
+
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
- fn recv_event(&mut self) -> (Option<Event>, bool) {
- let mut shard = self.shard.lock();
-
- let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) {
+ let gw_event = match self.shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
@@ -161,58 +355,69 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- return (None, true);
+ {
+ let last = self.shard.last_heartbeat_ack();
+ let interval = self.shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
+
+ if seconds_passed <= interval_in_secs * 2 {
+ return (None, None, true);
+ }
+ } else {
+ return (None, None, true);
}
- } else {
- return (None, true);
}
debug!("Attempting to auto-reconnect");
- if let Err(why) = shard.autoreconnect() {
+ if let Err(why) = self.shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
}
- return (None, true);
+ return (None, None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
- return (None, false);
+ return (None, None, false);
},
other => other,
};
let event = match gw_event {
Ok(Some(event)) => Ok(event),
- Ok(None) => return (None, true),
+ Ok(None) => return (None, None, true),
Err(why) => Err(why),
};
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => return (None, true),
+ let action = match self.shard.handle_event(&event) {
+ Ok(Some(action)) => Some(action),
+ Ok(None) => None,
Err(why) => {
error!("Shard handler received err: {:?}", why);
- return (None, true);
+ return (None, None, true);
},
- };
+ };
+
+ let event = match event {
+ Ok(GatewayEvent::Dispatch(_, event)) => Some(event),
+ _ => None,
+ };
- (Some(event), true)
+ (event, action, true)
}
fn request_restart(&self) -> Result<()> {
- debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
- let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+ let shard_id = ShardId(self.shard.shard_info()[0]);
+ let msg = ShardManagerMessage::Restart(shard_id);
let _ = self.manager_tx.send(msg);
Ok(())
diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs
new file mode 100644
index 0000000..e6458eb
--- /dev/null
+++ b/src/client/bridge/gateway/shard_runner_message.rs
@@ -0,0 +1,43 @@
+use model::{Game, GuildId, OnlineStatus};
+use websocket::message::OwnedMessage;
+
+/// A message to send from a shard over a WebSocket.
+pub enum ShardRunnerMessage {
+ /// Indicates that the client is to send a member chunk message.
+ ChunkGuilds {
+ /// The IDs of the [`Guild`]s to chunk.
+ ///
+ /// [`Guild`]: ../../../model/struct.Guild.html
+ guild_ids: Vec<GuildId>,
+ /// The maximum number of members to receive [`GuildMembersChunkEvent`]s
+ /// for.
+ ///
+ /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html
+ limit: Option<u16>,
+ /// Text to filter members by.
+ ///
+ /// For example, a query of `"s"` will cause only [`Member`]s whose
+ /// usernames start with `"s"` to be chunked.
+ ///
+ /// [`Member`]: ../../../model/struct.Member.html
+ query: Option<String>,
+ },
+ /// Indicates that the client is to close with the given status code and
+ /// reason.
+ ///
+ /// You should rarely - if _ever_ - need this, but the option is available.
+ /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you
+ /// are intending to send a 1000 close code.
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Close(u16, Option<String>),
+ /// Indicates that the client is to send a custom WebSocket message.
+ Message(OwnedMessage),
+ /// Indicates that the client is to update the shard's presence's game.
+ SetGame(Option<Game>),
+ /// Indicates that the client is to update the shard's presence in its
+ /// entirity.
+ SetPresence(OnlineStatus, Option<Game>),
+ /// Indicates that the client is to update the shard's presence's status.
+ SetStatus(OnlineStatus),
+}
diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs
index 4f27526..ea18d73 100644
--- a/src/client/bridge/mod.rs
+++ b/src/client/bridge/mod.rs
@@ -1 +1,10 @@
+//! A collection of bridged support between the [`client`] module and other
+//! modules.
+//!
+//! **Warning**: You likely _do not_ need to mess with anything in here. Beware.
+//! This is lower-level functionality abstracted by the [`Client`].
+//!
+//! [`Client`]: ../struct.Client.html
+//! [`client`]: ../
+
pub mod gateway;
diff --git a/src/client/context.rs b/src/client/context.rs
index a60aaa0..9b89cde 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -1,7 +1,7 @@
-use Result;
+use client::bridge::gateway::{ShardClientMessage, ShardMessenger};
+use std::sync::mpsc::Sender;
use std::sync::Arc;
use typemap::ShareMap;
-use gateway::Shard;
use model::*;
use parking_lot::Mutex;
@@ -12,7 +12,7 @@ use internal::prelude::*;
#[cfg(feature = "builder")]
use builder::EditProfile;
#[cfg(feature = "builder")]
-use {http, utils};
+use {Result, http, utils};
#[cfg(feature = "builder")]
use std::collections::HashMap;
@@ -38,21 +38,23 @@ pub struct Context {
///
/// [`Client::data`]: struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
- /// The associated shard which dispatched the event handler.
- ///
- /// Note that if you are sharding, in relevant terms, this is the shard
- /// which received the event being dispatched.
- pub shard: Arc<Mutex<Shard>>,
+ /// The messenger to communicate with the shard runner.
+ pub shard: ShardMessenger,
+ /// The ID of the shard this context is related to.
+ pub shard_id: u64,
}
impl Context {
/// Create a new Context to be passed to an event handler.
- pub(crate) fn new(shard: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>)
- -> Context {
+ pub(crate) fn new(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+ ) -> Context {
Context {
+ shard: ShardMessenger::new(runner_tx),
+ shard_id,
data,
- shard,
}
}
@@ -110,6 +112,7 @@ impl Context {
http::edit_profile(&edited)
}
+
/// Sets the current user as being [`Online`]. This maintains the current
/// game.
///
@@ -137,9 +140,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn online(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Online);
+ self.shard.set_status(OnlineStatus::Online);
}
/// Sets the current user as being [`Idle`]. This maintains the current
@@ -168,9 +171,9 @@ impl Context {
/// ```
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn idle(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Idle);
+ self.shard.set_status(OnlineStatus::Idle);
}
/// Sets the current user as being [`DoNotDisturb`]. This maintains the
@@ -199,9 +202,9 @@ impl Context {
/// ```
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ #[inline]
pub fn dnd(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::DoNotDisturb);
+ self.shard.set_status(OnlineStatus::DoNotDisturb);
}
/// Sets the current user as being [`Invisible`]. This maintains the current
@@ -231,9 +234,9 @@ impl Context {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
+ #[inline]
pub fn invisible(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Invisible);
+ self.shard.set_status(OnlineStatus::Invisible);
}
/// "Resets" the current user's presence, by setting the game to `None` and
@@ -265,9 +268,9 @@ impl Context {
/// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
+ #[inline]
pub fn reset_presence(&self) {
- let mut shard = self.shard.lock();
- shard.set_presence(None, OnlineStatus::Online);
+ self.shard.set_presence(None, OnlineStatus::Online);
}
/// Sets the current game, defaulting to an online status of [`Online`].
@@ -303,9 +306,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn set_game(&self, game: Game) {
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current game, passing in only its name. This will automatically
@@ -351,8 +354,7 @@ impl Context {
url: None,
};
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current user's presence, providing all fields to be passed.
@@ -406,9 +408,9 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn set_presence(&self, game: Option<Game>, status: OnlineStatus) {
- let mut shard = self.shard.lock();
- shard.set_presence(game, status);
+ self.shard.set_presence(game, status);
}
/// Disconnects the shard from the websocket, essentially "quiting" it.
@@ -417,9 +419,8 @@ impl Context {
/// until [`Client::start`] and vice versa are called again.
///
/// [`Client::start`]: ./struct.Client.html#method.start
- pub fn quit(&self) -> Result<()> {
- let mut shard = self.shard.lock();
-
- shard.shutdown_clean()
+ #[inline]
+ pub fn quit(&self) {
+ self.shard.shutdown_clean();
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 827875e..9545a6f 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,9 +1,10 @@
use std::sync::Arc;
use parking_lot::Mutex;
+use super::bridge::gateway::ShardClientMessage;
use super::event_handler::EventHandler;
use super::Context;
+use std::sync::mpsc::Sender;
use typemap::ShareMap;
-use gateway::Shard;
use model::event::Event;
use model::{Channel, Message};
@@ -35,19 +36,26 @@ macro_rules! now {
() => (Utc::now().time().second() * 1000)
}
-fn context(conn: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>) -> Context {
- Context::new(conn, data)
+fn context(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) -> Context {
+ Context::new(data, runner_tx, shard_id)
}
#[cfg(feature = "framework")]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(
context.clone(),
event.message.clone(),
@@ -58,21 +66,24 @@ pub fn dispatch<H: EventHandler + 'static>(event: Event,
framework.dispatch(context, event.message);
}
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
#[cfg(not(feature = "framework"))]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(context, event.message, event_handler);
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
@@ -91,10 +102,13 @@ fn dispatch_message<H>(
}
#[allow(cyclomatic_complexity, unused_assignments, unused_mut)]
-fn handle_event<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+fn handle_event<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
#[cfg(feature = "cache")]
let mut last_guild_create_time = now!();
@@ -113,7 +127,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
// This different channel_create dispatching is only due to the fact that
// each time the bot receives a dm, this event is also fired.
@@ -134,7 +148,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelDelete(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
match event.channel {
Channel::Private(_) | Channel::Group(_) => {},
@@ -147,28 +161,28 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
},
Event::ChannelPinsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_pins_update(context, event);
},
Event::ChannelRecipientAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_addition(context, event.channel_id, event.user);
},
Event::ChannelRecipientRemove(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_removal(context, event.channel_id, event.user);
},
Event::ChannelUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read().channel(event.channel.id());
@@ -178,12 +192,12 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::GuildBanAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_addition(context, event.guild_id, event.user);
},
Event::GuildBanRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_removal(context, event.guild_id, event.user);
},
@@ -204,7 +218,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let cache = CACHE.read();
if cache.unavailable_guilds.is_empty() {
- let context = context(Arc::clone(&conn), Arc::clone(&data));
+ let context = context(Arc::clone(&data), runner_tx.clone(), shard_id);
let guild_amount = cache
.guilds
@@ -216,7 +230,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
}
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_create(context, event.guild, _is_new);
@@ -226,7 +240,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildDelete(mut event) => {
let _full = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_delete(context, event.guild, _full);
@@ -237,25 +251,25 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildEmojisUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_emojis_update(context, event.guild_id, event.emojis);
},
Event::GuildIntegrationsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_integrations_update(context, event.guild_id);
},
Event::GuildMemberAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_member_addition(context, event.guild_id, event.member);
},
Event::GuildMemberRemove(mut event) => {
let _member = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_member_removal(context, event.guild_id, event.user, _member);
@@ -265,7 +279,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildMemberUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
// This is safe to unwrap, as the update would have created
@@ -284,20 +298,20 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildMembersChunk(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_members_chunk(context, event.guild_id, event.members);
},
Event::GuildRoleCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_role_create(context, event.guild_id, event.role);
},
Event::GuildRoleDelete(mut event) => {
let _role = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role);
@@ -307,7 +321,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildRoleUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_update(context, event.guild_id, _before, event.role);
@@ -318,14 +332,14 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildUnavailable(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_unavailable(context, event.guild_id);
},
Event::GuildUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read()
@@ -341,46 +355,46 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
// Already handled by the framework check macro
Event::MessageCreate(_) => {},
Event::MessageDeleteBulk(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete_bulk(context, event.channel_id, event.ids);
},
Event::MessageDelete(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete(context, event.channel_id, event.message_id);
},
Event::MessageUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_update(context, event);
},
Event::PresencesReplace(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_replace(context, event.presences);
},
Event::PresenceUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_update(context, event);
},
Event::ReactionAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_add(context, event.reaction);
},
Event::ReactionRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove(context, event.reaction);
},
Event::ReactionRemoveAll(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove_all(context, event.channel_id, event.message_id);
},
@@ -393,35 +407,35 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let _ = wait_for_guilds()
.map(move |_| {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
});
} else {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
}
}
},
Event::Resumed(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.resume(context, event);
},
Event::TypingStart(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.typing_start(context, event);
},
Event::Unknown(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.unknown(context, event.kind, event.value);
},
Event::UserUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.user_update(context, _before.unwrap(), event.current_user);
@@ -430,19 +444,19 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::VoiceServerUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_server_update(context, event);
},
Event::VoiceStateUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_state_update(context, event.guild_id, event.voice_state);
},
Event::WebhookUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.webhook_update(context, event.guild_id, event.channel_id);
},
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 00a305a..2a97c91 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -37,13 +37,11 @@ pub use http as rest;
#[cfg(feature = "cache")]
pub use CACHE;
-use self::bridge::gateway::{ShardId, ShardManager, ShardRunnerInfo};
+use self::bridge::gateway::{ShardManager, ShardManagerMonitor};
use self::dispatch::dispatch;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::mem;
use threadpool::ThreadPool;
use typemap::ShareMap;
use http;
@@ -103,8 +101,7 @@ impl CloseHandle {
/// [`on_message`]: #method.on_message
/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate
/// [sharding docs]: gateway/index.html#sharding
-#[derive(Clone)]
-pub struct Client<H: EventHandler + Send + Sync + 'static> {
+pub struct Client {
/// A ShareMap which requires types to be Send + Sync. This is a map that
/// can be safely shared across contexts.
///
@@ -191,7 +188,6 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`on_ready`]: #method.on_ready
- event_handler: Arc<H>,
#[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
/// A HashMap of all shards instantiated by the Client.
///
@@ -224,12 +220,12 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?;
///
- /// let shard_runners = client.shard_runners.clone();
+ /// let shard_manager = client.shard_manager.clone();
///
/// thread::spawn(move || {
/// loop {
/// println!("Shard count instantiated: {}",
- /// shard_runners.lock().len());
+ /// shard_manager.lock().shards_instantiated().len());
///
/// thread::sleep(Duration::from_millis(5000));
/// }
@@ -244,16 +240,26 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Client::start_shard`]: #method.start_shard
/// [`Client::start_shards`]: #method.start_shards
- pub shard_runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ pub shard_manager: Arc<Mutex<ShardManager>>,
+ shard_manager_worker: ShardManagerMonitor,
/// The threadpool shared by all shards.
///
/// Defaults to 5 threads, which should suffice small bots. Consider
/// increasing this number as your bot grows.
pub threadpool: ThreadPool,
- token: Arc<Mutex<String>>,
+ /// The token in use by the client.
+ pub token: Arc<Mutex<String>>,
+ /// URI that the client's shards will use to connect to the gateway.
+ ///
+ /// This is likely not important for production usage and is, at best, used
+ /// for debugging.
+ ///
+ /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated
+ /// value available.
+ pub ws_uri: Arc<Mutex<String>>,
}
-impl<H: EventHandler + Send + Sync + 'static> Client<H> {
+impl Client {
/// Creates a Client for a bot user.
///
/// Discord has a requirement of prefixing bot tokens with `"Bot "`, which
@@ -283,7 +289,8 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(token: &str, handler: H) -> Result<Self> {
+ pub fn new<H>(token: &str, handler: H) -> Result<Self>
+ where H: EventHandler + Send + Sync + 'static {
let token = if token.starts_with("Bot ") {
token.to_string()
} else {
@@ -295,23 +302,53 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let name = "serenity client".to_owned();
let threadpool = ThreadPool::with_name(name, 5);
+ let url = Arc::new(Mutex::new(http::get_gateway()?.url));
+ let data = Arc::new(Mutex::new(ShareMap::custom()));
+ let event_handler = Arc::new(handler);
Ok(feature_framework! {{
+ let framework = Arc::new(Mutex::new(None));
+
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ Arc::clone(&locked),
+ Arc::clone(&data),
+ Arc::clone(&event_handler),
+ Arc::clone(&framework),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
framework: Arc::new(Mutex::new(None)),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
} else {
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ locked.clone(),
+ data.clone(),
+ Arc::clone(&event_handler),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
}})
}
@@ -462,7 +499,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
///
/// [gateway docs]: gateway/index.html#sharding
pub fn start(&mut self) -> Result<()> {
- self.start_connection([0, 0, 1], http::get_gateway()?.url)
+ self.start_connection([0, 0, 1])
}
/// Establish the connection(s) and start listening for events.
@@ -514,15 +551,13 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
/// [gateway docs]: gateway/index.html#sharding
pub fn start_autosharded(&mut self) -> Result<()> {
- let mut res = http::get_bot_gateway()?;
-
- let x = res.shards as u64 - 1;
- let y = res.shards as u64;
- let url = mem::replace(&mut res.url, String::default());
+ let (x, y) = {
+ let res = http::get_bot_gateway()?;
- drop(res);
+ (res.shards as u64 - 1, res.shards as u64)
+ };
- self.start_connection([0, x, y], url)
+ self.start_connection([0, x, y])
}
/// Establish a sharded connection and start listening for events.
@@ -603,7 +638,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_autosharded`]: #method.start_autosharded
/// [gateway docs]: gateway/index.html#sharding
pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> {
- self.start_connection([shard, shard, shards], http::get_gateway()?.url)
+ self.start_connection([shard, shard, shards])
}
/// Establish sharded connections and start listening for events.
@@ -657,10 +692,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shard_range`]: #method.start_shard_range
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shards(&mut self, total_shards: u64) -> Result<()> {
- self.start_connection(
- [0, total_shards - 1, total_shards],
- http::get_gateway()?.url,
- )
+ self.start_connection([0, total_shards - 1, total_shards])
}
/// Establish a range of sharded connections and start listening for events.
@@ -730,7 +762,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shards`]: #method.start_shards
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> {
- self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url)
+ self.start_connection([range[0], range[1], total_shards])
}
/// Returns a thread-safe handle for closing shards.
@@ -749,7 +781,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
// an error.
//
// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
- fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> {
+ fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> {
HANDLE_STILL.store(true, Ordering::Relaxed);
// Update the framework's current user if the feature is enabled.
@@ -764,39 +796,37 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
}
}
- let gateway_url = Arc::new(Mutex::new(url));
+ {
+ let mut manager = self.shard_manager.lock();
+
+ let init = shard_data[1] - shard_data[0] + 1;
- let mut manager = ShardManager::new(
- shard_data[0],
- shard_data[1] - shard_data[0] + 1,
- shard_data[2],
- Arc::clone(&gateway_url),
- Arc::clone(&self.token),
- Arc::clone(&self.data),
- Arc::clone(&self.event_handler),
- #[cfg(feature = "framework")]
- Arc::clone(&self.framework),
- self.threadpool.clone(),
- );
+ manager.set_shards(shard_data[0], init, shard_data[2]);
- self.shard_runners = Arc::clone(&manager.runners);
+ debug!(
+ "Initializing shard info: {} - {}/{}",
+ shard_data[0],
+ init,
+ shard_data[2],
+ );
- if let Err(why) = manager.initialize() {
- error!("Failed to boot a shard: {:?}", why);
- info!("Shutting down all shards");
+ if let Err(why) = manager.initialize() {
+ error!("Failed to boot a shard: {:?}", why);
+ info!("Shutting down all shards");
- manager.shutdown_all();
+ manager.shutdown_all();
- return Err(Error::Client(ClientError::ShardBootFailure));
+ return Err(Error::Client(ClientError::ShardBootFailure));
+ }
}
- manager.run();
+ self.shard_manager_worker.run();
Err(Error::Client(ClientError::Shutdown))
}
}
-impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
+impl Drop for Client {
fn drop(&mut self) { self.close_handle().close(); }
}
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index b593f5c..147808e 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -51,9 +51,18 @@
mod error;
mod shard;
+mod ws_client_ext;
pub use self::error::Error as GatewayError;
pub use self::shard::Shard;
+pub use self::ws_client_ext::WebSocketGatewayClientExt;
+
+use model::{Game, OnlineStatus};
+use websocket::sync::client::Client;
+use websocket::sync::stream::{TcpStream, TlsStream};
+
+pub type CurrentPresence = (Option<Game>, OnlineStatus);
+pub type WsClient = Client<TlsStream<TcpStream>>;
/// Indicates the current connection stage of a [`Shard`].
///
@@ -136,3 +145,11 @@ impl ConnectionStage {
}
}
}
+
+pub enum ShardAction {
+ Autoreconnect,
+ Heartbeat,
+ Identify,
+ Reconnect,
+ Resume,
+}
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index c644eac..ef05cf4 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -1,39 +1,31 @@
-use chrono::Utc;
use parking_lot::Mutex;
-use serde_json::Value;
-use std::env::consts;
-use std::io::Write;
-use std::net::Shutdown;
use std::sync::Arc;
use std::time::{Duration as StdDuration, Instant};
-use std::thread;
-use super::{ConnectionStage, GatewayError};
+use super::{
+ ConnectionStage,
+ CurrentPresence,
+ ShardAction,
+ GatewayError,
+ WsClient,
+ WebSocketGatewayClientExt,
+};
use websocket::client::Url;
-use websocket::message::{CloseData, OwnedMessage};
use websocket::stream::sync::AsTcpStream;
-use websocket::sync::client::{Client, ClientBuilder};
-use websocket::sync::stream::{TcpStream, TlsStream};
+use websocket::sync::client::ClientBuilder;
use websocket::WebSocketError;
-use constants::{self, close_codes, OpCode};
+use constants::{self, close_codes};
use internal::prelude::*;
-use internal::ws_impl::SenderExt;
use model::event::{Event, GatewayEvent};
use model::{Game, GuildId, OnlineStatus};
#[cfg(feature = "voice")]
+use serde_json::Value;
+#[cfg(feature = "voice")]
use std::sync::mpsc::{self, Receiver as MpscReceiver};
-#[cfg(feature = "cache")]
-use client::CACHE;
#[cfg(feature = "voice")]
use voice::Manager as VoiceManager;
#[cfg(feature = "voice")]
use http;
-#[cfg(feature = "cache")]
-use utils;
-
-pub type WsClient = Client<TlsStream<TcpStream>>;
-
-type CurrentPresence = (Option<Game>, OnlineStatus);
/// A Shard is a higher-level handler for a websocket connection to Discord's
/// gateway. The shard allows for sending and receiving messages over the
@@ -91,7 +83,7 @@ pub struct Shard {
session_id: Option<String>,
shard_info: [u64; 2],
stage: ConnectionStage,
- token: Arc<Mutex<String>>,
+ pub token: Arc<Mutex<String>>,
ws_url: Arc<Mutex<String>>,
/// The voice connections that this Shard is responsible for. The Shard will
/// update the voice connections' states.
@@ -138,11 +130,14 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(ws_url: Arc<Mutex<String>>,
- token: Arc<Mutex<String>>,
- shard_info: [u64; 2])
- -> Result<Shard> {
- let client = connecting(&*ws_url.lock());
+ pub fn new(
+ ws_url: Arc<Mutex<String>>,
+ token: Arc<Mutex<String>>,
+ shard_info: [u64; 2],
+ ) -> Result<Shard> {
+ let mut client = connect(&*ws_url.lock())?;
+
+ let _ = set_client_timeout(&mut client);
let current_presence = (None, OnlineStatus::Online);
let heartbeat_instants = (None, None);
@@ -159,6 +154,8 @@ impl Shard {
let user = http::get_current_user()?;
Shard {
+ manager: VoiceManager::new(tx, user.id),
+ manager_rx: rx,
client,
current_presence,
heartbeat_instants,
@@ -170,8 +167,6 @@ impl Shard {
shard_info,
session_id,
ws_url,
- manager: VoiceManager::new(tx, user.id),
- manager_rx: rx,
}
} else {
Shard {
@@ -191,137 +186,122 @@ impl Shard {
})
}
- /// Retrieves a copy of the current shard information.
- ///
- /// The first element is the _current_ shard - 0-indexed - while the second
- /// element is the _total number_ of shards -- 1-indexed.
- ///
- /// For example, if using 3 shards in total, and if this is shard 1, then it
- /// can be read as "the second of three shards".
- ///
- /// # Examples
- ///
- /// Retrieving the shard info for the second shard, out of two shards total:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
- /// #
- /// assert_eq!(shard.shard_info(), [1, 2]);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
+ /// Retrieves the current presence of the shard.
+ #[inline]
+ pub fn current_presence(&self) -> &CurrentPresence {
+ &self.current_presence
+ }
- /// Sets the user's current game, if any.
+ /// Retrieves the heartbeat instants of the shard.
///
- /// Other presence settings are maintained.
+ /// This is the time of when a heartbeat was sent and when an
+ /// acknowledgement was last received.
+ #[inline]
+ pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) {
+ &self.heartbeat_instants
+ }
+
+ /// Retrieves the value of when the last heartbeat was sent.
+ #[inline]
+ pub fn last_heartbeat_sent(&self) -> Option<&Instant> {
+ self.heartbeat_instants.0.as_ref()
+ }
+
+ /// Retrieves the value of when the last heartbeat ack was received.
+ #[inline]
+ pub fn last_heartbeat_ack(&self) -> Option<&Instant> {
+ self.heartbeat_instants.1.as_ref()
+ }
+
+ /// Sends a heartbeat to the gateway with the current sequence.
///
- /// # Examples
+ /// This sets the last heartbeat time to now, and
+ /// `last_heartbeat_acknowledged` to `false`.
///
- /// Setting the current game to playing `"Heroes of the Storm"`:
+ /// # Errors
///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::Game;
+ /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending
+ /// a heartbeat.
///
- /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
+ /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed
+ pub fn heartbeat(&mut self) -> Result<()> {
+ match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) {
+ Ok(()) => {
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.last_heartbeat_acknowledged = false;
+
+ Ok(())
+ },
+ Err(why) => {
+ match why {
+ Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
+ debug!("[Shard {:?}] Err heartbeating: {:?}",
+ self.shard_info,
+ err);
+ },
+ other => {
+ warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
+ self.shard_info,
+ other);
+ },
+ }
+
+ Err(Error::Gateway(GatewayError::HeartbeatFailed))
+ }
+ }
+ }
+
+ #[inline]
+ pub fn heartbeat_interval(&self) -> Option<&u64> {
+ self.heartbeat_interval.as_ref()
+ }
+
+ #[inline]
+ pub fn last_heartbeat_acknowledged(&self) -> bool {
+ self.last_heartbeat_acknowledged
+ }
+
+ #[inline]
+ pub fn seq(&self) -> u64 {
+ self.seq
+ }
+
+ #[inline]
+ pub fn session_id(&self) -> Option<&String> {
+ self.session_id.as_ref()
+ }
+
+ #[inline]
pub fn set_game(&mut self, game: Option<Game>) {
self.current_presence.0 = game;
+ }
- self.update_presence();
+ #[inline]
+ pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) {
+ self.set_game(game);
+ self.set_status(status);
}
- /// Sets the user's current online status.
- ///
- /// Note that [`Offline`] is not a valid online status, so it is
- /// automatically converted to [`Invisible`].
- ///
- /// Other presence settings are maintained.
- ///
- /// # Examples
- ///
- /// Setting the current online status for the shard to [`DoNotDisturb`].
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::OnlineStatus;
- ///
- /// shard.set_status(OnlineStatus::DoNotDisturb);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
- /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
- /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
- pub fn set_status(&mut self, online_status: OnlineStatus) {
- self.current_presence.1 = match online_status {
- OnlineStatus::Offline => OnlineStatus::Invisible,
- other => other,
- };
+ #[inline]
+ pub fn set_status(&mut self, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
- self.update_presence();
+ self.current_presence.1 = status;
}
- /// Sets the user's full presence information.
+ /// Retrieves a copy of the current shard information.
///
- /// Consider using the individual setters if you only need to modify one of
- /// these.
+ /// The first element is the _current_ shard - 0-indexed - while the second
+ /// element is the _total number_ of shards -- 1-indexed.
+ ///
+ /// For example, if using 3 shards in total, and if this is shard 1, then it
+ /// can be read as "the second of three shards".
///
/// # Examples
///
- /// Set the current user as playing `"Heroes of the Storm"` and being
- /// online:
+ /// Retrieving the shard info for the second shard, out of two shards total:
///
/// ```rust,no_run
/// # extern crate parking_lot;
@@ -335,11 +315,9 @@ impl Shard {
/// # fn try_main() -> Result<(), Box<Error>> {
/// # let mutex = Arc::new(Mutex::new("".to_string()));
/// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
/// #
- /// use serenity::model::{Game, OnlineStatus};
- ///
- /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// assert_eq!(shard.shard_info(), [1, 2]);
/// # Ok(())
/// # }
/// #
@@ -347,15 +325,7 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) {
- if status == OnlineStatus::Offline {
- status = OnlineStatus::Invisible;
- }
-
- self.current_presence = (game, status);
-
- self.update_presence();
- }
+ pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
/// Returns the current connection stage of the shard.
pub fn stage(&self) -> ConnectionStage {
@@ -387,21 +357,24 @@ impl Shard {
/// Returns a `GatewayError::OverloadedShard` if the shard would have too
/// many guilds assigned to it.
#[allow(cyclomatic_complexity)]
- pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> {
- match event {
- Ok(GatewayEvent::Dispatch(seq, event)) => {
+ pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>)
+ -> Result<Option<ShardAction>> {
+ match *event {
+ Ok(GatewayEvent::Dispatch(seq, ref event)) => {
if seq > self.seq + 1 {
warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq);
}
- match event {
+ match *event {
Event::Ready(ref ready) => {
debug!("[Shard {:?}] Received Ready", self.shard_info);
self.session_id = Some(ready.ready.session_id.clone());
self.stage = ConnectionStage::Connected;
+ /*
set_client_timeout(&mut self.client)?;
+ */
},
Event::Resumed(_) => {
info!("[Shard {:?}] Resumed", self.shard_info);
@@ -421,7 +394,7 @@ impl Shard {
self.seq = seq;
- Ok(Some(event))
+ Ok(None)
},
Ok(GatewayEvent::Heartbeat(s)) => {
info!("[Shard {:?}] Received shard heartbeat", self.shard_info);
@@ -438,24 +411,18 @@ impl Shard {
if self.stage == ConnectionStage::Handshake {
self.stage = ConnectionStage::Identifying;
- self.identify()?;
+ return Ok(Some(ShardAction::Identify));
} else {
warn!(
"[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting",
self.shard_info
);
- return self.autoreconnect().and(Ok(None));
+ return Ok(Some(ShardAction::Autoreconnect));
}
}
- let map = json!({
- "d": Value::Null,
- "op": OpCode::Heartbeat.num(),
- });
- self.client.send_json(&map)?;
-
- Ok(None)
+ Ok(Some(ShardAction::Heartbeat))
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());
@@ -478,14 +445,14 @@ impl Shard {
self.heartbeat_interval = Some(interval);
}
- if self.stage == ConnectionStage::Handshake {
- self.identify().and(Ok(None))
+ Ok(Some(if self.stage == ConnectionStage::Handshake {
+ ShardAction::Identify
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
- self.autoreconnect().and(Ok(None))
- }
+ ShardAction::Autoreconnect
+ }))
},
Ok(GatewayEvent::InvalidateSession(resumable)) => {
info!(
@@ -493,16 +460,15 @@ impl Shard {
self.shard_info,
);
- if resumable {
- self.resume().and(Ok(None))
+ Ok(Some(if resumable {
+ ShardAction::Resume
} else {
- self.identify().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
- Err(Error::Gateway(GatewayError::Closed(data))) => {
+ Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)),
+ Err(Error::Gateway(GatewayError::Closed(ref data))) => {
let num = data.as_ref().map(|d| d.status_code);
- let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
match num {
@@ -562,7 +528,7 @@ impl Shard {
"[Shard {:?}] Unknown unclean close {}: {:?}",
self.shard_info,
other,
- reason
+ data.as_ref().map(|d| &d.reason),
);
},
_ => {},
@@ -573,14 +539,14 @@ impl Shard {
self.session_id.is_some()
}).unwrap_or(true);
- if resume {
- self.resume().or_else(|_| self.reconnect()).and(Ok(None))
+ Ok(Some(if resume {
+ ShardAction::Resume
} else {
- self.reconnect().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Err(Error::WebSocket(why)) => {
- if let WebSocketError::NoDataAvailable = why {
+ Err(Error::WebSocket(ref why)) => {
+ if let WebSocketError::NoDataAvailable = *why {
if self.heartbeat_instants.1.is_none() {
return Ok(None);
}
@@ -592,44 +558,62 @@ impl Shard {
info!("[Shard {:?}] Will attempt to auto-reconnect",
self.shard_info);
- self.autoreconnect().and(Ok(None))
+ Ok(Some(ShardAction::Autoreconnect))
},
- Err(error) => Err(error),
+ _ => Ok(None),
+ }
+ }
+
+ pub fn check_heartbeat(&mut self) -> Result<()> {
+ let wait = {
+ let heartbeat_interval = match self.heartbeat_interval {
+ Some(heartbeat_interval) => heartbeat_interval,
+ None => return Ok(()),
+ };
+
+ StdDuration::from_secs(heartbeat_interval / 1000)
+ };
+
+ // If a duration of time less than the heartbeat_interval has passed,
+ // then don't perform a keepalive or attempt to reconnect.
+ if let Some(last_sent) = self.heartbeat_instants.0 {
+ if last_sent.elapsed() <= wait {
+ return Ok(());
+ }
+ }
+
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // auto-reconnect.
+ if !self.last_heartbeat_acknowledged {
+ debug!(
+ "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
+ self.shard_info,
+ );
+
+ return self.reconnect().map_err(|why| {
+ warn!(
+ "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
+ self.shard_info,
+ why,
+ );
+
+ why
+ });
+ }
+
+ // Otherwise, we're good to heartbeat.
+ if let Err(why) = self.heartbeat() {
+ warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
+
+ self.reconnect()
+ } else {
+ trace!("[Shard {:?}] Heartbeated", self.shard_info);
+
+ Ok(())
}
}
/// Calculates the heartbeat latency between the shard and the gateway.
- ///
- /// # Examples
- ///
- /// When using the [`Client`], output the latency in response to a `"~ping"`
- /// message handled through [`Client::on_message`].
- ///
- /// ```rust,no_run
- /// # use serenity::prelude::*;
- /// # use serenity::model::*;
- /// struct Handler;
- ///
- /// impl EventHandler for Handler {
- /// fn message(&self, ctx: Context, msg: Message) {
- /// if msg.content == "~ping" {
- /// if let Some(latency) = ctx.shard.lock().latency() {
- /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos());
- ///
- /// let _ = msg.channel_id.say(&s);
- /// } else {
- /// let _ = msg.channel_id.say("N/A");
- /// }
- /// }
- /// }
- /// }
- /// let mut client = Client::new("token", Handler).unwrap();
- ///
- /// client.start().unwrap();
- /// ```
- ///
- /// [`Client`]: ../struct.Client.html
- /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message
// Shamelessly stolen from brayzure's commit in eris:
// <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
pub fn latency(&self) -> Option<StdDuration> {
@@ -640,38 +624,68 @@ impl Shard {
}
}
- /// Shuts down the receiver by attempting to cleanly close the
- /// connection.
- pub fn shutdown_clean(&mut self) -> Result<()> {
- {
- let data = CloseData {
- status_code: 1000,
- reason: String::new(),
- };
-
- let message = OwnedMessage::Close(Some(data));
-
- self.client.send_message(&message)?;
+ #[cfg(feature = "voice")]
+ fn voice_dispatch(&mut self, event: &Event) {
+ if let Event::VoiceStateUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_state(&update.voice_state);
+ }
+ }
}
- let mut stream = self.client.stream_ref().as_tcp();
+ if let Event::VoiceServerUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_server(&update.endpoint, &update.token);
+ }
+ }
+ }
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ #[cfg(feature = "voice")]
+ pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> {
+ let mut messages = vec![];
- debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info);
+ while let Ok(v) = self.manager_rx.try_recv() {
+ messages.push(v);
+ }
- Ok(())
+ messages
}
- /// Uncleanly shuts down the receiver by not sending a close code.
- pub fn shutdown(&mut self) -> Result<()> {
- let mut stream = self.client.stream_ref().as_tcp();
+ /// Performs a deterministic reconnect.
+ ///
+ /// The type of reconnect is deterministic on whether a [`session_id`].
+ ///
+ /// If the `session_id` still exists, then a RESUME is sent. If not, then
+ /// an IDENTIFY is sent.
+ ///
+ /// Note that, if the shard is already in a stage of
+ /// [`ConnectionStage::Connecting`], then no action will be performed.
+ ///
+ /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting
+ /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id
+ pub fn autoreconnect(&mut self) -> Result<()> {
+ if self.stage == ConnectionStage::Connecting {
+ return Ok(());
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ if self.session_id().is_some() {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to resume",
+ self.shard_info,
+ );
- Ok(())
+ self.resume()
+ } else {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to reconnect",
+ self.shard_info,
+ );
+
+ self.reconnect()
+ }
}
/// Requests that one or multiple [`Guild`]s be chunked.
@@ -710,7 +724,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(2000), None);
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
/// # Ok(())
/// # }
/// #
@@ -740,7 +754,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(20), Some("do"));
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
/// # Ok(())
/// # }
/// #
@@ -753,280 +767,40 @@ impl Shard {
/// ../../model/event/enum.Event.html#variant.GuildMembersChunk
/// [`Guild`]: ../../model/struct.Guild.html
/// [`Member`]: ../../model/struct.Member.html
- pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>)
- where It: IntoIterator<Item=T> {
+ pub fn chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);
- let msg = json!({
- "op": OpCode::GetGuildMembers.num(),
- "d": {
- "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
- "limit": limit.unwrap_or(0),
- "query": query.unwrap_or(""),
- },
- });
-
- let _ = self.client.send_json(&msg);
- }
-
- /// Calculates the number of guilds that the shard is responsible for.
- ///
- /// If sharding is not being used (i.e. 1 shard), then the total number of
- /// [`Guild`] in the [`Cache`] will be used.
- ///
- /// **Note**: Requires the `cache` feature be enabled.
- ///
- /// # Examples
- ///
- /// Retrieve the number of guilds a shard is responsible for:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// let info = shard.shard_info();
- /// let guilds = shard.guilds_handled();
- ///
- /// println!("Shard {:?} is responsible for {} guilds", info, guilds);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`Cache`]: ../ext/cache/struct.Cache.html
- /// [`Guild`]: ../model/struct.Guild.html
- #[cfg(feature = "cache")]
- pub fn guilds_handled(&self) -> u16 {
- let cache = CACHE.read();
-
- let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]);
-
- cache
- .guilds
- .keys()
- .filter(|guild_id| {
- utils::shard_id(guild_id.0, shard_count) == shard_id
- })
- .count() as u16
- }
-
- #[cfg(feature = "voice")]
- fn voice_dispatch(&mut self, event: &Event) {
- if let Event::VoiceStateUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_state(&update.voice_state);
- }
- }
- }
-
- if let Event::VoiceServerUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_server(&update.endpoint, &update.token);
- }
- }
- }
- }
-
- #[cfg(feature = "voice")]
- pub(crate) fn cycle_voice_recv(&mut self) {
- if let Ok(v) = self.manager_rx.try_recv() {
- if let Err(why) = self.client.send_json(&v) {
- warn!("[Shard {:?}] Err sending voice msg: {:?}",
- self.shard_info,
- why);
- }
- }
+ self.client.send_chunk_guilds(
+ guild_ids,
+ &self.shard_info,
+ limit,
+ query,
+ )
}
- pub(crate) fn heartbeat(&mut self) -> Result<()> {
- let map = json!({
- "d": self.seq,
- "op": OpCode::Heartbeat.num(),
- });
-
- trace!("[Shard {:?}] Sending heartbeat d: {}",
- self.shard_info,
- self.seq);
-
- match self.client.send_json(&map) {
- Ok(_) => {
- self.heartbeat_instants.0 = Some(Instant::now());
- self.last_heartbeat_acknowledged = false;
-
- trace!("[Shard {:?}] Successfully heartbeated",
- self.shard_info);
-
- Ok(())
- },
- Err(why) => {
- match why {
- Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
- debug!("[Shard {:?}] Err heartbeating: {:?}",
- self.shard_info,
- err);
- },
- other => {
- warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
- self.shard_info,
- other);
- },
- }
-
- Err(Error::Gateway(GatewayError::HeartbeatFailed))
- },
- }
- }
-
- pub(crate) fn check_heartbeat(&mut self) -> Result<()> {
- let heartbeat_interval = match self.heartbeat_interval {
- Some(heartbeat_interval) => heartbeat_interval,
- None => return Ok(()),
- };
-
- let wait = StdDuration::from_secs(heartbeat_interval / 1000);
-
- // If a duration of time less than the heartbeat_interval has passed,
- // then don't perform a keepalive or attempt to reconnect.
- if let Some(last_sent) = self.heartbeat_instants.0 {
- if last_sent.elapsed() <= wait {
- return Ok(());
- }
- }
-
- // If the last heartbeat didn't receive an acknowledgement, then
- // auto-reconnect.
- if !self.last_heartbeat_acknowledged {
- debug!(
- "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
- self.shard_info,
- );
-
- return self.reconnect().map_err(|why| {
- warn!(
- "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
- self.shard_info,
- why,
- );
-
- why
- });
- }
-
- // Otherwise, we're good to heartbeat.
- trace!("[Shard {:?}] Heartbeating", self.shard_info);
-
- if let Err(why) = self.heartbeat() {
- warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
-
- self.reconnect()
- } else {
- trace!("[Shard {:?}] Heartbeated", self.shard_info);
- self.heartbeat_instants.0 = Some(Instant::now());
-
- Ok(())
- }
- }
-
- pub(crate) fn autoreconnect(&mut self) -> Result<()> {
- if self.stage == ConnectionStage::Connecting {
- return Ok(());
- }
-
- if self.session_id.is_some() {
- debug!("[Shard {:?}] Autoreconnector choosing to resume",
- self.shard_info);
-
- self.resume()
- } else {
- debug!("[Shard {:?}] Autoreconnector choosing to reconnect",
- self.shard_info);
-
- self.reconnect()
- }
- }
-
- /// Retrieves the `heartbeat_interval`.
- #[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
- self.heartbeat_interval
- }
-
- /// Retrieves the value of when the last heartbeat ack was received.
- #[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
- self.heartbeat_instants.1
- }
-
- fn reconnect(&mut self) -> Result<()> {
- info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
- self.reset();
-
- self.initialize()
- }
-
- // Attempts to send a RESUME message.
- //
- // # Examples
- //
- // Returns a `GatewayError::NoSessionId` is there is no `session_id`,
- // indicating that the shard should instead [`reconnect`].
+ // Sets the shard as going into identifying stage, which sets:
//
- // [`reconnect`]: #method.reconnect
- fn resume(&mut self) -> Result<()> {
- debug!("Shard {:?}] Attempting to resume", self.shard_info);
-
- self.initialize()?;
- self.stage = ConnectionStage::Resuming;
-
- self.send_resume().or_else(|why| {
- warn!("[Shard {:?}] Err sending resume: {:?}",
- self.shard_info,
- why);
-
- self.reconnect()
- })
- }
-
- fn send_resume(&mut self) -> Result<()> {
- let session_id = match self.session_id.clone() {
- Some(session_id) => session_id,
- None => return Err(Error::Gateway(GatewayError::NoSessionId)),
- };
+ // - the time that the last heartbeat sent as being now
+ // - the `stage` to `Identifying`
+ pub fn identify(&mut self) -> Result<()> {
+ self.client.send_identify(&self.shard_info, &self.token.lock())?;
- debug!("[Shard {:?}] Sending resume; seq: {}",
- self.shard_info,
- self.seq);
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
- self.client.send_json(&json!({
- "op": OpCode::Resume.num(),
- "d": {
- "session_id": session_id,
- "seq": self.seq,
- "token": &*self.token.lock(),
- },
- }))
+ Ok(())
}
/// Initializes a new WebSocket client.
///
/// This will set the stage of the shard before and after instantiation of
/// the client.
- fn initialize(&mut self) -> Result<()> {
+ pub fn initialize(&mut self) -> Result<WsClient> {
debug!("[Shard {:?}] Initializing", self.shard_info);
// We need to do two, sort of three things here:
@@ -1038,38 +812,15 @@ impl Shard {
// This is used to accurately assess whether the state of the shard is
// accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
- self.client = connect(&self.ws_url.lock())?;
+ let mut client = connect(&self.ws_url.lock())?;
self.stage = ConnectionStage::Handshake;
- Ok(())
- }
-
- fn identify(&mut self) -> Result<()> {
- let identification = json!({
- "op": OpCode::Identify.num(),
- "d": {
- "compression": true,
- "large_threshold": constants::LARGE_THRESHOLD,
- "shard": self.shard_info,
- "token": &*self.token.lock(),
- "v": constants::GATEWAY_VERSION,
- "properties": {
- "$browser": "serenity",
- "$device": "serenity",
- "$os": consts::OS,
- },
- },
- });
-
- self.heartbeat_instants.0 = Some(Instant::now());
- self.stage = ConnectionStage::Identifying;
+ let _ = set_client_timeout(&mut client);
- debug!("[Shard {:?}] Identifying", self.shard_info);
-
- self.client.send_json(&identification)
+ Ok(client)
}
- fn reset(&mut self) {
+ pub fn reset(&mut self) {
self.heartbeat_instants = (Some(Instant::now()), None);
self.heartbeat_interval = None;
self.last_heartbeat_acknowledged = true;
@@ -1078,42 +829,39 @@ impl Shard {
self.seq = 0;
}
- fn update_presence(&mut self) {
- let (ref game, status) = self.current_presence;
- let now = Utc::now().timestamp() as u64;
-
- let msg = json!({
- "op": OpCode::StatusUpdate.num(),
- "d": {
- "afk": false,
- "since": now,
- "status": status.name(),
- "game": game.as_ref().map(|x| json!({
- "name": x.name,
- "type": x.kind,
- "url": x.url,
- })),
- },
- });
+ pub fn resume(&mut self) -> Result<()> {
+ debug!("Shard {:?}] Attempting to resume", self.shard_info);
- debug!("[Shard {:?}] Sending presence update", self.shard_info);
+ self.client = self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
- if let Err(why) = self.client.send_json(&msg) {
- warn!("[Shard {:?}] Err sending presence update: {:?}",
- self.shard_info,
- why);
+ match self.session_id.as_ref() {
+ Some(session_id) => {
+ self.client.send_resume(
+ &self.shard_info,
+ session_id,
+ &self.seq,
+ &self.token.lock(),
+ )
+ },
+ None => Err(Error::Gateway(GatewayError::NoSessionId)),
}
+ }
- #[cfg(feature = "cache")]
- {
- let mut cache = CACHE.write();
- let current_user_id = cache.user.id;
+ pub fn reconnect(&mut self) -> Result<()> {
+ info!("[Shard {:?}] Attempting to reconnect", self.shard_info());
- cache.presences.get_mut(&current_user_id).map(|presence| {
- presence.game = game.clone();
- presence.last_modified = Some(now);
- });
- }
+ self.reset();
+ self.client = self.initialize()?;
+
+ Ok(())
+ }
+
+ pub fn update_presence(&mut self) -> Result<()> {
+ self.client.send_presence_update(
+ &self.shard_info,
+ &self.current_presence,
+ )
}
}
@@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> {
Error::Gateway(GatewayError::BuildingUrl)
})
}
-
-/// Tries to connect and upon failure, retries.
-fn connecting(uri: &str) -> WsClient {
- let waiting_time = 30;
-
- loop {
- match connect(&uri) {
- Ok(client) => return client,
- Err(why) => {
- warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time);
- thread::sleep(StdDuration::from_secs(waiting_time));
- },
- };
- }
-}
diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs
new file mode 100644
index 0000000..873d114
--- /dev/null
+++ b/src/gateway/ws_client_ext.rs
@@ -0,0 +1,133 @@
+use chrono::Utc;
+use constants::{self, OpCode};
+use gateway::{CurrentPresence, WsClient};
+use internal::prelude::*;
+use internal::ws_impl::SenderExt;
+use model::GuildId;
+use std::env::consts;
+
+pub trait WebSocketGatewayClientExt {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId>;
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()>;
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()>;
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()>;
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()>;
+}
+
+impl WebSocketGatewayClientExt for WsClient {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
+ debug!("[Shard {:?}] Requesting member chunks", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::GetGuildMembers.num(),
+ "d": {
+ "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
+ "limit": limit.unwrap_or(0),
+ "query": query.unwrap_or(""),
+ },
+ })).map_err(From::from)
+ }
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()> {
+ trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq);
+
+ self.send_json(&json!({
+ "d": seq,
+ "op": OpCode::Heartbeat.num(),
+ })).map_err(From::from)
+ }
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()> {
+ debug!("[Shard {:?}] Identifying", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::Identify.num(),
+ "d": {
+ "compression": true,
+ "large_threshold": constants::LARGE_THRESHOLD,
+ "shard": shard_info,
+ "token": token,
+ "v": constants::GATEWAY_VERSION,
+ "properties": {
+ "$browser": "serenity",
+ "$device": "serenity",
+ "$os": consts::OS,
+ },
+ },
+ }))
+ }
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()> {
+ let &(ref game, ref status) = current_presence;
+ let now = Utc::now().timestamp() as u64;
+
+ debug!("[Shard {:?}] Sending presence update", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::StatusUpdate.num(),
+ "d": {
+ "afk": false,
+ "since": now,
+ "status": status.name(),
+ "game": game.as_ref().map(|x| json!({
+ "name": x.name,
+ "type": x.kind,
+ "url": x.url,
+ })),
+ },
+ }))
+ }
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()> {
+ debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq);
+
+ self.send_json(&json!({
+ "op": OpCode::Resume.num(),
+ "d": {
+ "session_id": session_id,
+ "seq": seq,
+ "token": token,
+ },
+ })).map_err(From::from)
+ }
+}