diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-24 15:48:02 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-09-24 15:53:23 -0700 |
| commit | 6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch) | |
| tree | e3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client/bridge/gateway/shard_manager.rs | |
| parent | Use $crate for CommandError (diff) | |
| download | serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip | |
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client/bridge/gateway/shard_manager.rs')
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs new file mode 100644 index 0000000..0ac9d19 --- /dev/null +++ b/src/client/bridge/gateway/shard_manager.rs @@ -0,0 +1,171 @@ +use internal::prelude::*; +use parking_lot::Mutex as ParkingLotMutex; +use std::collections::HashMap; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; +use super::super::super::EventHandler; +use super::{ + ShardId, + ShardManagerMessage, + ShardQueuer, + ShardQueuerMessage, + ShardRunnerInfo, +}; +use typemap::ShareMap; + +#[cfg(feature = "framework")] +use framework::Framework; + +pub struct ShardManager { + #[cfg(feature = "framework")] + runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// The index of the first shard to initialize, 0-indexed. + shard_index: u64, + /// The number of shards to initialize. + shard_init: u64, + /// The total shards in use, 1-indexed. + shard_total: u64, + shard_queuer: Sender<ShardQueuerMessage>, + thread_rx: Receiver<ShardManagerMessage>, +} + +impl ShardManager { + #[cfg(feature = "framework")] + pub fn new<H>( + shard_index: u64, + shard_init: u64, + shard_total: u64, + ws_url: Arc<Mutex<String>>, + token: Arc<Mutex<String>>, + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + ) -> Self where H: EventHandler + Send + Sync + 'static { + let (thread_tx, thread_rx) = mpsc::channel(); + let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); + + let runners = Arc::new(ParkingLotMutex::new(HashMap::new())); + + let mut shard_queuer = feature_framework! {{ + ShardQueuer { + data: data.clone(), + event_handler: event_handler.clone(), + framework: framework.clone(), + last_start: None, + manager_tx: thread_tx.clone(), + runners: runners.clone(), + rx: shard_queue_rx, + token: token.clone(), + ws_url: ws_url.clone(), + } + } else { + ShardQueuer { + data: data.clone(), + event_handler: event_handler.clone(), + last_start: None, + manager_tx: thread_tx.clone(), + runners: runners.clone(), + rx: shard_queue_rx, + rx: shard_queue_rx, + token: token.clone(), + ws_url: ws_url.clone(), + } + }}; + + thread::spawn(move || { + shard_queuer.run(); + }); + + Self { + shard_queuer: shard_queue_tx, + thread_rx: thread_rx, + runners, + shard_index, + shard_init, + shard_total, + } + } + + pub fn initialize(&mut self) -> Result<()> { + let shard_to = self.shard_index + self.shard_init; + + debug!("{}, {}", self.shard_index, self.shard_init); + + for shard_id in self.shard_index..shard_to { + let shard_total = self.shard_total; + + self.boot([ShardId(shard_id), ShardId(shard_total)]); + } + + Ok(()) + } + + pub fn run(&mut self) { + loop { + let value = match self.thread_rx.recv() { + Ok(value) => value, + Err(_) => break, + }; + + match value { + ShardManagerMessage::Restart(shard_id) => self.restart(shard_id), + ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id), + ShardManagerMessage::ShutdownAll => { + self.shutdown_all(); + + break; + }, + } + } + } + + pub fn shutdown_all(&mut self) { + info!("Shutting down all shards"); + let keys = { + self.runners.lock().keys().cloned().collect::<Vec<ShardId>>() + }; + + for shard_id in keys { + self.shutdown(shard_id); + } + } + + fn boot(&mut self, shard_info: [ShardId; 2]) { + info!("Telling shard queuer to start shard {}", shard_info[0]); + + let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); + let _ = self.shard_queuer.send(msg); + } + + fn restart(&mut self, shard_id: ShardId) { + info!("Restarting shard {}", shard_id); + self.shutdown(shard_id); + + let shard_total = self.shard_total; + + self.boot([shard_id, ShardId(shard_total)]); + } + + fn shutdown(&mut self, shard_id: ShardId) { + info!("Shutting down shard {}", shard_id); + + if let Some(runner) = self.runners.lock().get(&shard_id) { + let msg = ShardManagerMessage::Shutdown(shard_id); + + if let Err(why) = runner.runner_tx.send(msg) { + warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why); + } + } + + self.runners.lock().remove(&shard_id); + } +} + +impl Drop for ShardManager { + fn drop(&mut self) { + if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { + warn!("Failed to send shutdown to shard queuer: {:?}", why); + } + } +} |