aboutsummaryrefslogtreecommitdiff
path: root/src/gateway/shard.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-30 16:07:40 -0700
committerZeyla Hellyer <[email protected]>2017-10-09 15:45:48 -0700
commit45c1f27edbeedcb30aa3e9daa78eba44817f7260 (patch)
treec8ca606c603fe78c7b9efdd3c9b6708838324763 /src/gateway/shard.rs
parentImprove shard and shard runner logging (diff)
downloadserenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.tar.xz
serenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.zip
Improve shard logic
Improve shard logic by more cleanly differentiating when resuming, as well as actually fixing resume logic. For shard runners, better handling of dead clients is added, as well as more use of the shard manager, in that the runner will now more liberally request a restart when required (instead of sitting and doing nothing infinitely).
Diffstat (limited to 'src/gateway/shard.rs')
-rw-r--r--src/gateway/shard.rs70
1 files changed, 41 insertions, 29 deletions
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index e617255..de5104e 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -136,7 +136,7 @@ impl Shard {
let stage = ConnectionStage::Handshake;
let session_id = None;
- let mut shard = feature_voice! {
+ Ok(feature_voice! {
{
let (tx, rx) = mpsc::channel();
@@ -172,11 +172,7 @@ impl Shard {
ws_url,
}
}
- };
-
- shard.identify()?;
-
- Ok(shard)
+ })
}
/// Retrieves a copy of the current shard information.
@@ -308,6 +304,11 @@ impl Shard {
self.update_presence();
}
+ /// Returns the current connection stage of the shard.
+ pub fn stage(&self) -> ConnectionStage {
+ self.stage.clone()
+ }
+
/// Handles an event from the gateway over the receiver, requiring the
/// receiver to be passed if a reconnect needs to occur.
///
@@ -414,14 +415,16 @@ impl Shard {
self.shard_info,
interval);
+ if self.stage == ConnectionStage::Resuming {
+ return Ok(None);
+ }
+
if interval > 0 {
self.heartbeat_interval = Some(interval);
}
if self.stage == ConnectionStage::Handshake {
- self.stage = ConnectionStage::Identifying;
-
- Ok(None)
+ self.identify().and(Ok(None))
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
@@ -438,9 +441,7 @@ impl Shard {
self.seq = 0;
self.session_id = None;
- self.identify()?;
-
- Ok(None)
+ self.identify().and(Ok(None))
},
Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
Err(Error::Gateway(GatewayError::Closed(data))) => {
@@ -448,18 +449,6 @@ impl Shard {
let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
- {
- let kind = if clean { "Cleanly" } else { "Uncleanly" };
-
- info!(
- "[Shard {:?}] {} closing with {:?}: {:?}",
- self.shard_info,
- kind,
- num,
- reason
- );
- }
-
match num {
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[Shard {:?}] Sent invalid opcode",
@@ -524,9 +513,9 @@ impl Shard {
}
let resume = num.map(|x| {
- x != 1000 && x != close_codes::AUTHENTICATION_FAILED &&
+ x != close_codes::AUTHENTICATION_FAILED &&
self.session_id.is_some()
- }).unwrap_or(false);
+ }).unwrap_or(true);
if resume {
self.resume().or_else(|_| self.reconnect()).and(Ok(None))
@@ -878,11 +867,15 @@ impl Shard {
/// Retrieves the `heartbeat_interval`.
#[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval }
+ pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
+ self.heartbeat_interval
+ }
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 }
+ pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
+ self.heartbeat_instants.1
+ }
fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
@@ -902,6 +895,9 @@ impl Shard {
fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
+
self.send_resume().or_else(|why| {
warn!("[Shard {:?}] Err sending resume: {:?}",
self.shard_info,
@@ -931,11 +927,26 @@ impl Shard {
}))
}
+ /// Initializes a new WebSocket client.
+ ///
+ /// This will set the stage of the shard before and after instantiation of
+ /// the client.
fn initialize(&mut self) -> Result<()> {
+ debug!("[Shard {:?}] Initializing", self.shard_info);
+
+ // We need to do two, sort of three things here:
+ //
+ // - set the stage of the shard as opening the websocket connection
+ // - open the websocket connection
+ // - if successful, set the current stage as Handshaking
+ //
+ // This is used to accurately assess whether the state of the shard is
+ // accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
self.client = connect(&self.ws_url.lock().unwrap())?;
+ self.stage = ConnectionStage::Handshake;
- self.identify()
+ Ok(())
}
fn identify(&mut self) -> Result<()> {
@@ -956,6 +967,7 @@ impl Shard {
});
self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
debug!("[Shard {:?}] Identifying", self.shard_info);