aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_queuer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/bridge/gateway/shard_queuer.rs')
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs118
1 files changed, 118 insertions, 0 deletions
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
new file mode 100644
index 0000000..8d3dbe1
--- /dev/null
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -0,0 +1,118 @@
+use framework::Framework;
+use gateway::Shard;
+use internal::prelude::*;
+use parking_lot::Mutex as ParkingLotMutex;
+use std::collections::HashMap;
+use std::sync::mpsc::{Receiver, Sender};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::time::{Duration, Instant};
+use super::super::super::EventHandler;
+use super::{
+ ShardId,
+ ShardManagerMessage,
+ ShardQueuerMessage,
+ ShardRunner,
+ ShardRunnerInfo,
+};
+use typemap::ShareMap;
+
+/// The shard queuer is a simple loop that runs indefinitely to manage the
+/// startup of shards.
+///
+/// A shard queuer instance _should_ be run in its own thread, due to the
+/// blocking nature of the loop itself as well as a 5 second thread sleep
+/// between shard starts.
+pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
+ pub data: Arc<ParkingLotMutex<ShareMap>>,
+ pub event_handler: Arc<H>,
+ #[cfg(feature = "framework")]
+ pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ pub last_start: Option<Instant>,
+ pub manager_tx: Sender<ShardManagerMessage>,
+ pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ pub rx: Receiver<ShardQueuerMessage>,
+ pub token: Arc<Mutex<String>>,
+ pub ws_url: Arc<Mutex<String>>,
+}
+
+impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
+ pub fn run(&mut self) {
+ loop {
+ let msg = match self.rx.recv() {
+ Ok(msg) => msg,
+ Err(_) => {
+ break;
+ }
+ };
+
+ match msg {
+ ShardQueuerMessage::Shutdown => break,
+ ShardQueuerMessage::Start(shard_id, shard_total) => {
+ self.check_last_start();
+
+ if let Err(why) = self.start(shard_id, shard_total) {
+ warn!("Err starting shard {}: {:?}", shard_id, why);
+ }
+
+ self.last_start = Some(Instant::now());
+ },
+ }
+ }
+ }
+
+ fn check_last_start(&mut self) {
+ let instant = match self.last_start {
+ Some(instant) => instant,
+ None => return,
+ };
+
+ // We must wait 5 seconds between IDENTIFYs to avoid session
+ // invalidations.
+ let duration = Duration::from_secs(5);
+ let elapsed = instant.elapsed();
+
+ if elapsed >= duration {
+ return;
+ }
+
+ let to_sleep = duration - elapsed;
+
+ thread::sleep(to_sleep);
+ }
+
+ fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
+ let shard_info = [shard_id.0, shard_total.0];
+ let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
+ let locked = Arc::new(ParkingLotMutex::new(shard));
+
+ let mut runner = feature_framework! {{
+ ShardRunner::new(
+ locked.clone(),
+ self.manager_tx.clone(),
+ self.framework.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ )
+ } else {
+ ShardRunner::new(
+ locked.clone(),
+ self.manager_tx.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ )
+ }};
+
+ let runner_info = ShardRunnerInfo {
+ runner_tx: runner.runner_tx(),
+ };
+
+ thread::spawn(move || {
+ let _ = runner.run();
+ });
+
+ self.runners.lock().insert(shard_id, runner_info);
+
+ Ok(())
+ }
+}