aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-24 15:48:02 -0700
committerZeyla Hellyer <[email protected]>2017-09-24 15:53:23 -0700
commit6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch)
treee3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src
parentUse $crate for CommandError (diff)
downloadserenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz
serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src')
-rw-r--r--src/client/bridge/gateway/mod.rs46
-rw-r--r--src/client/bridge/gateway/shard_manager.rs171
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs118
-rw-r--r--src/client/bridge/gateway/shard_runner.rs171
-rw-r--r--src/client/bridge/mod.rs1
-rw-r--r--src/client/mod.rs287
-rw-r--r--src/gateway/shard.rs16
-rw-r--r--src/internal/ws_impl.rs23
8 files changed, 547 insertions, 286 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs
new file mode 100644
index 0000000..0bfd4e6
--- /dev/null
+++ b/src/client/bridge/gateway/mod.rs
@@ -0,0 +1,46 @@
+mod shard_manager;
+mod shard_queuer;
+mod shard_runner;
+
+pub use self::shard_manager::ShardManager;
+pub use self::shard_queuer::ShardQueuer;
+pub use self::shard_runner::ShardRunner;
+
+use gateway::Shard;
+use parking_lot::Mutex;
+use std::fmt::{Display, Formatter, Result as FmtResult};
+use std::sync::mpsc::Sender;
+use std::sync::Arc;
+
+type Parked<T> = Arc<Mutex<T>>;
+type LockedShard = Parked<Shard>;
+
+#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
+pub enum ShardManagerMessage {
+ Restart(ShardId),
+ Shutdown(ShardId),
+ ShutdownAll,
+}
+
+pub enum ShardQueuerMessage {
+ /// Message to start a shard, where the 0-index element is the ID of the
+ /// Shard to start and the 1-index element is the total shards in use.
+ Start(ShardId, ShardId),
+ /// Message to shutdown the shard queuer.
+ Shutdown,
+}
+
+// A light tuplestruct wrapper around a u64 to verify type correctness when
+// working with the IDs of shards.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
+pub struct ShardId(pub u64);
+
+impl Display for ShardId {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "{}", self.0)
+ }
+}
+
+pub struct ShardRunnerInfo {
+ runner_tx: Sender<ShardManagerMessage>,
+}
diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs
new file mode 100644
index 0000000..0ac9d19
--- /dev/null
+++ b/src/client/bridge/gateway/shard_manager.rs
@@ -0,0 +1,171 @@
+use internal::prelude::*;
+use parking_lot::Mutex as ParkingLotMutex;
+use std::collections::HashMap;
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use super::super::super::EventHandler;
+use super::{
+ ShardId,
+ ShardManagerMessage,
+ ShardQueuer,
+ ShardQueuerMessage,
+ ShardRunnerInfo,
+};
+use typemap::ShareMap;
+
+#[cfg(feature = "framework")]
+use framework::Framework;
+
+pub struct ShardManager {
+ #[cfg(feature = "framework")]
+ runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ /// The index of the first shard to initialize, 0-indexed.
+ shard_index: u64,
+ /// The number of shards to initialize.
+ shard_init: u64,
+ /// The total shards in use, 1-indexed.
+ shard_total: u64,
+ shard_queuer: Sender<ShardQueuerMessage>,
+ thread_rx: Receiver<ShardManagerMessage>,
+}
+
+impl ShardManager {
+ #[cfg(feature = "framework")]
+ pub fn new<H>(
+ shard_index: u64,
+ shard_init: u64,
+ shard_total: u64,
+ ws_url: Arc<Mutex<String>>,
+ token: Arc<Mutex<String>>,
+ data: Arc<ParkingLotMutex<ShareMap>>,
+ event_handler: Arc<H>,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ ) -> Self where H: EventHandler + Send + Sync + 'static {
+ let (thread_tx, thread_rx) = mpsc::channel();
+ let (shard_queue_tx, shard_queue_rx) = mpsc::channel();
+
+ let runners = Arc::new(ParkingLotMutex::new(HashMap::new()));
+
+ let mut shard_queuer = feature_framework! {{
+ ShardQueuer {
+ data: data.clone(),
+ event_handler: event_handler.clone(),
+ framework: framework.clone(),
+ last_start: None,
+ manager_tx: thread_tx.clone(),
+ runners: runners.clone(),
+ rx: shard_queue_rx,
+ token: token.clone(),
+ ws_url: ws_url.clone(),
+ }
+ } else {
+ ShardQueuer {
+ data: data.clone(),
+ event_handler: event_handler.clone(),
+ last_start: None,
+ manager_tx: thread_tx.clone(),
+ runners: runners.clone(),
+ rx: shard_queue_rx,
+ rx: shard_queue_rx,
+ token: token.clone(),
+ ws_url: ws_url.clone(),
+ }
+ }};
+
+ thread::spawn(move || {
+ shard_queuer.run();
+ });
+
+ Self {
+ shard_queuer: shard_queue_tx,
+ thread_rx: thread_rx,
+ runners,
+ shard_index,
+ shard_init,
+ shard_total,
+ }
+ }
+
+ pub fn initialize(&mut self) -> Result<()> {
+ let shard_to = self.shard_index + self.shard_init;
+
+ debug!("{}, {}", self.shard_index, self.shard_init);
+
+ for shard_id in self.shard_index..shard_to {
+ let shard_total = self.shard_total;
+
+ self.boot([ShardId(shard_id), ShardId(shard_total)]);
+ }
+
+ Ok(())
+ }
+
+ pub fn run(&mut self) {
+ loop {
+ let value = match self.thread_rx.recv() {
+ Ok(value) => value,
+ Err(_) => break,
+ };
+
+ match value {
+ ShardManagerMessage::Restart(shard_id) => self.restart(shard_id),
+ ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id),
+ ShardManagerMessage::ShutdownAll => {
+ self.shutdown_all();
+
+ break;
+ },
+ }
+ }
+ }
+
+ pub fn shutdown_all(&mut self) {
+ info!("Shutting down all shards");
+ let keys = {
+ self.runners.lock().keys().cloned().collect::<Vec<ShardId>>()
+ };
+
+ for shard_id in keys {
+ self.shutdown(shard_id);
+ }
+ }
+
+ fn boot(&mut self, shard_info: [ShardId; 2]) {
+ info!("Telling shard queuer to start shard {}", shard_info[0]);
+
+ let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]);
+ let _ = self.shard_queuer.send(msg);
+ }
+
+ fn restart(&mut self, shard_id: ShardId) {
+ info!("Restarting shard {}", shard_id);
+ self.shutdown(shard_id);
+
+ let shard_total = self.shard_total;
+
+ self.boot([shard_id, ShardId(shard_total)]);
+ }
+
+ fn shutdown(&mut self, shard_id: ShardId) {
+ info!("Shutting down shard {}", shard_id);
+
+ if let Some(runner) = self.runners.lock().get(&shard_id) {
+ let msg = ShardManagerMessage::Shutdown(shard_id);
+
+ if let Err(why) = runner.runner_tx.send(msg) {
+ warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why);
+ }
+ }
+
+ self.runners.lock().remove(&shard_id);
+ }
+}
+
+impl Drop for ShardManager {
+ fn drop(&mut self) {
+ if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) {
+ warn!("Failed to send shutdown to shard queuer: {:?}", why);
+ }
+ }
+}
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
new file mode 100644
index 0000000..8d3dbe1
--- /dev/null
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -0,0 +1,118 @@
+use framework::Framework;
+use gateway::Shard;
+use internal::prelude::*;
+use parking_lot::Mutex as ParkingLotMutex;
+use std::collections::HashMap;
+use std::sync::mpsc::{Receiver, Sender};
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::time::{Duration, Instant};
+use super::super::super::EventHandler;
+use super::{
+ ShardId,
+ ShardManagerMessage,
+ ShardQueuerMessage,
+ ShardRunner,
+ ShardRunnerInfo,
+};
+use typemap::ShareMap;
+
+/// The shard queuer is a simple loop that runs indefinitely to manage the
+/// startup of shards.
+///
+/// A shard queuer instance _should_ be run in its own thread, due to the
+/// blocking nature of the loop itself as well as a 5 second thread sleep
+/// between shard starts.
+pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
+ pub data: Arc<ParkingLotMutex<ShareMap>>,
+ pub event_handler: Arc<H>,
+ #[cfg(feature = "framework")]
+ pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ pub last_start: Option<Instant>,
+ pub manager_tx: Sender<ShardManagerMessage>,
+ pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
+ pub rx: Receiver<ShardQueuerMessage>,
+ pub token: Arc<Mutex<String>>,
+ pub ws_url: Arc<Mutex<String>>,
+}
+
+impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
+ pub fn run(&mut self) {
+ loop {
+ let msg = match self.rx.recv() {
+ Ok(msg) => msg,
+ Err(_) => {
+ break;
+ }
+ };
+
+ match msg {
+ ShardQueuerMessage::Shutdown => break,
+ ShardQueuerMessage::Start(shard_id, shard_total) => {
+ self.check_last_start();
+
+ if let Err(why) = self.start(shard_id, shard_total) {
+ warn!("Err starting shard {}: {:?}", shard_id, why);
+ }
+
+ self.last_start = Some(Instant::now());
+ },
+ }
+ }
+ }
+
+ fn check_last_start(&mut self) {
+ let instant = match self.last_start {
+ Some(instant) => instant,
+ None => return,
+ };
+
+ // We must wait 5 seconds between IDENTIFYs to avoid session
+ // invalidations.
+ let duration = Duration::from_secs(5);
+ let elapsed = instant.elapsed();
+
+ if elapsed >= duration {
+ return;
+ }
+
+ let to_sleep = duration - elapsed;
+
+ thread::sleep(to_sleep);
+ }
+
+ fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
+ let shard_info = [shard_id.0, shard_total.0];
+ let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
+ let locked = Arc::new(ParkingLotMutex::new(shard));
+
+ let mut runner = feature_framework! {{
+ ShardRunner::new(
+ locked.clone(),
+ self.manager_tx.clone(),
+ self.framework.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ )
+ } else {
+ ShardRunner::new(
+ locked.clone(),
+ self.manager_tx.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ )
+ }};
+
+ let runner_info = ShardRunnerInfo {
+ runner_tx: runner.runner_tx(),
+ };
+
+ thread::spawn(move || {
+ let _ = runner.run();
+ });
+
+ self.runners.lock().insert(shard_id, runner_info);
+
+ Ok(())
+ }
+}
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
new file mode 100644
index 0000000..8bdbb35
--- /dev/null
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -0,0 +1,171 @@
+use internal::prelude::*;
+use internal::ws_impl::ReceiverExt;
+use model::event::{Event, GatewayEvent};
+use parking_lot::Mutex as ParkingLotMutex;
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, Mutex};
+use super::super::super::{EventHandler, dispatch};
+use super::{LockedShard, ShardId, ShardManagerMessage};
+use typemap::ShareMap;
+use websocket::WebSocketError;
+
+#[cfg(feature = "framework")]
+use framework::Framework;
+
+enum EventRetrieval {
+ Some()
+}
+
+pub struct ShardRunner<H: EventHandler + 'static> {
+ data: Arc<ParkingLotMutex<ShareMap>>,
+ event_handler: Arc<H>,
+ #[cfg(feature = "framework")]
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ manager_tx: Sender<ShardManagerMessage>,
+ runner_rx: Receiver<ShardManagerMessage>,
+ runner_tx: Sender<ShardManagerMessage>,
+ shard: LockedShard,
+}
+
+impl<H: EventHandler + 'static> ShardRunner<H> {
+ pub fn new(shard: LockedShard,
+ manager_tx: Sender<ShardManagerMessage>,
+ framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
+ data: Arc<ParkingLotMutex<ShareMap>>,
+ event_handler: Arc<H>) -> Self {
+ let (tx, rx) = mpsc::channel();
+
+ Self {
+ runner_rx: rx,
+ runner_tx: tx,
+ data,
+ event_handler,
+ framework,
+ manager_tx,
+ shard,
+ }
+ }
+
+ pub fn run(&mut self) -> Result<()> {
+ loop {
+ {
+ let mut shard = self.shard.lock();
+ let incoming = self.runner_rx.try_recv();
+
+ // Check for an incoming message over the runner channel.
+ //
+ // If the message is to shutdown, first verify the ID so we know
+ // for certain this runner is to shutdown.
+ if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
+ if id.0 == shard.shard_info()[0] {
+ let _ = shard.shutdown_clean();
+
+ return Ok(());
+ }
+ }
+
+ if let Err(why) = shard.check_heartbeat() {
+ error!("Failed to heartbeat and reconnect: {:?}", why);
+
+ let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0]));
+ let _ = self.manager_tx.send(msg);
+
+ return Ok(());
+ }
+
+ #[cfg(feature = "voice")]
+ {
+ shard.cycle_voice_recv();
+ }
+ }
+
+ let events = self.recv_events();
+
+ for event in events {
+ feature_framework! {{
+ dispatch(event,
+ &self.shard,
+ &self.framework,
+ &self.data,
+ &self.event_handler);
+ } else {
+ dispatch(event,
+ &info.shard,
+ &info.data,
+ &info.event_handler,
+ &handle);
+ }}
+ }
+ }
+ }
+
+ pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> {
+ self.runner_tx.clone()
+ }
+
+ fn recv_events(&mut self) -> Vec<Event> {
+ let mut shard = self.shard.lock();
+
+ let mut events = vec![];
+
+ loop {
+ let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ // Check that an amount of time at least double the
+ // heartbeat_interval has passed.
+ //
+ // If not, continue on trying to receive messages.
+ //
+ // If it has, attempt to auto-reconnect.
+ let last = shard.last_heartbeat_ack();
+ let interval = shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
+
+ if seconds_passed <= interval_in_secs * 2 {
+ break;
+ }
+ } else {
+ break;
+ }
+
+ debug!("Attempting to auto-reconnect");
+
+ if let Err(why) = shard.autoreconnect() {
+ error!("Failed to auto-reconnect: {:?}", why);
+ }
+
+ break;
+ },
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => break,
+ other => other,
+ };
+
+ let event = match gw_event {
+ Ok(Some(event)) => Ok(event),
+ Ok(None) => break,
+ Err(why) => Err(why),
+ };
+
+ let event = match shard.handle_event(event) {
+ Ok(Some(event)) => event,
+ Ok(None) => continue,
+ Err(why) => {
+ error!("Shard handler received err: {:?}", why);
+
+ continue;
+ },
+ };
+
+ events.push(event);
+
+ if events.len() > 5 {
+ break;
+ }
+ };
+
+ events
+ }
+}
diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs
new file mode 100644
index 0000000..4f27526
--- /dev/null
+++ b/src/client/bridge/mod.rs
@@ -0,0 +1 @@
+pub mod gateway;
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 20c2bd6..6ab6951 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -19,6 +19,7 @@
//! [Client examples]: struct.Client.html#examples
#![allow(zero_ptr)]
+mod bridge;
mod context;
mod dispatch;
mod error;
@@ -35,20 +36,17 @@ pub use http as rest;
#[cfg(feature = "cache")]
pub use CACHE;
+use self::bridge::gateway::ShardManager;
use self::dispatch::dispatch;
use std::sync::{self, Arc};
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::time::Duration;
-use std::{mem, thread};
+use std::mem;
use super::gateway::Shard;
use typemap::ShareMap;
-use websocket::result::WebSocketError;
use http;
use internal::prelude::*;
-use internal::ws_impl::ReceiverExt;
-use model::event::*;
#[cfg(feature = "framework")]
use framework::Framework;
@@ -752,283 +750,36 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let gateway_url = Arc::new(sync::Mutex::new(url));
- let shards_index = shard_data[0];
- let shards_total = shard_data[1] + 1;
+ let mut manager = ShardManager::new(
+ shard_data[0],
+ shard_data[1] - shard_data[0] + 1,
+ shard_data[2],
+ gateway_url.clone(),
+ self.token.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ self.framework.clone(),
+ );
- let mut threads = vec![];
+ if let Err(why) = manager.initialize() {
+ error!("Failed to boot a shard: {:?}", why);
+ info!("Shutting down all shards");
- for shard_number in shards_index..shards_total {
- let shard_info = [shard_number, shard_data[2]];
+ manager.shutdown_all();
- let boot_info = BootInfo {
- gateway_url: gateway_url.clone(),
- shard_info: shard_info,
- token: self.token.clone(),
- };
-
- let boot = boot_shard(&boot_info);
-
- match boot {
- Ok(shard) => {
- let shard = Arc::new(Mutex::new(shard));
-
- self.shards.lock().insert(shard_number, shard.clone());
-
- let monitor_info = feature_framework! {{
- MonitorInfo {
- data: self.data.clone(),
- event_handler: self.event_handler.clone(),
- framework: self.framework.clone(),
- gateway_url: gateway_url.clone(),
- shard: shard,
- shard_info: shard_info,
- token: self.token.clone(),
- }
- } else {
- MonitorInfo {
- data: self.data.clone(),
- event_handler: self.event_handler.clone(),
- gateway_url: gateway_url.clone(),
- shard: shard,
- shard_info: shard_info,
- token: self.token.clone(),
- }
- }};
-
- threads.push(thread::spawn(move || {
- monitor_shard(monitor_info);
- }));
- },
- Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why),
- }
-
- // We need to wait at least 5 seconds between IDENTIFYs.
- //
- // Add an extra second as a buffer.
- thread::sleep(Duration::from_secs(6));
+ return Err(Error::Client(ClientError::ShardBootFailure));
}
- for thread in threads {
- let _ = thread.join();
- }
+ manager.run();
Err(Error::Client(ClientError::Shutdown))
}
}
-
impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
fn drop(&mut self) { self.close_handle().close(); }
}
-struct BootInfo {
- gateway_url: Arc<sync::Mutex<String>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-#[cfg(feature = "framework")]
-struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>,
- gateway_url: Arc<sync::Mutex<String>>,
- shard: Arc<Mutex<Shard>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-#[cfg(not(feature = "framework"))]
-struct MonitorInfo<H: EventHandler + Send + 'static> {
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- gateway_url: Arc<sync::Mutex<String>>,
- shard: Arc<Mutex<Shard>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-fn boot_shard(info: &BootInfo) -> Result<Shard> {
- // Make ten attempts to boot the shard, exponentially backing off; if it
- // still doesn't boot after that, accept it as a failure.
- //
- // After three attempts, start re-retrieving the gateway URL. Before that,
- // use the cached one.
- for attempt_number in 1..3u64 {
- let BootInfo {
- ref gateway_url,
- ref token,
- shard_info,
- } = *info;
- // If we've tried over 3 times so far, get a new gateway URL.
- //
- // If doing so fails, count this as a boot attempt.
- if attempt_number > 3 {
- match http::get_gateway() {
- Ok(g) => *gateway_url.lock().unwrap() = g.url,
- Err(why) => {
- warn!("Failed to retrieve gateway URL: {:?}", why);
-
- // Failed -- start over.
- continue;
- },
- }
- }
-
- let attempt = Shard::new(gateway_url.clone(), token.clone(), shard_info);
-
- match attempt {
- Ok(shard) => {
- info!("Successfully booted shard: {:?}", shard_info);
-
- return Ok(shard);
- },
- Err(why) => warn!("Failed to boot shard: {:?}", why),
- }
- }
-
- // Hopefully _never_ happens?
- Err(Error::Client(ClientError::ShardBootFailure))
-}
-
-fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) {
- handle_shard(&mut info);
-
- let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed);
-
- while handle_still {
- let mut boot_successful = false;
-
- for _ in 0..3 {
- let boot = boot_shard(&BootInfo {
- gateway_url: info.gateway_url.clone(),
- shard_info: info.shard_info,
- token: info.token.clone(),
- });
-
- match boot {
- Ok(new_shard) => {
- *info.shard.lock() = new_shard;
-
- boot_successful = true;
-
- break;
- },
- Err(why) => warn!("Failed to boot shard: {:?}", why),
- }
- }
-
- if boot_successful {
- handle_shard(&mut info);
- } else {
- break;
- }
-
- // The shard died: redo the cycle, unless client close was requested.
- handle_still = HANDLE_STILL.load(Ordering::Relaxed);
- }
-
- if handle_still {
- error!("Completely failed to reboot shard");
- } else {
- info!("Client close was requested. Shutting down.");
-
- let mut shard = info.shard.lock();
-
- if let Err(e) = shard.shutdown_clean() {
- error!(
- "Error shutting down shard {:?}: {:?}",
- shard.shard_info(),
- e
- );
- }
- }
-}
-
-fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) {
-
- // This is currently all ducktape. Redo this.
- while HANDLE_STILL.load(Ordering::Relaxed) {
- {
- let mut shard = info.shard.lock();
-
- if let Err(why) = shard.check_heartbeat() {
- error!("Failed to heartbeat and reconnect: {:?}", why);
-
- return;
- }
- }
-
- #[cfg(feature = "voice")]
- {
- let mut shard = info.shard.lock();
-
- shard.cycle_voice_recv();
- }
-
- let event = {
- let mut shard = info.shard.lock();
-
- let event = match shard.client.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- // Check that an amount of time at least double the
- // heartbeat_interval has passed.
- //
- // If not, continue on trying to receive messages.
- //
- // If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- continue;
- }
- } else {
- continue;
- }
-
- debug!("Attempting to auto-reconnect");
-
- if let Err(why) = shard.autoreconnect() {
- error!("Failed to auto-reconnect: {:?}", why);
- }
-
- continue;
- },
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
- other => other,
- };
-
- match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => continue,
- Err(why) => {
- error!("Shard handler received err: {:?}", why);
-
- continue;
- },
- }
- };
-
- feature_framework! {{
- dispatch(event,
- &info.shard,
- &info.framework,
- &info.data,
- &info.event_handler);
- } else {
- dispatch(event,
- &info.shard,
- &info.data,
- &info.event_handler);
- }}
- }
-}
-
/// Validates that a token is likely in a valid format.
///
/// This performs the following checks on a given token:
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index 571d522..890f56e 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -336,6 +336,10 @@ impl Shard {
pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> {
match event {
Ok(GatewayEvent::Dispatch(seq, event)) => {
+ if seq > self.seq + 1 {
+ warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq);
+ }
+
match event {
Event::Ready(ref ready) => {
self.session_id = Some(ready.ready.session_id.clone());
@@ -399,6 +403,8 @@ impl Shard {
self.heartbeat_instants.1 = Some(Instant::now());
self.last_heartbeat_acknowledged = true;
+ trace!("[Shard {}] Received heartbeat ack", self.shard_info[0]);
+
Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
@@ -758,12 +764,13 @@ impl Shard {
self.heartbeat_instants.0 = Some(Instant::now());
self.last_heartbeat_acknowledged = false;
+ trace!("[{:02}] successfully heartbeated", self.shard_info[0]);
+
Ok(())
},
Err(why) => {
match why {
- Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() !=
- Some(32) {
+ Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
debug!(
"[Shard {:?}] Err w/ heartbeating: {:?}",
self.shard_info,
@@ -820,11 +827,14 @@ impl Shard {
}
// Otherwise, we're good to heartbeat.
+ trace!("[{:02}] heartbeating", self.shard_info[0]);
+
if let Err(why) = self.heartbeat() {
warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
self.reconnect()
} else {
+ trace!("[{:02}] heartbeated", self.shard_info[0]);
self.heartbeat_instants.0 = Some(Instant::now());
Ok(())
@@ -924,6 +934,8 @@ impl Shard {
},
});
+ self.heartbeat_instants.0 = Some(Instant::now());
+
self.client.send_json(&identification)
}
diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs
index 70c4cb7..8edb69b 100644
--- a/src/internal/ws_impl.rs
+++ b/src/internal/ws_impl.rs
@@ -7,7 +7,7 @@ use gateway::GatewayError;
use internal::prelude::*;
pub trait ReceiverExt {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T>
+ fn recv_json<F, T>(&mut self, decode: F) -> Result<Option<T>>
where F: Fn(Value) -> Result<T>;
}
@@ -16,11 +16,9 @@ pub trait SenderExt {
}
impl ReceiverExt for WsClient<TlsStream<TcpStream>> {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T>
+ fn recv_json<F, T>(&mut self, decode: F) -> Result<Option<T>>
where F: Fn(Value) -> Result<T> {
- let message = self.recv_message()?;
-
- let res = match message {
+ Ok(match self.recv_message()? {
OwnedMessage::Binary(bytes) => {
let value = serde_json::from_reader(ZlibDecoder::new(&bytes[..]))?;
@@ -30,9 +28,9 @@ impl ReceiverExt for WsClient<TlsStream<TcpStream>> {
warn!("(╯°□°)╯︵ ┻━┻ Error decoding: {}", s);
why
- }))
+ })?)
},
- OwnedMessage::Close(data) => Some(Err(Error::Gateway(GatewayError::Closed(data)))),
+ OwnedMessage::Close(data) => return Err(Error::Gateway(GatewayError::Closed(data))),
OwnedMessage::Text(payload) => {
let value = serde_json::from_str(&payload)?;
@@ -40,7 +38,7 @@ impl ReceiverExt for WsClient<TlsStream<TcpStream>> {
warn!("(╯°□°)╯︵ ┻━┻ Error decoding: {}", payload);
why
- }))
+ })?)
},
OwnedMessage::Ping(x) => {
self.send_message(&OwnedMessage::Pong(x))
@@ -49,14 +47,7 @@ impl ReceiverExt for WsClient<TlsStream<TcpStream>> {
None
},
OwnedMessage::Pong(_) => None,
- };
-
- // As to ignore the `None`s returned from `Ping` and `Pong`.
- // Since they're essentially useless to us anyway.
- match res {
- Some(data) => data,
- None => self.recv_json(decode),
- }
+ })
}
}