diff options
| author | Zeyla Hellyer <[email protected]> | 2017-12-16 12:21:13 -0800 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-12-16 12:21:13 -0800 |
| commit | 8d685039d89fd2130e88c9a9e0421492a3e99da6 (patch) | |
| tree | c6073ee51d0c12962d1f1e4c0ed048bed6e72886 /src | |
| parent | Break up the model module (diff) | |
| download | serenity-8d685039d89fd2130e88c9a9e0421492a3e99da6.tar.xz serenity-8d685039d89fd2130e88c9a9e0421492a3e99da6.zip | |
Attempt to restart failed shard boots
When the ShardQueuer fails to restart a shard (such as due to a network
error, an issue on Discord's side, Cloudflare, etc.), it will now push
the ID onto a queue.
Every 5 seconds messages will attempt to be read from the receiver, and
if one is not read after the timeout, a queued shard start will occur
(if one is queued).
This should fix a number of reconnection issues.
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 4 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_queuer.rs | 62 |
2 files changed, 48 insertions, 18 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index fbfd2f6..99cdec7 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,6 +1,6 @@ use internal::prelude::*; use parking_lot::Mutex; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::mpsc::{self, Sender}; use std::sync::Arc; use std::thread; @@ -135,6 +135,7 @@ impl ShardManager { framework: Arc::clone(&framework), last_start: None, manager_tx: thread_tx.clone(), + queue: VecDeque::new(), runners: Arc::clone(&runners), rx: shard_queue_rx, token: Arc::clone(&token), @@ -184,6 +185,7 @@ impl ShardManager { event_handler: Arc::clone(&event_handler), last_start: None, manager_tx: thread_tx.clone(), + queue: VecDeque::new(), runners: Arc::clone(&runners), rx: shard_queue_rx, token: Arc::clone(&token), diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs index bd02532..30a0906 100644 --- a/src/client/bridge/gateway/shard_queuer.rs +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -1,8 +1,8 @@ use gateway::Shard; use internal::prelude::*; use parking_lot::Mutex; -use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::collections::{HashMap, VecDeque}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -20,6 +20,8 @@ use typemap::ShareMap; #[cfg(feature = "framework")] use framework::Framework; +const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5; + /// The shard queuer is a simple loop that runs indefinitely to manage the /// startup of shards. /// @@ -49,6 +51,10 @@ pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> { /// /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html pub manager_tx: Sender<ShardManagerMessage>, + /// The shards that are queued for booting. + /// + /// This will typically be filled with previously failed boots. + pub queue: VecDeque<(u64, u64)>, /// A copy of the map of shard runners. pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, /// A receiver channel for the shard queuer to be told to start shards. @@ -91,18 +97,27 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start /// [`rx`]: #structfield.rx pub fn run(&mut self) { - while let Ok(msg) = self.rx.recv() { - match msg { - ShardQueuerMessage::Shutdown => break, - ShardQueuerMessage::Start(shard_id, shard_total) => { - self.check_last_start(); - - if let Err(why) = self.start(shard_id, shard_total) { - warn!("Err starting shard {}: {:?}", shard_id, why); - } - - self.last_start = Some(Instant::now()); + // The duration to timeout from reads over the Rx channel. This can be + // done in a loop, and if the read times out then a shard can be + // started if one is presently waiting in the queue. + let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); + + loop { + match self.rx.recv_timeout(wait_duration) { + Ok(ShardQueuerMessage::Shutdown) => break, + Ok(ShardQueuerMessage::Start(id, total)) => { + self.checked_start(id.0, total.0); + }, + Err(RecvTimeoutError::Disconnected) => { + // If the sender half has disconnected then the queuer's + // lifespan has passed and can shutdown. + break; }, + Err(RecvTimeoutError::Timeout) => { + if let Some((id, total)) = self.queue.pop_front() { + self.checked_start(id, total); + } + } } } } @@ -115,7 +130,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { // We must wait 5 seconds between IDENTIFYs to avoid session // invalidations. - let duration = Duration::from_secs(5); + let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); let elapsed = instant.elapsed(); if elapsed >= duration { @@ -127,8 +142,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { thread::sleep(to_sleep); } - fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> { - let shard_info = [shard_id.0, shard_total.0]; + fn checked_start(&mut self, id: u64, total: u64) { + self.check_last_start(); + + if let Err(why) = self.start(id, total) { + warn!("Err starting shard {}: {:?}", id, why); + info!("Re-queueing start of shard {}", id); + + self.queue.push_back((id, total)); + } + + self.last_start = Some(Instant::now()); + } + + fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> { + let shard_info = [shard_id, shard_total]; let shard = Shard::new( Arc::clone(&self.ws_url), @@ -163,7 +191,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { let _ = runner.run(); }); - self.runners.lock().insert(shard_id, runner_info); + self.runners.lock().insert(ShardId(shard_id), runner_info); Ok(()) } |