aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client/bridge/gateway/mod.rs114
-rw-r--r--src/client/bridge/gateway/shard_manager.rs283
-rw-r--r--src/client/bridge/gateway/shard_manager_monitor.rs54
-rw-r--r--src/client/bridge/gateway/shard_messenger.rs276
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs56
-rw-r--r--src/client/bridge/gateway/shard_runner.rs385
-rw-r--r--src/client/bridge/gateway/shard_runner_message.rs43
-rw-r--r--src/client/bridge/mod.rs9
-rw-r--r--src/client/context.rs65
-rw-r--r--src/client/dispatch.rs132
-rw-r--r--src/client/mod.rs142
-rw-r--r--src/gateway/mod.rs17
-rw-r--r--src/gateway/shard.rs889
-rw-r--r--src/gateway/ws_client_ext.rs133
14 files changed, 1717 insertions, 881 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs
index 0b873aa..752b015 100644
--- a/src/client/bridge/gateway/mod.rs
+++ b/src/client/bridge/gateway/mod.rs
@@ -1,28 +1,113 @@
+//! The client gateway bridge is support essential for the [`client`] module.
+//!
+//! This is made available for user use if one wishes to be lower-level or avoid
+//! the higher functionality of the [`Client`].
+//!
+//! Of interest are three pieces:
+//!
+//! ### [`ShardManager`]
+//!
+//! The shard manager is responsible for being a clean interface between the
+//! user and the shard runners, providing essential functions such as
+//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`]
+//! to restart a shard.
+//!
+//! If you are using the `Client`, this is likely the only piece of interest to
+//! you. Refer to [its documentation][`ShardManager`] for more information.
+//!
+//! ### [`ShardQueuer`]
+//!
+//! The shard queuer is a light wrapper around an mpsc receiver that receives
+//! [`ShardManagerMessage`]s. It should be run in its own thread so it can
+//! receive messages to start shards in a queue.
+//!
+//! Refer to [its documentation][`ShardQueuer`] for more information.
+//!
+//! ### [`ShardRunner`]
+//!
+//! The shard runner is responsible for actually running a shard and
+//! communicating with its respective WebSocket client.
+//!
+//! It is performs all actions such as sending a presence update over the client
+//! and, with the help of the [`Shard`], will be able to determine what to do.
+//! This is, for example, whether to reconnect, resume, or identify with the
+//! gateway.
+//!
+//! ### In Conclusion
+//!
+//! For almost every - if not every - use case, you only need to _possibly_ be
+//! concerned about the [`ShardManager`] in this module.
+//!
+//! [`Client`]: ../../struct.Client.html
+//! [`client`]: ../..
+//! [`Shard`]: ../../../gateway/struct.Shard.html
+//! [`ShardManager`]: struct.ShardManager.html
+//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart
+//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown
+//! [`ShardQueuer`]: struct.ShardQueuer.html
+//! [`ShardRunner`]: struct.ShardRunner.html
+
mod shard_manager;
+mod shard_manager_monitor;
+mod shard_messenger;
mod shard_queuer;
mod shard_runner;
+mod shard_runner_message;
pub use self::shard_manager::ShardManager;
+pub use self::shard_manager_monitor::ShardManagerMonitor;
+pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::ShardQueuer;
pub use self::shard_runner::ShardRunner;
+pub use self::shard_runner_message::ShardRunnerMessage;
-use gateway::Shard;
-use parking_lot::Mutex;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::mpsc::Sender;
-use std::sync::Arc;
-type Parked<T> = Arc<Mutex<T>>;
-type LockedShard = Parked<Shard>;
+/// A message either for a [`ShardManager`] or a [`ShardRunner`].
+///
+/// [`ShardManager`]: struct.ShardManager.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+pub enum ShardClientMessage {
+ /// A message intended to be worked with by a [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Manager(ShardManagerMessage),
+ /// A message intended to be worked with by a [`ShardRunner`].
+ ///
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ Runner(ShardRunnerMessage),
+}
+/// A message for a [`ShardManager`] relating to an operation with a shard.
+///
+/// [`ShardManager`]: struct.ShardManager.html
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub enum ShardManagerMessage {
+ /// Indicator that a [`ShardManagerMonitor`] should restart a shard.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Restart(ShardId),
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard
+ /// without bringing it back up.
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Shutdown(ShardId),
- #[allow(dead_code)]
+ /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards
+ /// and end its monitoring process for the [`ShardManager`].
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
ShutdownAll,
}
+/// A message to be sent to the [`ShardQueuer`].
+///
+/// This should usually be wrapped in a [`ShardClientMessage`].
+///
+/// [`ShardClientMessage`]: enum.ShardClientMessage.html
+/// [`ShardQueuer`]: enum.ShardQueuer.html
+#[derive(Clone, Debug)]
pub enum ShardQueuerMessage {
/// Message to start a shard, where the 0-index element is the ID of the
/// Shard to start and the 1-index element is the total shards in use.
@@ -31,8 +116,8 @@ pub enum ShardQueuerMessage {
Shutdown,
}
-// A light tuplestruct wrapper around a u64 to verify type correctness when
-// working with the IDs of shards.
+/// A light tuplestruct wrapper around a u64 to verify type correctness when
+/// working with the IDs of shards.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub struct ShardId(pub u64);
@@ -42,7 +127,16 @@ impl Display for ShardId {
}
}
+/// Information about a [`ShardRunner`].
+///
+/// The [`ShardId`] is not included because, as it stands, you probably already
+/// know the Id if you obtained this.
+///
+/// [`ShardId`]: struct.ShardId.html
+/// [`ShardRunner`]: struct.ShardRunner.html
+#[derive(Debug)]
pub struct ShardRunnerInfo {
- pub runner_tx: Sender<ShardManagerMessage>,
- pub shard: Arc<Mutex<Shard>>,
+ /// The channel used to communicate with the shard runner, telling it
+ /// what to do with regards to its status.
+ pub runner_tx: Sender<ShardClientMessage>,
}
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
index 6e3b285..2cf45fd 100644
--- a/src/client/bridge/gateway/shard_manager.rs
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -1,13 +1,15 @@
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
use super::super::super::EventHandler;
use super::{
+ ShardClientMessage,
ShardId,
ShardManagerMessage,
+ ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
@@ -18,8 +20,80 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
+/// A manager for handling the status of shards by starting them, restarting
+/// them, and stopping them when required.
+///
+/// **Note**: The [`Client`] internally uses a shard manager. If you are using a
+/// Client, then you do not need to make one of these.
+///
+/// # Examples
+///
+/// Initialize a shard manager with a framework responsible for shards 0 through
+/// 2, of 5 total shards:
+///
+/// ```rust,no_run
+/// extern crate parking_lot;
+/// extern crate serenity;
+/// extern crate threadpool;
+/// extern crate typemap;
+///
+/// # use std::error::Error;
+/// #
+/// # #[cfg(feature = "framework")]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// #
+/// use parking_lot::Mutex;
+/// use serenity::client::bridge::gateway::ShardManager;
+/// use serenity::client::EventHandler;
+/// use serenity::http;
+/// use std::sync::Arc;
+/// use std::env;
+/// use threadpool::ThreadPool;
+/// use typemap::ShareMap;
+///
+/// struct Handler;
+///
+/// impl EventHandler for Handler { }
+///
+/// let token = env::var("DISCORD_TOKEN")?;
+/// http::set_token(&token);
+/// let token = Arc::new(Mutex::new(token));
+///
+/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url));
+/// let data = Arc::new(Mutex::new(ShareMap::custom()));
+/// let event_handler = Arc::new(Handler);
+/// let framework = Arc::new(Mutex::new(None));
+/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5);
+///
+/// ShardManager::new(
+/// 0, // the shard index to start initiating from
+/// 3, // the number of shards to initiate (this initiates 0, 1, and 2)
+/// 5, // the total number of shards in use
+/// gateway_url,
+/// token,
+/// data,
+/// event_handler,
+/// framework,
+/// threadpool,
+/// );
+/// # Ok(())
+/// # }
+/// #
+/// # #[cfg(not(feature = "framework"))]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Client`]: ../../struct.Client.html
+#[derive(Debug)]
pub struct ShardManager {
- pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The shard runners currently managed.
+ runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// The index of the first shard to initialize, 0-indexed.
shard_index: u64,
/// The number of shards to initialize.
@@ -27,10 +101,11 @@ pub struct ShardManager {
/// The total shards in use, 1-indexed.
shard_total: u64,
shard_queuer: Sender<ShardQueuerMessage>,
- thread_rx: Receiver<ShardManagerMessage>,
}
impl ShardManager {
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(feature = "framework")]
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new<H>(
@@ -43,7 +118,7 @@ impl ShardManager {
event_handler: Arc<H>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
@@ -66,16 +141,22 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
}
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(not(feature = "framework"))]
pub fn new<H>(
shard_index: u64,
@@ -86,21 +167,21 @@ impl ShardManager {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
let runners = Arc::new(Mutex::new(HashMap::new()));
let mut shard_queuer = ShardQueuer {
- data: data.clone(),
- event_handler: event_handler.clone(),
+ data: Arc::clone(&data),
+ event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
- runners: runners.clone(),
+ runners: Arc::clone(&runners),
rx: shard_queue_rx,
- token: token.clone(),
- ws_url: ws_url.clone(),
+ token: Arc::clone(&token),
+ ws_url: Arc::clone(&ws_url),
threadpool,
};
@@ -108,21 +189,41 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
+ }
+
+ /// Returns whether the shard manager contains either an active instance of
+ /// a shard runner responsible for the given ID.
+ ///
+ /// If a shard has been queued but has not yet been initiated, then this
+ /// will return `false`. Consider double-checking [`is_responsible_for`] to
+ /// determine whether this shard manager is responsible for the given shard.
+ ///
+ /// [`is_responsible_for`]: #method.is_responsible_for
+ pub fn has(&self, shard_id: ShardId) -> bool {
+ self.runners.lock().contains_key(&shard_id)
}
+ /// Initializes all shards that the manager is responsible for.
+ ///
+ /// This will communicate shard boots with the [`ShardQueuer`] so that they
+ /// are properly queued.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
pub fn initialize(&mut self) -> Result<()> {
let shard_to = self.shard_index + self.shard_init;
- debug!("{}, {}", self.shard_index, self.shard_init);
-
for shard_id in self.shard_index..shard_to {
let shard_total = self.shard_total;
@@ -132,39 +233,52 @@ impl ShardManager {
Ok(())
}
- pub fn run(&mut self) {
- while let Ok(value) = self.thread_rx.recv() {
- match value {
- ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
- ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
- ShardManagerMessage::ShutdownAll => {
- self.shutdown_all();
-
- break;
- },
- }
- }
- }
-
- pub fn shutdown_all(&mut self) {
- info!("Shutting down all shards");
- let keys = {
- self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
- };
+ /// Sets the new sharding information for the manager.
+ ///
+ /// This will shutdown all existing shards.
+ ///
+ /// This will _not_ instantiate the new shards.
+ pub fn set_shards(&mut self, index: u64, init: u64, total: u64) {
+ self.shutdown_all();
- for shard_id in keys {
- self.shutdown(shard_id);
- }
+ self.shard_index = index;
+ self.shard_init = init;
+ self.shard_total = total;
}
- fn boot(&mut self, shard_info: [ShardId; 2]) {
- info!("Telling shard queuer to start shard {}", shard_info[0]);
-
- let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
- let _ = self.shard_queuer.send(msg);
- }
-
- fn restart(&mut self, shard_id: ShardId) {
+ /// Restarts a shard runner.
+ ///
+ /// This sends a shutdown signal to a shard's associated [`ShardRunner`],
+ /// and then queues a initialization of a shard runner for the same shard
+ /// via the [`ShardQueuer`].
+ ///
+ /// # Examples
+ ///
+ /// Creating a client and then restarting a shard by ID:
+ ///
+ /// _(note: in reality this precise code doesn't have an effect since the
+ /// shard would not yet have been initialized via [`initialize`], but the
+ /// concept is the same)_
+ ///
+ /// ```rust,no_run
+ /// use serenity::client::bridge::gateway::ShardId;
+ /// use serenity::client::{Client, EventHandler};
+ /// use std::env;
+ ///
+ /// struct Handler;
+ ///
+ /// impl EventHandler for Handler { }
+ ///
+ /// let token = env::var("DISCORD_TOKEN").unwrap();
+ /// let mut client = Client::new(&token, Handler).unwrap();
+ ///
+ /// // restart shard ID 7
+ /// client.shard_manager.lock().restart(ShardId(7));
+ /// ```
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn restart(&mut self, shard_id: ShardId) {
info!("Restarting shard {}", shard_id);
self.shutdown(shard_id);
@@ -173,23 +287,88 @@ impl ShardManager {
self.boot([shard_id, ShardId(shard_total)]);
}
- fn shutdown(&mut self, shard_id: ShardId) {
+ /// Returns the [`ShardId`]s of the shards that have been instantiated and
+ /// currently have a valid [`ShardRunner`].
+ ///
+ /// [`ShardId`]: struct.ShardId.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn shards_instantiated(&self) -> Vec<ShardId> {
+ self.runners.lock().keys().cloned().collect()
+ }
+
+ /// Attempts to shut down the shard runner by Id.
+ ///
+ /// Returns a boolean indicating whether a shard runner was present. This is
+ /// _not_ necessary an indicator of whether the shard runner was
+ /// successfully shut down.
+ ///
+ /// **Note**: If the receiving end of an mpsc channel - theoretically owned
+ /// by the shard runner - no longer exists, then the shard runner will not
+ /// know it should shut down. This _should never happen_. It may already be
+ /// stopped.
+ pub fn shutdown(&mut self, shard_id: ShardId) -> bool {
info!("Shutting down shard {}", shard_id);
if let Some(runner) = self.runners.lock().get(&shard_id) {
- let msg = ShardManagerMessage::Shutdown(shard_id);
+ let shutdown = ShardManagerMessage::Shutdown(shard_id);
+ let msg = ShardClientMessage::Manager(shutdown);
if let Err(why) = runner.runner_tx.send(msg) {
- warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ warn!(
+ "Failed to cleanly shutdown shard {}: {:?}",
+ shard_id,
+ why,
+ );
}
}
- self.runners.lock().remove(&shard_id);
+ self.runners.lock().remove(&shard_id).is_some()
+ }
+
+ /// Sends a shutdown message for all shards that the manager is responsible
+ /// for that are still known to be running.
+ ///
+ /// If you only need to shutdown a select number of shards, prefer looping
+ /// over the [`shutdown`] method.
+ ///
+ /// [`shutdown`]: #method.shutdown
+ pub fn shutdown_all(&mut self) {
+ let keys = {
+ let runners = self.runners.lock();
+
+ if runners.is_empty() {
+ return;
+ }
+
+ runners.keys().cloned().collect::<Vec<_>>()
+ };
+
+ info!("Shutting down all shards");
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
}
}
impl Drop for ShardManager {
+ /// A custom drop implementation to clean up after the manager.
+ ///
+ /// This shuts down all active [`ShardRunner`]s and attempts to tell the
+ /// [`ShardQueuer`] to shutdown.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
fn drop(&mut self) {
+ self.shutdown_all();
+
if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
warn!("Failed to send shutdown to shard queuer: {:?}", why);
}
diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs
new file mode 100644
index 0000000..4a64473
--- /dev/null
+++ b/src/client/bridge/gateway/shard_manager_monitor.rs
@@ -0,0 +1,54 @@
+use parking_lot::Mutex;
+use std::sync::mpsc::Receiver;
+use std::sync::Arc;
+use super::{ShardManager, ShardManagerMessage};
+
+/// The shard manager monitor does what it says on the tin -- it monitors the
+/// shard manager and performs actions on it as received.
+///
+/// The monitor is essentially responsible for running in its own thread and
+/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or
+/// shutdown everything entirely.
+///
+/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html
+#[derive(Debug)]
+pub struct ShardManagerMonitor {
+ /// An clone of the Arc to the manager itself.
+ pub manager: Arc<Mutex<ShardManager>>,
+ /// The mpsc Receiver channel to receive shard manager messages over.
+ pub rx: Receiver<ShardManagerMessage>,
+}
+
+impl ShardManagerMonitor {
+ /// "Runs" the monitor, waiting for messages over the Receiver.
+ ///
+ /// This should be called in its own thread due to its blocking, looped
+ /// nature.
+ ///
+ /// This will continue running until either:
+ ///
+ /// - a [`ShardManagerMessage::ShutdownAll`] has been received
+ /// - an error is returned while receiving a message from the
+ /// channel (probably indicating that the shard manager should stop anyway)
+ ///
+ /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll
+ pub fn run(&mut self) {
+ debug!("Starting shard manager worker");
+
+ while let Ok(value) = self.rx.recv() {
+ match value {
+ ShardManagerMessage::Restart(shard_id) => {
+ self.manager.lock().restart(shard_id);
+ },
+ ShardManagerMessage::Shutdown(shard_id) => {
+ self.manager.lock().shutdown(shard_id);
+ },
+ ShardManagerMessage::ShutdownAll => {
+ self.manager.lock().shutdown_all();
+
+ break;
+ },
+ }
+ }
+ }
+}
diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs
new file mode 100644
index 0000000..069c838
--- /dev/null
+++ b/src/client/bridge/gateway/shard_messenger.rs
@@ -0,0 +1,276 @@
+use model::{Game, GuildId, OnlineStatus};
+use super::{ShardClientMessage, ShardRunnerMessage};
+use std::sync::mpsc::{SendError, Sender};
+use websocket::message::OwnedMessage;
+
+/// A lightweight wrapper around an mpsc sender.
+///
+/// This is used to cleanly communicate with a shard's respective
+/// [`ShardRunner`]. This can be used for actions such as setting the game via
+/// [`set_game`] or shutting down via [`shutdown`].
+///
+/// [`ShardRunner`]: struct.ShardRunner.html
+/// [`set_game`]: #method.set_game
+/// [`shutdown`]: #method.shutdown
+#[derive(Clone, Debug)]
+pub struct ShardMessenger {
+ tx: Sender<ShardClientMessage>,
+}
+
+impl ShardMessenger {
+ /// Creates a new shard messenger.
+ ///
+ /// If you are using the [`Client`], you do not need to do this.
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ #[inline]
+ pub fn new(tx: Sender<ShardClientMessage>) -> Self {
+ Self {
+ tx,
+ }
+ }
+
+ /// Requests that one or multiple [`Guild`]s be chunked.
+ ///
+ /// This will ask the gateway to start sending member chunks for large
+ /// guilds (250 members+). If a guild is over 250 members, then a full
+ /// member list will not be downloaded, and must instead be requested to be
+ /// sent in "chunks" containing members.
+ ///
+ /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each
+ /// chunk only contains a partial amount of the total members.
+ ///
+ /// If the `cache` feature is enabled, the cache will automatically be
+ /// updated with member chunks.
+ ///
+ /// # Examples
+ ///
+ /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not
+ /// specifying a query parameter:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// Chunk a single guild by Id, limiting to 20 members, and specifying a
+ /// query parameter of `"do"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?;
+ /// #
+ /// use serenity::model::GuildId;
+ ///
+ /// let guild_ids = vec![GuildId(81384788765712384)];
+ ///
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`Event::GuildMembersChunk`]:
+ /// ../../model/event/enum.Event.html#variant.GuildMembersChunk
+ /// [`Guild`]: ../../model/struct.Guild.html
+ /// [`Member`]: ../../model/struct.Member.html
+ pub fn chunk_guilds<It>(
+ &self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<String>,
+ ) where It: IntoIterator<Item=GuildId> {
+ let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>();
+
+ let _ = self.send(ShardRunnerMessage::ChunkGuilds {
+ guild_ids: guilds,
+ limit,
+ query,
+ });
+ }
+
+ /// Sets the user's current game, if any.
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current game to playing `"Heroes of the Storm"`:
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::Game;
+ ///
+ /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_game(&self, game: Option<Game>) {
+ let _ = self.send(ShardRunnerMessage::SetGame(game));
+ }
+
+ /// Sets the user's full presence information.
+ ///
+ /// Consider using the individual setters if you only need to modify one of
+ /// these.
+ ///
+ /// # Examples
+ ///
+ /// Set the current user as playing `"Heroes of the Storm"` and being
+ /// online:
+ ///
+ /// ```rust,ignore
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::{Game, OnlineStatus};
+ ///
+ /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn set_presence(&self, game: Option<Game>, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetPresence(status, game));
+ }
+
+ /// Sets the user's current online status.
+ ///
+ /// Note that [`Offline`] is not a valid online status, so it is
+ /// automatically converted to [`Invisible`].
+ ///
+ /// Other presence settings are maintained.
+ ///
+ /// # Examples
+ ///
+ /// Setting the current online status for the shard to [`DoNotDisturb`].
+ ///
+ /// ```rust,no_run
+ /// # extern crate parking_lot;
+ /// # extern crate serenity;
+ /// #
+ /// # use parking_lot::Mutex;
+ /// # use serenity::client::gateway::Shard;
+ /// # use std::error::Error;
+ /// # use std::sync::Arc;
+ /// #
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// # let mutex = Arc::new(Mutex::new("".to_string()));
+ /// #
+ /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// #
+ /// use serenity::model::OnlineStatus;
+ ///
+ /// shard.set_status(OnlineStatus::DoNotDisturb);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
+ /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
+ pub fn set_status(&self, mut online_status: OnlineStatus) {
+ if online_status == OnlineStatus::Offline {
+ online_status = OnlineStatus::Invisible;
+ }
+
+ let _ = self.send(ShardRunnerMessage::SetStatus(online_status));
+ }
+
+ /// Shuts down the websocket by attempting to cleanly close the
+ /// connection.
+ pub fn shutdown_clean(&self) {
+ let _ = self.send(ShardRunnerMessage::Close(1000, None));
+ }
+
+ /// Sends a raw message over the WebSocket.
+ ///
+ /// The given message is not mutated in any way, and is sent as-is.
+ ///
+ /// You should only use this if you know what you're doing. If you're
+ /// wanting to, for example, send a presence update, prefer the usage of
+ /// the [`set_presence`] method.
+ ///
+ /// [`set_presence`]: #method.set_presence
+ pub fn websocket_message(&self, message: OwnedMessage) {
+ let _ = self.send(ShardRunnerMessage::Message(message));
+ }
+
+ #[inline]
+ fn send(&self, msg: ShardRunnerMessage)
+ -> Result<(), SendError<ShardClientMessage>> {
+ self.tx.send(ShardClientMessage::Runner(msg))
+ }
+}
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
index cb3f749..bd02532 100644
--- a/src/client/bridge/gateway/shard_queuer.rs
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -27,20 +27,69 @@ use framework::Framework;
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
+ /// A copy of [`Client::data`] to be given to runners for contextual
+ /// dispatching.
+ ///
+ /// [`Client::data`]: ../../struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
+ /// A reference to an `EventHandler`, such as the one given to the
+ /// [`Client`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
pub event_handler: Arc<H>,
+ /// A copy of the framework
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ /// The instant that a shard was last started.
+ ///
+ /// This is used to determine how long to wait between shard IDENTIFYs.
pub last_start: Option<Instant>,
+ /// A copy of the sender channel to communicate with the
+ /// [`ShardManagerMonitor`].
+ ///
+ /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
+ /// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// A receiver channel for the shard queuer to be told to start shards.
pub rx: Receiver<ShardQueuerMessage>,
+ /// A copy of a threadpool to give shard runners.
+ ///
+ /// For example, when using the [`Client`], this will be a copy of
+ /// [`Client::threadpool`].
+ ///
+ /// [`Client`]: ../../struct.Client.html
+ /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool
pub threadpool: ThreadPool,
+ /// A copy of the token to connect with.
pub token: Arc<Mutex<String>>,
+ /// A copy of the URI to use to connect to the gateway.
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
+ /// Begins the shard queuer loop.
+ ///
+ /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s,
+ /// blocking for messages on what to do.
+ ///
+ /// If a [`ShardQueuerMessage::Start`] is received, this will:
+ ///
+ /// 1. Check how much time has passed since the last shard was started
+ /// 2. If the amount of time is less than the ratelimit, it will sleep until
+ /// that time has passed
+ /// 3. Start the shard by ID
+ ///
+ /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and
+ /// the loop will be over.
+ ///
+ /// **Note**: This should be run in its own thread due to the blocking
+ /// nature of the loop.
+ ///
+ /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html
+ /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown
+ /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
+ /// [`rx`]: #structfield.rx
pub fn run(&mut self) {
while let Ok(msg) = self.rx.recv() {
match msg {
@@ -80,16 +129,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
+
let shard = Shard::new(
Arc::clone(&self.ws_url),
Arc::clone(&self.token),
shard_info,
)?;
- let locked = Arc::new(Mutex::new(shard));
let mut runner = feature_framework! {{
ShardRunner::new(
- Arc::clone(&locked),
+ shard,
self.manager_tx.clone(),
Arc::clone(&self.framework),
Arc::clone(&self.data),
@@ -98,7 +147,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
)
} else {
ShardRunner::new(
- locked.clone(),
+ shard,
self.manager_tx.clone(),
self.data.clone(),
self.event_handler.clone(),
@@ -108,7 +157,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
let runner_info = ShardRunnerInfo {
runner_tx: runner.runner_tx(),
- shard: locked,
};
thread::spawn(move || {
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index b14e48b..a5fde3d 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -1,35 +1,45 @@
+use gateway::{Shard, ShardAction};
use internal::prelude::*;
use internal::ws_impl::ReceiverExt;
use model::event::{Event, GatewayEvent};
use parking_lot::Mutex;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use super::super::super::{EventHandler, dispatch};
-use super::{LockedShard, ShardId, ShardManagerMessage};
+use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage};
use threadpool::ThreadPool;
use typemap::ShareMap;
+use websocket::message::{CloseData, OwnedMessage};
use websocket::WebSocketError;
#[cfg(feature = "framework")]
use framework::Framework;
+#[cfg(feature = "voice")]
+use internal::ws_impl::SenderExt;
+/// A runner for managing a [`Shard`] and its respective WebSocket client.
+///
+/// [`Shard`]: ../../../gateway/struct.Shard.html
pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
#[cfg(feature = "framework")]
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
manager_tx: Sender<ShardManagerMessage>,
- runner_rx: Receiver<ShardManagerMessage>,
- runner_tx: Sender<ShardManagerMessage>,
- shard: LockedShard,
- shard_info: [u64; 2],
+ // channel to receive messages from the shard manager and dispatches
+ runner_rx: Receiver<ShardClientMessage>,
+ // channel to send messages to the shard runner from the shard manager
+ runner_tx: Sender<ShardClientMessage>,
+ shard: Shard,
threadpool: ThreadPool,
}
impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
+ /// Creates a new runner for a Shard.
+ #[allow(too_many_arguments)]
#[cfg(feature = "framework")]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
data: Arc<Mutex<ShareMap>>,
@@ -37,7 +47,6 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -47,21 +56,20 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
framework,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Creates a new runner for a Shard.
#[cfg(not(feature = "framework"))]
pub fn new(
- shard: LockedShard,
+ shard: Shard,
manager_tx: Sender<ShardManagerMessage>,
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
) -> Self {
let (tx, rx) = mpsc::channel();
- let shard_info = shard.lock().shard_info();
Self {
runner_rx: rx,
@@ -70,90 +78,276 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
event_handler,
manager_tx,
shard,
- shard_info,
threadpool,
}
}
+ /// Starts the runner's loop to receive events.
+ ///
+ /// This runs a loop that performs the following in each iteration:
+ ///
+ /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the
+ /// [`ShardManager`], and if there is one, acts on it.
+ ///
+ /// 2. checks if a heartbeat should be sent to the discord Gateway, and if
+ /// so, sends one.
+ ///
+ /// 3. attempts to retrieve a message from the WebSocket, processing it into
+ /// a [`GatewayEvent`]. This will block for 100ms before assuming there is
+ /// no message available.
+ ///
+ /// 4. Checks with the [`Shard`] to determine if the gateway event is
+ /// specifying an action to take (e.g. resuming, reconnecting, heartbeating)
+ /// and then performs that action, if any.
+ ///
+ /// 5. Dispatches the event via the Client.
+ ///
+ /// 6. Go back to 1.
+ ///
+ /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html
+ /// [`Shard`]: ../../../gateway/struct.Shard.html
+ /// [`ShardManager`]: struct.ShardManager.html
+ /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html
pub fn run(&mut self) -> Result<()> {
- debug!("[ShardRunner {:?}] Running", self.shard_info);
+ debug!("[ShardRunner {:?}] Running", self.shard.shard_info());
loop {
- {
- let mut shard = self.shard.lock();
- let incoming = self.runner_rx.try_recv();
+ if !self.recv()? {
+ return Ok(());
+ }
- // Check for an incoming message over the runner channel.
- //
- // If the message is to shutdown, first verify the ID so we know
- // for certain this runner is to shutdown.
- if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
- if id.0 == self.shard_info[0] {
- let _ = shard.shutdown_clean();
+ // check heartbeat
+ if let Err(why) = self.shard.check_heartbeat() {
+ warn!(
+ "[ShardRunner {:?}] Error heartbeating: {:?}",
+ self.shard.shard_info(),
+ why,
+ );
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+
+ return self.request_restart();
+ }
- return Ok(());
+ #[cfg(feature = "voice")]
+ {
+ for message in self.shard.cycle_voice_recv() {
+ if let Err(why) = self.shard.client.send_json(&message) {
+ println!("Err sending from voice over WS: {:?}", why);
}
}
+ }
- if let Err(why) = shard.check_heartbeat() {
- error!("Failed to heartbeat and reconnect: {:?}", why);
+ let (event, action, successful) = self.recv_event();
- return self.request_restart();
- }
-
- #[cfg(feature = "voice")]
- {
- shard.cycle_voice_recv();
- }
+ if let Some(action) = action {
+ let _ = self.action(action);
}
- let (event, successful) = self.recv_event();
-
if let Some(event) = event {
- let data = Arc::clone(&self.data);
- let event_handler = Arc::clone(&self.event_handler);
- let shard = Arc::clone(&self.shard);
-
- feature_framework! {{
- let framework = Arc::clone(&self.framework);
-
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- framework,
- data,
- event_handler,
- );
- });
- } else {
- self.threadpool.execute(|| {
- dispatch(
- event,
- shard,
- data,
- event_handler,
- );
- });
- }}
+ self.dispatch(event);
}
- if !successful && !self.shard.lock().stage().is_connecting() {
+ if !successful && !self.shard.stage().is_connecting() {
return self.request_restart();
}
}
}
- pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> {
+ /// Clones the internal copy of the Sender to the shard runner.
+ pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> {
self.runner_tx.clone()
}
+ /// Takes an action that a [`Shard`] has determined should happen and then
+ /// does it.
+ ///
+ /// For example, if the shard says that an Identify message needs to be
+ /// sent, this will do that.
+ ///
+ /// # Errors
+ ///
+ /// Returns
+ fn action(&mut self, action: ShardAction) -> Result<()> {
+ match action {
+ ShardAction::Autoreconnect => self.shard.autoreconnect(),
+ ShardAction::Heartbeat => self.shard.heartbeat(),
+ ShardAction::Identify => self.shard.identify(),
+ ShardAction::Reconnect => self.shard.reconnect(),
+ ShardAction::Resume => self.shard.resume(),
+ }
+ }
+
+ // Checks if the ID received to shutdown is equivalent to the ID of the
+ // shard this runner is responsible. If so, it shuts down the WebSocket
+ // client.
+ //
+ // Returns whether the WebSocket client is still active.
+ //
+ // If true, the WebSocket client was _not_ shutdown. If false, it was.
+ fn checked_shutdown(&mut self, id: ShardId) -> bool {
+ // First verify the ID so we know for certain this runner is
+ // to shutdown.
+ if id.0 != self.shard.shard_info()[0] {
+ // Not meant for this runner for some reason, don't
+ // shutdown.
+ return true;
+ }
+
+ let close_data = CloseData::new(1000, String::new());
+ let msg = OwnedMessage::Close(Some(close_data));
+ let _ = self.shard.client.send_message(&msg);
+
+ false
+ }
+
+ fn dispatch(&self, event: Event) {
+ let data = Arc::clone(&self.data);
+ let event_handler = Arc::clone(&self.event_handler);
+ let runner_tx = self.runner_tx.clone();
+ let shard_id = self.shard.shard_info()[0];
+
+ feature_framework! {{
+ let framework = Arc::clone(&self.framework);
+
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ framework,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ } else {
+ self.threadpool.execute(move || {
+ dispatch(
+ event,
+ data,
+ event_handler,
+ runner_tx,
+ shard_id,
+ );
+ });
+ }}
+ }
+
+ // Handles a received value over the shard runner rx channel.
+ //
+ // Returns a boolean on whether the shard runner can continue.
+ //
+ // This always returns true, except in the case that the shard manager asked
+ // the runner to shutdown.
+ fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool {
+ match value {
+ ShardClientMessage::Manager(x) => match x {
+ ShardManagerMessage::Restart(id) |
+ ShardManagerMessage::Shutdown(id) => {
+ self.checked_shutdown(id)
+ },
+ ShardManagerMessage::ShutdownAll => {
+ // This variant should never be received.
+ warn!(
+ "[ShardRunner {:?}] Received a ShutdownAll?",
+ self.shard.shard_info(),
+ );
+
+ true
+ },
+ }
+ ShardClientMessage::Runner(x) => match x {
+ ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => {
+ self.shard.chunk_guilds(
+ guild_ids,
+ limit,
+ query.as_ref().map(String::as_str),
+ ).is_ok()
+ },
+ ShardRunnerMessage::Close(code, reason) => {
+ let reason = reason.unwrap_or_else(String::new);
+ let data = CloseData::new(code, reason);
+ let msg = OwnedMessage::Close(Some(data));
+
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::Message(msg) => {
+ self.shard.client.send_message(&msg).is_ok()
+ },
+ ShardRunnerMessage::SetGame(game) => {
+ // To avoid a clone of `game`, we do a little bit of
+ // trickery here:
+ //
+ // First, we obtain a reference to the current presence of
+ // the shard, and create a new presence tuple of the new
+ // game we received over the channel as well as the online
+ // status that the shard already had.
+ //
+ // We then (attempt to) send the websocket message with the
+ // status update, expressively returning:
+ //
+ // - whether the message successfully sent
+ // - the original game we received over the channel
+ self.shard.set_game(game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetPresence(status, game) => {
+ self.shard.set_presence(status, game);
+
+ self.shard.update_presence().is_ok()
+ },
+ ShardRunnerMessage::SetStatus(status) => {
+ self.shard.set_status(status);
+
+ self.shard.update_presence().is_ok()
+ },
+ },
+ }
+ }
+
+ // Receives values over the internal shard runner rx channel and handles
+ // them.
+ //
+ // This will loop over values until there is no longer one.
+ //
+ // Requests a restart if the sending half of the channel disconnects. This
+ // should _never_ happen, as the sending half is kept on the runner.
+
+ // Returns whether the shard runner is in a state that can continue.
+ fn recv(&mut self) -> Result<bool> {
+ loop {
+ match self.runner_rx.try_recv() {
+ Ok(value) => {
+ if !self.handle_rx_value(value) {
+ return Ok(false);
+ }
+ },
+ Err(TryRecvError::Disconnected) => {
+ warn!(
+ "[ShardRunner {:?}] Sending half DC; restarting",
+ self.shard.shard_info(),
+ );
+
+ let _ = self.request_restart();
+
+ return Ok(false);
+ },
+ Err(TryRecvError::Empty) => break,
+ }
+ }
+
+ // There are no longer any values available.
+
+ Ok(true)
+ }
+
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
- fn recv_event(&mut self) -> (Option<Event>, bool) {
- let mut shard = self.shard.lock();
-
- let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) {
+ let gw_event = match self.shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
@@ -161,58 +355,69 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- return (None, true);
+ {
+ let last = self.shard.last_heartbeat_ack();
+ let interval = self.shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
+
+ if seconds_passed <= interval_in_secs * 2 {
+ return (None, None, true);
+ }
+ } else {
+ return (None, None, true);
}
- } else {
- return (None, true);
}
debug!("Attempting to auto-reconnect");
- if let Err(why) = shard.autoreconnect() {
+ if let Err(why) = self.shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
}
- return (None, true);
+ return (None, None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
- return (None, false);
+ return (None, None, false);
},
other => other,
};
let event = match gw_event {
Ok(Some(event)) => Ok(event),
- Ok(None) => return (None, true),
+ Ok(None) => return (None, None, true),
Err(why) => Err(why),
};
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => return (None, true),
+ let action = match self.shard.handle_event(&event) {
+ Ok(Some(action)) => Some(action),
+ Ok(None) => None,
Err(why) => {
error!("Shard handler received err: {:?}", why);
- return (None, true);
+ return (None, None, true);
},
- };
+ };
+
+ let event = match event {
+ Ok(GatewayEvent::Dispatch(_, event)) => Some(event),
+ _ => None,
+ };
- (Some(event), true)
+ (event, action, true)
}
fn request_restart(&self) -> Result<()> {
- debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
- let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
+ debug!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard.shard_info(),
+ );
+ let shard_id = ShardId(self.shard.shard_info()[0]);
+ let msg = ShardManagerMessage::Restart(shard_id);
let _ = self.manager_tx.send(msg);
Ok(())
diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs
new file mode 100644
index 0000000..e6458eb
--- /dev/null
+++ b/src/client/bridge/gateway/shard_runner_message.rs
@@ -0,0 +1,43 @@
+use model::{Game, GuildId, OnlineStatus};
+use websocket::message::OwnedMessage;
+
+/// A message to send from a shard over a WebSocket.
+pub enum ShardRunnerMessage {
+ /// Indicates that the client is to send a member chunk message.
+ ChunkGuilds {
+ /// The IDs of the [`Guild`]s to chunk.
+ ///
+ /// [`Guild`]: ../../../model/struct.Guild.html
+ guild_ids: Vec<GuildId>,
+ /// The maximum number of members to receive [`GuildMembersChunkEvent`]s
+ /// for.
+ ///
+ /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html
+ limit: Option<u16>,
+ /// Text to filter members by.
+ ///
+ /// For example, a query of `"s"` will cause only [`Member`]s whose
+ /// usernames start with `"s"` to be chunked.
+ ///
+ /// [`Member`]: ../../../model/struct.Member.html
+ query: Option<String>,
+ },
+ /// Indicates that the client is to close with the given status code and
+ /// reason.
+ ///
+ /// You should rarely - if _ever_ - need this, but the option is available.
+ /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you
+ /// are intending to send a 1000 close code.
+ ///
+ /// [`ShardManager`]: struct.ShardManager.html
+ Close(u16, Option<String>),
+ /// Indicates that the client is to send a custom WebSocket message.
+ Message(OwnedMessage),
+ /// Indicates that the client is to update the shard's presence's game.
+ SetGame(Option<Game>),
+ /// Indicates that the client is to update the shard's presence in its
+ /// entirity.
+ SetPresence(OnlineStatus, Option<Game>),
+ /// Indicates that the client is to update the shard's presence's status.
+ SetStatus(OnlineStatus),
+}
diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs
index 4f27526..ea18d73 100644
--- a/src/client/bridge/mod.rs
+++ b/src/client/bridge/mod.rs
@@ -1 +1,10 @@
+//! A collection of bridged support between the [`client`] module and other
+//! modules.
+//!
+//! **Warning**: You likely _do not_ need to mess with anything in here. Beware.
+//! This is lower-level functionality abstracted by the [`Client`].
+//!
+//! [`Client`]: ../struct.Client.html
+//! [`client`]: ../
+
pub mod gateway;
diff --git a/src/client/context.rs b/src/client/context.rs
index a60aaa0..9b89cde 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -1,7 +1,7 @@
-use Result;
+use client::bridge::gateway::{ShardClientMessage, ShardMessenger};
+use std::sync::mpsc::Sender;
use std::sync::Arc;
use typemap::ShareMap;
-use gateway::Shard;
use model::*;
use parking_lot::Mutex;
@@ -12,7 +12,7 @@ use internal::prelude::*;
#[cfg(feature = "builder")]
use builder::EditProfile;
#[cfg(feature = "builder")]
-use {http, utils};
+use {Result, http, utils};
#[cfg(feature = "builder")]
use std::collections::HashMap;
@@ -38,21 +38,23 @@ pub struct Context {
///
/// [`Client::data`]: struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
- /// The associated shard which dispatched the event handler.
- ///
- /// Note that if you are sharding, in relevant terms, this is the shard
- /// which received the event being dispatched.
- pub shard: Arc<Mutex<Shard>>,
+ /// The messenger to communicate with the shard runner.
+ pub shard: ShardMessenger,
+ /// The ID of the shard this context is related to.
+ pub shard_id: u64,
}
impl Context {
/// Create a new Context to be passed to an event handler.
- pub(crate) fn new(shard: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>)
- -> Context {
+ pub(crate) fn new(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+ ) -> Context {
Context {
+ shard: ShardMessenger::new(runner_tx),
+ shard_id,
data,
- shard,
}
}
@@ -110,6 +112,7 @@ impl Context {
http::edit_profile(&edited)
}
+
/// Sets the current user as being [`Online`]. This maintains the current
/// game.
///
@@ -137,9 +140,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn online(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Online);
+ self.shard.set_status(OnlineStatus::Online);
}
/// Sets the current user as being [`Idle`]. This maintains the current
@@ -168,9 +171,9 @@ impl Context {
/// ```
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn idle(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Idle);
+ self.shard.set_status(OnlineStatus::Idle);
}
/// Sets the current user as being [`DoNotDisturb`]. This maintains the
@@ -199,9 +202,9 @@ impl Context {
/// ```
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
+ #[inline]
pub fn dnd(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::DoNotDisturb);
+ self.shard.set_status(OnlineStatus::DoNotDisturb);
}
/// Sets the current user as being [`Invisible`]. This maintains the current
@@ -231,9 +234,9 @@ impl Context {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
+ #[inline]
pub fn invisible(&self) {
- let mut shard = self.shard.lock();
- shard.set_status(OnlineStatus::Invisible);
+ self.shard.set_status(OnlineStatus::Invisible);
}
/// "Resets" the current user's presence, by setting the game to `None` and
@@ -265,9 +268,9 @@ impl Context {
/// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
+ #[inline]
pub fn reset_presence(&self) {
- let mut shard = self.shard.lock();
- shard.set_presence(None, OnlineStatus::Online);
+ self.shard.set_presence(None, OnlineStatus::Online);
}
/// Sets the current game, defaulting to an online status of [`Online`].
@@ -303,9 +306,9 @@ impl Context {
/// ```
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
+ #[inline]
pub fn set_game(&self, game: Game) {
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current game, passing in only its name. This will automatically
@@ -351,8 +354,7 @@ impl Context {
url: None,
};
- let mut shard = self.shard.lock();
- shard.set_presence(Some(game), OnlineStatus::Online);
+ self.shard.set_presence(Some(game), OnlineStatus::Online);
}
/// Sets the current user's presence, providing all fields to be passed.
@@ -406,9 +408,9 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
+ #[inline]
pub fn set_presence(&self, game: Option<Game>, status: OnlineStatus) {
- let mut shard = self.shard.lock();
- shard.set_presence(game, status);
+ self.shard.set_presence(game, status);
}
/// Disconnects the shard from the websocket, essentially "quiting" it.
@@ -417,9 +419,8 @@ impl Context {
/// until [`Client::start`] and vice versa are called again.
///
/// [`Client::start`]: ./struct.Client.html#method.start
- pub fn quit(&self) -> Result<()> {
- let mut shard = self.shard.lock();
-
- shard.shutdown_clean()
+ #[inline]
+ pub fn quit(&self) {
+ self.shard.shutdown_clean();
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 827875e..9545a6f 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,9 +1,10 @@
use std::sync::Arc;
use parking_lot::Mutex;
+use super::bridge::gateway::ShardClientMessage;
use super::event_handler::EventHandler;
use super::Context;
+use std::sync::mpsc::Sender;
use typemap::ShareMap;
-use gateway::Shard;
use model::event::Event;
use model::{Channel, Message};
@@ -35,19 +36,26 @@ macro_rules! now {
() => (Utc::now().time().second() * 1000)
}
-fn context(conn: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>) -> Context {
- Context::new(conn, data)
+fn context(
+ data: Arc<Mutex<ShareMap>>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) -> Context {
+ Context::new(data, runner_tx, shard_id)
}
#[cfg(feature = "framework")]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(
context.clone(),
event.message.clone(),
@@ -58,21 +66,24 @@ pub fn dispatch<H: EventHandler + 'static>(event: Event,
framework.dispatch(context, event.message);
}
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
#[cfg(not(feature = "framework"))]
-pub fn dispatch<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+pub fn dispatch<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
match event {
Event::MessageCreate(event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
dispatch_message(context, event.message, event_handler);
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, data, event_handler, runner_tx, shard_id),
}
}
@@ -91,10 +102,13 @@ fn dispatch_message<H>(
}
#[allow(cyclomatic_complexity, unused_assignments, unused_mut)]
-fn handle_event<H: EventHandler + 'static>(event: Event,
- conn: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>) {
+fn handle_event<H: EventHandler + 'static>(
+ event: Event,
+ data: Arc<Mutex<ShareMap>>,
+ event_handler: Arc<H>,
+ runner_tx: Sender<ShardClientMessage>,
+ shard_id: u64,
+) {
#[cfg(feature = "cache")]
let mut last_guild_create_time = now!();
@@ -113,7 +127,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
// This different channel_create dispatching is only due to the fact that
// each time the bot receives a dm, this event is also fired.
@@ -134,7 +148,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::ChannelDelete(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
match event.channel {
Channel::Private(_) | Channel::Group(_) => {},
@@ -147,28 +161,28 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
},
Event::ChannelPinsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_pins_update(context, event);
},
Event::ChannelRecipientAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_addition(context, event.channel_id, event.user);
},
Event::ChannelRecipientRemove(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.channel_recipient_removal(context, event.channel_id, event.user);
},
Event::ChannelUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read().channel(event.channel.id());
@@ -178,12 +192,12 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::GuildBanAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_addition(context, event.guild_id, event.user);
},
Event::GuildBanRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_ban_removal(context, event.guild_id, event.user);
},
@@ -204,7 +218,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let cache = CACHE.read();
if cache.unavailable_guilds.is_empty() {
- let context = context(Arc::clone(&conn), Arc::clone(&data));
+ let context = context(Arc::clone(&data), runner_tx.clone(), shard_id);
let guild_amount = cache
.guilds
@@ -216,7 +230,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}
}
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_create(context, event.guild, _is_new);
@@ -226,7 +240,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildDelete(mut event) => {
let _full = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_delete(context, event.guild, _full);
@@ -237,25 +251,25 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildEmojisUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_emojis_update(context, event.guild_id, event.emojis);
},
Event::GuildIntegrationsUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_integrations_update(context, event.guild_id);
},
Event::GuildMemberAdd(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_member_addition(context, event.guild_id, event.member);
},
Event::GuildMemberRemove(mut event) => {
let _member = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_member_removal(context, event.guild_id, event.user, _member);
@@ -265,7 +279,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildMemberUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
// This is safe to unwrap, as the update would have created
@@ -284,20 +298,20 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildMembersChunk(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_members_chunk(context, event.guild_id, event.members);
},
Event::GuildRoleCreate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_role_create(context, event.guild_id, event.role);
},
Event::GuildRoleDelete(mut event) => {
let _role = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role);
@@ -307,7 +321,7 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
},
Event::GuildRoleUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.guild_role_update(context, event.guild_id, _before, event.role);
@@ -318,14 +332,14 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
Event::GuildUnavailable(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.guild_unavailable(context, event.guild_id);
},
Event::GuildUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
let before = CACHE.read()
@@ -341,46 +355,46 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
// Already handled by the framework check macro
Event::MessageCreate(_) => {},
Event::MessageDeleteBulk(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete_bulk(context, event.channel_id, event.ids);
},
Event::MessageDelete(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_delete(context, event.channel_id, event.message_id);
},
Event::MessageUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.message_update(context, event);
},
Event::PresencesReplace(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_replace(context, event.presences);
},
Event::PresenceUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.presence_update(context, event);
},
Event::ReactionAdd(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_add(context, event.reaction);
},
Event::ReactionRemove(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove(context, event.reaction);
},
Event::ReactionRemoveAll(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.reaction_remove_all(context, event.channel_id, event.message_id);
},
@@ -393,35 +407,35 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
let _ = wait_for_guilds()
.map(move |_| {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
});
} else {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.ready(context, event.ready);
}
}
},
Event::Resumed(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.resume(context, event);
},
Event::TypingStart(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.typing_start(context, event);
},
Event::Unknown(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.unknown(context, event.kind, event.value);
},
Event::UserUpdate(mut event) => {
let _before = update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
feature_cache! {{
event_handler.user_update(context, _before.unwrap(), event.current_user);
@@ -430,19 +444,19 @@ fn handle_event<H: EventHandler + 'static>(event: Event,
}}
},
Event::VoiceServerUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_server_update(context, event);
},
Event::VoiceStateUpdate(mut event) => {
update!(event);
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.voice_state_update(context, event.guild_id, event.voice_state);
},
Event::WebhookUpdate(mut event) => {
- let context = context(conn, data);
+ let context = context(data, runner_tx, shard_id);
event_handler.webhook_update(context, event.guild_id, event.channel_id);
},
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 00a305a..2a97c91 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -37,13 +37,11 @@ pub use http as rest;
#[cfg(feature = "cache")]
pub use CACHE;
-use self::bridge::gateway::{ShardId, ShardManager, ShardRunnerInfo};
+use self::bridge::gateway::{ShardManager, ShardManagerMonitor};
use self::dispatch::dispatch;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::mem;
use threadpool::ThreadPool;
use typemap::ShareMap;
use http;
@@ -103,8 +101,7 @@ impl CloseHandle {
/// [`on_message`]: #method.on_message
/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate
/// [sharding docs]: gateway/index.html#sharding
-#[derive(Clone)]
-pub struct Client<H: EventHandler + Send + Sync + 'static> {
+pub struct Client {
/// A ShareMap which requires types to be Send + Sync. This is a map that
/// can be safely shared across contexts.
///
@@ -191,7 +188,6 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`on_ready`]: #method.on_ready
- event_handler: Arc<H>,
#[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
/// A HashMap of all shards instantiated by the Client.
///
@@ -224,12 +220,12 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?;
///
- /// let shard_runners = client.shard_runners.clone();
+ /// let shard_manager = client.shard_manager.clone();
///
/// thread::spawn(move || {
/// loop {
/// println!("Shard count instantiated: {}",
- /// shard_runners.lock().len());
+ /// shard_manager.lock().shards_instantiated().len());
///
/// thread::sleep(Duration::from_millis(5000));
/// }
@@ -244,16 +240,26 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
///
/// [`Client::start_shard`]: #method.start_shard
/// [`Client::start_shards`]: #method.start_shards
- pub shard_runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ pub shard_manager: Arc<Mutex<ShardManager>>,
+ shard_manager_worker: ShardManagerMonitor,
/// The threadpool shared by all shards.
///
/// Defaults to 5 threads, which should suffice small bots. Consider
/// increasing this number as your bot grows.
pub threadpool: ThreadPool,
- token: Arc<Mutex<String>>,
+ /// The token in use by the client.
+ pub token: Arc<Mutex<String>>,
+ /// URI that the client's shards will use to connect to the gateway.
+ ///
+ /// This is likely not important for production usage and is, at best, used
+ /// for debugging.
+ ///
+ /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated
+ /// value available.
+ pub ws_uri: Arc<Mutex<String>>,
}
-impl<H: EventHandler + Send + Sync + 'static> Client<H> {
+impl Client {
/// Creates a Client for a bot user.
///
/// Discord has a requirement of prefixing bot tokens with `"Bot "`, which
@@ -283,7 +289,8 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(token: &str, handler: H) -> Result<Self> {
+ pub fn new<H>(token: &str, handler: H) -> Result<Self>
+ where H: EventHandler + Send + Sync + 'static {
let token = if token.starts_with("Bot ") {
token.to_string()
} else {
@@ -295,23 +302,53 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let name = "serenity client".to_owned();
let threadpool = ThreadPool::with_name(name, 5);
+ let url = Arc::new(Mutex::new(http::get_gateway()?.url));
+ let data = Arc::new(Mutex::new(ShareMap::custom()));
+ let event_handler = Arc::new(handler);
Ok(feature_framework! {{
+ let framework = Arc::new(Mutex::new(None));
+
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ Arc::clone(&locked),
+ Arc::clone(&data),
+ Arc::clone(&event_handler),
+ Arc::clone(&framework),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
framework: Arc::new(Mutex::new(None)),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
} else {
+ let (shard_manager, shard_manager_worker) = ShardManager::new(
+ 0,
+ 0,
+ 0,
+ Arc::clone(&url),
+ locked.clone(),
+ data.clone(),
+ Arc::clone(&event_handler),
+ threadpool.clone(),
+ );
+
Client {
- data: Arc::new(Mutex::new(ShareMap::custom())),
- event_handler: Arc::new(handler),
- shard_runners: Arc::new(Mutex::new(HashMap::new())),
- threadpool,
token: locked,
+ ws_uri: url,
+ data,
+ shard_manager,
+ shard_manager_worker,
+ threadpool,
}
}})
}
@@ -462,7 +499,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
///
/// [gateway docs]: gateway/index.html#sharding
pub fn start(&mut self) -> Result<()> {
- self.start_connection([0, 0, 1], http::get_gateway()?.url)
+ self.start_connection([0, 0, 1])
}
/// Establish the connection(s) and start listening for events.
@@ -514,15 +551,13 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
/// [gateway docs]: gateway/index.html#sharding
pub fn start_autosharded(&mut self) -> Result<()> {
- let mut res = http::get_bot_gateway()?;
-
- let x = res.shards as u64 - 1;
- let y = res.shards as u64;
- let url = mem::replace(&mut res.url, String::default());
+ let (x, y) = {
+ let res = http::get_bot_gateway()?;
- drop(res);
+ (res.shards as u64 - 1, res.shards as u64)
+ };
- self.start_connection([0, x, y], url)
+ self.start_connection([0, x, y])
}
/// Establish a sharded connection and start listening for events.
@@ -603,7 +638,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_autosharded`]: #method.start_autosharded
/// [gateway docs]: gateway/index.html#sharding
pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> {
- self.start_connection([shard, shard, shards], http::get_gateway()?.url)
+ self.start_connection([shard, shard, shards])
}
/// Establish sharded connections and start listening for events.
@@ -657,10 +692,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shard_range`]: #method.start_shard_range
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shards(&mut self, total_shards: u64) -> Result<()> {
- self.start_connection(
- [0, total_shards - 1, total_shards],
- http::get_gateway()?.url,
- )
+ self.start_connection([0, total_shards - 1, total_shards])
}
/// Establish a range of sharded connections and start listening for events.
@@ -730,7 +762,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// [`start_shards`]: #method.start_shards
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> {
- self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url)
+ self.start_connection([range[0], range[1], total_shards])
}
/// Returns a thread-safe handle for closing shards.
@@ -749,7 +781,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
// an error.
//
// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
- fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> {
+ fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> {
HANDLE_STILL.store(true, Ordering::Relaxed);
// Update the framework's current user if the feature is enabled.
@@ -764,39 +796,37 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
}
}
- let gateway_url = Arc::new(Mutex::new(url));
+ {
+ let mut manager = self.shard_manager.lock();
+
+ let init = shard_data[1] - shard_data[0] + 1;
- let mut manager = ShardManager::new(
- shard_data[0],
- shard_data[1] - shard_data[0] + 1,
- shard_data[2],
- Arc::clone(&gateway_url),
- Arc::clone(&self.token),
- Arc::clone(&self.data),
- Arc::clone(&self.event_handler),
- #[cfg(feature = "framework")]
- Arc::clone(&self.framework),
- self.threadpool.clone(),
- );
+ manager.set_shards(shard_data[0], init, shard_data[2]);
- self.shard_runners = Arc::clone(&manager.runners);
+ debug!(
+ "Initializing shard info: {} - {}/{}",
+ shard_data[0],
+ init,
+ shard_data[2],
+ );
- if let Err(why) = manager.initialize() {
- error!("Failed to boot a shard: {:?}", why);
- info!("Shutting down all shards");
+ if let Err(why) = manager.initialize() {
+ error!("Failed to boot a shard: {:?}", why);
+ info!("Shutting down all shards");
- manager.shutdown_all();
+ manager.shutdown_all();
- return Err(Error::Client(ClientError::ShardBootFailure));
+ return Err(Error::Client(ClientError::ShardBootFailure));
+ }
}
- manager.run();
+ self.shard_manager_worker.run();
Err(Error::Client(ClientError::Shutdown))
}
}
-impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
+impl Drop for Client {
fn drop(&mut self) { self.close_handle().close(); }
}
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index b593f5c..147808e 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -51,9 +51,18 @@
mod error;
mod shard;
+mod ws_client_ext;
pub use self::error::Error as GatewayError;
pub use self::shard::Shard;
+pub use self::ws_client_ext::WebSocketGatewayClientExt;
+
+use model::{Game, OnlineStatus};
+use websocket::sync::client::Client;
+use websocket::sync::stream::{TcpStream, TlsStream};
+
+pub type CurrentPresence = (Option<Game>, OnlineStatus);
+pub type WsClient = Client<TlsStream<TcpStream>>;
/// Indicates the current connection stage of a [`Shard`].
///
@@ -136,3 +145,11 @@ impl ConnectionStage {
}
}
}
+
+pub enum ShardAction {
+ Autoreconnect,
+ Heartbeat,
+ Identify,
+ Reconnect,
+ Resume,
+}
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index c644eac..ef05cf4 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -1,39 +1,31 @@
-use chrono::Utc;
use parking_lot::Mutex;
-use serde_json::Value;
-use std::env::consts;
-use std::io::Write;
-use std::net::Shutdown;
use std::sync::Arc;
use std::time::{Duration as StdDuration, Instant};
-use std::thread;
-use super::{ConnectionStage, GatewayError};
+use super::{
+ ConnectionStage,
+ CurrentPresence,
+ ShardAction,
+ GatewayError,
+ WsClient,
+ WebSocketGatewayClientExt,
+};
use websocket::client::Url;
-use websocket::message::{CloseData, OwnedMessage};
use websocket::stream::sync::AsTcpStream;
-use websocket::sync::client::{Client, ClientBuilder};
-use websocket::sync::stream::{TcpStream, TlsStream};
+use websocket::sync::client::ClientBuilder;
use websocket::WebSocketError;
-use constants::{self, close_codes, OpCode};
+use constants::{self, close_codes};
use internal::prelude::*;
-use internal::ws_impl::SenderExt;
use model::event::{Event, GatewayEvent};
use model::{Game, GuildId, OnlineStatus};
#[cfg(feature = "voice")]
+use serde_json::Value;
+#[cfg(feature = "voice")]
use std::sync::mpsc::{self, Receiver as MpscReceiver};
-#[cfg(feature = "cache")]
-use client::CACHE;
#[cfg(feature = "voice")]
use voice::Manager as VoiceManager;
#[cfg(feature = "voice")]
use http;
-#[cfg(feature = "cache")]
-use utils;
-
-pub type WsClient = Client<TlsStream<TcpStream>>;
-
-type CurrentPresence = (Option<Game>, OnlineStatus);
/// A Shard is a higher-level handler for a websocket connection to Discord's
/// gateway. The shard allows for sending and receiving messages over the
@@ -91,7 +83,7 @@ pub struct Shard {
session_id: Option<String>,
shard_info: [u64; 2],
stage: ConnectionStage,
- token: Arc<Mutex<String>>,
+ pub token: Arc<Mutex<String>>,
ws_url: Arc<Mutex<String>>,
/// The voice connections that this Shard is responsible for. The Shard will
/// update the voice connections' states.
@@ -138,11 +130,14 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(ws_url: Arc<Mutex<String>>,
- token: Arc<Mutex<String>>,
- shard_info: [u64; 2])
- -> Result<Shard> {
- let client = connecting(&*ws_url.lock());
+ pub fn new(
+ ws_url: Arc<Mutex<String>>,
+ token: Arc<Mutex<String>>,
+ shard_info: [u64; 2],
+ ) -> Result<Shard> {
+ let mut client = connect(&*ws_url.lock())?;
+
+ let _ = set_client_timeout(&mut client);
let current_presence = (None, OnlineStatus::Online);
let heartbeat_instants = (None, None);
@@ -159,6 +154,8 @@ impl Shard {
let user = http::get_current_user()?;
Shard {
+ manager: VoiceManager::new(tx, user.id),
+ manager_rx: rx,
client,
current_presence,
heartbeat_instants,
@@ -170,8 +167,6 @@ impl Shard {
shard_info,
session_id,
ws_url,
- manager: VoiceManager::new(tx, user.id),
- manager_rx: rx,
}
} else {
Shard {
@@ -191,137 +186,122 @@ impl Shard {
})
}
- /// Retrieves a copy of the current shard information.
- ///
- /// The first element is the _current_ shard - 0-indexed - while the second
- /// element is the _total number_ of shards -- 1-indexed.
- ///
- /// For example, if using 3 shards in total, and if this is shard 1, then it
- /// can be read as "the second of three shards".
- ///
- /// # Examples
- ///
- /// Retrieving the shard info for the second shard, out of two shards total:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
- /// #
- /// assert_eq!(shard.shard_info(), [1, 2]);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
+ /// Retrieves the current presence of the shard.
+ #[inline]
+ pub fn current_presence(&self) -> &CurrentPresence {
+ &self.current_presence
+ }
- /// Sets the user's current game, if any.
+ /// Retrieves the heartbeat instants of the shard.
///
- /// Other presence settings are maintained.
+ /// This is the time of when a heartbeat was sent and when an
+ /// acknowledgement was last received.
+ #[inline]
+ pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) {
+ &self.heartbeat_instants
+ }
+
+ /// Retrieves the value of when the last heartbeat was sent.
+ #[inline]
+ pub fn last_heartbeat_sent(&self) -> Option<&Instant> {
+ self.heartbeat_instants.0.as_ref()
+ }
+
+ /// Retrieves the value of when the last heartbeat ack was received.
+ #[inline]
+ pub fn last_heartbeat_ack(&self) -> Option<&Instant> {
+ self.heartbeat_instants.1.as_ref()
+ }
+
+ /// Sends a heartbeat to the gateway with the current sequence.
///
- /// # Examples
+ /// This sets the last heartbeat time to now, and
+ /// `last_heartbeat_acknowledged` to `false`.
///
- /// Setting the current game to playing `"Heroes of the Storm"`:
+ /// # Errors
///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::Game;
+ /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending
+ /// a heartbeat.
///
- /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
+ /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed
+ pub fn heartbeat(&mut self) -> Result<()> {
+ match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) {
+ Ok(()) => {
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.last_heartbeat_acknowledged = false;
+
+ Ok(())
+ },
+ Err(why) => {
+ match why {
+ Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
+ debug!("[Shard {:?}] Err heartbeating: {:?}",
+ self.shard_info,
+ err);
+ },
+ other => {
+ warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
+ self.shard_info,
+ other);
+ },
+ }
+
+ Err(Error::Gateway(GatewayError::HeartbeatFailed))
+ }
+ }
+ }
+
+ #[inline]
+ pub fn heartbeat_interval(&self) -> Option<&u64> {
+ self.heartbeat_interval.as_ref()
+ }
+
+ #[inline]
+ pub fn last_heartbeat_acknowledged(&self) -> bool {
+ self.last_heartbeat_acknowledged
+ }
+
+ #[inline]
+ pub fn seq(&self) -> u64 {
+ self.seq
+ }
+
+ #[inline]
+ pub fn session_id(&self) -> Option<&String> {
+ self.session_id.as_ref()
+ }
+
+ #[inline]
pub fn set_game(&mut self, game: Option<Game>) {
self.current_presence.0 = game;
+ }
- self.update_presence();
+ #[inline]
+ pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) {
+ self.set_game(game);
+ self.set_status(status);
}
- /// Sets the user's current online status.
- ///
- /// Note that [`Offline`] is not a valid online status, so it is
- /// automatically converted to [`Invisible`].
- ///
- /// Other presence settings are maintained.
- ///
- /// # Examples
- ///
- /// Setting the current online status for the shard to [`DoNotDisturb`].
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::OnlineStatus;
- ///
- /// shard.set_status(OnlineStatus::DoNotDisturb);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
- /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
- /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
- pub fn set_status(&mut self, online_status: OnlineStatus) {
- self.current_presence.1 = match online_status {
- OnlineStatus::Offline => OnlineStatus::Invisible,
- other => other,
- };
+ #[inline]
+ pub fn set_status(&mut self, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
- self.update_presence();
+ self.current_presence.1 = status;
}
- /// Sets the user's full presence information.
+ /// Retrieves a copy of the current shard information.
///
- /// Consider using the individual setters if you only need to modify one of
- /// these.
+ /// The first element is the _current_ shard - 0-indexed - while the second
+ /// element is the _total number_ of shards -- 1-indexed.
+ ///
+ /// For example, if using 3 shards in total, and if this is shard 1, then it
+ /// can be read as "the second of three shards".
///
/// # Examples
///
- /// Set the current user as playing `"Heroes of the Storm"` and being
- /// online:
+ /// Retrieving the shard info for the second shard, out of two shards total:
///
/// ```rust,no_run
/// # extern crate parking_lot;
@@ -335,11 +315,9 @@ impl Shard {
/// # fn try_main() -> Result<(), Box<Error>> {
/// # let mutex = Arc::new(Mutex::new("".to_string()));
/// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
/// #
- /// use serenity::model::{Game, OnlineStatus};
- ///
- /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// assert_eq!(shard.shard_info(), [1, 2]);
/// # Ok(())
/// # }
/// #
@@ -347,15 +325,7 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) {
- if status == OnlineStatus::Offline {
- status = OnlineStatus::Invisible;
- }
-
- self.current_presence = (game, status);
-
- self.update_presence();
- }
+ pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
/// Returns the current connection stage of the shard.
pub fn stage(&self) -> ConnectionStage {
@@ -387,21 +357,24 @@ impl Shard {
/// Returns a `GatewayError::OverloadedShard` if the shard would have too
/// many guilds assigned to it.
#[allow(cyclomatic_complexity)]
- pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> {
- match event {
- Ok(GatewayEvent::Dispatch(seq, event)) => {
+ pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>)
+ -> Result<Option<ShardAction>> {
+ match *event {
+ Ok(GatewayEvent::Dispatch(seq, ref event)) => {
if seq > self.seq + 1 {
warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq);
}
- match event {
+ match *event {
Event::Ready(ref ready) => {
debug!("[Shard {:?}] Received Ready", self.shard_info);
self.session_id = Some(ready.ready.session_id.clone());
self.stage = ConnectionStage::Connected;
+ /*
set_client_timeout(&mut self.client)?;
+ */
},
Event::Resumed(_) => {
info!("[Shard {:?}] Resumed", self.shard_info);
@@ -421,7 +394,7 @@ impl Shard {
self.seq = seq;
- Ok(Some(event))
+ Ok(None)
},
Ok(GatewayEvent::Heartbeat(s)) => {
info!("[Shard {:?}] Received shard heartbeat", self.shard_info);
@@ -438,24 +411,18 @@ impl Shard {
if self.stage == ConnectionStage::Handshake {
self.stage = ConnectionStage::Identifying;
- self.identify()?;
+ return Ok(Some(ShardAction::Identify));
} else {
warn!(
"[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting",
self.shard_info
);
- return self.autoreconnect().and(Ok(None));
+ return Ok(Some(ShardAction::Autoreconnect));
}
}
- let map = json!({
- "d": Value::Null,
- "op": OpCode::Heartbeat.num(),
- });
- self.client.send_json(&map)?;
-
- Ok(None)
+ Ok(Some(ShardAction::Heartbeat))
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());
@@ -478,14 +445,14 @@ impl Shard {
self.heartbeat_interval = Some(interval);
}
- if self.stage == ConnectionStage::Handshake {
- self.identify().and(Ok(None))
+ Ok(Some(if self.stage == ConnectionStage::Handshake {
+ ShardAction::Identify
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
- self.autoreconnect().and(Ok(None))
- }
+ ShardAction::Autoreconnect
+ }))
},
Ok(GatewayEvent::InvalidateSession(resumable)) => {
info!(
@@ -493,16 +460,15 @@ impl Shard {
self.shard_info,
);
- if resumable {
- self.resume().and(Ok(None))
+ Ok(Some(if resumable {
+ ShardAction::Resume
} else {
- self.identify().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
- Err(Error::Gateway(GatewayError::Closed(data))) => {
+ Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)),
+ Err(Error::Gateway(GatewayError::Closed(ref data))) => {
let num = data.as_ref().map(|d| d.status_code);
- let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
match num {
@@ -562,7 +528,7 @@ impl Shard {
"[Shard {:?}] Unknown unclean close {}: {:?}",
self.shard_info,
other,
- reason
+ data.as_ref().map(|d| &d.reason),
);
},
_ => {},
@@ -573,14 +539,14 @@ impl Shard {
self.session_id.is_some()
}).unwrap_or(true);
- if resume {
- self.resume().or_else(|_| self.reconnect()).and(Ok(None))
+ Ok(Some(if resume {
+ ShardAction::Resume
} else {
- self.reconnect().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Err(Error::WebSocket(why)) => {
- if let WebSocketError::NoDataAvailable = why {
+ Err(Error::WebSocket(ref why)) => {
+ if let WebSocketError::NoDataAvailable = *why {
if self.heartbeat_instants.1.is_none() {
return Ok(None);
}
@@ -592,44 +558,62 @@ impl Shard {
info!("[Shard {:?}] Will attempt to auto-reconnect",
self.shard_info);
- self.autoreconnect().and(Ok(None))
+ Ok(Some(ShardAction::Autoreconnect))
},
- Err(error) => Err(error),
+ _ => Ok(None),
+ }
+ }
+
+ pub fn check_heartbeat(&mut self) -> Result<()> {
+ let wait = {
+ let heartbeat_interval = match self.heartbeat_interval {
+ Some(heartbeat_interval) => heartbeat_interval,
+ None => return Ok(()),
+ };
+
+ StdDuration::from_secs(heartbeat_interval / 1000)
+ };
+
+ // If a duration of time less than the heartbeat_interval has passed,
+ // then don't perform a keepalive or attempt to reconnect.
+ if let Some(last_sent) = self.heartbeat_instants.0 {
+ if last_sent.elapsed() <= wait {
+ return Ok(());
+ }
+ }
+
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // auto-reconnect.
+ if !self.last_heartbeat_acknowledged {
+ debug!(
+ "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
+ self.shard_info,
+ );
+
+ return self.reconnect().map_err(|why| {
+ warn!(
+ "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
+ self.shard_info,
+ why,
+ );
+
+ why
+ });
+ }
+
+ // Otherwise, we're good to heartbeat.
+ if let Err(why) = self.heartbeat() {
+ warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
+
+ self.reconnect()
+ } else {
+ trace!("[Shard {:?}] Heartbeated", self.shard_info);
+
+ Ok(())
}
}
/// Calculates the heartbeat latency between the shard and the gateway.
- ///
- /// # Examples
- ///
- /// When using the [`Client`], output the latency in response to a `"~ping"`
- /// message handled through [`Client::on_message`].
- ///
- /// ```rust,no_run
- /// # use serenity::prelude::*;
- /// # use serenity::model::*;
- /// struct Handler;
- ///
- /// impl EventHandler for Handler {
- /// fn message(&self, ctx: Context, msg: Message) {
- /// if msg.content == "~ping" {
- /// if let Some(latency) = ctx.shard.lock().latency() {
- /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos());
- ///
- /// let _ = msg.channel_id.say(&s);
- /// } else {
- /// let _ = msg.channel_id.say("N/A");
- /// }
- /// }
- /// }
- /// }
- /// let mut client = Client::new("token", Handler).unwrap();
- ///
- /// client.start().unwrap();
- /// ```
- ///
- /// [`Client`]: ../struct.Client.html
- /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message
// Shamelessly stolen from brayzure's commit in eris:
// <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
pub fn latency(&self) -> Option<StdDuration> {
@@ -640,38 +624,68 @@ impl Shard {
}
}
- /// Shuts down the receiver by attempting to cleanly close the
- /// connection.
- pub fn shutdown_clean(&mut self) -> Result<()> {
- {
- let data = CloseData {
- status_code: 1000,
- reason: String::new(),
- };
-
- let message = OwnedMessage::Close(Some(data));
-
- self.client.send_message(&message)?;
+ #[cfg(feature = "voice")]
+ fn voice_dispatch(&mut self, event: &Event) {
+ if let Event::VoiceStateUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_state(&update.voice_state);
+ }
+ }
}
- let mut stream = self.client.stream_ref().as_tcp();
+ if let Event::VoiceServerUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_server(&update.endpoint, &update.token);
+ }
+ }
+ }
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ #[cfg(feature = "voice")]
+ pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> {
+ let mut messages = vec![];
- debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info);
+ while let Ok(v) = self.manager_rx.try_recv() {
+ messages.push(v);
+ }
- Ok(())
+ messages
}
- /// Uncleanly shuts down the receiver by not sending a close code.
- pub fn shutdown(&mut self) -> Result<()> {
- let mut stream = self.client.stream_ref().as_tcp();
+ /// Performs a deterministic reconnect.
+ ///
+ /// The type of reconnect is deterministic on whether a [`session_id`].
+ ///
+ /// If the `session_id` still exists, then a RESUME is sent. If not, then
+ /// an IDENTIFY is sent.
+ ///
+ /// Note that, if the shard is already in a stage of
+ /// [`ConnectionStage::Connecting`], then no action will be performed.
+ ///
+ /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting
+ /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id
+ pub fn autoreconnect(&mut self) -> Result<()> {
+ if self.stage == ConnectionStage::Connecting {
+ return Ok(());
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ if self.session_id().is_some() {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to resume",
+ self.shard_info,
+ );
- Ok(())
+ self.resume()
+ } else {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to reconnect",
+ self.shard_info,
+ );
+
+ self.reconnect()
+ }
}
/// Requests that one or multiple [`Guild`]s be chunked.
@@ -710,7 +724,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(2000), None);
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
/// # Ok(())
/// # }
/// #
@@ -740,7 +754,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(20), Some("do"));
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
/// # Ok(())
/// # }
/// #
@@ -753,280 +767,40 @@ impl Shard {
/// ../../model/event/enum.Event.html#variant.GuildMembersChunk
/// [`Guild`]: ../../model/struct.Guild.html
/// [`Member`]: ../../model/struct.Member.html
- pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>)
- where It: IntoIterator<Item=T> {
+ pub fn chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);
- let msg = json!({
- "op": OpCode::GetGuildMembers.num(),
- "d": {
- "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
- "limit": limit.unwrap_or(0),
- "query": query.unwrap_or(""),
- },
- });
-
- let _ = self.client.send_json(&msg);
- }
-
- /// Calculates the number of guilds that the shard is responsible for.
- ///
- /// If sharding is not being used (i.e. 1 shard), then the total number of
- /// [`Guild`] in the [`Cache`] will be used.
- ///
- /// **Note**: Requires the `cache` feature be enabled.
- ///
- /// # Examples
- ///
- /// Retrieve the number of guilds a shard is responsible for:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// let info = shard.shard_info();
- /// let guilds = shard.guilds_handled();
- ///
- /// println!("Shard {:?} is responsible for {} guilds", info, guilds);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`Cache`]: ../ext/cache/struct.Cache.html
- /// [`Guild`]: ../model/struct.Guild.html
- #[cfg(feature = "cache")]
- pub fn guilds_handled(&self) -> u16 {
- let cache = CACHE.read();
-
- let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]);
-
- cache
- .guilds
- .keys()
- .filter(|guild_id| {
- utils::shard_id(guild_id.0, shard_count) == shard_id
- })
- .count() as u16
- }
-
- #[cfg(feature = "voice")]
- fn voice_dispatch(&mut self, event: &Event) {
- if let Event::VoiceStateUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_state(&update.voice_state);
- }
- }
- }
-
- if let Event::VoiceServerUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_server(&update.endpoint, &update.token);
- }
- }
- }
- }
-
- #[cfg(feature = "voice")]
- pub(crate) fn cycle_voice_recv(&mut self) {
- if let Ok(v) = self.manager_rx.try_recv() {
- if let Err(why) = self.client.send_json(&v) {
- warn!("[Shard {:?}] Err sending voice msg: {:?}",
- self.shard_info,
- why);
- }
- }
+ self.client.send_chunk_guilds(
+ guild_ids,
+ &self.shard_info,
+ limit,
+ query,
+ )
}
- pub(crate) fn heartbeat(&mut self) -> Result<()> {
- let map = json!({
- "d": self.seq,
- "op": OpCode::Heartbeat.num(),
- });
-
- trace!("[Shard {:?}] Sending heartbeat d: {}",
- self.shard_info,
- self.seq);
-
- match self.client.send_json(&map) {
- Ok(_) => {
- self.heartbeat_instants.0 = Some(Instant::now());
- self.last_heartbeat_acknowledged = false;
-
- trace!("[Shard {:?}] Successfully heartbeated",
- self.shard_info);
-
- Ok(())
- },
- Err(why) => {
- match why {
- Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
- debug!("[Shard {:?}] Err heartbeating: {:?}",
- self.shard_info,
- err);
- },
- other => {
- warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
- self.shard_info,
- other);
- },
- }
-
- Err(Error::Gateway(GatewayError::HeartbeatFailed))
- },
- }
- }
-
- pub(crate) fn check_heartbeat(&mut self) -> Result<()> {
- let heartbeat_interval = match self.heartbeat_interval {
- Some(heartbeat_interval) => heartbeat_interval,
- None => return Ok(()),
- };
-
- let wait = StdDuration::from_secs(heartbeat_interval / 1000);
-
- // If a duration of time less than the heartbeat_interval has passed,
- // then don't perform a keepalive or attempt to reconnect.
- if let Some(last_sent) = self.heartbeat_instants.0 {
- if last_sent.elapsed() <= wait {
- return Ok(());
- }
- }
-
- // If the last heartbeat didn't receive an acknowledgement, then
- // auto-reconnect.
- if !self.last_heartbeat_acknowledged {
- debug!(
- "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
- self.shard_info,
- );
-
- return self.reconnect().map_err(|why| {
- warn!(
- "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
- self.shard_info,
- why,
- );
-
- why
- });
- }
-
- // Otherwise, we're good to heartbeat.
- trace!("[Shard {:?}] Heartbeating", self.shard_info);
-
- if let Err(why) = self.heartbeat() {
- warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
-
- self.reconnect()
- } else {
- trace!("[Shard {:?}] Heartbeated", self.shard_info);
- self.heartbeat_instants.0 = Some(Instant::now());
-
- Ok(())
- }
- }
-
- pub(crate) fn autoreconnect(&mut self) -> Result<()> {
- if self.stage == ConnectionStage::Connecting {
- return Ok(());
- }
-
- if self.session_id.is_some() {
- debug!("[Shard {:?}] Autoreconnector choosing to resume",
- self.shard_info);
-
- self.resume()
- } else {
- debug!("[Shard {:?}] Autoreconnector choosing to reconnect",
- self.shard_info);
-
- self.reconnect()
- }
- }
-
- /// Retrieves the `heartbeat_interval`.
- #[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
- self.heartbeat_interval
- }
-
- /// Retrieves the value of when the last heartbeat ack was received.
- #[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
- self.heartbeat_instants.1
- }
-
- fn reconnect(&mut self) -> Result<()> {
- info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
- self.reset();
-
- self.initialize()
- }
-
- // Attempts to send a RESUME message.
- //
- // # Examples
- //
- // Returns a `GatewayError::NoSessionId` is there is no `session_id`,
- // indicating that the shard should instead [`reconnect`].
+ // Sets the shard as going into identifying stage, which sets:
//
- // [`reconnect`]: #method.reconnect
- fn resume(&mut self) -> Result<()> {
- debug!("Shard {:?}] Attempting to resume", self.shard_info);
-
- self.initialize()?;
- self.stage = ConnectionStage::Resuming;
-
- self.send_resume().or_else(|why| {
- warn!("[Shard {:?}] Err sending resume: {:?}",
- self.shard_info,
- why);
-
- self.reconnect()
- })
- }
-
- fn send_resume(&mut self) -> Result<()> {
- let session_id = match self.session_id.clone() {
- Some(session_id) => session_id,
- None => return Err(Error::Gateway(GatewayError::NoSessionId)),
- };
+ // - the time that the last heartbeat sent as being now
+ // - the `stage` to `Identifying`
+ pub fn identify(&mut self) -> Result<()> {
+ self.client.send_identify(&self.shard_info, &self.token.lock())?;
- debug!("[Shard {:?}] Sending resume; seq: {}",
- self.shard_info,
- self.seq);
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
- self.client.send_json(&json!({
- "op": OpCode::Resume.num(),
- "d": {
- "session_id": session_id,
- "seq": self.seq,
- "token": &*self.token.lock(),
- },
- }))
+ Ok(())
}
/// Initializes a new WebSocket client.
///
/// This will set the stage of the shard before and after instantiation of
/// the client.
- fn initialize(&mut self) -> Result<()> {
+ pub fn initialize(&mut self) -> Result<WsClient> {
debug!("[Shard {:?}] Initializing", self.shard_info);
// We need to do two, sort of three things here:
@@ -1038,38 +812,15 @@ impl Shard {
// This is used to accurately assess whether the state of the shard is
// accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
- self.client = connect(&self.ws_url.lock())?;
+ let mut client = connect(&self.ws_url.lock())?;
self.stage = ConnectionStage::Handshake;
- Ok(())
- }
-
- fn identify(&mut self) -> Result<()> {
- let identification = json!({
- "op": OpCode::Identify.num(),
- "d": {
- "compression": true,
- "large_threshold": constants::LARGE_THRESHOLD,
- "shard": self.shard_info,
- "token": &*self.token.lock(),
- "v": constants::GATEWAY_VERSION,
- "properties": {
- "$browser": "serenity",
- "$device": "serenity",
- "$os": consts::OS,
- },
- },
- });
-
- self.heartbeat_instants.0 = Some(Instant::now());
- self.stage = ConnectionStage::Identifying;
+ let _ = set_client_timeout(&mut client);
- debug!("[Shard {:?}] Identifying", self.shard_info);
-
- self.client.send_json(&identification)
+ Ok(client)
}
- fn reset(&mut self) {
+ pub fn reset(&mut self) {
self.heartbeat_instants = (Some(Instant::now()), None);
self.heartbeat_interval = None;
self.last_heartbeat_acknowledged = true;
@@ -1078,42 +829,39 @@ impl Shard {
self.seq = 0;
}
- fn update_presence(&mut self) {
- let (ref game, status) = self.current_presence;
- let now = Utc::now().timestamp() as u64;
-
- let msg = json!({
- "op": OpCode::StatusUpdate.num(),
- "d": {
- "afk": false,
- "since": now,
- "status": status.name(),
- "game": game.as_ref().map(|x| json!({
- "name": x.name,
- "type": x.kind,
- "url": x.url,
- })),
- },
- });
+ pub fn resume(&mut self) -> Result<()> {
+ debug!("Shard {:?}] Attempting to resume", self.shard_info);
- debug!("[Shard {:?}] Sending presence update", self.shard_info);
+ self.client = self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
- if let Err(why) = self.client.send_json(&msg) {
- warn!("[Shard {:?}] Err sending presence update: {:?}",
- self.shard_info,
- why);
+ match self.session_id.as_ref() {
+ Some(session_id) => {
+ self.client.send_resume(
+ &self.shard_info,
+ session_id,
+ &self.seq,
+ &self.token.lock(),
+ )
+ },
+ None => Err(Error::Gateway(GatewayError::NoSessionId)),
}
+ }
- #[cfg(feature = "cache")]
- {
- let mut cache = CACHE.write();
- let current_user_id = cache.user.id;
+ pub fn reconnect(&mut self) -> Result<()> {
+ info!("[Shard {:?}] Attempting to reconnect", self.shard_info());
- cache.presences.get_mut(&current_user_id).map(|presence| {
- presence.game = game.clone();
- presence.last_modified = Some(now);
- });
- }
+ self.reset();
+ self.client = self.initialize()?;
+
+ Ok(())
+ }
+
+ pub fn update_presence(&mut self) -> Result<()> {
+ self.client.send_presence_update(
+ &self.shard_info,
+ &self.current_presence,
+ )
}
}
@@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> {
Error::Gateway(GatewayError::BuildingUrl)
})
}
-
-/// Tries to connect and upon failure, retries.
-fn connecting(uri: &str) -> WsClient {
- let waiting_time = 30;
-
- loop {
- match connect(&uri) {
- Ok(client) => return client,
- Err(why) => {
- warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time);
- thread::sleep(StdDuration::from_secs(waiting_time));
- },
- };
- }
-}
diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs
new file mode 100644
index 0000000..873d114
--- /dev/null
+++ b/src/gateway/ws_client_ext.rs
@@ -0,0 +1,133 @@
+use chrono::Utc;
+use constants::{self, OpCode};
+use gateway::{CurrentPresence, WsClient};
+use internal::prelude::*;
+use internal::ws_impl::SenderExt;
+use model::GuildId;
+use std::env::consts;
+
+pub trait WebSocketGatewayClientExt {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId>;
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()>;
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()>;
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()>;
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()>;
+}
+
+impl WebSocketGatewayClientExt for WsClient {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
+ debug!("[Shard {:?}] Requesting member chunks", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::GetGuildMembers.num(),
+ "d": {
+ "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
+ "limit": limit.unwrap_or(0),
+ "query": query.unwrap_or(""),
+ },
+ })).map_err(From::from)
+ }
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()> {
+ trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq);
+
+ self.send_json(&json!({
+ "d": seq,
+ "op": OpCode::Heartbeat.num(),
+ })).map_err(From::from)
+ }
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()> {
+ debug!("[Shard {:?}] Identifying", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::Identify.num(),
+ "d": {
+ "compression": true,
+ "large_threshold": constants::LARGE_THRESHOLD,
+ "shard": shard_info,
+ "token": token,
+ "v": constants::GATEWAY_VERSION,
+ "properties": {
+ "$browser": "serenity",
+ "$device": "serenity",
+ "$os": consts::OS,
+ },
+ },
+ }))
+ }
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()> {
+ let &(ref game, ref status) = current_presence;
+ let now = Utc::now().timestamp() as u64;
+
+ debug!("[Shard {:?}] Sending presence update", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::StatusUpdate.num(),
+ "d": {
+ "afk": false,
+ "since": now,
+ "status": status.name(),
+ "game": game.as_ref().map(|x| json!({
+ "name": x.name,
+ "type": x.kind,
+ "url": x.url,
+ })),
+ },
+ }))
+ }
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()> {
+ debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq);
+
+ self.send_json(&json!({
+ "op": OpCode::Resume.num(),
+ "d": {
+ "session_id": session_id,
+ "seq": seq,
+ "token": token,
+ },
+ })).map_err(From::from)
+ }
+}