diff options
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 255 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 1 | ||||
| -rw-r--r-- | src/gateway/mod.rs | 2 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 171 |
4 files changed, 415 insertions, 14 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index ea1fad6..1075925 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -51,8 +51,15 @@ pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// Creates a new runner for a Shard. pub fn new(opt: ShardRunnerOptions<H>) -> Self { + trace!("[ShardRunner {:?}] Setting up", opt.shard.shard_info()); + let (tx, rx) = mpsc::channel(); + trace!( + "[ShardRunner {:?}] Setup channels", + opt.shard.shard_info(), + ); + Self { runner_rx: rx, runner_tx: tx, @@ -68,6 +75,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } } + #[inline] + fn shard_info(&self) -> [u64; 2] { + self.shard.shard_info() + } + /// Starts the runner's loop to receive events. /// /// This runs a loop that performs the following in each iteration: @@ -95,40 +107,78 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// [`ShardManager`]: struct.ShardManager.html /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html pub fn run(&mut self) -> Result<()> { - debug!("[ShardRunner {:?}] Running", self.shard.shard_info()); + debug!("[ShardRunner {:?}] Running", self.shard_info()); loop { + trace!("[ShardRunner {:?}] Recv'ing...", self.shard_info()); if !self.recv()? { + trace!("[ShardRunner {:?}] Failed to recv", self.shard_info()); + return Ok(()); } // check heartbeat + trace!("[ShardRunner {:?}] Checking heartbeat", self.shard_info()); + if !self.shard.check_heartbeat() { warn!( - "[ShardRunner {:?}] Error heartbeating", + "[ShardRunner {:?}] Error heartbeating; restarting", self.shard.shard_info(), ); return self.request_restart(); } + trace!("[ShardRunner {:?}] Checked heartbeat", self.shard_info()); + let pre = self.shard.stage(); + trace!( + "[ShardRunner {:?}] Receiving event; stage: {:?}", + self.shard_info(), + pre, + ); let (event, action, successful) = self.recv_event(); let post = self.shard.stage(); + trace!( + "[ShardRunner {:?}] Received event; stage: {:?}", + self.shard_info(), + post, + ); if post != pre { + trace!("[ShardRunner {:?}] Stages changed", self.shard_info()); self.update_manager(); + trace!( + "[ShardRunner {:?}] Updated shard manager", + self.shard_info(), + ); let e = ClientEvent::ShardStageUpdate(ShardStageUpdateEvent { new: post, old: pre, shard_id: ShardId(self.shard.shard_info()[0]), }); + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + e, + ); self.dispatch(DispatchEvent::Client(e)); + trace!("[ShardRunner {:?}] Dispatched", self.shard_info()); } + trace!( + "[ShardRunner {:?}] Action: {:?}", + self.shard_info(), + action, + ); + match action { Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => { + trace!( + "[ShardRunner {:?}] Requesting restart", + self.shard_info(), + ); return self.request_restart() }, Some(other) => { @@ -138,9 +188,23 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } if let Some(event) = event { + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + event, + ); + self.dispatch(DispatchEvent::Model(event)); + trace!("[ShardRunner {:?}] Dispatched", self.shard_info()); } + trace!( + "[ShardRunner {:?}] Successful: {:?}; connecting?: {:?}", + self.shard_info(), + successful, + self.shard.stage().is_connecting(), + ); + if !successful && !self.shard.stage().is_connecting() { return self.request_restart(); } @@ -187,6 +251,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { if id.0 != self.shard.shard_info()[0] { // Not meant for this runner for some reason, don't // shutdown. + debug!( + "[ShardRunner {:?}] Received message meant for {:?}?", + self.shard_info(), + id.0, + ); + return true; } @@ -199,6 +269,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { #[inline] fn dispatch(&self, event: DispatchEvent) { + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + event, + ); + dispatch( event, #[cfg(feature = "framework")] @@ -218,10 +294,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // This always returns true, except in the case that the shard manager asked // the runner to shutdown. fn handle_rx_value(&mut self, value: InterMessage) -> bool { + trace!( + "[ShardRunner {:?}] Handling rx value: {:?}", + self.shard_info(), + value, + ); + match value { InterMessage::Client(ShardClientMessage::Manager(x)) => match x { ShardManagerMessage::Restart(id) | ShardManagerMessage::Shutdown(id) => { + trace!( + "[ShardRunner {:?}] Received checked shutdown", + self.shard_info(), + ); + self.checked_shutdown(id) }, ShardManagerMessage::ShutdownAll => { @@ -242,6 +329,14 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { }, InterMessage::Client(ShardClientMessage::Runner(x)) => match x { ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { + trace!( + "[ShardRunner {:?}] Received rx guild chunk: {:?}; {:?}; {:?}", + self.shard_info(), + guild_ids, + limit, + query, + ); + self.shard.chunk_guilds( guild_ids, limit, @@ -253,12 +348,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let data = CloseData::new(code, reason); let msg = OwnedMessage::Close(Some(data)); + trace!( + "[ShardRunner {:?}] Received rx close: {:?}", + self.shard_info(), + msg, + ); + self.shard.client.send_message(&msg).is_ok() }, ShardRunnerMessage::Message(msg) => { + trace!( + "[ShardRunner {:?}] Received rx heartbeat: {:?}", + self.shard_info(), + msg, + ); + self.shard.client.send_message(&msg).is_ok() }, ShardRunnerMessage::SetGame(game) => { + trace!( + "[ShardRunner {:?}] Received rx set game: {:?}", + self.shard_info(), + game, + ); // To avoid a clone of `game`, we do a little bit of // trickery here: // @@ -277,17 +389,35 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { self.shard.update_presence().is_ok() }, ShardRunnerMessage::SetPresence(status, game) => { + trace!( + "[ShardRunner {:?}] Received rx set presence: {:?}; {:?}", + self.shard_info(), + status, + game, + ); + self.shard.set_presence(status, game); self.shard.update_presence().is_ok() }, ShardRunnerMessage::SetStatus(status) => { + trace!( + "[ShardRunner {:?}] Received rx set status: {:?}", + self.shard_info(), + status, + ); + self.shard.set_status(status); self.shard.update_presence().is_ok() }, }, InterMessage::Json(value) => { + trace!( + "[ShardRunner {:?}] Received rx json: {}", + self.shard_info(), + value, + ); // Value must be forwarded over the websocket self.shard.client.send_json(&value).is_ok() }, @@ -296,6 +426,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { #[cfg(feature = "voice")] fn handle_voice_event(&self, event: &Event) { + trace!( + "[ShardRunner {:?}] Handling voice event: {:?}", + self.shard_info(), + event, + ); + match *event { Event::Ready(_) => { self.voice_manager.lock().set( @@ -337,6 +473,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // Returns whether the shard runner is in a state that can continue. fn recv(&mut self) -> Result<bool> { + trace!( + "[ShardRunner {:?}] Starting recv", + self.shard_info(), + ); + loop { match self.runner_rx.try_recv() { Ok(value) => { @@ -352,13 +493,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let _ = self.request_restart(); + trace!( + "[ShardRunner {:?}] Recv: requested restart", + self.shard_info(), + ); + return Ok(false); }, - Err(TryRecvError::Empty) => break, + Err(TryRecvError::Empty) => { + trace!( + "[ShardRunner {:?}] Recv empty", + self.shard_info(), + ); + + break; + }, } } // There are no longer any values available. + trace!( + "[ShardRunner {:?}] Recv: no values available", + self.shard_info(), + ); Ok(true) } @@ -366,12 +523,36 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// Returns a received event, as well as whether reading the potentially /// present event was successful. fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) { + trace!( + "[ShardRunner {:?}] Recv event", + self.shard_info(), + ); + let gw_event = match self.shard.client.recv_json() { Ok(Some(value)) => { + trace!( + "[ShardRunner {:?}] Recv event: value: {:?}", + self.shard_info(), + value, + ); + GatewayEvent::deserialize(value).map(Some).map_err(From::from) }, - Ok(None) => Ok(None), - Err(Error::WebSocket(WebSocketError::IoError(_))) => { + Ok(None) => { + trace!( + "[ShardRunner {:?}] Recv event: Ok(None)", + self.shard_info(), + ); + + Ok(None) + }, + Err(Error::WebSocket(WebSocketError::IoError(why))) => { + trace!( + "[ShardRunner {:?}] recv event: ws io err: {:?}", + self.shard_info(), + why, + ); + // Check that an amount of time at least double the // heartbeat_interval has passed. // @@ -387,18 +568,46 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let interval_in_secs = interval / 1000; if seconds_passed <= interval_in_secs * 2 { + trace!( + "[ShardRunner {:?}] Recv event: seconds_passed <", + self.shard_info(), + ); + return (None, None, true); } } else { + trace!( + "[ShardRunner {:?}] Recv event: no interval; last: {:?}; interval: {:?}", + self.shard_info(), + last, + interval, + ); + return (None, None, true); } } - debug!("Attempting to auto-reconnect"); + debug!( + "[ShardRunner {:?}] Attempting to auto-reconnect: {:?}", + self.shard_info(), + self.shard.reconnection_type(), + ); match self.shard.reconnection_type() { - ReconnectType::Reidentify => return (None, None, false), + ReconnectType::Reidentify => { + trace!( + "[ShardRunner {:?}] Reconnection type: reidentify", + self.shard_info(), + ); + + return (None, None, false); + }, ReconnectType::Resume => { + trace!( + "[ShardRunner {:?}] Reconnection type: resume", + self.shard_info(), + ); + if let Err(why) = self.shard.resume() { warn!("Failed to resume: {:?}", why); @@ -410,6 +619,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { return (None, None, true); }, Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + trace!( + "[ShardRunner {:?}] Recv event: no data available", + self.shard_info(), + ); + // This is hit when the websocket client dies this will be // hit every iteration. return (None, None, false); @@ -423,6 +637,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { Err(why) => Err(why), }; + trace!( + "[ShardRunner {:?}] Recv event: done: {:?}", + self.shard_info(), + event, + ); + let action = match self.shard.handle_event(&event) { Ok(Some(action)) => Some(action), Ok(None) => None, @@ -433,7 +653,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { }, }; + trace!( + "[ShardRunner {:?}] Recv event: handle action: {:?}", + self.shard_info(), + action, + ); + if let Ok(GatewayEvent::HeartbeatAck) = event { + trace!( + "[ShardRunner {:?}] Recv event: heartbeatack", + self.shard_info(), + ); + self.update_manager(); } @@ -472,11 +703,17 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } fn update_manager(&self) { - let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate { + if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShardUpdate { id: ShardId(self.shard.shard_info()[0]), latency: self.shard.latency(), stage: self.shard.stage(), - }); + }) { + warn!( + "[ShardRunner {:?}] Error updating manager: {:?}", + self.shard_info(), + why, + ); + } } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 663a283..18ff63e 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -42,6 +42,7 @@ fn context( Context::new(Arc::clone(data), runner_tx.clone(), shard_id) } +#[derive(Debug)] pub(crate) enum DispatchEvent { Client(ClientEvent), Model(Event), diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index ffacba7..f20d7cd 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -185,6 +185,7 @@ pub enum InterMessage { Json(Value), } +#[derive(Clone, Debug)] pub enum ShardAction { Heartbeat, Identify, @@ -192,6 +193,7 @@ pub enum ShardAction { } /// The type of reconnection that should be performed. +#[derive(Clone, Debug)] pub enum ReconnectType { /// Indicator that a new connection should be made by sending an IDENTIFY. Reidentify, diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 13944e6..a42ca68 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -8,6 +8,7 @@ use model::{ }; use parking_lot::Mutex; use std::{ + fmt::{Debug, Formatter, Result as FmtResult}, sync::Arc, time::{Duration as StdDuration, Instant} }; @@ -138,7 +139,9 @@ impl Shard { ) -> Result<Shard> { let mut client = connect(&*ws_url.lock())?; - let _ = set_client_timeout(&mut client); + if let Err(why) = set_client_timeout(&mut client) { + warn!("[Shard {:?}] Error setting timeout: {:?}", shard_info, why); + } let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -168,8 +171,12 @@ impl Shard { /// Retrieves the current presence of the shard. #[inline] pub fn current_presence(&self) -> &CurrentPresence { + trace!("[Shard {:?}] Getting current presence", self.shard_info); + trace!("[Shard {:?}] State0: {:?}", self.shard_info, self); + &self.current_presence } + /// Whether the shard has permanently shutdown. /// /// This should normally happen due to manual calling of [`shutdown`] or @@ -179,6 +186,9 @@ impl Shard { /// [`shutdown_clean`]: #method.shutdown_clean #[inline] pub fn is_shutdown(&self) -> bool { + trace!("[Shard {:?}] Getting whether shutdown", self.shard_info); + trace!("[Shard {:?}] State01: {:?}", self.shard_info, self); + self.shutdown } @@ -188,18 +198,27 @@ impl Shard { /// acknowledgement was last received. #[inline] pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) { + trace!("[Shard {:?}] Getting heartbeat instants", self.shard_info); + trace!("[Shard {:?}] State02: {:?}", self.shard_info, self); + &self.heartbeat_instants } /// Retrieves the value of when the last heartbeat was sent. #[inline] pub fn last_heartbeat_sent(&self) -> Option<&Instant> { + trace!("[Shard {:?}] Getting last heartbeat sent", self.shard_info); + trace!("[Shard {:?}] State03: {:?}", self.shard_info, self); + self.heartbeat_instants.0.as_ref() } /// Retrieves the value of when the last heartbeat ack was received. #[inline] pub fn last_heartbeat_ack(&self) -> Option<&Instant> { + trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State04: {:?}", self.shard_info, self); + self.heartbeat_instants.1.as_ref() } @@ -215,11 +234,17 @@ impl Shard { /// /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed pub fn heartbeat(&mut self) -> Result<()> { + trace!("[Shard {:?}] Heartbeating", self.shard_info); + trace!("[Shard {:?}] State05: {:?}", self.shard_info, self); + match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) { Ok(()) => { self.heartbeat_instants.0 = Some(Instant::now()); self.last_heartbeat_acknowledged = false; + trace!("[Shard {:?}] Heartbeated", self.shard_info); + trace!("[Shard {:?}] State06: {:?}", self.shard_info, self); + Ok(()) }, Err(why) => { @@ -243,21 +268,33 @@ impl Shard { #[inline] pub fn heartbeat_interval(&self) -> Option<&u64> { + trace!("[Shard {:?}] Getting heartbeat interval", self.shard_info); + trace!("[Shard {:?}] State07: {:?}", self.shard_info, self); + self.heartbeat_interval.as_ref() } #[inline] pub fn last_heartbeat_acknowledged(&self) -> bool { + trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State08: {:?}", self.shard_info, self); + self.last_heartbeat_acknowledged } #[inline] pub fn seq(&self) -> u64 { + trace!("[Shard {:?}] Getting seq", self.shard_info); + trace!("[Shard {:?}] State09: {:?}", self.shard_info, self); + self.seq } #[inline] pub fn session_id(&self) -> Option<&String> { + trace!("[Shard {:?}] Getting session_id", self.shard_info); + trace!("[Shard {:?}] State10: {:?}", self.shard_info, self); + self.session_id.as_ref() } @@ -282,11 +319,17 @@ impl Shard { /// ``` #[inline] pub fn set_game(&mut self, game: Option<Game>) { + trace!("[Shard {:?}] Setting game: {:?}", self.shard_info, game); + trace!("[Shard {:?}] State11: {:?}", self.shard_info, self); + self.current_presence.0 = game; } #[inline] pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) { + trace!("[Shard {:?}] Setting presence: {:?}", self.shard_info, game); + trace!("[Shard {:?}] State12: {:?}", self.shard_info, self); + self.set_game(game); self.set_status(status); } @@ -297,6 +340,9 @@ impl Shard { status = OnlineStatus::Invisible; } + trace!("[Shard {:?}] Setting status: {:?}", self.shard_info, status); + trace!("[Shard {:?}] State13: {:?}", self.shard_info, self); + self.current_presence.1 = status; } @@ -330,10 +376,17 @@ impl Shard { /// # #[cfg(not(feature = "model"))] /// # fn main() {} /// ``` - pub fn shard_info(&self) -> [u64; 2] { self.shard_info } + pub fn shard_info(&self) -> [u64; 2] { + trace!("[Shard {:?}] Getting shard info", self.shard_info); + trace!("[Shard {:?}] State14: {:?}", self.shard_info, self); + self.shard_info + } /// Returns the current connection stage of the shard. pub fn stage(&self) -> ConnectionStage { + trace!("[Shard {:?}] Getting stage", self.shard_info); + trace!("[Shard {:?}] State15: {:?}", self.shard_info, self); + self.stage } @@ -364,6 +417,9 @@ impl Shard { #[allow(cyclomatic_complexity)] pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>) -> Result<Option<ShardAction>> { + trace!("[Shard {:?}] Handling event: {:?}", self.shard_info, event); + trace!("[Shard {:?}] State16: {:?}", self.shard_info, self); + match *event { Ok(GatewayEvent::Dispatch(seq, ref event)) => { if seq > self.seq + 1 { @@ -402,15 +458,21 @@ impl Shard { s, self.seq ); + trace!("[Shard {:?}] State17: {:?}", self.shard_info, self); if self.stage == ConnectionStage::Handshake { + trace!( + "[Shard {:?}] Received heartbeat; now in identifying stage", + self.shard_info, + ); + self.stage = ConnectionStage::Identifying; return Ok(Some(ShardAction::Identify)); } else { warn!( "[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting", - self.shard_info + self.shard_info, ); return Ok(Some(ShardAction::Reconnect(self.reconnection_type()))); @@ -424,6 +486,7 @@ impl Shard { self.last_heartbeat_acknowledged = true; trace!("[Shard {:?}] Received heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State18: {:?}", self.shard_info, self); Ok(None) }, @@ -431,6 +494,7 @@ impl Shard { debug!("[Shard {:?}] Received a Hello; interval: {}", self.shard_info, interval); + trace!("[Shard {:?}] State19: {:?}", self.shard_info, self); if self.stage == ConnectionStage::Resuming { return Ok(None); @@ -454,6 +518,7 @@ impl Shard { "[Shard {:?}] Received session invalidation", self.shard_info, ); + trace!("[Shard {:?}] State20: {:?}", self.shard_info, self); Ok(Some(if resumable { ShardAction::Reconnect(ReconnectType::Resume) @@ -462,9 +527,18 @@ impl Shard { })) }, Ok(GatewayEvent::Reconnect) => { + trace!("[Shard {:?}] Received reconnect", self.shard_info); + trace!("[Shard {:?}] State21: {:?}", self.shard_info, self); + Ok(Some(ShardAction::Reconnect(ReconnectType::Reidentify))) }, Err(Error::Gateway(GatewayError::Closed(ref data))) => { + trace!("[Shard {:?}] Received close: {:?}", + self.shard_info, + data, + ); + trace!("[Shard {:?}] State22: {:?}", self.shard_info, self); + let num = data.as_ref().map(|d| d.status_code); let clean = num == Some(1000); @@ -543,6 +617,9 @@ impl Shard { })) }, Err(Error::WebSocket(ref why)) => { + trace!("[Shard {:?}] ws err: {:?}", self.shard_info, why); + trace!("[Shard {:?}] State23: {:?}", self.shard_info, self); + if let WebSocketError::NoDataAvailable = *why { if self.heartbeat_instants.1.is_none() { return Ok(None); @@ -575,10 +652,19 @@ impl Shard { /// - a heartbeat acknowledgement was not received in time /// - an error occurred while heartbeating pub fn check_heartbeat(&mut self) -> bool { + trace!("[Shard {:?}] Checking heartbeat", self.shard_info); + trace!("[Shard {:?}] State24: {:?}", self.shard_info, self); + let wait = { let heartbeat_interval = match self.heartbeat_interval { Some(heartbeat_interval) => heartbeat_interval, None => { + trace!( + "[Shard {:?}] Checking heartbeat; no interval; elapsed diff: {:?}", + self.shard_info, + self.started.elapsed() < StdDuration::from_secs(15), + ); + return self.started.elapsed() < StdDuration::from_secs(15); }, }; @@ -586,10 +672,24 @@ impl Shard { StdDuration::from_secs(heartbeat_interval / 1000) }; + trace!( + "[Shard {:?}] Checking heartbeat; wait: {:?}", + self.shard_info, + wait, + ); + // If a duration of time less than the heartbeat_interval has passed, // then don't perform a keepalive or attempt to reconnect. if let Some(last_sent) = self.heartbeat_instants.0 { - if last_sent.elapsed() <= wait { + let elapsed = last_sent.elapsed(); + + if elapsed <= wait { + trace!( + "[Shard {:?}] Last sent elapsed: {:?}", + self.shard_info, + elapsed, + ); + return true; } } @@ -601,17 +701,22 @@ impl Shard { "[Shard {:?}] Last heartbeat not acknowledged", self.shard_info, ); + trace!("[Shard {:?}] State25: {:?}", self.shard_info, self); return false; + } else { + trace!("[Shard {:?}] Last heartbeat acknowledged", self.shard_info); } // Otherwise, we're good to heartbeat. if let Err(why) = self.heartbeat() { warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); + trace!("[Shard {:?}] State26: {:?}", self.shard_info, self); false } else { trace!("[Shard {:?}] Heartbeated", self.shard_info); + trace!("[Shard {:?}] State27: {:?}", self.shard_info, self); true } @@ -621,6 +726,9 @@ impl Shard { // Shamelessly stolen from brayzure's commit in eris: // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6> pub fn latency(&self) -> Option<StdDuration> { + trace!("[Shard {:?}] Getting latency", self.shard_info); + trace!("[Shard {:?}] State28: {:?}", self.shard_info, self); + if let (Some(sent), Some(received)) = self.heartbeat_instants { if received > sent { return Some(received - sent); @@ -643,6 +751,9 @@ impl Shard { /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id pub fn should_reconnect(&mut self) -> Option<ReconnectType> { + trace!("[Shard {:?}] Should reconnect?", self.shard_info); + trace!("[Shard {:?}] State29: {:?}", self.shard_info, self); + if self.stage == ConnectionStage::Connecting { return None; } @@ -651,6 +762,9 @@ impl Shard { } pub fn reconnection_type(&self) -> ReconnectType { + trace!("[Shard {:?}] Reconnection type", self.shard_info); + trace!("[Shard {:?}] State30: {:?}", self.shard_info, self); + if self.session_id().is_some() { ReconnectType::Resume } else { @@ -744,6 +858,7 @@ impl Shard { query: Option<&str>, ) -> Result<()> where It: IntoIterator<Item=GuildId> { debug!("[Shard {:?}] Requesting member chunks", self.shard_info); + trace!("[Shard {:?}] State31: {:?}", self.shard_info, self); self.client.send_chunk_guilds( guild_ids, @@ -758,11 +873,17 @@ impl Shard { // - the time that the last heartbeat sent as being now // - the `stage` to `Identifying` pub fn identify(&mut self) -> Result<()> { + trace!("[Shard {:?}] Identifying", self.shard_info); + trace!("[Shard {:?}] State32: {:?}", self.shard_info, self); + self.client.send_identify(&self.shard_info, &self.token.lock())?; self.heartbeat_instants.0 = Some(Instant::now()); self.stage = ConnectionStage::Identifying; + trace!("[Shard {:?}] Identified", self.shard_info); + trace!("[Shard {:?}] State33: {:?}", self.shard_info, self); + Ok(()) } @@ -772,6 +893,7 @@ impl Shard { /// the client. pub fn initialize(&mut self) -> Result<WsClient> { debug!("[Shard {:?}] Initializing", self.shard_info); + trace!("[Shard {:?}] State34: {:?}", self.shard_info, self); // We need to do two, sort of three things here: // @@ -786,22 +908,38 @@ impl Shard { let mut client = connect(&self.ws_url.lock())?; self.stage = ConnectionStage::Handshake; - let _ = set_client_timeout(&mut client); + if let Err(why) = set_client_timeout(&mut client) { + warn!( + "[Shard {:?}] Error setting timeouts when initializing: {:?}", + self.shard_info, + why, + ); + } + + trace!("[Shard {:?}] Initialized", self.shard_info); + trace!("[Shard {:?}] State35: {:?}", self.shard_info, self); Ok(client) } pub fn reset(&mut self) { + trace!("[Shard {:?}] Resetting", self.shard_info); + trace!("[Shard {:?}] State36: {:?}", self.shard_info, self); + self.heartbeat_instants = (Some(Instant::now()), None); self.heartbeat_interval = None; self.last_heartbeat_acknowledged = true; self.session_id = None; self.stage = ConnectionStage::Disconnected; self.seq = 0; + + trace!("[Shard {:?}] Reset", self.shard_info); + trace!("[Shard {:?}] State37: {:?}", self.shard_info, self); } pub fn resume(&mut self) -> Result<()> { debug!("Shard {:?}] Attempting to resume", self.shard_info); + trace!("[Shard {:?}] State38: {:?}", self.shard_info, self); self.client = self.initialize()?; self.stage = ConnectionStage::Resuming; @@ -821,6 +959,7 @@ impl Shard { pub fn reconnect(&mut self) -> Result<()> { info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); + trace!("[Shard {:?}] State39: {:?}", self.shard_info, self); self.reset(); self.client = self.initialize()?; @@ -829,6 +968,8 @@ impl Shard { } pub fn update_presence(&mut self) -> Result<()> { + trace!("[Shard {:?}] Updating presence", self.shard_info); + trace!("[Shard {:?}] State40: {:?}", self.shard_info, self); self.client.send_presence_update( &self.shard_info, &self.current_presence, @@ -836,6 +977,26 @@ impl Shard { } } +impl Debug for Shard { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + f.debug_struct("Shard") + .field("client", self.client.stream_ref()) + .field("current_presence", &self.current_presence) + .field("heartbeat_instants", &self.heartbeat_instants) + .field("heartbeat_interval", &self.heartbeat_interval) + .field("last_heartbeat_acknowledged", &self.last_heartbeat_acknowledged) + .field("seq", &self.seq) + .field("session_id", &self.session_id) + .field("shard_info", &self.shard_info) + .field("shutdown", &self.shutdown) + .field("stage", &self.stage) + .field("started", &self.started) + .field("token", &self.token) + .field("ws_url", &self.ws_url) + .finish() + } +} + fn connect(base_url: &str) -> Result<WsClient> { let url = build_gateway_url(base_url)?; let client = ClientBuilder::from_url(&url).connect_secure(None)?; |