aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/client')
-rw-r--r--src/client/bridge/gateway/mod.rs114
-rw-r--r--src/client/bridge/gateway/shard_manager.rs283
-rw-r--r--src/client/bridge/gateway/shard_manager_monitor.rs54
-rw-r--r--src/client/bridge/gateway/shard_messenger.rs276
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs56
-rw-r--r--src/client/bridge/gateway/shard_runner.rs385
-rw-r--r--src/client/bridge/gateway/shard_runner_message.rs43
-rw-r--r--src/client/bridge/mod.rs9
-rw-r--r--src/client/context.rs65
-rw-r--r--src/client/dispatch.rs132
-rw-r--r--src/client/mod.rs142
11 files changed, 1256 insertions, 303 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs
index 0b873aa..752b015 100644
--- a/src/client/bridge/gateway/mod.rs
+++ b/src/client/bridge/gateway/mod.rs
@@ -1,28 +1,113 @@
+//! The client gateway bridge is support essential for the [`client`] module.
+//!
+//! This is made available for user use if one wishes to be lower-level or avoid
+//! the higher functionality of the [`Client`].
+//!
+//! Of interest are three pieces:
+//!
+//! ### [`ShardManager`]
+//!
+//! The shard manager is responsible for being a clean interface between the
+//! user and the shard runners, providing essential functions such as
+//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`]
+//! to restart a shard.
+//!
+//! If you are using the `Client`, this is likely the only piece of interest to
+//! you. Refer to [its documentation][`ShardManager`] for more information.
+//!
+//! ### [`ShardQueuer`]
+//!
+//! The shard queuer is a light wrapper around an mpsc receiver that receives
+//! [`ShardManagerMessage`]s. It should be run in its own thread so it can
+//! receive messages to start shards in a queue.
+//!
+//! Refer to [its documentation][`ShardQueuer`] for more information.
+//!
+//! ### [`ShardRunner`]
+//!
+//! The shard runner is responsible for actually running a shard and
+//! communicating with its respective WebSocket client.
+//!
+//! It is performs all actions such as sending a presence update over the client
+//! and, with the help of the [`Shard`], will be able to determine what to do.
+//! This is, for example, whether to reconnect, resume, or identify with the
+//! gateway.
+//!
+//! ### In Conclusion
+//!
+//! For almost every - if not every - use case, you only need to _possibly_ be
+//! concerned about the [`ShardManager`] in this module.
+//!
+//! [`Client`]: ../../struct.Client.html
+//! [`client`]: ../..
+//! [`Shard`]: ../../../gateway/struct.Shard.html
+//! [`ShardManager`]: struct.ShardManager.html
+//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart
+//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown
+//! [`ShardQueuer`]: struct.ShardQueuer.html
+//! [`ShardRunner`]: struct.ShardRunner.html
+
mod shard_manager;
+mod shard_manager_monitor;
+mod shard_messenger;
mod shard_queuer;
mod shard_runner;
+mod shard_runner_message;
pub use self::shard_manager::ShardManager;
+pub use self::shard_manager_monitor::ShardManagerMonitor;
+pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::ShardQueuer;
pub use self::shard_runner::ShardRunner;
+pub use self::shard_runner_message::ShardRunnerMessage;
-use gateway::Shard;
-use parking_lot::Mutex;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::mpsc::Sender;
-use std::sync::Arc;
-type Parked<T> = Arc<Mutex<T>>;
-type LockedShard = Parked<Shard>;
+/// A message either for a [`ShardManager`] or a [`ShardRunner`].
+///
+/// [`ShardManager`]: struct.ShardManager.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+pub enum ShardClientMessage {
+ /// A message intended to be worked with by a [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Manager(ShardManagerMessage),
+ /// A message intended to be worked with by a [`ShardRunner`].
+ ///
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ Runner(ShardRunnerMessage),
+}
+/// A message for a [`ShardManager`] relating to an operation with a shard.
+///
+/// [`ShardManager`]: struct.ShardManager.html
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub enum ShardManagerMessage {
+ /// Indicator that a [`ShardManagerMonitor`] should restart a shard.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Restart(ShardId),
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard
+ /// without bringing it back up.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Shutdown(ShardId),
- #[allow(dead_code)]
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards
+ /// and end its monitoring process for the [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
ShutdownAll,
}
+/// A message to be sent to the [`ShardQueuer`].
+///
+/// This should usually be wrapped in a [`ShardClientMessage`].
+///
+/// [`ShardClientMessage`]: enum.ShardClientMessage.html
+/// [`ShardQueuer`]: enum.ShardQueuer.html
+#[derive(Clone, Debug)]
pub enum ShardQueuerMessage {
/// Message to start a shard, where the 0-index element is the ID of the
/// Shard to start and the 1-index element is the total shards in use.
@@ -31,8 +116,8 @@ pub enum ShardQueuerMessage {
Shutdown,
}
-// A light tuplestruct wrapper around a u64 to verify type correctness when
-// working with the IDs of shards.
+/// A light tuplestruct wrapper around a u64 to verify type correctness when
+/// working with the IDs of shards.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub struct ShardId(pub u64);
@@ -42,7 +127,16 @@ impl Display for ShardId {
}
}
+/// Information about a [`ShardRunner`].
+///
+/// The [`ShardId`] is not included because, as it stands, you probably already
+/// know the Id if you obtained this.
+///
+/// [`ShardId`]: struct.ShardId.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+#[derive(Debug)]
pub struct ShardRunnerInfo {
- pub runner_tx: Sender<ShardManagerMessage>,
- pub shard: Arc<Mutex<Shard>>,
+ /// The channel used to communicate with the shard runner, telling it
+ /// what to do with regards to its status.
+ pub runner_tx: Sender<ShardClientMessage>,
}
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
index 6e3b285..2cf45fd 100644
--- a/src/client/bridge/gateway/shard_manager.rs
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -1,13 +1,15 @@
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
use super::super::super::EventHandler;
use super::{
+ ShardClientMessage,
ShardId,
ShardManagerMessage,
+ ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
@@ -18,8 +20,80 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
+/// A manager for handling the status of shards by starting them, restarting
+/// them, and stopping them when required.
+///
+/// **Note**: The [`Client`] internally uses a shard manager. If you are using a
+/// Client, then you do not need to make one of these.
+///
+/// # Examples
+///
+/// Initialize a shard manager with a framework responsible for shards 0 through
+/// 2, of 5 total shards:
+///
+/// ```rust,no_run
+/// extern crate parking_lot;
+/// extern crate serenity;
+/// extern crate threadpool;
+/// extern crate typemap;
+///
+/// # use std::error::Error;
+/// #
+/// # #[cfg(feature = "framework")]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// #
+/// use parking_lot::Mutex;
+/// use serenity::client::bridge::gateway::ShardManager;
+/// use serenity::client::EventHandler;
+/// use serenity::http;
+/// use std::sync::Arc;
+/// use std::env;
+/// use threadpool::ThreadPool;
+/// use typemap::ShareMap;
+///
+/// struct Handler;
+///
+/// impl EventHandler for Handler { }
+///
+/// let token = env::var("DISCORD_TOKEN")?;
+/// http::set_token(&token);
+/// let token = Arc::new(Mutex::new(token));
+///
+/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url));
+/// let data = Arc::new(Mutex::new(ShareMap::custom()));
+/// let event_handler = Arc::new(Handler);
+/// let framework = Arc::new(Mutex::new(None));
+/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5);
+///
+/// ShardManager::new(
+/// 0, // the shard index to start initiating from
+/// 3, // the number of shards to initiate (this initiates 0, 1, and 2)
+/// 5, // the total number of shards in use
+/// gateway_url,
+/// token,
+/// data,
+/// event_handler,
+/// framework,
+/// threadpool,
+/// );
+/// # Ok(())
+/// # }
+/// #
+/// # #[cfg(not(feature = "framework"))]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Client`]: ../../struct.Client.html
+#[derive(Debug)]
pub struct ShardManager {
- pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The shard runners currently managed.
+ runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// The index of the first shard to initialize, 0-indexed.
shard_index: u64,
/// The number of shards to initialize.
@@ -27,10 +101,11 @@ pub struct ShardManager {
/// The total shards in use, 1-indexed.
shard_total: u64,
shard_queuer: Sender<ShardQueuerMessage>,
- thread_rx: Receiver<ShardManagerMessage>,
}
impl ShardManager {
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(feature = "framework")]
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new<H>(
@@ -43,7 +118,7 @@ impl ShardManager {
event_handler: Arc<H>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
@@ -66,16 +141,22 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
}
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(not(feature = "framework"))]
pub fn new<H>(
shard_index: u64,
@@ -86,21 +167,21 @@ impl ShardManager {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
let runners = Arc::new(Mutex::new(HashMap::new()));
let mut shard_queuer = ShardQueuer {
- data: data.clone(),
- event_handler: event_handler.clone(),
+ data: Arc::clone(&data),
+ event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
- runners: runners.clone(),
+ runners: Arc::clone(&runners),
rx: shard_queue_rx,
- token: token.clone(),
- ws_url: ws_url.clone(),
+ token: Arc::clone(&token),
+ ws_url: Arc::clone(&ws_url),
threadpool,
};
@@ -108,21 +189,41 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
+ }
+
+ /// Returns whether the shard manager contains either an active instance of
+ /// a shard runner responsible for the given ID.
+ ///
+ /// If a shard has been queued but has not yet been initiated, then this
+ /// will return `false`. Consider double-checking [`is_responsible_for`] to
+ /// determine whether this shard manager is responsible for the given shard.
+ ///
+ /// [`is_responsible_for`]: #method.is_responsible_for
+ pub fn has(&self, shard_id: ShardId) -> bool {
+ self.runners.lock().contains_key(&shard_id)
}
+ /// Initializes all shards that the manager is responsible for.
+ ///
+ /// This will communicate shard boots with the [`ShardQueuer`] so that they
+ /// are properly queued.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
pub fn initialize(&mut self) -> Result<()> {
let shard_to = self.shard_index + self.shard_init;
- debug!("{}, {}", self.shard_index, self.shard_init);
-
for shard_id in self.shard_index..shard_to {
let shard_total = self.shard_total;
@@ -132,39 +233,52 @@ impl ShardManager {
Ok(())
}
- pub fn run(&mut self) {
- while let Ok(value) = self.thread_rx.recv() {
- match value {
- ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
- ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
- ShardManagerMessage::ShutdownAll => {
- self.shutdown_all();
-
- break;
- },
- }
- }
- }
-
- pub fn shutdown_all(&mut self) {
- info!("Shutting down all shards");
- let keys = {
- self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
- };
+ /// Sets the new sharding information for the manager.
+ ///
+ /// This will shutdown all existing shards.
+ ///
+ /// This will _not_ instantiate the new shards.
+ pub fn set_shards(&mut self, index: u64, init: u64, total: u64) {
+ self.shutdown_all();
- for shard_id in keys {
- self.shutdown(shard_id);
- }
+ self.shard_index = index;
+ self.shard_init = init;
+ self.shard_total = total;
}
- fn boot(&mut self, shard_info: [ShardId; 2]) {
- info!("Telling shard queuer to start shard {}", shard_info[0]);
-
- let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
- let _ = self.shard_queuer.send(msg);
- }
-
- fn restart(&mut self, shard_id: ShardId) {
+ /// Restarts a shard runner.
+ ///
+ /// This sends a shutdown signal to a shard's associated [`ShardRunner`],
+ /// and then queues a initialization of a shard runner for the same shard
+ /// via the [`ShardQueuer`].
+ ///
+ /// # Examples
+ ///
+ /// Creating a client and then restarting a shard by ID:
+ ///
+ /// _(note: in reality this precise code doesn't have an effect since the
+ /// shard would not yet have been initialized via [`initialize`], but the
+ /// concept is the same)_
+ ///
+ /// ```rust,no_run
+ /// use serenity::client::bridge::gateway::ShardId;
+ /// use serenity::client::{Client, EventHandler};
+ /// use std::env;
+ ///
+ /// struct Handler;
+ ///
+ /// impl EventHandler for Handler { }
+ ///
+ /// let token = env::var("DISCORD_TOKEN").unwrap();
+ /// let mut client = Client::new(&token, Handler).unwrap();
+ ///
+ /// // restart shard ID 7
+ /// client.shard_manager.lock().restart(ShardId(7));
+ /// ```
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn restart(&mut self, shard_id: ShardId) {
info!("Restarting shard {}", shard_id);
self.shutdown(shard_id);
@@ -173,23 +287,88 @@ impl ShardManager {
self.boot([shard_id, ShardId(shard_total)]);
}
- fn shutdown(&mut self, shard_id: ShardId) {
+ /// Returns the [`ShardId`]s of the shards that have been instantiated and
+ /// currently have a valid [`ShardRunner`].
+ ///
+ /// [`ShardId`]: struct.ShardId.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn shards_instantiated(&self) -> Vec<ShardId> {
+ self.runners.lock().keys().cloned().collect()
+ }
+
+ /// Attempts to shut down the shard runner by Id.
+ ///
+ /// Returns a boolean indicating whether a shard runner was present. This is
+ /// _not_ necessary an indicator of whether the shard runner was
+ /// successfully shut down.
+ ///
+ /// **Note**: If the receiving end of an mpsc channel - theoretically owned
+ /// by the shard runner - no longer exists, then the shard runner will not
+ /// know it should shut down. This _should never happen_. It may already be
+ /// stopped.
+ pub fn shutdown(&mut self, shard_id: ShardId) -> bool {
info!("Shutting down shard {}", shard_id);
if let Some(runner) = self.runners.lock().get(&shard_id) {
- let msg = ShardManagerMessage::Shutdown(shard_id);
+ let shutdown = ShardManagerMessage::Shutdown(shard_id);
+ let msg = ShardClientMessage::Manager(shutdown);
if let Err(why) = runner.runner_tx.send(msg) {
- warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ warn!(
+ "Failed to cleanly shutdown shard {}: {:?}",
+ shard_id,
+ why,
+ );
}
}
- self.runners.lock().remove(&shard_id);
+ self.runners.lock().remove(&shard_id).is_some()
+ }
+
+ /// Sends a shutdown message for all shards that the manager is responsible
+ /// for that are still known to be running.
+ ///
+ /// If you only need to shutdown a select number of shards, prefer looping
+ /// over the [`shutdown`] method.
+ ///
+ /// [`shutdown`]: #method.shutdown
+ pub fn shutdown_all(&mut self) {
+ let keys = {
+ let runners = self.runners.lock();
+
+ if runners.is_empty() {
+ return;
+ }
+
+ runners.keys().cloned().collect::<Vec<_>>()
+ };
+
+ info!("Shutting down all shards");
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
}
}
impl Drop for ShardManager {
+ /// A custom drop implementation to clean up after the manager.
+ ///
+ /// This shuts down all active [`ShardRunner`]s and attempts to tell the
+ /// [`ShardQueuer`] to shutdown.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
fn drop(&mut self) {
+ self.shutdown_all();
+
if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
warn!("Failed to send shutdown to shard queuer: {:?}", why);
}
diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs
new file mode 100644
index 0000000..4a64473
--- /dev/null
+++ b/src/client/bridge/gateway/shard_manager_monitor.rs
@@ -0,0 +1,54 @@
+use parking_lot::Mutex;
+use std::sync::mpsc::Receiver;
+use std::sync::Arc;
+use super::{ShardManager, ShardManagerMessage};
+
+/// The shard manager monitor does what it says on the tin -- it monitors the
+/// shard manager and performs actions on it as received.
+///
+/// The monitor is essentially responsible for running in its own thread and
+/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or
+/// shutdown everything entirely.
+///
+/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html
+#[derive(Debug)]
+pub struct ShardManagerMonitor {
+ /// An clone of the Arc to the manager itself.
+ pub manager: Arc<Mutex<ShardManager>>,
+ /// The mpsc Receiver channel to receive shard manager messages over.
+ pub rx: Receiver<ShardManagerMessage>,
+}
+
+impl ShardManagerMonitor {
+ /// "Runs" the monitor, waiting for messages over the Receiver.
+ ///
+ /// This should be called in its own thread due to its blocking, looped
+ /// nature.
+ ///
+ /// This will continue running until either:
+ ///
+ /// - a [`ShardManagerMessage::ShutdownAll`] has been received
+ /// - an error is returned while receiving a message from the
+ /// channel (probably indicating that the shard manager should stop anyway)
+ ///
+ /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll
+ pub fn run(&mut self) {
+ debug!("Starting shard manager worker");
+
+ while let Ok(value) = self.rx.recv() {
+ match value {
+ ShardManagerMessage::Restart(shard_id) => {
+ self.manager.lock().restart(shard_id);
+ },
+ ShardManagerMessage::Shutdown(shard_id) => {
+ self.manager.lock().shutdown(shard_id);
+ },
+ ShardManagerMessage::ShutdownAll => {
+ self.manager.lock().shutdown_all();
+
+ break;
+ },
+ }
+ }
+ }
+}
diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs
new file mode 100644
index 0000000..069c838
--- /dev/null
+++ b/src/client/bridge/gateway/shard_messenger.rs
@@ -0,0 +1,276 @@
+use model::{Game, GuildId, OnlineStatus};
+use super::{ShardClientMessage, ShardRunnerMessage};
+use std::sync::mpsc::{SendError, Sender};
+use websocket::message::OwnedMessage;
+
+/// A lightweight wrapper around an mpsc sender.
+///
+/// This is used to cleanly communicate with a shard's respective
+/// [`ShardRunner`]. This can be used for actions such as setting the game via
+/// [`set_game`] or shutting down via [`shutdown`].
+///
+/// [`ShardRunner`]: struct.ShardRunner.html
+/// [`set_game`]: #method.set_game
+/// [`shutdown`]: #method.shutdown
+#[derive(Clone, Debug)]
+pub struct ShardMessenger {
+ tx: Sender<ShardClientMessage>,
+}
+
+impl ShardMessenger {
+ /// Creates a new shard messenger.
+ ///
+ /// If you are using the [`Client`], you do not need to do this.
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ #[inline]
+ pub fn new(tx: Sender<ShardClientMessage>) -> Self {
+ Self {
+ tx,
+ }
+ }
+
+ /// Requests that one or multiple [`Guild`]s be chunked.
+ ///
+ /// This will ask the gateway to start sending member chunks for large
+ /// guilds (250 members+). If a guild is over 250 members, then a full
+ /// member list will not be downloaded, and must instead be requested to be
+ /// sent in "chunks" containing members.
+ ///
+ /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each
+ /// chunk only contains a partial amount of the total members.
+ ///
+ /// If the `cache` feature is enabled, the cache will automatically be
+ /// updated with member chunks.
+ ///
+ /// # Examples
+ ///
+ /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not
+ /// specifying a query parameter:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// Chunk a single guild by Id, limiting to 20 members, and specifying a
+ /// query parameter of `"do"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`Event::GuildMembersChunk`]:
+ /// ../../model/event/enum.Event.html#variant.GuildMembersChunk
+ /// [`Guild`]: ../../model/struct.Guild.html
+ /// [`Member`]: ../../model/struct.Member.html
+ pub fn chunk_guilds<It>(
+ &self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<String>,
+ ) where It: IntoIterator<Item=GuildId> {
+ let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>();
+
+ let _ = self.send(ShardRunnerMessage::ChunkGuilds {
+ guild_ids: guilds,
+ limit,
+ query,
+ });
+ }
+
+ /// Sets the user's current game, if any.
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current game to playing `"Heroes of the Storm"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::Game;
+ ///
+ /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_game(&self, game: Option<Game>) {
+ let _ = self.send(ShardRunnerMessage::SetGame(game));
+ }
+
+ /// Sets the user's full presence information.
+ ///
+ /// Consider using the individual setters if you only need to modify one of
+ /// these.
+ ///
+ /// # Examples
+ ///
+ /// Set the current user as playing `"Heroes of the Storm"` and being
+ /// online:
+ ///
+ /// ```rust,ignore
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::{Game, OnlineStatus};
+ ///
+ /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_presence(&self, game: Option<Game>, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetPresence(status, game));
+ }
+
+ /// Sets the user's current online status.
+ ///
+ /// Note that [`Offline`] is not a valid online status, so it is
+ /// automatically converted to [`Invisible`].
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current online status for the shard to [`DoNotDisturb`].
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::OnlineStatus;
+ ///
+ /// shard.set_status(OnlineStatus::DoNotDisturb);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
+ /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
+ pub fn set_status(&self, mut online_status: OnlineStatus) {
+ if online_status == OnlineStatus::Offline {
+ online_status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetStatus(online_status));
+ }
+
+ /// Shuts down the websocket by attempting to cleanly close the
+ /// connection.
+ pub fn shutdown_clean(&self) {
+ let _ = self.send(ShardRunnerMessage::Close(1000, None));
+ }
+
+ /// Sends a raw message over the WebSocket.
+ ///
+ /// The given message is not mutated in any way, and is sent as-is.
+ ///
+ /// You should only use this if you know what you're doing. If you're
+ /// wanting to, for example, send a presence update, prefer the usage of
+ /// the [`set_presence`] method.
+ ///
+ /// [`set_presence`]: #method.set_presence
+ pub fn websocket_message(&self, message: OwnedMessage) {
+ let _ = self.send(ShardRunnerMessage::Message(message));
+ }
+
+ #[inline]
+ fn send(&self, msg: ShardRunnerMessage)
+ -> Result<(), SendError<ShardClientMessage>> {
+ self.tx.send(ShardClientMessage::Runner(msg))
+ }
+}
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
index cb3f749..bd02532 100644
--- a/src/client/bridge/gateway/shard_queuer.rs
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -27,20 +27,69 @@ use framework::Framework;
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
+ /// A copy of [`Client::data`] to be given to runners for contextual
+ /// dispatching.
+ ///
+ /// [`Client::data`]: ../../struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
+ /// A reference to an `EventHandler`, such as the one given to the
+ /// [`Client`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
pub event_handler: Arc<H>,
+ /// A copy of the framework
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ /// The instant that a shard was last started.
+ ///
+ /// This is used to determine how long to wait between shard IDENTIFYs.
pub last_start: Option<Instant>,
+ /// A copy of the sender channel to communicate with the
+ /// [`ShardManagerMonitor`].
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
+ /// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// A receiver channel for the shard queuer to be told to start shards.
pub rx: Receiver<ShardQueuerMessage>,
+ /// A copy of a threadpool to give shard runners.
+ ///
+ /// For example, when using the [`Client`], this will be a copy of
+ /// [`Client::threadpool`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool
pub threadpool: ThreadPool,
+ /// A copy of the token to connect with.
pub token: Arc<Mutex<String>>,
+ /// A copy of the URI to use to connect to the gateway.
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
+ /// Begins the shard queuer loop.
+ ///
+ /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s,
+ /// blocking for messages on what to do.
+ ///
+ /// If a [`ShardQueuerMessage::Start`] is received, this will:
+ ///
+ /// 1. Check how much time has passed since the last shard was started
+ /// 2. If the amount of time is less than the ratelimit, it will sleep until
+ /// that time has passed
+ /// 3. Start the shard by ID
+ ///
+ /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and
+ /// the loop will be over.
+ ///
+ /// **Note**: This should be run in its own thread due to the blocking
+ /// nature of the loop.
+ ///
+ /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html
+ /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown
+ /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
+ /// [`rx`]: #structfield.rx
pub fn run(&mut self) {
while let Ok(msg) = self.rx.recv() {
match msg {
@@ -80,16 +129,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
+
let shard = Shard::new(
Arc::clone(&self.ws_url),
Arc::clone(&self.token),
shard_info,
)?;
- let locked = Arc::new(Mutex::new(shard));
let mut runner = feature_framework! {{
ShardRunner::new(
- Arc::clone(&locked),
+ shard,
self.manager_tx.clone(),
Arc::clone(&self.framework),
Arc::clone(&self.data),
@@ -98,7 +147,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
)
} else {
ShardRunner::new(
- locked.clone(),
+ shard,
self.manager_tx.clone(),
self.data.clone(),
self.event_handler.clone(),
@@ -108,7 +157,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
let runner_info = ShardRunnerInfo {
runner_tx: runner.runner_tx(),
- shard: locked,
};
thread::spawn(move || {
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index b14e48b..a5fde3d 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -1,35 +1,45 @@
+use gateway::{Shard, ShardAction};
use internal::prelude::*;
use internal::ws_impl::ReceiverExt;
use model::event::{Event, GatewayEvent};
use parking_lot::Mutex;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use super::super::super::{EventHandler, dispatch};
-use super::{LockedShard, ShardId, ShardManagerMessage};
+use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage};
use threadpool::ThreadPool;
use typemap::ShareMap;
+use websocket::message::{CloseData, OwnedMessage};
use websocket::WebSocketError;
#[cfg(feature = "framework")]
use framework::Framework;
+#[cfg(feature = "voice")]
+use internal::ws_impl::SenderExt;
+/// A runner for managing a [`Shard`] and its respective WebSocket client.
+///
+/// [`Shard`]: ../../../gateway/struct.Shard.html
pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
#[cfg(feature = "framework")]
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
manager_tx: Sender<ShardManagerMessage>,
- runner_rx: Receiver<ShardManagerMessage>,
- runner_tx: Sender<ShardManagerMessage>,
- shard: LockedShard,
- shard_info: [u64; 2],
+ // channel to receive messages from the shard manager and dispatches
+ runner_rx: Receiver<ShardClientMessage>,
+ // channel to send messages to the shard runner from the shard manager
+ runner_tx: Sender<ShardClientMessage>,
+ shard: Shard,
threadpool: ThreadPool,
}
impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
+ /// Creates a new runner for a Shard.
+ #[allow(too_many_arguments)]
#[cfg(feature = "framework")]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
data: Arc<Mutex<ShareMap>>,
@@ -37,7 +47,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -47,21 +56,20 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
framework,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Creates a new runner for a Shard.
#[cfg(not(feature = "framework"))]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -70,90 +78,276 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
event_handler,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Starts the runner's loop to receive events.
+ ///
+ /// This runs a loop that performs the following in each iteration:
+ ///
+ /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the
+ /// [`ShardManager`], and if there is one, acts on it.
+ ///
+ /// 2. checks if a heartbeat should be sent to the discord Gateway, and if
+ /// so, sends one.
+ ///
+ /// 3. attempts to retrieve a message from the WebSocket, processing it into
+ /// a [`GatewayEvent`]. This will block for 100ms before assuming there is
+ /// no message available.
+ ///
+ /// 4. Checks with the [`Shard`] to determine if the gateway event is
+ /// specifying an action to take (e.g. resuming, reconnecting, heartbeating)
+ /// and then performs that action, if any.
+ ///
+ /// 5. Dispatches the event via the Client.
+ ///
+ /// 6. Go back to 1.
+ ///
+ /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html
+ /// [`Shard`]: ../../../gateway/struct.Shard.html
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html
pub fn run(&mut self) -> Result<()> {
- debug!("[ShardRunner {:?}] Running", self.shard_info);
+ debug!("[ShardRunner {:?}] Running", self.shard.shard_info());
loop {
- {
- let mut shard = self.shard.lock();
- let incoming = self.runner_rx.try_recv();
+ if !self.recv()? {
+ return Ok(());
+ }
- // Check for an incoming message over the runner channel.
- //
- // If the message is to shutdown, first verify the ID so we know
- // for certain this runner is to shutdown.
- if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
- if id.0 == self.shard_info[0] {
- let _ = shard.shutdown_clean();
+ // check heartbeat
+ if let Err(why) = self.shard.check_heartbeat() {
+ warn!(
+ "[ShardRunner {:?}] Error heartbeating: {:?}",
+ self.shard.shard_info(),
+ why,
+ );
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+
+ return self.request_restart();
+ }
- return Ok(());
+ #[cfg(feature = "voice")]
+ {
+ for message in self.shard.cycle_voice_recv() {
+ if let Err(why) = self.shard.client.send_json(&message) {
+ println!("Err sending from voice over WS: {:?}", why);
}
}
+ }
- if let Err(why) = shard.check_heartbeat() {
- error!("Failed to heartbeat and reconnect: {:?}", why);
+ let (event, action, successful) = self.recv_event();
- return self.request_restart();
- }
-
- #[cfg(feature = "voice")]
- {
- shard.cycle_voice_recv();
- }
+ if let Some(action) = action {
+ let _ = self.action(action);
}
- let (event, successful) = self.recv_event();
-
if let Some(event) = event {
- let data = Arc::clone(&self.data);
- let event_handler = Arc::clone(&self.event_handler);
- let shard = Arc::clone(&self.shard);
-
- feature_framework! {{
- let framework = Arc::clone(&self.framework);
-
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- framework,
- data,
- event_handler,
- );
- });
- } else {
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- data,
- event_handler,
- );
- });
- }}
+ self.dispatch(event);
}
- if !successful && !self.shard.lock().stage().is_connecting() {
+ if !successful && !self.shard.stage().is_connecting() {
return self.request_restart();
}
}
}
- pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> {
+ /// Clones the internal copy of the Sender to the shard runner.
+ pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> {
self.runner_tx.clone()
}
+ /// Takes an action that a [`Shard`] has determined should happen and then
+ /// does it.
+ ///
+ /// For example, if the shard says that an Identify message needs to be
+ /// sent, this will do that.
+ ///
+ /// # Errors
+ ///
+ /// Returns
+ fn action(&mut self, action: ShardAction) -> Result<()> {
+ match action {
+ ShardAction::Autoreconnect => self.shard.autoreconnect(),
+ ShardAction::Heartbeat => self.shard.heartbeat(),
+ ShardAction::Identify => self.shard.identify(),
+ ShardAction::Reconnect => self.shard.reconnect(),
+ ShardAction::Resume => self.shard.resume(),
+ }
+ }
+
+ // Checks if the ID received to shutdown is equivalent to the ID of the
+ // shard this runner is responsible. If so, it shuts down the WebSocket
+ // client.
+ //
+ // Returns whether the WebSocket client is still active.
+ //
+ // If true, the WebSocket client was _not_ shutdown. If false, it was.
+ fn checked_shutdown(&mut self, id: ShardId) -> bool {
+ // First verify the ID so we know for certain this runner is
+ // to shutdown.
+ if id.0 != self.shard.shard_info()[0] {
+ // Not meant for this runner for some reason, don't
+ // shutdown.
+ return true;
+ }
+
+ let close_data = CloseData::new(1000, String::new());
+ let msg = OwnedMessage::Close(Some(close_data));
+ let _ = self.shard.client.send_message(&msg);
+
+ false
+ }
+
+ fn dispatch(&self, event: Event) {
+ let data = Arc::clone(&self.data);
+ let event_handler = Arc::clone(&self.event_handler);
+ let runner_tx = self.runner_tx.clone();
+ let shard_id = self.shard.shard_info()[0];
+
+ feature_framework! {{
+ let framework = Arc::clone(&self.framework);
+
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ framework,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ } else {
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ }}
+ }
+
+ // Handles a received value over the shard runner rx channel.
+ //
+ // Returns a boolean on whether the shard runner can continue.
+ //
+ // This always returns true, except in the case that the shard manager asked
+ // the runner to shutdown.
+ fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool {
+ match value {
+ ShardClientMessage::Manager(x) => match x {
+ ShardManagerMessage::Restart(id) |
+ ShardManagerMessage::Shutdown(id) => {
+ self.checked_shutdown(id)
+ },
+ ShardManagerMessage::ShutdownAll => {
+ // This variant should never be received.
+ warn!(
+ "[ShardRunner {:?}] Received a ShutdownAll?",
+ self.shard.shard_info(),
+ );
+
+ true
+ },
+ }
+ ShardClientMessage::Runner(x) => match x {
+ ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => {
+ self.shard.chunk_guilds(
+ guild_ids,
+ limit,
+ query.as_ref().map(String::as_str),
+ ).is_ok()
+ },
+ ShardRunnerMessage::Close(code, reason) => {
+ let reason = reason.unwrap_or_else(String::new);
+ let data = CloseData::new(code, reason);
+ let msg = OwnedMessage::Close(Some(data));
+
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::Message(msg) => {
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::SetGame(game) => {
+ // To avoid a clone of `game`, we do a little bit of
+ // trickery here:
+ //
+ // First, we obtain a reference to the current presence of
+ // the shard, and create a new presence tuple of the new
+ // game we received over the channel as well as the online
+ // status that the shard already had.
+ //
+ // We then (attempt to) send the websocket message with the
+ // status update, expressively returning:
+ //
+ // - whether the message successfully sent
+ // - the original game we received over the channel
+ self.shard.set_game(game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetPresence(status, game) => {
+ self.shard.set_presence(status, game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetStatus(status) => {
+ self.shard.set_status(status);
+
+ self.shard.update_presence().is_ok()
+ },
+ },
+ }
+ }
+
+ // Receives values over the internal shard runner rx channel and handles
+ // them.
+ //
+ // This will loop over values until there is no longer one.
+ //
+ // Requests a restart if the sending half of the channel disconnects. This
+ // should _never_ happen, as the sending half is kept on the runner.
+
+ // Returns whether the shard runner is in a state that can continue.
+ fn recv(&mut self) -> Result<bool> {
+ loop {
+ match self.runner_rx.try_recv() {
+ Ok(value) => {
+ if !self.handle_rx_value(value) {
+ return Ok(false);
+ }
+ },
+ Err(TryRecvError::Disconnected) => {
+ warn!(
+ "[ShardRunner {:?}] Sending half DC; restarting",
+ self.shard.shard_info(),
+ );
+
+ let _ = self.request_restart();
+
+ return Ok(false);
+ },
+ Err(TryRecvError::Empty) => break,
+ }
+ }
+
+ // There are no longer any values available.
+
+ Ok(true)
+ }
+
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
- fn recv_event(&mut self) -> (Option<Event>, bool) {
- let mut shard = self.shard.lock();
-
- let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) {
+ let gw_event = match self.shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
@@ -161,58 +355,69 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- return (None, true);
+ {
+ let last = self.shard.last_heartbeat_ack();
+ let interval = self.shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
+
+ if seconds_passed <= interval_in_secs * 2 {
+ return (None, None, true);
+ }
+ } else {
+ return (None, None, true);
}
- } else {
- return (None, true);
}
debug!("Attempting to auto-reconnect");
- if let Err(why) = shard.autoreconnect() {
+ if let Err(why) = self.shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
}
- return (None, true);
+ return (None, None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
- return (None, false);
+ return (None, None, false);
},
other => other,
};
let event = match gw_event {
Ok(Some(event)) => Ok(event),
- Ok(None) => return (None, true),
+ Ok(None) => return (None, None, true),
Err(why) => Err(why),
};
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => return (None, true),
+ let action = match self.shard.handle_event(&event) {
+ Ok(Some(action)) => Some(action),
+ Ok(None) => None,
Err(why) => {
error!("Shard handler received err: {:?}", why);
- return (None, true);
+ return (None, None, true);
},
- };
+ };
+
+ let event = match event {
+ Ok(GatewayEvent::Dispatch(_, event)) => Some(event),
+ _ => None,
+ };
- (Some(event), true)
+ (event, action, true)
}
fn request_restart(&self) -> Result<()> {
- debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
- let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+ let shard_id = ShardId(self.shard.shard_info()[0]);
+ let msg = ShardManagerMessage::Restart(shard_id);
let _ = self.manager_tx.send(msg);
Ok(())
diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs
new file mode 100644
index 0000000..e6458eb
--- /dev/null
+++ b/src/client/bridge/gateway/shard_runner_message.rs
@@ -0,0 +1,43 @@
+use model::{Game, GuildId, OnlineStatus};
+use websocket::message::OwnedMessage;
+
+/// A message to send from a shard over a WebSocket.
+pub enum ShardRunnerMessage {
+ /// Indicates that the client is to send a member chunk message.
+ ChunkGuilds {
+ /// The IDs of the [`Guild`]s to chunk.
+ ///
+ /// [`Guild`]: ../../../model/struct.Guild.html
+ guild_ids: Vec<GuildId>,
+ /// The maximum number of members to receive [`GuildMembersChunkEvent`]s
+ /// for.
+ ///
+ /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html
+ limit: Option<u16>,
+ /// Text to filter members by.
+ ///
+ /// For example, a query of `"s"` will cause only [`Member`]s whose
+ /// usernames start with `"s"` to be chunked.
+ ///
+ /// [`Member`]: ../../../model/struct.Member.html
+ query: Option<String>,
+ },
+ /// Indicates that the client is to close with the given status code and
+ /// reason.
+ ///
+ /// You should rarely - if _ever_ - need this, but the option is available.
+ /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you
+ /// are intending to send a 1000 close code.
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Close(u16, Option<String>),
+ /// Indicates that the client is to send a custom WebSocket message.
+ Message(OwnedMessage),
+ /// Indicates that the client is to update the shard's presence's game.
+ SetGame(Option<Game>),
+ /// Indicates that the client is to update the shard's presence in its
+ /// entirity.
+ SetPresence(OnlineStatus, Option<Game>),
+ /// Indicates that the client is to update the shard's presence's status.
+ SetStatus(OnlineStatus),
+}
diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs
index 4f27526..ea18d73 100644
--- a/src/client/bridge/mod.rs
+++ b/src/client/bridge/mod.rs
@@ -1 +1,10 @@
+//! A collection of bridged support between the [`client`] module and other
+//! modules.
+//!
+//! **Warning**: You likely _do not_ need to mess with anything in here. Beware.
+//! This is lower-level functionality abstracted by the [`Client`].
+//!
+//! [`Client`]: ../struct.Client.html
+//! [`client`]: ../
+
pub mod gateway;
diff --git a/src/client/context.rs b/src/client/context.rs
index a60aaa0..9b89cde 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -1,7 +1,7 @@
-use Result;
+use client::bridge::gateway::{ShardClientMessage, ShardMessenger};
+use std::sync::mpsc::Sender;
use std::sync::Arc;
use typemap::ShareMap;
-use gateway::Shard;
use model::*;
use parking_lot::Mutex;
@@ -12,7 +12,7 @@ use internal::prelude::*;
#[cfg(feature = "builder")]
use builder::EditProfile;
#[cfg(feature = "builder")]
-use {http, utils};
+use {Result, http, utils};
#[cfg(feature = "builder")]
use std::collections::HashMap;
@@ -38,21 +38,23 @@ pub struct Context {
///
/// [`Client::data`]: struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
- /// The associated shard which dispatched the event handler.
- ///
- /// Note that if you are sharding, in relevant terms, this is the shard
- /// which received the event being dispatched.
- pub shard: Arc<Mutex<Shard>>,
+ /// The messenger to communicate with the shard runner.
+ pub shard: ShardMessenger,
+ /// The ID of the shard this context is related to.
+ pub shard_id: u64,
}
impl Context {
/// Create a new Context to be passed to an event handler.
- pub(crate) fn new(shard: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>)
- -> Context {
+ pub(crate) fn new(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+ ) -> Context {
Context {
+ shard: ShardMessenger::new(runner_tx),
+ shard_id,
data,
- shard,
}
}
@@ -110,6 +112,7 @@ impl Context {
http::edit_profile(&edited)
}
+
/// Sets the current user as being [`Online`]. This maintains the current
/// game.
///
@@ -137,9 +140,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn online(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Online);
+ self.shard.set_status(OnlineStatus::Online);
}
/// Sets the current user as being [`Idle`]. This maintains the current
@@ -168,9 +171,9 @@ impl Context {
/// ```
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn idle(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Idle);
+ self.shard.set_status(OnlineStatus::Idle);
}
/// Sets the current user as being [`DoNotDisturb`]. This maintains the
@@ -199,9 +202,9 @@ impl Context {
/// ```
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ #[inline]
pub fn dnd(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::DoNotDisturb);
+ self.shard.set_status(OnlineStatus::DoNotDisturb);
}
/// Sets the current user as being [`Invisible`]. This maintains the current
@@ -231,9 +234,9 @@ impl Context {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
+ #[inline]
pub fn invisible(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Invisible);
+ self.shard.set_status(OnlineStatus::Invisible);
}
/// "Resets" the current user's presence, by setting the game to `None` and
@@ -265,9 +268,9 @@ impl Context {
/// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
+ #[inline]
pub fn reset_presence(&self) {
- let mut shard = self.shard.lock();
- shard.set_presence(None, OnlineStatus::Online);
+ self.shard.set_presence(None, OnlineStatus::Online);
}
/// Sets the current game, defaulting to an online status of [`Online`].
@@ -303,9 +306,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn set_game(&self, game: Game) {
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current game, passing in only its name. This will automatically
@@ -351,8 +354,7 @@ impl Context {
url: None,
};
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current user's presence, providing all fields to be passed.
@@ -406,9 +408,9 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn set_presence(&self, game: Option<Game>, status: OnlineStatus) {
- let mut shard = self.shard.lock();
- shard.set_presence(game, status);
+ self.shard.set_presence(game, status);
}
/// Disconnects the shard from the websocket, essentially "quiting" it.
@@ -417,9 +419,8 @@ impl Context {
/// until [`Client::start`] and vice versa are called again.
///
/// [`Client::start`]: ./struct.Client.html#method.start
- pub fn quit(&self) -> Result<()> {
- let mut shard = self.shard.lock();
-
- shard.shutdown_clean()
+ #[inline]
+ pub fn quit(&self) {
+ self.shard.shutdown_clean();
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 827875e..9545a6f 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,9 +1,10 @@
use std::sync::Arc;
use parking_lot::Mutex;
+use super::bridge::gateway::ShardClientMessage;
use super::event_handler::EventHandler;
use super::Context;
+use std::sync::mpsc::Sender;
use typemap::ShareMap;
-use gateway::Shard;
use model::event::Event;
use model::{Channel, Message};
@@ -35,19 +36,26 @@ macro_rules! now {
() => (Utc::now().time().second() * 1000)
}
-fn context(conn: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>) -> Context {
- Context::new(conn, data)
+fn context(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) -> Context {
+ Context::new(data, runner_tx, shard_id)
}
#[cfg(feature = "framework")]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(
context.clone(),
event.message.clone(),
@@ -58,21 +66,24 @@ pub fn dispatch<H: EventHandler + 'static>(event: Event,
framework.dispatch(context, event.message);
}
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
#[cfg(not(feature = "framework"))]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(context, event.message, event_handler);
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
@@ -91,10 +102,13 @@ fn dispatch_message<H>(
}
#[allow(cyclomatic_complexity, unused_assignments, unused_mut)]
-fn handle_event<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+fn handle_event<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
#[cfg(feature = "cache")]
let mut last_guild_create_time = now!();
@@ -113,7 +127,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
// This different channel_create dispatching is only due to the fact that
// each time the bot receives a dm, this event is also fired.
@@ -134,7 +148,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelDelete(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
match event.channel {
Channel::Private(_) | Channel::Group(_) => {},
@@ -147,28 +161,28 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
},
Event::ChannelPinsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_pins_update(context, event);
},
Event::ChannelRecipientAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_addition(context, event.channel_id, event.user);
},
Event::ChannelRecipientRemove(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_removal(context, event.channel_id, event.user);
},
Event::ChannelUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read().channel(event.channel.id());
@@ -178,12 +192,12 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::GuildBanAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_addition(context, event.guild_id, event.user);
},
Event::GuildBanRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_removal(context, event.guild_id, event.user);
},
@@ -204,7 +218,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let cache = CACHE.read();
if cache.unavailable_guilds.is_empty() {
- let context = context(Arc::clone(&conn), Arc::clone(&data));
+ let context = context(Arc::clone(&data), runner_tx.clone(), shard_id);
let guild_amount = cache
.guilds
@@ -216,7 +230,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
}
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_create(context, event.guild, _is_new);
@@ -226,7 +240,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildDelete(mut event) => {
let _full = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_delete(context, event.guild, _full);
@@ -237,25 +251,25 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildEmojisUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_emojis_update(context, event.guild_id, event.emojis);
},
Event::GuildIntegrationsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_integrations_update(context, event.guild_id);
},
Event::GuildMemberAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_member_addition(context, event.guild_id, event.member);
},
Event::GuildMemberRemove(mut event) => {
let _member = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_member_removal(context, event.guild_id, event.user, _member);
@@ -265,7 +279,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildMemberUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
// This is safe to unwrap, as the update would have created
@@ -284,20 +298,20 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildMembersChunk(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_members_chunk(context, event.guild_id, event.members);
},
Event::GuildRoleCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_role_create(context, event.guild_id, event.role);
},
Event::GuildRoleDelete(mut event) => {
let _role = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role);
@@ -307,7 +321,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildRoleUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_update(context, event.guild_id, _before, event.role);
@@ -318,14 +332,14 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildUnavailable(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_unavailable(context, event.guild_id);
},
Event::GuildUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read()
@@ -341,46 +355,46 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
// Already handled by the framework check macro
Event::MessageCreate(_) => {},
Event::MessageDeleteBulk(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete_bulk(context, event.channel_id, event.ids);
},
Event::MessageDelete(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete(context, event.channel_id, event.message_id);
},
Event::MessageUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_update(context, event);
},
Event::PresencesReplace(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_replace(context, event.presences);
},
Event::PresenceUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_update(context, event);
},
Event::ReactionAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_add(context, event.reaction);
},
Event::ReactionRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove(context, event.reaction);
},
Event::ReactionRemoveAll(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove_all(context, event.channel_id, event.message_id);
},
@@ -393,35 +407,35 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let _ = wait_for_guilds()
.map(move |_| {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
});
} else {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
}
}
},
Event::Resumed(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.resume(context, event);
},
Event::TypingStart(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.typing_start(context, event);
},
Event::Unknown(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.unknown(context, event.kind, event.value);
},
Event::UserUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.user_update(context, _before.unwrap(), event.current_user);
@@ -430,19 +444,19 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::VoiceServerUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_server_update(context, event);
},
Event::VoiceStateUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_state_update(context, event.guild_id, event.voice_state);
},
Event::WebhookUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.webhook_update(context, event.guild_id, event.channel_id);
},
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 00a305a..2a97c91 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -37,13 +37,11 @@ pub use http as rest;
#[cfg(feature = "cache")]
pub use CACHE;
-use self::bridge::gateway::{ShardId, ShardManager, ShardRunnerInfo};
+use self::bridge::gateway::{ShardManager, ShardManagerMonitor};
use self::dispatch::dispatch;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::mem;
use threadpool::ThreadPool;
use typemap::ShareMap;
use http;
@@ -103,8 +101,7 @@ impl CloseHandle {
/// [`on_message`]: #method.on_message
/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate
/// [sharding docs]: gateway/index.html#sharding
-#[derive(Clone)]
-pub struct Client<H: EventHandler + Send + Sync + 'static> {
+pub struct Client {
/// A ShareMap which requires types to be Send + Sync. This is a map that
/// can be safely shared across contexts.
///
@@ -191,7 +188,6 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`on_ready`]: #method.on_ready
- event_handler: Arc<H>,
#[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
/// A HashMap of all shards instantiated by the Client.
///
@@ -224,12 +220,12 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?;
///
- /// let shard_runners = client.shard_runners.clone();
+ /// let shard_manager = client.shard_manager.clone();
///
/// thread::spawn(move || {
/// loop {
/// println!("Shard count instantiated: {}",
- /// shard_runners.lock().len());
+ /// shard_manager.lock().shards_instantiated().len());
///
/// thread::sleep(Duration::from_millis(5000));
/// }
@@ -244,16 +240,26 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Client::start_shard`]: #method.start_shard
/// [`Client::start_shards`]: #method.start_shards
- pub shard_runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ pub shard_manager: Arc<Mutex<ShardManager>>,
+ shard_manager_worker: ShardManagerMonitor,
/// The threadpool shared by all shards.
///
/// Defaults to 5 threads, which should suffice small bots. Consider
/// increasing this number as your bot grows.
pub threadpool: ThreadPool,
- token: Arc<Mutex<String>>,
+ /// The token in use by the client.
+ pub token: Arc<Mutex<String>>,
+ /// URI that the client's shards will use to connect to the gateway.
+ ///
+ /// This is likely not important for production usage and is, at best, used
+ /// for debugging.
+ ///
+ /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated
+ /// value available.
+ pub ws_uri: Arc<Mutex<String>>,
}
-impl<H: EventHandler + Send + Sync + 'static> Client<H> {
+impl Client {
/// Creates a Client for a bot user.
///
/// Discord has a requirement of prefixing bot tokens with `"Bot "`, which
@@ -283,7 +289,8 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(token: &str, handler: H) -> Result<Self> {
+ pub fn new<H>(token: &str, handler: H) -> Result<Self>
+ where H: EventHandler + Send + Sync + 'static {
let token = if token.starts_with("Bot ") {
token.to_string()
} else {
@@ -295,23 +302,53 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let name = "serenity client".to_owned();
let threadpool = ThreadPool::with_name(name, 5);
+ let url = Arc::new(Mutex::new(http::get_gateway()?.url));
+ let data = Arc::new(Mutex::new(ShareMap::custom()));
+ let event_handler = Arc::new(handler);
Ok(feature_framework! {{
+ let framework = Arc::new(Mutex::new(None));
+
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ Arc::clone(&locked),
+ Arc::clone(&data),
+ Arc::clone(&event_handler),
+ Arc::clone(&framework),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
framework: Arc::new(Mutex::new(None)),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
} else {
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ locked.clone(),
+ data.clone(),
+ Arc::clone(&event_handler),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
}})
}
@@ -462,7 +499,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
///
/// [gateway docs]: gateway/index.html#sharding
pub fn start(&mut self) -> Result<()> {
- self.start_connection([0, 0, 1], http::get_gateway()?.url)
+ self.start_connection([0, 0, 1])
}
/// Establish the connection(s) and start listening for events.
@@ -514,15 +551,13 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
/// [gateway docs]: gateway/index.html#sharding
pub fn start_autosharded(&mut self) -> Result<()> {
- let mut res = http::get_bot_gateway()?;
-
- let x = res.shards as u64 - 1;
- let y = res.shards as u64;
- let url = mem::replace(&mut res.url, String::default());
+ let (x, y) = {
+ let res = http::get_bot_gateway()?;
- drop(res);
+ (res.shards as u64 - 1, res.shards as u64)
+ };
- self.start_connection([0, x, y], url)
+ self.start_connection([0, x, y])
}
/// Establish a sharded connection and start listening for events.
@@ -603,7 +638,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_autosharded`]: #method.start_autosharded
/// [gateway docs]: gateway/index.html#sharding
pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> {
- self.start_connection([shard, shard, shards], http::get_gateway()?.url)
+ self.start_connection([shard, shard, shards])
}
/// Establish sharded connections and start listening for events.
@@ -657,10 +692,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shard_range`]: #method.start_shard_range
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shards(&mut self, total_shards: u64) -> Result<()> {
- self.start_connection(
- [0, total_shards - 1, total_shards],
- http::get_gateway()?.url,
- )
+ self.start_connection([0, total_shards - 1, total_shards])
}
/// Establish a range of sharded connections and start listening for events.
@@ -730,7 +762,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shards`]: #method.start_shards
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> {
- self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url)
+ self.start_connection([range[0], range[1], total_shards])
}
/// Returns a thread-safe handle for closing shards.
@@ -749,7 +781,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
// an error.
//
// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
- fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> {
+ fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> {
HANDLE_STILL.store(true, Ordering::Relaxed);
// Update the framework's current user if the feature is enabled.
@@ -764,39 +796,37 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
}
}
- let gateway_url = Arc::new(Mutex::new(url));
+ {
+ let mut manager = self.shard_manager.lock();
+
+ let init = shard_data[1] - shard_data[0] + 1;
- let mut manager = ShardManager::new(
- shard_data[0],
- shard_data[1] - shard_data[0] + 1,
- shard_data[2],
- Arc::clone(&gateway_url),
- Arc::clone(&self.token),
- Arc::clone(&self.data),
- Arc::clone(&self.event_handler),
- #[cfg(feature = "framework")]
- Arc::clone(&self.framework),
- self.threadpool.clone(),
- );
+ manager.set_shards(shard_data[0], init, shard_data[2]);
- self.shard_runners = Arc::clone(&manager.runners);
+ debug!(
+ "Initializing shard info: {} - {}/{}",
+ shard_data[0],
+ init,
+ shard_data[2],
+ );
- if let Err(why) = manager.initialize() {
- error!("Failed to boot a shard: {:?}", why);
- info!("Shutting down all shards");
+ if let Err(why) = manager.initialize() {
+ error!("Failed to boot a shard: {:?}", why);
+ info!("Shutting down all shards");
- manager.shutdown_all();
+ manager.shutdown_all();
- return Err(Error::Client(ClientError::ShardBootFailure));
+ return Err(Error::Client(ClientError::ShardBootFailure));
+ }
}
- manager.run();
+ self.shard_manager_worker.run();
Err(Error::Client(ClientError::Shutdown))
}
}
-impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
+impl Drop for Client {
fn drop(&mut self) { self.close_handle().close(); }
}