aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_manager.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-24 15:48:02 -0700
committerZeyla Hellyer <[email protected]>2017-09-24 15:53:23 -0700
commit6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch)
treee3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client/bridge/gateway/shard_manager.rs
parentUse $crate for CommandError (diff)
downloadserenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz
serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client/bridge/gateway/shard_manager.rs')
-rw-r--r--src/client/bridge/gateway/shard_manager.rs171
1 files changed, 171 insertions, 0 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
new file mode 100644
index 0000000..0ac9d19
--- /dev/null
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -0,0 +1,171 @@
+use internal::prelude::*;
+use parking_lot::Mutex as ParkingLotMutex;
+use std::collections::HashMap;
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use super::super::super::EventHandler;
+use super::{
+ ShardId,
+ ShardManagerMessage,
+ ShardQueuer,
+ ShardQueuerMessage,
+ ShardRunnerInfo,
+};
+use typemap::ShareMap;
+
+#[cfg(feature = "framework")]
+use framework::Framework;
+
+pub struct ShardManager {
+ #[cfg(feature = "framework")]
+ runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The index of the first shard to initialize, 0-indexed.
+ shard_index: u64,
+ /// The number of shards to initialize.
+ shard_init: u64,
+ /// The total shards in use, 1-indexed.
+ shard_total: u64,
+ shard_queuer: Sender<ShardQueuerMessage>,
+ thread_rx: Receiver<ShardManagerMessage>,
+}
+
+impl ShardManager {
+ #[cfg(feature = "framework")]
+ pub fn new<H>(
+ shard_index: u64,
+ shard_init: u64,
+ shard_total: u64,
+ ws_url: Arc<Mutex<String>>,
+ token: Arc<Mutex<String>>,
+ data: Arc<ParkingLotMutex<ShareMap>>,
+ event_handler: Arc<H>,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ ) -> Self where H: EventHandler + Send + Sync + 'static {
+ let (thread_tx, thread_rx) = mpsc::channel();
+ let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
+
+ let runners = Arc::new(ParkingLotMutex::new(HashMap::new()));
+
+ let mut shard_queuer = feature_framework! {{
+ ShardQueuer {
+ data: data.clone(),
+ event_handler: event_handler.clone(),
+ framework: framework.clone(),
+ last_start: None,
+ manager_tx: thread_tx.clone(),
+ runners: runners.clone(),
+ rx: shard_queue_rx,
+ token: token.clone(),
+ ws_url: ws_url.clone(),
+ }
+ } else {
+ ShardQueuer {
+ data: data.clone(),
+ event_handler: event_handler.clone(),
+ last_start: None,
+ manager_tx: thread_tx.clone(),
+ runners: runners.clone(),
+ rx: shard_queue_rx,
+ rx: shard_queue_rx,
+ token: token.clone(),
+ ws_url: ws_url.clone(),
+ }
+ }};
+
+ thread::spawn(move || {
+ shard_queuer.run();
+ });
+
+ Self {
+ shard_queuer: shard_queue_tx,
+ thread_rx: thread_rx,
+ runners,
+ shard_index,
+ shard_init,
+ shard_total,
+ }
+ }
+
+ pub fn initialize(&mut self) -> Result<()> {
+ let shard_to = self.shard_index + self.shard_init;
+
+ debug!("{}, {}", self.shard_index, self.shard_init);
+
+ for shard_id in self.shard_index..shard_to {
+ let shard_total = self.shard_total;
+
+ self.boot([ShardId(shard_id), ShardId(shard_total)]);
+ }
+
+ Ok(())
+ }
+
+ pub fn run(&mut self) {
+ loop {
+ let value = match self.thread_rx.recv() {
+ Ok(value) => value,
+ Err(_) => break,
+ };
+
+ match value {
+ ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
+ ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
+ ShardManagerMessage::ShutdownAll => {
+ self.shutdown_all();
+
+ break;
+ },
+ }
+ }
+ }
+
+ pub fn shutdown_all(&mut self) {
+ info!("Shutting down all shards");
+ let keys = {
+ self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
+ };
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
+ }
+
+ fn restart(&mut self, shard_id: ShardId) {
+ info!("Restarting shard {}", shard_id);
+ self.shutdown(shard_id);
+
+ let shard_total = self.shard_total;
+
+ self.boot([shard_id, ShardId(shard_total)]);
+ }
+
+ fn shutdown(&mut self, shard_id: ShardId) {
+ info!("Shutting down shard {}", shard_id);
+
+ if let Some(runner) = self.runners.lock().get(&shard_id) {
+ let msg = ShardManagerMessage::Shutdown(shard_id);
+
+ if let Err(why) = runner.runner_tx.send(msg) {
+ warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ }
+ }
+
+ self.runners.lock().remove(&shard_id);
+ }
+}
+
+impl Drop for ShardManager {
+ fn drop(&mut self) {
+ if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
+ warn!("Failed to send shutdown to shard queuer: {:?}", why);
+ }
+ }
+}