aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-08-02 10:42:04 -0600
committerMishio595 <[email protected]>2018-08-02 10:42:04 -0600
commita67e5f7c8c53c773a292459a71479045f4b2a444 (patch)
treedec993d6b5610bcca4ba1a64242b08258f8fa163
parentMerge branch 'upstream' (diff)
parentDon't delay Ready with cache enabled (diff)
downloadserenity-debug.tar.xz
serenity-debug.zip
Merge branch 'debug' of https://github.com/serenity-rs/serenity into debugdebug
-rw-r--r--src/client/bridge/gateway/shard_runner.rs255
-rw-r--r--src/client/dispatch.rs1
-rw-r--r--src/gateway/mod.rs2
-rw-r--r--src/gateway/shard.rs171
4 files changed, 415 insertions, 14 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index ea1fad6..1075925 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -51,8 +51,15 @@ pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> {
impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// Creates a new runner for a Shard.
pub fn new(opt: ShardRunnerOptions<H>) -> Self {
+ trace!("[ShardRunner {:?}] Setting up", opt.shard.shard_info());
+
let (tx, rx) = mpsc::channel();
+ trace!(
+ "[ShardRunner {:?}] Setup channels",
+ opt.shard.shard_info(),
+ );
+
Self {
runner_rx: rx,
runner_tx: tx,
@@ -68,6 +75,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
}
+ #[inline]
+ fn shard_info(&self) -> [u64; 2] {
+ self.shard.shard_info()
+ }
+
/// Starts the runner's loop to receive events.
///
/// This runs a loop that performs the following in each iteration:
@@ -95,40 +107,78 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// [`ShardManager`]: struct.ShardManager.html
/// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html
pub fn run(&mut self) -> Result<()> {
- debug!("[ShardRunner {:?}] Running", self.shard.shard_info());
+ debug!("[ShardRunner {:?}] Running", self.shard_info());
loop {
+ trace!("[ShardRunner {:?}] Recv'ing...", self.shard_info());
if !self.recv()? {
+ trace!("[ShardRunner {:?}] Failed to recv", self.shard_info());
+
return Ok(());
}
// check heartbeat
+ trace!("[ShardRunner {:?}] Checking heartbeat", self.shard_info());
+
if !self.shard.check_heartbeat() {
warn!(
- "[ShardRunner {:?}] Error heartbeating",
+ "[ShardRunner {:?}] Error heartbeating; restarting",
self.shard.shard_info(),
);
return self.request_restart();
}
+ trace!("[ShardRunner {:?}] Checked heartbeat", self.shard_info());
+
let pre = self.shard.stage();
+ trace!(
+ "[ShardRunner {:?}] Receiving event; stage: {:?}",
+ self.shard_info(),
+ pre,
+ );
let (event, action, successful) = self.recv_event();
let post = self.shard.stage();
+ trace!(
+ "[ShardRunner {:?}] Received event; stage: {:?}",
+ self.shard_info(),
+ post,
+ );
if post != pre {
+ trace!("[ShardRunner {:?}] Stages changed", self.shard_info());
self.update_manager();
+ trace!(
+ "[ShardRunner {:?}] Updated shard manager",
+ self.shard_info(),
+ );
let e = ClientEvent::ShardStageUpdate(ShardStageUpdateEvent {
new: post,
old: pre,
shard_id: ShardId(self.shard.shard_info()[0]),
});
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ e,
+ );
self.dispatch(DispatchEvent::Client(e));
+ trace!("[ShardRunner {:?}] Dispatched", self.shard_info());
}
+ trace!(
+ "[ShardRunner {:?}] Action: {:?}",
+ self.shard_info(),
+ action,
+ );
+
match action {
Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => {
+ trace!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard_info(),
+ );
return self.request_restart()
},
Some(other) => {
@@ -138,9 +188,23 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
if let Some(event) = event {
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ event,
+ );
+
self.dispatch(DispatchEvent::Model(event));
+ trace!("[ShardRunner {:?}] Dispatched", self.shard_info());
}
+ trace!(
+ "[ShardRunner {:?}] Successful: {:?}; connecting?: {:?}",
+ self.shard_info(),
+ successful,
+ self.shard.stage().is_connecting(),
+ );
+
if !successful && !self.shard.stage().is_connecting() {
return self.request_restart();
}
@@ -187,6 +251,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
if id.0 != self.shard.shard_info()[0] {
// Not meant for this runner for some reason, don't
// shutdown.
+ debug!(
+ "[ShardRunner {:?}] Received message meant for {:?}?",
+ self.shard_info(),
+ id.0,
+ );
+
return true;
}
@@ -199,6 +269,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
#[inline]
fn dispatch(&self, event: DispatchEvent) {
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ event,
+ );
+
dispatch(
event,
#[cfg(feature = "framework")]
@@ -218,10 +294,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// This always returns true, except in the case that the shard manager asked
// the runner to shutdown.
fn handle_rx_value(&mut self, value: InterMessage) -> bool {
+ trace!(
+ "[ShardRunner {:?}] Handling rx value: {:?}",
+ self.shard_info(),
+ value,
+ );
+
match value {
InterMessage::Client(ShardClientMessage::Manager(x)) => match x {
ShardManagerMessage::Restart(id) |
ShardManagerMessage::Shutdown(id) => {
+ trace!(
+ "[ShardRunner {:?}] Received checked shutdown",
+ self.shard_info(),
+ );
+
self.checked_shutdown(id)
},
ShardManagerMessage::ShutdownAll => {
@@ -242,6 +329,14 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
},
InterMessage::Client(ShardClientMessage::Runner(x)) => match x {
ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => {
+ trace!(
+ "[ShardRunner {:?}] Received rx guild chunk: {:?}; {:?}; {:?}",
+ self.shard_info(),
+ guild_ids,
+ limit,
+ query,
+ );
+
self.shard.chunk_guilds(
guild_ids,
limit,
@@ -253,12 +348,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let data = CloseData::new(code, reason);
let msg = OwnedMessage::Close(Some(data));
+ trace!(
+ "[ShardRunner {:?}] Received rx close: {:?}",
+ self.shard_info(),
+ msg,
+ );
+
self.shard.client.send_message(&msg).is_ok()
},
ShardRunnerMessage::Message(msg) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx heartbeat: {:?}",
+ self.shard_info(),
+ msg,
+ );
+
self.shard.client.send_message(&msg).is_ok()
},
ShardRunnerMessage::SetGame(game) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set game: {:?}",
+ self.shard_info(),
+ game,
+ );
// To avoid a clone of `game`, we do a little bit of
// trickery here:
//
@@ -277,17 +389,35 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
self.shard.update_presence().is_ok()
},
ShardRunnerMessage::SetPresence(status, game) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set presence: {:?}; {:?}",
+ self.shard_info(),
+ status,
+ game,
+ );
+
self.shard.set_presence(status, game);
self.shard.update_presence().is_ok()
},
ShardRunnerMessage::SetStatus(status) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set status: {:?}",
+ self.shard_info(),
+ status,
+ );
+
self.shard.set_status(status);
self.shard.update_presence().is_ok()
},
},
InterMessage::Json(value) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx json: {}",
+ self.shard_info(),
+ value,
+ );
// Value must be forwarded over the websocket
self.shard.client.send_json(&value).is_ok()
},
@@ -296,6 +426,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
#[cfg(feature = "voice")]
fn handle_voice_event(&self, event: &Event) {
+ trace!(
+ "[ShardRunner {:?}] Handling voice event: {:?}",
+ self.shard_info(),
+ event,
+ );
+
match *event {
Event::Ready(_) => {
self.voice_manager.lock().set(
@@ -337,6 +473,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// Returns whether the shard runner is in a state that can continue.
fn recv(&mut self) -> Result<bool> {
+ trace!(
+ "[ShardRunner {:?}] Starting recv",
+ self.shard_info(),
+ );
+
loop {
match self.runner_rx.try_recv() {
Ok(value) => {
@@ -352,13 +493,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let _ = self.request_restart();
+ trace!(
+ "[ShardRunner {:?}] Recv: requested restart",
+ self.shard_info(),
+ );
+
return Ok(false);
},
- Err(TryRecvError::Empty) => break,
+ Err(TryRecvError::Empty) => {
+ trace!(
+ "[ShardRunner {:?}] Recv empty",
+ self.shard_info(),
+ );
+
+ break;
+ },
}
}
// There are no longer any values available.
+ trace!(
+ "[ShardRunner {:?}] Recv: no values available",
+ self.shard_info(),
+ );
Ok(true)
}
@@ -366,12 +523,36 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) {
+ trace!(
+ "[ShardRunner {:?}] Recv event",
+ self.shard_info(),
+ );
+
let gw_event = match self.shard.client.recv_json() {
Ok(Some(value)) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: value: {:?}",
+ self.shard_info(),
+ value,
+ );
+
GatewayEvent::deserialize(value).map(Some).map_err(From::from)
},
- Ok(None) => Ok(None),
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ Ok(None) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: Ok(None)",
+ self.shard_info(),
+ );
+
+ Ok(None)
+ },
+ Err(Error::WebSocket(WebSocketError::IoError(why))) => {
+ trace!(
+ "[ShardRunner {:?}] recv event: ws io err: {:?}",
+ self.shard_info(),
+ why,
+ );
+
// Check that an amount of time at least double the
// heartbeat_interval has passed.
//
@@ -387,18 +568,46 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let interval_in_secs = interval / 1000;
if seconds_passed <= interval_in_secs * 2 {
+ trace!(
+ "[ShardRunner {:?}] Recv event: seconds_passed <",
+ self.shard_info(),
+ );
+
return (None, None, true);
}
} else {
+ trace!(
+ "[ShardRunner {:?}] Recv event: no interval; last: {:?}; interval: {:?}",
+ self.shard_info(),
+ last,
+ interval,
+ );
+
return (None, None, true);
}
}
- debug!("Attempting to auto-reconnect");
+ debug!(
+ "[ShardRunner {:?}] Attempting to auto-reconnect: {:?}",
+ self.shard_info(),
+ self.shard.reconnection_type(),
+ );
match self.shard.reconnection_type() {
- ReconnectType::Reidentify => return (None, None, false),
+ ReconnectType::Reidentify => {
+ trace!(
+ "[ShardRunner {:?}] Reconnection type: reidentify",
+ self.shard_info(),
+ );
+
+ return (None, None, false);
+ },
ReconnectType::Resume => {
+ trace!(
+ "[ShardRunner {:?}] Reconnection type: resume",
+ self.shard_info(),
+ );
+
if let Err(why) = self.shard.resume() {
warn!("Failed to resume: {:?}", why);
@@ -410,6 +619,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
return (None, None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: no data available",
+ self.shard_info(),
+ );
+
// This is hit when the websocket client dies this will be
// hit every iteration.
return (None, None, false);
@@ -423,6 +637,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
Err(why) => Err(why),
};
+ trace!(
+ "[ShardRunner {:?}] Recv event: done: {:?}",
+ self.shard_info(),
+ event,
+ );
+
let action = match self.shard.handle_event(&event) {
Ok(Some(action)) => Some(action),
Ok(None) => None,
@@ -433,7 +653,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
},
};
+ trace!(
+ "[ShardRunner {:?}] Recv event: handle action: {:?}",
+ self.shard_info(),
+ action,
+ );
+
if let Ok(GatewayEvent::HeartbeatAck) = event {
+ trace!(
+ "[ShardRunner {:?}] Recv event: heartbeatack",
+ self.shard_info(),
+ );
+
self.update_manager();
}
@@ -472,11 +703,17 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
fn update_manager(&self) {
- let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate {
+ if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShardUpdate {
id: ShardId(self.shard.shard_info()[0]),
latency: self.shard.latency(),
stage: self.shard.stage(),
- });
+ }) {
+ warn!(
+ "[ShardRunner {:?}] Error updating manager: {:?}",
+ self.shard_info(),
+ why,
+ );
+ }
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 663a283..18ff63e 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -42,6 +42,7 @@ fn context(
Context::new(Arc::clone(data), runner_tx.clone(), shard_id)
}
+#[derive(Debug)]
pub(crate) enum DispatchEvent {
Client(ClientEvent),
Model(Event),
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index ffacba7..f20d7cd 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -185,6 +185,7 @@ pub enum InterMessage {
Json(Value),
}
+#[derive(Clone, Debug)]
pub enum ShardAction {
Heartbeat,
Identify,
@@ -192,6 +193,7 @@ pub enum ShardAction {
}
/// The type of reconnection that should be performed.
+#[derive(Clone, Debug)]
pub enum ReconnectType {
/// Indicator that a new connection should be made by sending an IDENTIFY.
Reidentify,
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index 13944e6..a42ca68 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -8,6 +8,7 @@ use model::{
};
use parking_lot::Mutex;
use std::{
+ fmt::{Debug, Formatter, Result as FmtResult},
sync::Arc,
time::{Duration as StdDuration, Instant}
};
@@ -138,7 +139,9 @@ impl Shard {
) -> Result<Shard> {
let mut client = connect(&*ws_url.lock())?;
- let _ = set_client_timeout(&mut client);
+ if let Err(why) = set_client_timeout(&mut client) {
+ warn!("[Shard {:?}] Error setting timeout: {:?}", shard_info, why);
+ }
let current_presence = (None, OnlineStatus::Online);
let heartbeat_instants = (None, None);
@@ -168,8 +171,12 @@ impl Shard {
/// Retrieves the current presence of the shard.
#[inline]
pub fn current_presence(&self) -> &CurrentPresence {
+ trace!("[Shard {:?}] Getting current presence", self.shard_info);
+ trace!("[Shard {:?}] State0: {:?}", self.shard_info, self);
+
&self.current_presence
}
+
/// Whether the shard has permanently shutdown.
///
/// This should normally happen due to manual calling of [`shutdown`] or
@@ -179,6 +186,9 @@ impl Shard {
/// [`shutdown_clean`]: #method.shutdown_clean
#[inline]
pub fn is_shutdown(&self) -> bool {
+ trace!("[Shard {:?}] Getting whether shutdown", self.shard_info);
+ trace!("[Shard {:?}] State01: {:?}", self.shard_info, self);
+
self.shutdown
}
@@ -188,18 +198,27 @@ impl Shard {
/// acknowledgement was last received.
#[inline]
pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) {
+ trace!("[Shard {:?}] Getting heartbeat instants", self.shard_info);
+ trace!("[Shard {:?}] State02: {:?}", self.shard_info, self);
+
&self.heartbeat_instants
}
/// Retrieves the value of when the last heartbeat was sent.
#[inline]
pub fn last_heartbeat_sent(&self) -> Option<&Instant> {
+ trace!("[Shard {:?}] Getting last heartbeat sent", self.shard_info);
+ trace!("[Shard {:?}] State03: {:?}", self.shard_info, self);
+
self.heartbeat_instants.0.as_ref()
}
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
pub fn last_heartbeat_ack(&self) -> Option<&Instant> {
+ trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State04: {:?}", self.shard_info, self);
+
self.heartbeat_instants.1.as_ref()
}
@@ -215,11 +234,17 @@ impl Shard {
///
/// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed
pub fn heartbeat(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Heartbeating", self.shard_info);
+ trace!("[Shard {:?}] State05: {:?}", self.shard_info, self);
+
match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) {
Ok(()) => {
self.heartbeat_instants.0 = Some(Instant::now());
self.last_heartbeat_acknowledged = false;
+ trace!("[Shard {:?}] Heartbeated", self.shard_info);
+ trace!("[Shard {:?}] State06: {:?}", self.shard_info, self);
+
Ok(())
},
Err(why) => {
@@ -243,21 +268,33 @@ impl Shard {
#[inline]
pub fn heartbeat_interval(&self) -> Option<&u64> {
+ trace!("[Shard {:?}] Getting heartbeat interval", self.shard_info);
+ trace!("[Shard {:?}] State07: {:?}", self.shard_info, self);
+
self.heartbeat_interval.as_ref()
}
#[inline]
pub fn last_heartbeat_acknowledged(&self) -> bool {
+ trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State08: {:?}", self.shard_info, self);
+
self.last_heartbeat_acknowledged
}
#[inline]
pub fn seq(&self) -> u64 {
+ trace!("[Shard {:?}] Getting seq", self.shard_info);
+ trace!("[Shard {:?}] State09: {:?}", self.shard_info, self);
+
self.seq
}
#[inline]
pub fn session_id(&self) -> Option<&String> {
+ trace!("[Shard {:?}] Getting session_id", self.shard_info);
+ trace!("[Shard {:?}] State10: {:?}", self.shard_info, self);
+
self.session_id.as_ref()
}
@@ -282,11 +319,17 @@ impl Shard {
/// ```
#[inline]
pub fn set_game(&mut self, game: Option<Game>) {
+ trace!("[Shard {:?}] Setting game: {:?}", self.shard_info, game);
+ trace!("[Shard {:?}] State11: {:?}", self.shard_info, self);
+
self.current_presence.0 = game;
}
#[inline]
pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) {
+ trace!("[Shard {:?}] Setting presence: {:?}", self.shard_info, game);
+ trace!("[Shard {:?}] State12: {:?}", self.shard_info, self);
+
self.set_game(game);
self.set_status(status);
}
@@ -297,6 +340,9 @@ impl Shard {
status = OnlineStatus::Invisible;
}
+ trace!("[Shard {:?}] Setting status: {:?}", self.shard_info, status);
+ trace!("[Shard {:?}] State13: {:?}", self.shard_info, self);
+
self.current_presence.1 = status;
}
@@ -330,10 +376,17 @@ impl Shard {
/// # #[cfg(not(feature = "model"))]
/// # fn main() {}
/// ```
- pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
+ pub fn shard_info(&self) -> [u64; 2] {
+ trace!("[Shard {:?}] Getting shard info", self.shard_info);
+ trace!("[Shard {:?}] State14: {:?}", self.shard_info, self);
+ self.shard_info
+ }
/// Returns the current connection stage of the shard.
pub fn stage(&self) -> ConnectionStage {
+ trace!("[Shard {:?}] Getting stage", self.shard_info);
+ trace!("[Shard {:?}] State15: {:?}", self.shard_info, self);
+
self.stage
}
@@ -364,6 +417,9 @@ impl Shard {
#[allow(cyclomatic_complexity)]
pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>)
-> Result<Option<ShardAction>> {
+ trace!("[Shard {:?}] Handling event: {:?}", self.shard_info, event);
+ trace!("[Shard {:?}] State16: {:?}", self.shard_info, self);
+
match *event {
Ok(GatewayEvent::Dispatch(seq, ref event)) => {
if seq > self.seq + 1 {
@@ -402,15 +458,21 @@ impl Shard {
s,
self.seq
);
+ trace!("[Shard {:?}] State17: {:?}", self.shard_info, self);
if self.stage == ConnectionStage::Handshake {
+ trace!(
+ "[Shard {:?}] Received heartbeat; now in identifying stage",
+ self.shard_info,
+ );
+
self.stage = ConnectionStage::Identifying;
return Ok(Some(ShardAction::Identify));
} else {
warn!(
"[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting",
- self.shard_info
+ self.shard_info,
);
return Ok(Some(ShardAction::Reconnect(self.reconnection_type())));
@@ -424,6 +486,7 @@ impl Shard {
self.last_heartbeat_acknowledged = true;
trace!("[Shard {:?}] Received heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State18: {:?}", self.shard_info, self);
Ok(None)
},
@@ -431,6 +494,7 @@ impl Shard {
debug!("[Shard {:?}] Received a Hello; interval: {}",
self.shard_info,
interval);
+ trace!("[Shard {:?}] State19: {:?}", self.shard_info, self);
if self.stage == ConnectionStage::Resuming {
return Ok(None);
@@ -454,6 +518,7 @@ impl Shard {
"[Shard {:?}] Received session invalidation",
self.shard_info,
);
+ trace!("[Shard {:?}] State20: {:?}", self.shard_info, self);
Ok(Some(if resumable {
ShardAction::Reconnect(ReconnectType::Resume)
@@ -462,9 +527,18 @@ impl Shard {
}))
},
Ok(GatewayEvent::Reconnect) => {
+ trace!("[Shard {:?}] Received reconnect", self.shard_info);
+ trace!("[Shard {:?}] State21: {:?}", self.shard_info, self);
+
Ok(Some(ShardAction::Reconnect(ReconnectType::Reidentify)))
},
Err(Error::Gateway(GatewayError::Closed(ref data))) => {
+ trace!("[Shard {:?}] Received close: {:?}",
+ self.shard_info,
+ data,
+ );
+ trace!("[Shard {:?}] State22: {:?}", self.shard_info, self);
+
let num = data.as_ref().map(|d| d.status_code);
let clean = num == Some(1000);
@@ -543,6 +617,9 @@ impl Shard {
}))
},
Err(Error::WebSocket(ref why)) => {
+ trace!("[Shard {:?}] ws err: {:?}", self.shard_info, why);
+ trace!("[Shard {:?}] State23: {:?}", self.shard_info, self);
+
if let WebSocketError::NoDataAvailable = *why {
if self.heartbeat_instants.1.is_none() {
return Ok(None);
@@ -575,10 +652,19 @@ impl Shard {
/// - a heartbeat acknowledgement was not received in time
/// - an error occurred while heartbeating
pub fn check_heartbeat(&mut self) -> bool {
+ trace!("[Shard {:?}] Checking heartbeat", self.shard_info);
+ trace!("[Shard {:?}] State24: {:?}", self.shard_info, self);
+
let wait = {
let heartbeat_interval = match self.heartbeat_interval {
Some(heartbeat_interval) => heartbeat_interval,
None => {
+ trace!(
+ "[Shard {:?}] Checking heartbeat; no interval; elapsed diff: {:?}",
+ self.shard_info,
+ self.started.elapsed() < StdDuration::from_secs(15),
+ );
+
return self.started.elapsed() < StdDuration::from_secs(15);
},
};
@@ -586,10 +672,24 @@ impl Shard {
StdDuration::from_secs(heartbeat_interval / 1000)
};
+ trace!(
+ "[Shard {:?}] Checking heartbeat; wait: {:?}",
+ self.shard_info,
+ wait,
+ );
+
// If a duration of time less than the heartbeat_interval has passed,
// then don't perform a keepalive or attempt to reconnect.
if let Some(last_sent) = self.heartbeat_instants.0 {
- if last_sent.elapsed() <= wait {
+ let elapsed = last_sent.elapsed();
+
+ if elapsed <= wait {
+ trace!(
+ "[Shard {:?}] Last sent elapsed: {:?}",
+ self.shard_info,
+ elapsed,
+ );
+
return true;
}
}
@@ -601,17 +701,22 @@ impl Shard {
"[Shard {:?}] Last heartbeat not acknowledged",
self.shard_info,
);
+ trace!("[Shard {:?}] State25: {:?}", self.shard_info, self);
return false;
+ } else {
+ trace!("[Shard {:?}] Last heartbeat acknowledged", self.shard_info);
}
// Otherwise, we're good to heartbeat.
if let Err(why) = self.heartbeat() {
warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
+ trace!("[Shard {:?}] State26: {:?}", self.shard_info, self);
false
} else {
trace!("[Shard {:?}] Heartbeated", self.shard_info);
+ trace!("[Shard {:?}] State27: {:?}", self.shard_info, self);
true
}
@@ -621,6 +726,9 @@ impl Shard {
// Shamelessly stolen from brayzure's commit in eris:
// <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
pub fn latency(&self) -> Option<StdDuration> {
+ trace!("[Shard {:?}] Getting latency", self.shard_info);
+ trace!("[Shard {:?}] State28: {:?}", self.shard_info, self);
+
if let (Some(sent), Some(received)) = self.heartbeat_instants {
if received > sent {
return Some(received - sent);
@@ -643,6 +751,9 @@ impl Shard {
/// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting
/// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id
pub fn should_reconnect(&mut self) -> Option<ReconnectType> {
+ trace!("[Shard {:?}] Should reconnect?", self.shard_info);
+ trace!("[Shard {:?}] State29: {:?}", self.shard_info, self);
+
if self.stage == ConnectionStage::Connecting {
return None;
}
@@ -651,6 +762,9 @@ impl Shard {
}
pub fn reconnection_type(&self) -> ReconnectType {
+ trace!("[Shard {:?}] Reconnection type", self.shard_info);
+ trace!("[Shard {:?}] State30: {:?}", self.shard_info, self);
+
if self.session_id().is_some() {
ReconnectType::Resume
} else {
@@ -744,6 +858,7 @@ impl Shard {
query: Option<&str>,
) -> Result<()> where It: IntoIterator<Item=GuildId> {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);
+ trace!("[Shard {:?}] State31: {:?}", self.shard_info, self);
self.client.send_chunk_guilds(
guild_ids,
@@ -758,11 +873,17 @@ impl Shard {
// - the time that the last heartbeat sent as being now
// - the `stage` to `Identifying`
pub fn identify(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Identifying", self.shard_info);
+ trace!("[Shard {:?}] State32: {:?}", self.shard_info, self);
+
self.client.send_identify(&self.shard_info, &self.token.lock())?;
self.heartbeat_instants.0 = Some(Instant::now());
self.stage = ConnectionStage::Identifying;
+ trace!("[Shard {:?}] Identified", self.shard_info);
+ trace!("[Shard {:?}] State33: {:?}", self.shard_info, self);
+
Ok(())
}
@@ -772,6 +893,7 @@ impl Shard {
/// the client.
pub fn initialize(&mut self) -> Result<WsClient> {
debug!("[Shard {:?}] Initializing", self.shard_info);
+ trace!("[Shard {:?}] State34: {:?}", self.shard_info, self);
// We need to do two, sort of three things here:
//
@@ -786,22 +908,38 @@ impl Shard {
let mut client = connect(&self.ws_url.lock())?;
self.stage = ConnectionStage::Handshake;
- let _ = set_client_timeout(&mut client);
+ if let Err(why) = set_client_timeout(&mut client) {
+ warn!(
+ "[Shard {:?}] Error setting timeouts when initializing: {:?}",
+ self.shard_info,
+ why,
+ );
+ }
+
+ trace!("[Shard {:?}] Initialized", self.shard_info);
+ trace!("[Shard {:?}] State35: {:?}", self.shard_info, self);
Ok(client)
}
pub fn reset(&mut self) {
+ trace!("[Shard {:?}] Resetting", self.shard_info);
+ trace!("[Shard {:?}] State36: {:?}", self.shard_info, self);
+
self.heartbeat_instants = (Some(Instant::now()), None);
self.heartbeat_interval = None;
self.last_heartbeat_acknowledged = true;
self.session_id = None;
self.stage = ConnectionStage::Disconnected;
self.seq = 0;
+
+ trace!("[Shard {:?}] Reset", self.shard_info);
+ trace!("[Shard {:?}] State37: {:?}", self.shard_info, self);
}
pub fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ trace!("[Shard {:?}] State38: {:?}", self.shard_info, self);
self.client = self.initialize()?;
self.stage = ConnectionStage::Resuming;
@@ -821,6 +959,7 @@ impl Shard {
pub fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info());
+ trace!("[Shard {:?}] State39: {:?}", self.shard_info, self);
self.reset();
self.client = self.initialize()?;
@@ -829,6 +968,8 @@ impl Shard {
}
pub fn update_presence(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Updating presence", self.shard_info);
+ trace!("[Shard {:?}] State40: {:?}", self.shard_info, self);
self.client.send_presence_update(
&self.shard_info,
&self.current_presence,
@@ -836,6 +977,26 @@ impl Shard {
}
}
+impl Debug for Shard {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ f.debug_struct("Shard")
+ .field("client", self.client.stream_ref())
+ .field("current_presence", &self.current_presence)
+ .field("heartbeat_instants", &self.heartbeat_instants)
+ .field("heartbeat_interval", &self.heartbeat_interval)
+ .field("last_heartbeat_acknowledged", &self.last_heartbeat_acknowledged)
+ .field("seq", &self.seq)
+ .field("session_id", &self.session_id)
+ .field("shard_info", &self.shard_info)
+ .field("shutdown", &self.shutdown)
+ .field("stage", &self.stage)
+ .field("started", &self.started)
+ .field("token", &self.token)
+ .field("ws_url", &self.ws_url)
+ .finish()
+ }
+}
+
fn connect(base_url: &str) -> Result<WsClient> {
let url = build_gateway_url(base_url)?;
let client = ClientBuilder::from_url(&url).connect_secure(None)?;