diff options
Diffstat (limited to 'src/gateway/shard.rs')
| -rw-r--r-- | src/gateway/shard.rs | 171 |
1 files changed, 166 insertions, 5 deletions
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 13944e6..a42ca68 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -8,6 +8,7 @@ use model::{ }; use parking_lot::Mutex; use std::{ + fmt::{Debug, Formatter, Result as FmtResult}, sync::Arc, time::{Duration as StdDuration, Instant} }; @@ -138,7 +139,9 @@ impl Shard { ) -> Result<Shard> { let mut client = connect(&*ws_url.lock())?; - let _ = set_client_timeout(&mut client); + if let Err(why) = set_client_timeout(&mut client) { + warn!("[Shard {:?}] Error setting timeout: {:?}", shard_info, why); + } let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -168,8 +171,12 @@ impl Shard { /// Retrieves the current presence of the shard. #[inline] pub fn current_presence(&self) -> &CurrentPresence { + trace!("[Shard {:?}] Getting current presence", self.shard_info); + trace!("[Shard {:?}] State0: {:?}", self.shard_info, self); + &self.current_presence } + /// Whether the shard has permanently shutdown. /// /// This should normally happen due to manual calling of [`shutdown`] or @@ -179,6 +186,9 @@ impl Shard { /// [`shutdown_clean`]: #method.shutdown_clean #[inline] pub fn is_shutdown(&self) -> bool { + trace!("[Shard {:?}] Getting whether shutdown", self.shard_info); + trace!("[Shard {:?}] State01: {:?}", self.shard_info, self); + self.shutdown } @@ -188,18 +198,27 @@ impl Shard { /// acknowledgement was last received. #[inline] pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) { + trace!("[Shard {:?}] Getting heartbeat instants", self.shard_info); + trace!("[Shard {:?}] State02: {:?}", self.shard_info, self); + &self.heartbeat_instants } /// Retrieves the value of when the last heartbeat was sent. #[inline] pub fn last_heartbeat_sent(&self) -> Option<&Instant> { + trace!("[Shard {:?}] Getting last heartbeat sent", self.shard_info); + trace!("[Shard {:?}] State03: {:?}", self.shard_info, self); + self.heartbeat_instants.0.as_ref() } /// Retrieves the value of when the last heartbeat ack was received. #[inline] pub fn last_heartbeat_ack(&self) -> Option<&Instant> { + trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State04: {:?}", self.shard_info, self); + self.heartbeat_instants.1.as_ref() } @@ -215,11 +234,17 @@ impl Shard { /// /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed pub fn heartbeat(&mut self) -> Result<()> { + trace!("[Shard {:?}] Heartbeating", self.shard_info); + trace!("[Shard {:?}] State05: {:?}", self.shard_info, self); + match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) { Ok(()) => { self.heartbeat_instants.0 = Some(Instant::now()); self.last_heartbeat_acknowledged = false; + trace!("[Shard {:?}] Heartbeated", self.shard_info); + trace!("[Shard {:?}] State06: {:?}", self.shard_info, self); + Ok(()) }, Err(why) => { @@ -243,21 +268,33 @@ impl Shard { #[inline] pub fn heartbeat_interval(&self) -> Option<&u64> { + trace!("[Shard {:?}] Getting heartbeat interval", self.shard_info); + trace!("[Shard {:?}] State07: {:?}", self.shard_info, self); + self.heartbeat_interval.as_ref() } #[inline] pub fn last_heartbeat_acknowledged(&self) -> bool { + trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State08: {:?}", self.shard_info, self); + self.last_heartbeat_acknowledged } #[inline] pub fn seq(&self) -> u64 { + trace!("[Shard {:?}] Getting seq", self.shard_info); + trace!("[Shard {:?}] State09: {:?}", self.shard_info, self); + self.seq } #[inline] pub fn session_id(&self) -> Option<&String> { + trace!("[Shard {:?}] Getting session_id", self.shard_info); + trace!("[Shard {:?}] State10: {:?}", self.shard_info, self); + self.session_id.as_ref() } @@ -282,11 +319,17 @@ impl Shard { /// ``` #[inline] pub fn set_game(&mut self, game: Option<Game>) { + trace!("[Shard {:?}] Setting game: {:?}", self.shard_info, game); + trace!("[Shard {:?}] State11: {:?}", self.shard_info, self); + self.current_presence.0 = game; } #[inline] pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) { + trace!("[Shard {:?}] Setting presence: {:?}", self.shard_info, game); + trace!("[Shard {:?}] State12: {:?}", self.shard_info, self); + self.set_game(game); self.set_status(status); } @@ -297,6 +340,9 @@ impl Shard { status = OnlineStatus::Invisible; } + trace!("[Shard {:?}] Setting status: {:?}", self.shard_info, status); + trace!("[Shard {:?}] State13: {:?}", self.shard_info, self); + self.current_presence.1 = status; } @@ -330,10 +376,17 @@ impl Shard { /// # #[cfg(not(feature = "model"))] /// # fn main() {} /// ``` - pub fn shard_info(&self) -> [u64; 2] { self.shard_info } + pub fn shard_info(&self) -> [u64; 2] { + trace!("[Shard {:?}] Getting shard info", self.shard_info); + trace!("[Shard {:?}] State14: {:?}", self.shard_info, self); + self.shard_info + } /// Returns the current connection stage of the shard. pub fn stage(&self) -> ConnectionStage { + trace!("[Shard {:?}] Getting stage", self.shard_info); + trace!("[Shard {:?}] State15: {:?}", self.shard_info, self); + self.stage } @@ -364,6 +417,9 @@ impl Shard { #[allow(cyclomatic_complexity)] pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>) -> Result<Option<ShardAction>> { + trace!("[Shard {:?}] Handling event: {:?}", self.shard_info, event); + trace!("[Shard {:?}] State16: {:?}", self.shard_info, self); + match *event { Ok(GatewayEvent::Dispatch(seq, ref event)) => { if seq > self.seq + 1 { @@ -402,15 +458,21 @@ impl Shard { s, self.seq ); + trace!("[Shard {:?}] State17: {:?}", self.shard_info, self); if self.stage == ConnectionStage::Handshake { + trace!( + "[Shard {:?}] Received heartbeat; now in identifying stage", + self.shard_info, + ); + self.stage = ConnectionStage::Identifying; return Ok(Some(ShardAction::Identify)); } else { warn!( "[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting", - self.shard_info + self.shard_info, ); return Ok(Some(ShardAction::Reconnect(self.reconnection_type()))); @@ -424,6 +486,7 @@ impl Shard { self.last_heartbeat_acknowledged = true; trace!("[Shard {:?}] Received heartbeat ack", self.shard_info); + trace!("[Shard {:?}] State18: {:?}", self.shard_info, self); Ok(None) }, @@ -431,6 +494,7 @@ impl Shard { debug!("[Shard {:?}] Received a Hello; interval: {}", self.shard_info, interval); + trace!("[Shard {:?}] State19: {:?}", self.shard_info, self); if self.stage == ConnectionStage::Resuming { return Ok(None); @@ -454,6 +518,7 @@ impl Shard { "[Shard {:?}] Received session invalidation", self.shard_info, ); + trace!("[Shard {:?}] State20: {:?}", self.shard_info, self); Ok(Some(if resumable { ShardAction::Reconnect(ReconnectType::Resume) @@ -462,9 +527,18 @@ impl Shard { })) }, Ok(GatewayEvent::Reconnect) => { + trace!("[Shard {:?}] Received reconnect", self.shard_info); + trace!("[Shard {:?}] State21: {:?}", self.shard_info, self); + Ok(Some(ShardAction::Reconnect(ReconnectType::Reidentify))) }, Err(Error::Gateway(GatewayError::Closed(ref data))) => { + trace!("[Shard {:?}] Received close: {:?}", + self.shard_info, + data, + ); + trace!("[Shard {:?}] State22: {:?}", self.shard_info, self); + let num = data.as_ref().map(|d| d.status_code); let clean = num == Some(1000); @@ -543,6 +617,9 @@ impl Shard { })) }, Err(Error::WebSocket(ref why)) => { + trace!("[Shard {:?}] ws err: {:?}", self.shard_info, why); + trace!("[Shard {:?}] State23: {:?}", self.shard_info, self); + if let WebSocketError::NoDataAvailable = *why { if self.heartbeat_instants.1.is_none() { return Ok(None); @@ -575,10 +652,19 @@ impl Shard { /// - a heartbeat acknowledgement was not received in time /// - an error occurred while heartbeating pub fn check_heartbeat(&mut self) -> bool { + trace!("[Shard {:?}] Checking heartbeat", self.shard_info); + trace!("[Shard {:?}] State24: {:?}", self.shard_info, self); + let wait = { let heartbeat_interval = match self.heartbeat_interval { Some(heartbeat_interval) => heartbeat_interval, None => { + trace!( + "[Shard {:?}] Checking heartbeat; no interval; elapsed diff: {:?}", + self.shard_info, + self.started.elapsed() < StdDuration::from_secs(15), + ); + return self.started.elapsed() < StdDuration::from_secs(15); }, }; @@ -586,10 +672,24 @@ impl Shard { StdDuration::from_secs(heartbeat_interval / 1000) }; + trace!( + "[Shard {:?}] Checking heartbeat; wait: {:?}", + self.shard_info, + wait, + ); + // If a duration of time less than the heartbeat_interval has passed, // then don't perform a keepalive or attempt to reconnect. if let Some(last_sent) = self.heartbeat_instants.0 { - if last_sent.elapsed() <= wait { + let elapsed = last_sent.elapsed(); + + if elapsed <= wait { + trace!( + "[Shard {:?}] Last sent elapsed: {:?}", + self.shard_info, + elapsed, + ); + return true; } } @@ -601,17 +701,22 @@ impl Shard { "[Shard {:?}] Last heartbeat not acknowledged", self.shard_info, ); + trace!("[Shard {:?}] State25: {:?}", self.shard_info, self); return false; + } else { + trace!("[Shard {:?}] Last heartbeat acknowledged", self.shard_info); } // Otherwise, we're good to heartbeat. if let Err(why) = self.heartbeat() { warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); + trace!("[Shard {:?}] State26: {:?}", self.shard_info, self); false } else { trace!("[Shard {:?}] Heartbeated", self.shard_info); + trace!("[Shard {:?}] State27: {:?}", self.shard_info, self); true } @@ -621,6 +726,9 @@ impl Shard { // Shamelessly stolen from brayzure's commit in eris: // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6> pub fn latency(&self) -> Option<StdDuration> { + trace!("[Shard {:?}] Getting latency", self.shard_info); + trace!("[Shard {:?}] State28: {:?}", self.shard_info, self); + if let (Some(sent), Some(received)) = self.heartbeat_instants { if received > sent { return Some(received - sent); @@ -643,6 +751,9 @@ impl Shard { /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id pub fn should_reconnect(&mut self) -> Option<ReconnectType> { + trace!("[Shard {:?}] Should reconnect?", self.shard_info); + trace!("[Shard {:?}] State29: {:?}", self.shard_info, self); + if self.stage == ConnectionStage::Connecting { return None; } @@ -651,6 +762,9 @@ impl Shard { } pub fn reconnection_type(&self) -> ReconnectType { + trace!("[Shard {:?}] Reconnection type", self.shard_info); + trace!("[Shard {:?}] State30: {:?}", self.shard_info, self); + if self.session_id().is_some() { ReconnectType::Resume } else { @@ -744,6 +858,7 @@ impl Shard { query: Option<&str>, ) -> Result<()> where It: IntoIterator<Item=GuildId> { debug!("[Shard {:?}] Requesting member chunks", self.shard_info); + trace!("[Shard {:?}] State31: {:?}", self.shard_info, self); self.client.send_chunk_guilds( guild_ids, @@ -758,11 +873,17 @@ impl Shard { // - the time that the last heartbeat sent as being now // - the `stage` to `Identifying` pub fn identify(&mut self) -> Result<()> { + trace!("[Shard {:?}] Identifying", self.shard_info); + trace!("[Shard {:?}] State32: {:?}", self.shard_info, self); + self.client.send_identify(&self.shard_info, &self.token.lock())?; self.heartbeat_instants.0 = Some(Instant::now()); self.stage = ConnectionStage::Identifying; + trace!("[Shard {:?}] Identified", self.shard_info); + trace!("[Shard {:?}] State33: {:?}", self.shard_info, self); + Ok(()) } @@ -772,6 +893,7 @@ impl Shard { /// the client. pub fn initialize(&mut self) -> Result<WsClient> { debug!("[Shard {:?}] Initializing", self.shard_info); + trace!("[Shard {:?}] State34: {:?}", self.shard_info, self); // We need to do two, sort of three things here: // @@ -786,22 +908,38 @@ impl Shard { let mut client = connect(&self.ws_url.lock())?; self.stage = ConnectionStage::Handshake; - let _ = set_client_timeout(&mut client); + if let Err(why) = set_client_timeout(&mut client) { + warn!( + "[Shard {:?}] Error setting timeouts when initializing: {:?}", + self.shard_info, + why, + ); + } + + trace!("[Shard {:?}] Initialized", self.shard_info); + trace!("[Shard {:?}] State35: {:?}", self.shard_info, self); Ok(client) } pub fn reset(&mut self) { + trace!("[Shard {:?}] Resetting", self.shard_info); + trace!("[Shard {:?}] State36: {:?}", self.shard_info, self); + self.heartbeat_instants = (Some(Instant::now()), None); self.heartbeat_interval = None; self.last_heartbeat_acknowledged = true; self.session_id = None; self.stage = ConnectionStage::Disconnected; self.seq = 0; + + trace!("[Shard {:?}] Reset", self.shard_info); + trace!("[Shard {:?}] State37: {:?}", self.shard_info, self); } pub fn resume(&mut self) -> Result<()> { debug!("Shard {:?}] Attempting to resume", self.shard_info); + trace!("[Shard {:?}] State38: {:?}", self.shard_info, self); self.client = self.initialize()?; self.stage = ConnectionStage::Resuming; @@ -821,6 +959,7 @@ impl Shard { pub fn reconnect(&mut self) -> Result<()> { info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); + trace!("[Shard {:?}] State39: {:?}", self.shard_info, self); self.reset(); self.client = self.initialize()?; @@ -829,6 +968,8 @@ impl Shard { } pub fn update_presence(&mut self) -> Result<()> { + trace!("[Shard {:?}] Updating presence", self.shard_info); + trace!("[Shard {:?}] State40: {:?}", self.shard_info, self); self.client.send_presence_update( &self.shard_info, &self.current_presence, @@ -836,6 +977,26 @@ impl Shard { } } +impl Debug for Shard { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + f.debug_struct("Shard") + .field("client", self.client.stream_ref()) + .field("current_presence", &self.current_presence) + .field("heartbeat_instants", &self.heartbeat_instants) + .field("heartbeat_interval", &self.heartbeat_interval) + .field("last_heartbeat_acknowledged", &self.last_heartbeat_acknowledged) + .field("seq", &self.seq) + .field("session_id", &self.session_id) + .field("shard_info", &self.shard_info) + .field("shutdown", &self.shutdown) + .field("stage", &self.stage) + .field("started", &self.started) + .field("token", &self.token) + .field("ws_url", &self.ws_url) + .finish() + } +} + fn connect(base_url: &str) -> Result<WsClient> { let url = build_gateway_url(base_url)?; let client = ClientBuilder::from_url(&url).connect_secure(None)?; |