aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-12-16 12:21:13 -0800
committerZeyla Hellyer <[email protected]>2017-12-16 12:21:13 -0800
commit8d685039d89fd2130e88c9a9e0421492a3e99da6 (patch)
treec6073ee51d0c12962d1f1e4c0ed048bed6e72886 /src
parentBreak up the model module (diff)
downloadserenity-8d685039d89fd2130e88c9a9e0421492a3e99da6.tar.xz
serenity-8d685039d89fd2130e88c9a9e0421492a3e99da6.zip
Attempt to restart failed shard boots
When the ShardQueuer fails to restart a shard (such as due to a network error, an issue on Discord's side, Cloudflare, etc.), it will now push the ID onto a queue. Every 5 seconds messages will attempt to be read from the receiver, and if one is not read after the timeout, a queued shard start will occur (if one is queued). This should fix a number of reconnection issues.
Diffstat (limited to 'src')
-rw-r--r--src/client/bridge/gateway/shard_manager.rs4
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs62
2 files changed, 48 insertions, 18 deletions
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
index fbfd2f6..99cdec7 100644
--- a/src/client/bridge/gateway/shard_manager.rs
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -1,6 +1,6 @@
use internal::prelude::*;
use parking_lot::Mutex;
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
@@ -135,6 +135,7 @@ impl ShardManager {
framework: Arc::clone(&framework),
last_start: None,
manager_tx: thread_tx.clone(),
+ queue: VecDeque::new(),
runners: Arc::clone(&runners),
rx: shard_queue_rx,
token: Arc::clone(&token),
@@ -184,6 +185,7 @@ impl ShardManager {
event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
+ queue: VecDeque::new(),
runners: Arc::clone(&runners),
rx: shard_queue_rx,
token: Arc::clone(&token),
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
index bd02532..30a0906 100644
--- a/src/client/bridge/gateway/shard_queuer.rs
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -1,8 +1,8 @@
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::sync::mpsc::{Receiver, Sender};
+use std::collections::{HashMap, VecDeque};
+use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
@@ -20,6 +20,8 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
+const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5;
+
/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
@@ -49,6 +51,10 @@ pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
///
/// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
+ /// The shards that are queued for booting.
+ ///
+ /// This will typically be filled with previously failed boots.
+ pub queue: VecDeque<(u64, u64)>,
/// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// A receiver channel for the shard queuer to be told to start shards.
@@ -91,18 +97,27 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
/// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
/// [`rx`]: #structfield.rx
pub fn run(&mut self) {
- while let Ok(msg) = self.rx.recv() {
- match msg {
- ShardQueuerMessage::Shutdown => break,
- ShardQueuerMessage::Start(shard_id, shard_total) => {
- self.check_last_start();
-
- if let Err(why) = self.start(shard_id, shard_total) {
- warn!("Err starting shard {}: {:?}", shard_id, why);
- }
-
- self.last_start = Some(Instant::now());
+ // The duration to timeout from reads over the Rx channel. This can be
+ // done in a loop, and if the read times out then a shard can be
+ // started if one is presently waiting in the queue.
+ let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
+
+ loop {
+ match self.rx.recv_timeout(wait_duration) {
+ Ok(ShardQueuerMessage::Shutdown) => break,
+ Ok(ShardQueuerMessage::Start(id, total)) => {
+ self.checked_start(id.0, total.0);
+ },
+ Err(RecvTimeoutError::Disconnected) => {
+ // If the sender half has disconnected then the queuer's
+ // lifespan has passed and can shutdown.
+ break;
},
+ Err(RecvTimeoutError::Timeout) => {
+ if let Some((id, total)) = self.queue.pop_front() {
+ self.checked_start(id, total);
+ }
+ }
}
}
}
@@ -115,7 +130,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
// We must wait 5 seconds between IDENTIFYs to avoid session
// invalidations.
- let duration = Duration::from_secs(5);
+ let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
let elapsed = instant.elapsed();
if elapsed >= duration {
@@ -127,8 +142,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
thread::sleep(to_sleep);
}
- fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
- let shard_info = [shard_id.0, shard_total.0];
+ fn checked_start(&mut self, id: u64, total: u64) {
+ self.check_last_start();
+
+ if let Err(why) = self.start(id, total) {
+ warn!("Err starting shard {}: {:?}", id, why);
+ info!("Re-queueing start of shard {}", id);
+
+ self.queue.push_back((id, total));
+ }
+
+ self.last_start = Some(Instant::now());
+ }
+
+ fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> {
+ let shard_info = [shard_id, shard_total];
let shard = Shard::new(
Arc::clone(&self.ws_url),
@@ -163,7 +191,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
let _ = runner.run();
});
- self.runners.lock().insert(shard_id, runner_info);
+ self.runners.lock().insert(ShardId(shard_id), runner_info);
Ok(())
}