aboutsummaryrefslogtreecommitdiff
path: root/src/gateway
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-30 16:07:40 -0700
committerZeyla Hellyer <[email protected]>2017-09-30 16:08:12 -0700
commit683691f762bbf58e3abf3bc67381e18112f5c8ad (patch)
treec29424f0f264912ab99ee31ae718a93fadd30143 /src/gateway
parentImprove shard and shard runner logging (diff)
downloadserenity-683691f762bbf58e3abf3bc67381e18112f5c8ad.tar.xz
serenity-683691f762bbf58e3abf3bc67381e18112f5c8ad.zip
Improve shard logic
Improve shard logic by more cleanly differentiating when resuming, as well as actually fixing resume logic. For shard runners, better handling of dead clients is added, as well as more use of the shard manager, in that the runner will now more liberally request a restart when required (instead of sitting and doing nothing infinitely).
Diffstat (limited to 'src/gateway')
-rw-r--r--src/gateway/mod.rs53
-rw-r--r--src/gateway/shard.rs70
2 files changed, 93 insertions, 30 deletions
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index 6f839db..bd9c45b 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -60,7 +60,7 @@ pub use self::shard::Shard;
/// This can be useful for knowing which shards are currently "down"/"up".
///
/// [`Shard`]: struct.Shard.html
-#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
+#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ConnectionStage {
/// Indicator that the [`Shard`] is normally connected and is not in, e.g.,
/// a resume phase.
@@ -83,5 +83,56 @@ pub enum ConnectionStage {
Handshake,
/// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting
/// a READY packet.
+ ///
+ /// [`Shard`]: struct.Shard.html
Identifying,
+ /// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a
+ /// RESUMED packet.
+ ///
+ /// [`Shard`]: struct.Shard.html
+ Resuming,
+}
+
+impl ConnectionStage {
+ /// Whether the stage is a form of connecting.
+ ///
+ /// This will return `true` on:
+ ///
+ /// - [`Connecting`][`ConnectionStage::Connecting`]
+ /// - [`Handshake`][`ConnectionStage::Handshake`]
+ /// - [`Identifying`][`ConnectionStage::Identifying`]
+ /// - [`Resuming`][`ConnectionStage::Resuming`]
+ ///
+ /// All other variants will return `false`.
+ ///
+ /// # Examples
+ ///
+ /// Assert that [`ConnectionStage::Identifying`] is a connecting stage:
+ ///
+ /// ```rust
+ /// use serenity::gateway::ConnectionStage;
+ ///
+ /// assert!(ConnectionStage::Identifying.is_connecting());
+ /// ```
+ ///
+ /// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage:
+ ///
+ /// ```rust
+ /// use serenity::gateway::ConnectionStage;
+ ///
+ /// assert!(!ConnectionStage::Connected.is_connecting());
+ /// ```
+ ///
+ /// [`ConnectionStage::Connecting`]: #variant.Connecting
+ /// [`ConnectionStage::Handshake`]: #variant.Handshake
+ /// [`ConnectionStage::Identifying`]: #variant.Identifying
+ /// [`ConnectionStage::Resuming`]: #variant.Resuming
+ pub fn is_connecting(&self) -> bool {
+ use self::ConnectionStage::*;
+
+ *self == Connecting
+ || *self == Handshake
+ || *self == Identifying
+ || *self == Resuming
+ }
}
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index e617255..de5104e 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -136,7 +136,7 @@ impl Shard {
let stage = ConnectionStage::Handshake;
let session_id = None;
- let mut shard = feature_voice! {
+ Ok(feature_voice! {
{
let (tx, rx) = mpsc::channel();
@@ -172,11 +172,7 @@ impl Shard {
ws_url,
}
}
- };
-
- shard.identify()?;
-
- Ok(shard)
+ })
}
/// Retrieves a copy of the current shard information.
@@ -308,6 +304,11 @@ impl Shard {
self.update_presence();
}
+ /// Returns the current connection stage of the shard.
+ pub fn stage(&self) -> ConnectionStage {
+ self.stage.clone()
+ }
+
/// Handles an event from the gateway over the receiver, requiring the
/// receiver to be passed if a reconnect needs to occur.
///
@@ -414,14 +415,16 @@ impl Shard {
self.shard_info,
interval);
+ if self.stage == ConnectionStage::Resuming {
+ return Ok(None);
+ }
+
if interval > 0 {
self.heartbeat_interval = Some(interval);
}
if self.stage == ConnectionStage::Handshake {
- self.stage = ConnectionStage::Identifying;
-
- Ok(None)
+ self.identify().and(Ok(None))
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
@@ -438,9 +441,7 @@ impl Shard {
self.seq = 0;
self.session_id = None;
- self.identify()?;
-
- Ok(None)
+ self.identify().and(Ok(None))
},
Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
Err(Error::Gateway(GatewayError::Closed(data))) => {
@@ -448,18 +449,6 @@ impl Shard {
let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
- {
- let kind = if clean { "Cleanly" } else { "Uncleanly" };
-
- info!(
- "[Shard {:?}] {} closing with {:?}: {:?}",
- self.shard_info,
- kind,
- num,
- reason
- );
- }
-
match num {
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[Shard {:?}] Sent invalid opcode",
@@ -524,9 +513,9 @@ impl Shard {
}
let resume = num.map(|x| {
- x != 1000 && x != close_codes::AUTHENTICATION_FAILED &&
+ x != close_codes::AUTHENTICATION_FAILED &&
self.session_id.is_some()
- }).unwrap_or(false);
+ }).unwrap_or(true);
if resume {
self.resume().or_else(|_| self.reconnect()).and(Ok(None))
@@ -878,11 +867,15 @@ impl Shard {
/// Retrieves the `heartbeat_interval`.
#[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval }
+ pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
+ self.heartbeat_interval
+ }
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 }
+ pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
+ self.heartbeat_instants.1
+ }
fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
@@ -902,6 +895,9 @@ impl Shard {
fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
+
self.send_resume().or_else(|why| {
warn!("[Shard {:?}] Err sending resume: {:?}",
self.shard_info,
@@ -931,11 +927,26 @@ impl Shard {
}))
}
+ /// Initializes a new WebSocket client.
+ ///
+ /// This will set the stage of the shard before and after instantiation of
+ /// the client.
fn initialize(&mut self) -> Result<()> {
+ debug!("[Shard {:?}] Initializing", self.shard_info);
+
+ // We need to do two, sort of three things here:
+ //
+ // - set the stage of the shard as opening the websocket connection
+ // - open the websocket connection
+ // - if successful, set the current stage as Handshaking
+ //
+ // This is used to accurately assess whether the state of the shard is
+ // accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
self.client = connect(&self.ws_url.lock().unwrap())?;
+ self.stage = ConnectionStage::Handshake;
- self.identify()
+ Ok(())
}
fn identify(&mut self) -> Result<()> {
@@ -956,6 +967,7 @@ impl Shard {
});
self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
debug!("[Shard {:?}] Identifying", self.shard_info);