aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/bridge/gateway/shard_manager.rs')
-rw-r--r--src/client/bridge/gateway/shard_manager.rs283
1 files changed, 231 insertions, 52 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
index 6e3b285..2cf45fd 100644
--- a/src/client/bridge/gateway/shard_manager.rs
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -1,13 +1,15 @@
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
use super::super::super::EventHandler;
use super::{
+ ShardClientMessage,
ShardId,
ShardManagerMessage,
+ ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
@@ -18,8 +20,80 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
+/// A manager for handling the status of shards by starting them, restarting
+/// them, and stopping them when required.
+///
+/// **Note**: The [`Client`] internally uses a shard manager. If you are using a
+/// Client, then you do not need to make one of these.
+///
+/// # Examples
+///
+/// Initialize a shard manager with a framework responsible for shards 0 through
+/// 2, of 5 total shards:
+///
+/// ```rust,no_run
+/// extern crate parking_lot;
+/// extern crate serenity;
+/// extern crate threadpool;
+/// extern crate typemap;
+///
+/// # use std::error::Error;
+/// #
+/// # #[cfg(feature = "framework")]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// #
+/// use parking_lot::Mutex;
+/// use serenity::client::bridge::gateway::ShardManager;
+/// use serenity::client::EventHandler;
+/// use serenity::http;
+/// use std::sync::Arc;
+/// use std::env;
+/// use threadpool::ThreadPool;
+/// use typemap::ShareMap;
+///
+/// struct Handler;
+///
+/// impl EventHandler for Handler { }
+///
+/// let token = env::var("DISCORD_TOKEN")?;
+/// http::set_token(&token);
+/// let token = Arc::new(Mutex::new(token));
+///
+/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url));
+/// let data = Arc::new(Mutex::new(ShareMap::custom()));
+/// let event_handler = Arc::new(Handler);
+/// let framework = Arc::new(Mutex::new(None));
+/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5);
+///
+/// ShardManager::new(
+/// 0, // the shard index to start initiating from
+/// 3, // the number of shards to initiate (this initiates 0, 1, and 2)
+/// 5, // the total number of shards in use
+/// gateway_url,
+/// token,
+/// data,
+/// event_handler,
+/// framework,
+/// threadpool,
+/// );
+/// # Ok(())
+/// # }
+/// #
+/// # #[cfg(not(feature = "framework"))]
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Client`]: ../../struct.Client.html
+#[derive(Debug)]
pub struct ShardManager {
- pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The shard runners currently managed.
+ runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// The index of the first shard to initialize, 0-indexed.
shard_index: u64,
/// The number of shards to initialize.
@@ -27,10 +101,11 @@ pub struct ShardManager {
/// The total shards in use, 1-indexed.
shard_total: u64,
shard_queuer: Sender<ShardQueuerMessage>,
- thread_rx: Receiver<ShardManagerMessage>,
}
impl ShardManager {
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(feature = "framework")]
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new<H>(
@@ -43,7 +118,7 @@ impl ShardManager {
event_handler: Arc<H>,
framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
@@ -66,16 +141,22 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
}
+ /// Creates a new shard manager, returning both the manager and a monitor
+ /// for usage in a separate thread.
#[cfg(not(feature = "framework"))]
pub fn new<H>(
shard_index: u64,
@@ -86,21 +167,21 @@ impl ShardManager {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
threadpool: ThreadPool,
- ) -> Self where H: EventHandler + Send + Sync + 'static {
+ ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static {
let (thread_tx, thread_rx) = mpsc::channel();
let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
let runners = Arc::new(Mutex::new(HashMap::new()));
let mut shard_queuer = ShardQueuer {
- data: data.clone(),
- event_handler: event_handler.clone(),
+ data: Arc::clone(&data),
+ event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
- runners: runners.clone(),
+ runners: Arc::clone(&runners),
rx: shard_queue_rx,
- token: token.clone(),
- ws_url: ws_url.clone(),
+ token: Arc::clone(&token),
+ ws_url: Arc::clone(&ws_url),
threadpool,
};
@@ -108,21 +189,41 @@ impl ShardManager {
shard_queuer.run();
});
- Self {
+ let manager = Arc::new(Mutex::new(Self {
shard_queuer: shard_queue_tx,
- thread_rx: thread_rx,
runners,
shard_index,
shard_init,
shard_total,
- }
+ }));
+
+ (Arc::clone(&manager), ShardManagerMonitor {
+ rx: thread_rx,
+ manager,
+ })
+ }
+
+ /// Returns whether the shard manager contains either an active instance of
+ /// a shard runner responsible for the given ID.
+ ///
+ /// If a shard has been queued but has not yet been initiated, then this
+ /// will return `false`. Consider double-checking [`is_responsible_for`] to
+ /// determine whether this shard manager is responsible for the given shard.
+ ///
+ /// [`is_responsible_for`]: #method.is_responsible_for
+ pub fn has(&self, shard_id: ShardId) -> bool {
+ self.runners.lock().contains_key(&shard_id)
}
+ /// Initializes all shards that the manager is responsible for.
+ ///
+ /// This will communicate shard boots with the [`ShardQueuer`] so that they
+ /// are properly queued.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
pub fn initialize(&mut self) -> Result<()> {
let shard_to = self.shard_index + self.shard_init;
- debug!("{}, {}", self.shard_index, self.shard_init);
-
for shard_id in self.shard_index..shard_to {
let shard_total = self.shard_total;
@@ -132,39 +233,52 @@ impl ShardManager {
Ok(())
}
- pub fn run(&mut self) {
- while let Ok(value) = self.thread_rx.recv() {
- match value {
- ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
- ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
- ShardManagerMessage::ShutdownAll => {
- self.shutdown_all();
-
- break;
- },
- }
- }
- }
-
- pub fn shutdown_all(&mut self) {
- info!("Shutting down all shards");
- let keys = {
- self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
- };
+ /// Sets the new sharding information for the manager.
+ ///
+ /// This will shutdown all existing shards.
+ ///
+ /// This will _not_ instantiate the new shards.
+ pub fn set_shards(&mut self, index: u64, init: u64, total: u64) {
+ self.shutdown_all();
- for shard_id in keys {
- self.shutdown(shard_id);
- }
+ self.shard_index = index;
+ self.shard_init = init;
+ self.shard_total = total;
}
- fn boot(&mut self, shard_info: [ShardId; 2]) {
- info!("Telling shard queuer to start shard {}", shard_info[0]);
-
- let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
- let _ = self.shard_queuer.send(msg);
- }
-
- fn restart(&mut self, shard_id: ShardId) {
+ /// Restarts a shard runner.
+ ///
+ /// This sends a shutdown signal to a shard's associated [`ShardRunner`],
+ /// and then queues a initialization of a shard runner for the same shard
+ /// via the [`ShardQueuer`].
+ ///
+ /// # Examples
+ ///
+ /// Creating a client and then restarting a shard by ID:
+ ///
+ /// _(note: in reality this precise code doesn't have an effect since the
+ /// shard would not yet have been initialized via [`initialize`], but the
+ /// concept is the same)_
+ ///
+ /// ```rust,no_run
+ /// use serenity::client::bridge::gateway::ShardId;
+ /// use serenity::client::{Client, EventHandler};
+ /// use std::env;
+ ///
+ /// struct Handler;
+ ///
+ /// impl EventHandler for Handler { }
+ ///
+ /// let token = env::var("DISCORD_TOKEN").unwrap();
+ /// let mut client = Client::new(&token, Handler).unwrap();
+ ///
+ /// // restart shard ID 7
+ /// client.shard_manager.lock().restart(ShardId(7));
+ /// ```
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn restart(&mut self, shard_id: ShardId) {
info!("Restarting shard {}", shard_id);
self.shutdown(shard_id);
@@ -173,23 +287,88 @@ impl ShardManager {
self.boot([shard_id, ShardId(shard_total)]);
}
- fn shutdown(&mut self, shard_id: ShardId) {
+ /// Returns the [`ShardId`]s of the shards that have been instantiated and
+ /// currently have a valid [`ShardRunner`].
+ ///
+ /// [`ShardId`]: struct.ShardId.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
+ pub fn shards_instantiated(&self) -> Vec<ShardId> {
+ self.runners.lock().keys().cloned().collect()
+ }
+
+ /// Attempts to shut down the shard runner by Id.
+ ///
+ /// Returns a boolean indicating whether a shard runner was present. This is
+ /// _not_ necessary an indicator of whether the shard runner was
+ /// successfully shut down.
+ ///
+ /// **Note**: If the receiving end of an mpsc channel - theoretically owned
+ /// by the shard runner - no longer exists, then the shard runner will not
+ /// know it should shut down. This _should never happen_. It may already be
+ /// stopped.
+ pub fn shutdown(&mut self, shard_id: ShardId) -> bool {
info!("Shutting down shard {}", shard_id);
if let Some(runner) = self.runners.lock().get(&shard_id) {
- let msg = ShardManagerMessage::Shutdown(shard_id);
+ let shutdown = ShardManagerMessage::Shutdown(shard_id);
+ let msg = ShardClientMessage::Manager(shutdown);
if let Err(why) = runner.runner_tx.send(msg) {
- warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ warn!(
+ "Failed to cleanly shutdown shard {}: {:?}",
+ shard_id,
+ why,
+ );
}
}
- self.runners.lock().remove(&shard_id);
+ self.runners.lock().remove(&shard_id).is_some()
+ }
+
+ /// Sends a shutdown message for all shards that the manager is responsible
+ /// for that are still known to be running.
+ ///
+ /// If you only need to shutdown a select number of shards, prefer looping
+ /// over the [`shutdown`] method.
+ ///
+ /// [`shutdown`]: #method.shutdown
+ pub fn shutdown_all(&mut self) {
+ let keys = {
+ let runners = self.runners.lock();
+
+ if runners.is_empty() {
+ return;
+ }
+
+ runners.keys().cloned().collect::<Vec<_>>()
+ };
+
+ info!("Shutting down all shards");
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
}
}
impl Drop for ShardManager {
+ /// A custom drop implementation to clean up after the manager.
+ ///
+ /// This shuts down all active [`ShardRunner`]s and attempts to tell the
+ /// [`ShardQueuer`] to shutdown.
+ ///
+ /// [`ShardQueuer`]: struct.ShardQueuer.html
+ /// [`ShardRunner`]: struct.ShardRunner.html
fn drop(&mut self) {
+ self.shutdown_all();
+
if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
warn!("Failed to send shutdown to shard queuer: {:?}", why);
}