aboutsummaryrefslogtreecommitdiff
path: root/src/gateway/shard.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/gateway/shard.rs')
-rw-r--r--src/gateway/shard.rs171
1 files changed, 166 insertions, 5 deletions
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index 13944e6..a42ca68 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -8,6 +8,7 @@ use model::{
};
use parking_lot::Mutex;
use std::{
+ fmt::{Debug, Formatter, Result as FmtResult},
sync::Arc,
time::{Duration as StdDuration, Instant}
};
@@ -138,7 +139,9 @@ impl Shard {
) -> Result<Shard> {
let mut client = connect(&*ws_url.lock())?;
- let _ = set_client_timeout(&mut client);
+ if let Err(why) = set_client_timeout(&mut client) {
+ warn!("[Shard {:?}] Error setting timeout: {:?}", shard_info, why);
+ }
let current_presence = (None, OnlineStatus::Online);
let heartbeat_instants = (None, None);
@@ -168,8 +171,12 @@ impl Shard {
/// Retrieves the current presence of the shard.
#[inline]
pub fn current_presence(&self) -> &CurrentPresence {
+ trace!("[Shard {:?}] Getting current presence", self.shard_info);
+ trace!("[Shard {:?}] State0: {:?}", self.shard_info, self);
+
&self.current_presence
}
+
/// Whether the shard has permanently shutdown.
///
/// This should normally happen due to manual calling of [`shutdown`] or
@@ -179,6 +186,9 @@ impl Shard {
/// [`shutdown_clean`]: #method.shutdown_clean
#[inline]
pub fn is_shutdown(&self) -> bool {
+ trace!("[Shard {:?}] Getting whether shutdown", self.shard_info);
+ trace!("[Shard {:?}] State01: {:?}", self.shard_info, self);
+
self.shutdown
}
@@ -188,18 +198,27 @@ impl Shard {
/// acknowledgement was last received.
#[inline]
pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) {
+ trace!("[Shard {:?}] Getting heartbeat instants", self.shard_info);
+ trace!("[Shard {:?}] State02: {:?}", self.shard_info, self);
+
&self.heartbeat_instants
}
/// Retrieves the value of when the last heartbeat was sent.
#[inline]
pub fn last_heartbeat_sent(&self) -> Option<&Instant> {
+ trace!("[Shard {:?}] Getting last heartbeat sent", self.shard_info);
+ trace!("[Shard {:?}] State03: {:?}", self.shard_info, self);
+
self.heartbeat_instants.0.as_ref()
}
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
pub fn last_heartbeat_ack(&self) -> Option<&Instant> {
+ trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State04: {:?}", self.shard_info, self);
+
self.heartbeat_instants.1.as_ref()
}
@@ -215,11 +234,17 @@ impl Shard {
///
/// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed
pub fn heartbeat(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Heartbeating", self.shard_info);
+ trace!("[Shard {:?}] State05: {:?}", self.shard_info, self);
+
match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) {
Ok(()) => {
self.heartbeat_instants.0 = Some(Instant::now());
self.last_heartbeat_acknowledged = false;
+ trace!("[Shard {:?}] Heartbeated", self.shard_info);
+ trace!("[Shard {:?}] State06: {:?}", self.shard_info, self);
+
Ok(())
},
Err(why) => {
@@ -243,21 +268,33 @@ impl Shard {
#[inline]
pub fn heartbeat_interval(&self) -> Option<&u64> {
+ trace!("[Shard {:?}] Getting heartbeat interval", self.shard_info);
+ trace!("[Shard {:?}] State07: {:?}", self.shard_info, self);
+
self.heartbeat_interval.as_ref()
}
#[inline]
pub fn last_heartbeat_acknowledged(&self) -> bool {
+ trace!("[Shard {:?}] Getting last heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State08: {:?}", self.shard_info, self);
+
self.last_heartbeat_acknowledged
}
#[inline]
pub fn seq(&self) -> u64 {
+ trace!("[Shard {:?}] Getting seq", self.shard_info);
+ trace!("[Shard {:?}] State09: {:?}", self.shard_info, self);
+
self.seq
}
#[inline]
pub fn session_id(&self) -> Option<&String> {
+ trace!("[Shard {:?}] Getting session_id", self.shard_info);
+ trace!("[Shard {:?}] State10: {:?}", self.shard_info, self);
+
self.session_id.as_ref()
}
@@ -282,11 +319,17 @@ impl Shard {
/// ```
#[inline]
pub fn set_game(&mut self, game: Option<Game>) {
+ trace!("[Shard {:?}] Setting game: {:?}", self.shard_info, game);
+ trace!("[Shard {:?}] State11: {:?}", self.shard_info, self);
+
self.current_presence.0 = game;
}
#[inline]
pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) {
+ trace!("[Shard {:?}] Setting presence: {:?}", self.shard_info, game);
+ trace!("[Shard {:?}] State12: {:?}", self.shard_info, self);
+
self.set_game(game);
self.set_status(status);
}
@@ -297,6 +340,9 @@ impl Shard {
status = OnlineStatus::Invisible;
}
+ trace!("[Shard {:?}] Setting status: {:?}", self.shard_info, status);
+ trace!("[Shard {:?}] State13: {:?}", self.shard_info, self);
+
self.current_presence.1 = status;
}
@@ -330,10 +376,17 @@ impl Shard {
/// # #[cfg(not(feature = "model"))]
/// # fn main() {}
/// ```
- pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
+ pub fn shard_info(&self) -> [u64; 2] {
+ trace!("[Shard {:?}] Getting shard info", self.shard_info);
+ trace!("[Shard {:?}] State14: {:?}", self.shard_info, self);
+ self.shard_info
+ }
/// Returns the current connection stage of the shard.
pub fn stage(&self) -> ConnectionStage {
+ trace!("[Shard {:?}] Getting stage", self.shard_info);
+ trace!("[Shard {:?}] State15: {:?}", self.shard_info, self);
+
self.stage
}
@@ -364,6 +417,9 @@ impl Shard {
#[allow(cyclomatic_complexity)]
pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>)
-> Result<Option<ShardAction>> {
+ trace!("[Shard {:?}] Handling event: {:?}", self.shard_info, event);
+ trace!("[Shard {:?}] State16: {:?}", self.shard_info, self);
+
match *event {
Ok(GatewayEvent::Dispatch(seq, ref event)) => {
if seq > self.seq + 1 {
@@ -402,15 +458,21 @@ impl Shard {
s,
self.seq
);
+ trace!("[Shard {:?}] State17: {:?}", self.shard_info, self);
if self.stage == ConnectionStage::Handshake {
+ trace!(
+ "[Shard {:?}] Received heartbeat; now in identifying stage",
+ self.shard_info,
+ );
+
self.stage = ConnectionStage::Identifying;
return Ok(Some(ShardAction::Identify));
} else {
warn!(
"[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting",
- self.shard_info
+ self.shard_info,
);
return Ok(Some(ShardAction::Reconnect(self.reconnection_type())));
@@ -424,6 +486,7 @@ impl Shard {
self.last_heartbeat_acknowledged = true;
trace!("[Shard {:?}] Received heartbeat ack", self.shard_info);
+ trace!("[Shard {:?}] State18: {:?}", self.shard_info, self);
Ok(None)
},
@@ -431,6 +494,7 @@ impl Shard {
debug!("[Shard {:?}] Received a Hello; interval: {}",
self.shard_info,
interval);
+ trace!("[Shard {:?}] State19: {:?}", self.shard_info, self);
if self.stage == ConnectionStage::Resuming {
return Ok(None);
@@ -454,6 +518,7 @@ impl Shard {
"[Shard {:?}] Received session invalidation",
self.shard_info,
);
+ trace!("[Shard {:?}] State20: {:?}", self.shard_info, self);
Ok(Some(if resumable {
ShardAction::Reconnect(ReconnectType::Resume)
@@ -462,9 +527,18 @@ impl Shard {
}))
},
Ok(GatewayEvent::Reconnect) => {
+ trace!("[Shard {:?}] Received reconnect", self.shard_info);
+ trace!("[Shard {:?}] State21: {:?}", self.shard_info, self);
+
Ok(Some(ShardAction::Reconnect(ReconnectType::Reidentify)))
},
Err(Error::Gateway(GatewayError::Closed(ref data))) => {
+ trace!("[Shard {:?}] Received close: {:?}",
+ self.shard_info,
+ data,
+ );
+ trace!("[Shard {:?}] State22: {:?}", self.shard_info, self);
+
let num = data.as_ref().map(|d| d.status_code);
let clean = num == Some(1000);
@@ -543,6 +617,9 @@ impl Shard {
}))
},
Err(Error::WebSocket(ref why)) => {
+ trace!("[Shard {:?}] ws err: {:?}", self.shard_info, why);
+ trace!("[Shard {:?}] State23: {:?}", self.shard_info, self);
+
if let WebSocketError::NoDataAvailable = *why {
if self.heartbeat_instants.1.is_none() {
return Ok(None);
@@ -575,10 +652,19 @@ impl Shard {
/// - a heartbeat acknowledgement was not received in time
/// - an error occurred while heartbeating
pub fn check_heartbeat(&mut self) -> bool {
+ trace!("[Shard {:?}] Checking heartbeat", self.shard_info);
+ trace!("[Shard {:?}] State24: {:?}", self.shard_info, self);
+
let wait = {
let heartbeat_interval = match self.heartbeat_interval {
Some(heartbeat_interval) => heartbeat_interval,
None => {
+ trace!(
+ "[Shard {:?}] Checking heartbeat; no interval; elapsed diff: {:?}",
+ self.shard_info,
+ self.started.elapsed() < StdDuration::from_secs(15),
+ );
+
return self.started.elapsed() < StdDuration::from_secs(15);
},
};
@@ -586,10 +672,24 @@ impl Shard {
StdDuration::from_secs(heartbeat_interval / 1000)
};
+ trace!(
+ "[Shard {:?}] Checking heartbeat; wait: {:?}",
+ self.shard_info,
+ wait,
+ );
+
// If a duration of time less than the heartbeat_interval has passed,
// then don't perform a keepalive or attempt to reconnect.
if let Some(last_sent) = self.heartbeat_instants.0 {
- if last_sent.elapsed() <= wait {
+ let elapsed = last_sent.elapsed();
+
+ if elapsed <= wait {
+ trace!(
+ "[Shard {:?}] Last sent elapsed: {:?}",
+ self.shard_info,
+ elapsed,
+ );
+
return true;
}
}
@@ -601,17 +701,22 @@ impl Shard {
"[Shard {:?}] Last heartbeat not acknowledged",
self.shard_info,
);
+ trace!("[Shard {:?}] State25: {:?}", self.shard_info, self);
return false;
+ } else {
+ trace!("[Shard {:?}] Last heartbeat acknowledged", self.shard_info);
}
// Otherwise, we're good to heartbeat.
if let Err(why) = self.heartbeat() {
warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
+ trace!("[Shard {:?}] State26: {:?}", self.shard_info, self);
false
} else {
trace!("[Shard {:?}] Heartbeated", self.shard_info);
+ trace!("[Shard {:?}] State27: {:?}", self.shard_info, self);
true
}
@@ -621,6 +726,9 @@ impl Shard {
// Shamelessly stolen from brayzure's commit in eris:
// <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
pub fn latency(&self) -> Option<StdDuration> {
+ trace!("[Shard {:?}] Getting latency", self.shard_info);
+ trace!("[Shard {:?}] State28: {:?}", self.shard_info, self);
+
if let (Some(sent), Some(received)) = self.heartbeat_instants {
if received > sent {
return Some(received - sent);
@@ -643,6 +751,9 @@ impl Shard {
/// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting
/// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id
pub fn should_reconnect(&mut self) -> Option<ReconnectType> {
+ trace!("[Shard {:?}] Should reconnect?", self.shard_info);
+ trace!("[Shard {:?}] State29: {:?}", self.shard_info, self);
+
if self.stage == ConnectionStage::Connecting {
return None;
}
@@ -651,6 +762,9 @@ impl Shard {
}
pub fn reconnection_type(&self) -> ReconnectType {
+ trace!("[Shard {:?}] Reconnection type", self.shard_info);
+ trace!("[Shard {:?}] State30: {:?}", self.shard_info, self);
+
if self.session_id().is_some() {
ReconnectType::Resume
} else {
@@ -744,6 +858,7 @@ impl Shard {
query: Option<&str>,
) -> Result<()> where It: IntoIterator<Item=GuildId> {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);
+ trace!("[Shard {:?}] State31: {:?}", self.shard_info, self);
self.client.send_chunk_guilds(
guild_ids,
@@ -758,11 +873,17 @@ impl Shard {
// - the time that the last heartbeat sent as being now
// - the `stage` to `Identifying`
pub fn identify(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Identifying", self.shard_info);
+ trace!("[Shard {:?}] State32: {:?}", self.shard_info, self);
+
self.client.send_identify(&self.shard_info, &self.token.lock())?;
self.heartbeat_instants.0 = Some(Instant::now());
self.stage = ConnectionStage::Identifying;
+ trace!("[Shard {:?}] Identified", self.shard_info);
+ trace!("[Shard {:?}] State33: {:?}", self.shard_info, self);
+
Ok(())
}
@@ -772,6 +893,7 @@ impl Shard {
/// the client.
pub fn initialize(&mut self) -> Result<WsClient> {
debug!("[Shard {:?}] Initializing", self.shard_info);
+ trace!("[Shard {:?}] State34: {:?}", self.shard_info, self);
// We need to do two, sort of three things here:
//
@@ -786,22 +908,38 @@ impl Shard {
let mut client = connect(&self.ws_url.lock())?;
self.stage = ConnectionStage::Handshake;
- let _ = set_client_timeout(&mut client);
+ if let Err(why) = set_client_timeout(&mut client) {
+ warn!(
+ "[Shard {:?}] Error setting timeouts when initializing: {:?}",
+ self.shard_info,
+ why,
+ );
+ }
+
+ trace!("[Shard {:?}] Initialized", self.shard_info);
+ trace!("[Shard {:?}] State35: {:?}", self.shard_info, self);
Ok(client)
}
pub fn reset(&mut self) {
+ trace!("[Shard {:?}] Resetting", self.shard_info);
+ trace!("[Shard {:?}] State36: {:?}", self.shard_info, self);
+
self.heartbeat_instants = (Some(Instant::now()), None);
self.heartbeat_interval = None;
self.last_heartbeat_acknowledged = true;
self.session_id = None;
self.stage = ConnectionStage::Disconnected;
self.seq = 0;
+
+ trace!("[Shard {:?}] Reset", self.shard_info);
+ trace!("[Shard {:?}] State37: {:?}", self.shard_info, self);
}
pub fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ trace!("[Shard {:?}] State38: {:?}", self.shard_info, self);
self.client = self.initialize()?;
self.stage = ConnectionStage::Resuming;
@@ -821,6 +959,7 @@ impl Shard {
pub fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info());
+ trace!("[Shard {:?}] State39: {:?}", self.shard_info, self);
self.reset();
self.client = self.initialize()?;
@@ -829,6 +968,8 @@ impl Shard {
}
pub fn update_presence(&mut self) -> Result<()> {
+ trace!("[Shard {:?}] Updating presence", self.shard_info);
+ trace!("[Shard {:?}] State40: {:?}", self.shard_info, self);
self.client.send_presence_update(
&self.shard_info,
&self.current_presence,
@@ -836,6 +977,26 @@ impl Shard {
}
}
+impl Debug for Shard {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ f.debug_struct("Shard")
+ .field("client", self.client.stream_ref())
+ .field("current_presence", &self.current_presence)
+ .field("heartbeat_instants", &self.heartbeat_instants)
+ .field("heartbeat_interval", &self.heartbeat_interval)
+ .field("last_heartbeat_acknowledged", &self.last_heartbeat_acknowledged)
+ .field("seq", &self.seq)
+ .field("session_id", &self.session_id)
+ .field("shard_info", &self.shard_info)
+ .field("shutdown", &self.shutdown)
+ .field("stage", &self.stage)
+ .field("started", &self.started)
+ .field("token", &self.token)
+ .field("ws_url", &self.ws_url)
+ .finish()
+ }
+}
+
fn connect(base_url: &str) -> Result<WsClient> {
let url = build_gateway_url(base_url)?;
let client = ClientBuilder::from_url(&url).connect_secure(None)?;