diff options
Diffstat (limited to 'src/client/bridge/gateway/shard_manager.rs')
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 283 |
1 files changed, 231 insertions, 52 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index 6e3b285..2cf45fd 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,13 +1,15 @@ use internal::prelude::*; use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::mpsc::{self, Sender}; use std::sync::Arc; use std::thread; use super::super::super::EventHandler; use super::{ + ShardClientMessage, ShardId, ShardManagerMessage, + ShardManagerMonitor, ShardQueuer, ShardQueuerMessage, ShardRunnerInfo, @@ -18,8 +20,80 @@ use typemap::ShareMap; #[cfg(feature = "framework")] use framework::Framework; +/// A manager for handling the status of shards by starting them, restarting +/// them, and stopping them when required. +/// +/// **Note**: The [`Client`] internally uses a shard manager. If you are using a +/// Client, then you do not need to make one of these. +/// +/// # Examples +/// +/// Initialize a shard manager with a framework responsible for shards 0 through +/// 2, of 5 total shards: +/// +/// ```rust,no_run +/// extern crate parking_lot; +/// extern crate serenity; +/// extern crate threadpool; +/// extern crate typemap; +/// +/// # use std::error::Error; +/// # +/// # #[cfg(feature = "framework")] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # +/// use parking_lot::Mutex; +/// use serenity::client::bridge::gateway::ShardManager; +/// use serenity::client::EventHandler; +/// use serenity::http; +/// use std::sync::Arc; +/// use std::env; +/// use threadpool::ThreadPool; +/// use typemap::ShareMap; +/// +/// struct Handler; +/// +/// impl EventHandler for Handler { } +/// +/// let token = env::var("DISCORD_TOKEN")?; +/// http::set_token(&token); +/// let token = Arc::new(Mutex::new(token)); +/// +/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url)); +/// let data = Arc::new(Mutex::new(ShareMap::custom())); +/// let event_handler = Arc::new(Handler); +/// let framework = Arc::new(Mutex::new(None)); +/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5); +/// +/// ShardManager::new( +/// 0, // the shard index to start initiating from +/// 3, // the number of shards to initiate (this initiates 0, 1, and 2) +/// 5, // the total number of shards in use +/// gateway_url, +/// token, +/// data, +/// event_handler, +/// framework, +/// threadpool, +/// ); +/// # Ok(()) +/// # } +/// # +/// # #[cfg(not(feature = "framework"))] +/// # fn try_main() -> Result<(), Box<Error>> { +/// # Ok(()) +/// # } +/// # +/// # fn main() { +/// # try_main().unwrap(); +/// # } +/// ``` +/// +/// [`Client`]: ../../struct.Client.html +#[derive(Debug)] pub struct ShardManager { - pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// The shard runners currently managed. + runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, /// The index of the first shard to initialize, 0-indexed. shard_index: u64, /// The number of shards to initialize. @@ -27,10 +101,11 @@ pub struct ShardManager { /// The total shards in use, 1-indexed. shard_total: u64, shard_queuer: Sender<ShardQueuerMessage>, - thread_rx: Receiver<ShardManagerMessage>, } impl ShardManager { + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(feature = "framework")] #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new<H>( @@ -43,7 +118,7 @@ impl ShardManager { event_handler: Arc<H>, framework: Arc<Mutex<Option<Box<Framework + Send>>>>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); @@ -66,16 +141,22 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) } + /// Creates a new shard manager, returning both the manager and a monitor + /// for usage in a separate thread. #[cfg(not(feature = "framework"))] pub fn new<H>( shard_index: u64, @@ -86,21 +167,21 @@ impl ShardManager { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, threadpool: ThreadPool, - ) -> Self where H: EventHandler + Send + Sync + 'static { + ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { let (thread_tx, thread_rx) = mpsc::channel(); let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); let runners = Arc::new(Mutex::new(HashMap::new())); let mut shard_queuer = ShardQueuer { - data: data.clone(), - event_handler: event_handler.clone(), + data: Arc::clone(&data), + event_handler: Arc::clone(&event_handler), last_start: None, manager_tx: thread_tx.clone(), - runners: runners.clone(), + runners: Arc::clone(&runners), rx: shard_queue_rx, - token: token.clone(), - ws_url: ws_url.clone(), + token: Arc::clone(&token), + ws_url: Arc::clone(&ws_url), threadpool, }; @@ -108,21 +189,41 @@ impl ShardManager { shard_queuer.run(); }); - Self { + let manager = Arc::new(Mutex::new(Self { shard_queuer: shard_queue_tx, - thread_rx: thread_rx, runners, shard_index, shard_init, shard_total, - } + })); + + (Arc::clone(&manager), ShardManagerMonitor { + rx: thread_rx, + manager, + }) + } + + /// Returns whether the shard manager contains either an active instance of + /// a shard runner responsible for the given ID. + /// + /// If a shard has been queued but has not yet been initiated, then this + /// will return `false`. Consider double-checking [`is_responsible_for`] to + /// determine whether this shard manager is responsible for the given shard. + /// + /// [`is_responsible_for`]: #method.is_responsible_for + pub fn has(&self, shard_id: ShardId) -> bool { + self.runners.lock().contains_key(&shard_id) } + /// Initializes all shards that the manager is responsible for. + /// + /// This will communicate shard boots with the [`ShardQueuer`] so that they + /// are properly queued. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html pub fn initialize(&mut self) -> Result<()> { let shard_to = self.shard_index + self.shard_init; - debug!("{}, {}", self.shard_index, self.shard_init); - for shard_id in self.shard_index..shard_to { let shard_total = self.shard_total; @@ -132,39 +233,52 @@ impl ShardManager { Ok(()) } - pub fn run(&mut self) { - while let Ok(value) = self.thread_rx.recv() { - match value { - ShardManagerMessage::Restart(shard_id) => self.restart(shard_id), - ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id), - ShardManagerMessage::ShutdownAll => { - self.shutdown_all(); - - break; - }, - } - } - } - - pub fn shutdown_all(&mut self) { - info!("Shutting down all shards"); - let keys = { - self.runners.lock().keys().cloned().collect::<Vec<ShardId>>() - }; + /// Sets the new sharding information for the manager. + /// + /// This will shutdown all existing shards. + /// + /// This will _not_ instantiate the new shards. + pub fn set_shards(&mut self, index: u64, init: u64, total: u64) { + self.shutdown_all(); - for shard_id in keys { - self.shutdown(shard_id); - } + self.shard_index = index; + self.shard_init = init; + self.shard_total = total; } - fn boot(&mut self, shard_info: [ShardId; 2]) { - info!("Telling shard queuer to start shard {}", shard_info[0]); - - let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); - let _ = self.shard_queuer.send(msg); - } - - fn restart(&mut self, shard_id: ShardId) { + /// Restarts a shard runner. + /// + /// This sends a shutdown signal to a shard's associated [`ShardRunner`], + /// and then queues a initialization of a shard runner for the same shard + /// via the [`ShardQueuer`]. + /// + /// # Examples + /// + /// Creating a client and then restarting a shard by ID: + /// + /// _(note: in reality this precise code doesn't have an effect since the + /// shard would not yet have been initialized via [`initialize`], but the + /// concept is the same)_ + /// + /// ```rust,no_run + /// use serenity::client::bridge::gateway::ShardId; + /// use serenity::client::{Client, EventHandler}; + /// use std::env; + /// + /// struct Handler; + /// + /// impl EventHandler for Handler { } + /// + /// let token = env::var("DISCORD_TOKEN").unwrap(); + /// let mut client = Client::new(&token, Handler).unwrap(); + /// + /// // restart shard ID 7 + /// client.shard_manager.lock().restart(ShardId(7)); + /// ``` + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn restart(&mut self, shard_id: ShardId) { info!("Restarting shard {}", shard_id); self.shutdown(shard_id); @@ -173,23 +287,88 @@ impl ShardManager { self.boot([shard_id, ShardId(shard_total)]); } - fn shutdown(&mut self, shard_id: ShardId) { + /// Returns the [`ShardId`]s of the shards that have been instantiated and + /// currently have a valid [`ShardRunner`]. + /// + /// [`ShardId`]: struct.ShardId.html + /// [`ShardRunner`]: struct.ShardRunner.html + pub fn shards_instantiated(&self) -> Vec<ShardId> { + self.runners.lock().keys().cloned().collect() + } + + /// Attempts to shut down the shard runner by Id. + /// + /// Returns a boolean indicating whether a shard runner was present. This is + /// _not_ necessary an indicator of whether the shard runner was + /// successfully shut down. + /// + /// **Note**: If the receiving end of an mpsc channel - theoretically owned + /// by the shard runner - no longer exists, then the shard runner will not + /// know it should shut down. This _should never happen_. It may already be + /// stopped. + pub fn shutdown(&mut self, shard_id: ShardId) -> bool { info!("Shutting down shard {}", shard_id); if let Some(runner) = self.runners.lock().get(&shard_id) { - let msg = ShardManagerMessage::Shutdown(shard_id); + let shutdown = ShardManagerMessage::Shutdown(shard_id); + let msg = ShardClientMessage::Manager(shutdown); if let Err(why) = runner.runner_tx.send(msg) { - warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why); + warn!( + "Failed to cleanly shutdown shard {}: {:?}", + shard_id, + why, + ); } } - self.runners.lock().remove(&shard_id); + self.runners.lock().remove(&shard_id).is_some() + } + + /// Sends a shutdown message for all shards that the manager is responsible + /// for that are still known to be running. + /// + /// If you only need to shutdown a select number of shards, prefer looping + /// over the [`shutdown`] method. + /// + /// [`shutdown`]: #method.shutdown + pub fn shutdown_all(&mut self) { + let keys = { + let runners = self.runners.lock(); + + if runners.is_empty() { + return; + } + + runners.keys().cloned().collect::<Vec<_>>() + }; + + info!("Shutting down all shards"); + + for shard_id in keys { + self.shutdown(shard_id); + } + } + + fn boot(&mut self, shard_info: [ShardId; 2]) { + info!("Telling shard queuer to start shard {}", shard_info[0]); + + let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); + let _ = self.shard_queuer.send(msg); } } impl Drop for ShardManager { + /// A custom drop implementation to clean up after the manager. + /// + /// This shuts down all active [`ShardRunner`]s and attempts to tell the + /// [`ShardQueuer`] to shutdown. + /// + /// [`ShardQueuer`]: struct.ShardQueuer.html + /// [`ShardRunner`]: struct.ShardRunner.html fn drop(&mut self) { + self.shutdown_all(); + if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { warn!("Failed to send shutdown to shard queuer: {:?}", why); } |