aboutsummaryrefslogtreecommitdiff
path: root/src/gateway/shard.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/gateway/shard.rs')
-rw-r--r--src/gateway/shard.rs70
1 files changed, 41 insertions, 29 deletions
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index e617255..de5104e 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -136,7 +136,7 @@ impl Shard {
let stage = ConnectionStage::Handshake;
let session_id = None;
- let mut shard = feature_voice! {
+ Ok(feature_voice! {
{
let (tx, rx) = mpsc::channel();
@@ -172,11 +172,7 @@ impl Shard {
ws_url,
}
}
- };
-
- shard.identify()?;
-
- Ok(shard)
+ })
}
/// Retrieves a copy of the current shard information.
@@ -308,6 +304,11 @@ impl Shard {
self.update_presence();
}
+ /// Returns the current connection stage of the shard.
+ pub fn stage(&self) -> ConnectionStage {
+ self.stage.clone()
+ }
+
/// Handles an event from the gateway over the receiver, requiring the
/// receiver to be passed if a reconnect needs to occur.
///
@@ -414,14 +415,16 @@ impl Shard {
self.shard_info,
interval);
+ if self.stage == ConnectionStage::Resuming {
+ return Ok(None);
+ }
+
if interval > 0 {
self.heartbeat_interval = Some(interval);
}
if self.stage == ConnectionStage::Handshake {
- self.stage = ConnectionStage::Identifying;
-
- Ok(None)
+ self.identify().and(Ok(None))
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
@@ -438,9 +441,7 @@ impl Shard {
self.seq = 0;
self.session_id = None;
- self.identify()?;
-
- Ok(None)
+ self.identify().and(Ok(None))
},
Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
Err(Error::Gateway(GatewayError::Closed(data))) => {
@@ -448,18 +449,6 @@ impl Shard {
let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
- {
- let kind = if clean { "Cleanly" } else { "Uncleanly" };
-
- info!(
- "[Shard {:?}] {} closing with {:?}: {:?}",
- self.shard_info,
- kind,
- num,
- reason
- );
- }
-
match num {
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[Shard {:?}] Sent invalid opcode",
@@ -524,9 +513,9 @@ impl Shard {
}
let resume = num.map(|x| {
- x != 1000 && x != close_codes::AUTHENTICATION_FAILED &&
+ x != close_codes::AUTHENTICATION_FAILED &&
self.session_id.is_some()
- }).unwrap_or(false);
+ }).unwrap_or(true);
if resume {
self.resume().or_else(|_| self.reconnect()).and(Ok(None))
@@ -878,11 +867,15 @@ impl Shard {
/// Retrieves the `heartbeat_interval`.
#[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval }
+ pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
+ self.heartbeat_interval
+ }
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 }
+ pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
+ self.heartbeat_instants.1
+ }
fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
@@ -902,6 +895,9 @@ impl Shard {
fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
+
self.send_resume().or_else(|why| {
warn!("[Shard {:?}] Err sending resume: {:?}",
self.shard_info,
@@ -931,11 +927,26 @@ impl Shard {
}))
}
+ /// Initializes a new WebSocket client.
+ ///
+ /// This will set the stage of the shard before and after instantiation of
+ /// the client.
fn initialize(&mut self) -> Result<()> {
+ debug!("[Shard {:?}] Initializing", self.shard_info);
+
+ // We need to do two, sort of three things here:
+ //
+ // - set the stage of the shard as opening the websocket connection
+ // - open the websocket connection
+ // - if successful, set the current stage as Handshaking
+ //
+ // This is used to accurately assess whether the state of the shard is
+ // accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
self.client = connect(&self.ws_url.lock().unwrap())?;
+ self.stage = ConnectionStage::Handshake;
- self.identify()
+ Ok(())
}
fn identify(&mut self) -> Result<()> {
@@ -956,6 +967,7 @@ impl Shard {
});
self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
debug!("[Shard {:?}] Identifying", self.shard_info);