aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client/bridge/gateway/shard_runner.rs158
-rw-r--r--src/gateway/mod.rs53
-rw-r--r--src/gateway/shard.rs70
3 files changed, 168 insertions, 113 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index 530e367..909e511 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
// If the message is to shutdown, first verify the ID so we know
// for certain this runner is to shutdown.
if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
- if id.0 == shard.shard_info()[0] {
+ if id.0 == self.shard_info[0] {
let _ = shard.shutdown_clean();
return Ok(());
@@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
if let Err(why) = shard.check_heartbeat() {
error!("Failed to heartbeat and reconnect: {:?}", why);
- let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0]));
- let _ = self.manager_tx.send(msg);
-
- return Ok(());
+ return self.request_restart();
}
#[cfg(feature = "voice")]
@@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
}
}
- let events = self.recv_events();
+ let (event, successful) = self.recv_events();
+
+ if let Some(event) = event {
+ dispatch(
+ event,
+ &self.shard,
+ #[cfg(feature = "framework")]
+ &self.framework,
+ &self.data,
+ &self.event_handler,
+ );
+ }
- for event in events {
- feature_framework! {{
- dispatch(event,
- &self.shard,
- &self.framework,
- &self.data,
- &self.event_handler);
- } else {
- dispatch(event,
- &self.shard,
- &self.data,
- &self.event_handler);
- }}
+ if !successful && !self.shard.lock().stage().is_connecting() {
+ return self.request_restart();
}
}
}
@@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
self.runner_tx.clone()
}
- fn recv_events(&mut self) -> Vec<Event> {
+ /// Returns a received event, as well as whether reading the potentially
+ /// present event was successful.
+ fn recv_events(&mut self) -> (Option<Event>, bool) {
let mut shard = self.shard.lock();
- let mut events = vec![];
+ let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ // Check that an amount of time at least double the
+ // heartbeat_interval has passed.
+ //
+ // If not, continue on trying to receive messages.
+ //
+ // If it has, attempt to auto-reconnect.
+ let last = shard.last_heartbeat_ack();
+ let interval = shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
- loop {
- let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- // Check that an amount of time at least double the
- // heartbeat_interval has passed.
- //
- // If not, continue on trying to receive messages.
- //
- // If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- break;
- }
- } else {
- break;
+ if seconds_passed <= interval_in_secs * 2 {
+ return (None, true);
}
+ } else {
+ return (None, true);
+ }
- debug!("[ShardRunner {:?}] Attempting to auto-reconnect",
- self.shard_info);
+ debug!("Attempting to auto-reconnect");
- if let Err(why) = shard.autoreconnect() {
- error!(
- "[ShardRunner {:?}] Failed to auto-reconnect: {:?}",
- self.shard_info,
- why,
- );
- }
+ if let Err(why) = shard.autoreconnect() {
+ error!("Failed to auto-reconnect: {:?}", why);
+ }
- break;
- },
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
- // This is hit when the websocket client dies this will be
- // hit every iteration.
- break;
- },
- other => other,
- };
-
- let event = match gw_event {
- Ok(Some(event)) => Ok(event),
- Ok(None) => break,
- Err(why) => Err(why),
- };
-
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => continue,
- Err(why) => {
- error!("Shard handler received err: {:?}", why);
-
- continue;
- },
- };
-
- events.push(event);
-
- if events.len() > 5 {
- break;
- }
+ return (None, true);
+ },
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
+ // This is hit when the websocket client dies this will be
+ // hit every iteration.
+ return (None, false);
+ },
+ other => other,
};
- events
+ let event = match gw_event {
+ Ok(Some(event)) => Ok(event),
+ Ok(None) => return (None, true),
+ Err(why) => Err(why),
+ };
+
+ let event = match shard.handle_event(event) {
+ Ok(Some(event)) => event,
+ Ok(None) => return (None, true),
+ Err(why) => {
+ error!("Shard handler received err: {:?}", why);
+
+ return (None, true);
+ },
+ };
+
+ (Some(event), true)
+ }
+
+ fn request_restart(&self) -> Result<()> {
+ debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
+ let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
+ let _ = self.manager_tx.send(msg);
+
+ Ok(())
}
}
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index 6f839db..bd9c45b 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -60,7 +60,7 @@ pub use self::shard::Shard;
/// This can be useful for knowing which shards are currently "down"/"up".
///
/// [`Shard`]: struct.Shard.html
-#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
+#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ConnectionStage {
/// Indicator that the [`Shard`] is normally connected and is not in, e.g.,
/// a resume phase.
@@ -83,5 +83,56 @@ pub enum ConnectionStage {
Handshake,
/// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting
/// a READY packet.
+ ///
+ /// [`Shard`]: struct.Shard.html
Identifying,
+ /// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a
+ /// RESUMED packet.
+ ///
+ /// [`Shard`]: struct.Shard.html
+ Resuming,
+}
+
+impl ConnectionStage {
+ /// Whether the stage is a form of connecting.
+ ///
+ /// This will return `true` on:
+ ///
+ /// - [`Connecting`][`ConnectionStage::Connecting`]
+ /// - [`Handshake`][`ConnectionStage::Handshake`]
+ /// - [`Identifying`][`ConnectionStage::Identifying`]
+ /// - [`Resuming`][`ConnectionStage::Resuming`]
+ ///
+ /// All other variants will return `false`.
+ ///
+ /// # Examples
+ ///
+ /// Assert that [`ConnectionStage::Identifying`] is a connecting stage:
+ ///
+ /// ```rust
+ /// use serenity::gateway::ConnectionStage;
+ ///
+ /// assert!(ConnectionStage::Identifying.is_connecting());
+ /// ```
+ ///
+ /// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage:
+ ///
+ /// ```rust
+ /// use serenity::gateway::ConnectionStage;
+ ///
+ /// assert!(!ConnectionStage::Connected.is_connecting());
+ /// ```
+ ///
+ /// [`ConnectionStage::Connecting`]: #variant.Connecting
+ /// [`ConnectionStage::Handshake`]: #variant.Handshake
+ /// [`ConnectionStage::Identifying`]: #variant.Identifying
+ /// [`ConnectionStage::Resuming`]: #variant.Resuming
+ pub fn is_connecting(&self) -> bool {
+ use self::ConnectionStage::*;
+
+ *self == Connecting
+ || *self == Handshake
+ || *self == Identifying
+ || *self == Resuming
+ }
}
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index e617255..de5104e 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -136,7 +136,7 @@ impl Shard {
let stage = ConnectionStage::Handshake;
let session_id = None;
- let mut shard = feature_voice! {
+ Ok(feature_voice! {
{
let (tx, rx) = mpsc::channel();
@@ -172,11 +172,7 @@ impl Shard {
ws_url,
}
}
- };
-
- shard.identify()?;
-
- Ok(shard)
+ })
}
/// Retrieves a copy of the current shard information.
@@ -308,6 +304,11 @@ impl Shard {
self.update_presence();
}
+ /// Returns the current connection stage of the shard.
+ pub fn stage(&self) -> ConnectionStage {
+ self.stage.clone()
+ }
+
/// Handles an event from the gateway over the receiver, requiring the
/// receiver to be passed if a reconnect needs to occur.
///
@@ -414,14 +415,16 @@ impl Shard {
self.shard_info,
interval);
+ if self.stage == ConnectionStage::Resuming {
+ return Ok(None);
+ }
+
if interval > 0 {
self.heartbeat_interval = Some(interval);
}
if self.stage == ConnectionStage::Handshake {
- self.stage = ConnectionStage::Identifying;
-
- Ok(None)
+ self.identify().and(Ok(None))
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
@@ -438,9 +441,7 @@ impl Shard {
self.seq = 0;
self.session_id = None;
- self.identify()?;
-
- Ok(None)
+ self.identify().and(Ok(None))
},
Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
Err(Error::Gateway(GatewayError::Closed(data))) => {
@@ -448,18 +449,6 @@ impl Shard {
let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
- {
- let kind = if clean { "Cleanly" } else { "Uncleanly" };
-
- info!(
- "[Shard {:?}] {} closing with {:?}: {:?}",
- self.shard_info,
- kind,
- num,
- reason
- );
- }
-
match num {
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[Shard {:?}] Sent invalid opcode",
@@ -524,9 +513,9 @@ impl Shard {
}
let resume = num.map(|x| {
- x != 1000 && x != close_codes::AUTHENTICATION_FAILED &&
+ x != close_codes::AUTHENTICATION_FAILED &&
self.session_id.is_some()
- }).unwrap_or(false);
+ }).unwrap_or(true);
if resume {
self.resume().or_else(|_| self.reconnect()).and(Ok(None))
@@ -878,11 +867,15 @@ impl Shard {
/// Retrieves the `heartbeat_interval`.
#[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval }
+ pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
+ self.heartbeat_interval
+ }
/// Retrieves the value of when the last heartbeat ack was received.
#[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 }
+ pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
+ self.heartbeat_instants.1
+ }
fn reconnect(&mut self) -> Result<()> {
info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
@@ -902,6 +895,9 @@ impl Shard {
fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);
+ self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
+
self.send_resume().or_else(|why| {
warn!("[Shard {:?}] Err sending resume: {:?}",
self.shard_info,
@@ -931,11 +927,26 @@ impl Shard {
}))
}
+ /// Initializes a new WebSocket client.
+ ///
+ /// This will set the stage of the shard before and after instantiation of
+ /// the client.
fn initialize(&mut self) -> Result<()> {
+ debug!("[Shard {:?}] Initializing", self.shard_info);
+
+ // We need to do two, sort of three things here:
+ //
+ // - set the stage of the shard as opening the websocket connection
+ // - open the websocket connection
+ // - if successful, set the current stage as Handshaking
+ //
+ // This is used to accurately assess whether the state of the shard is
+ // accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
self.client = connect(&self.ws_url.lock().unwrap())?;
+ self.stage = ConnectionStage::Handshake;
- self.identify()
+ Ok(())
}
fn identify(&mut self) -> Result<()> {
@@ -956,6 +967,7 @@ impl Shard {
});
self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
debug!("[Shard {:?}] Identifying", self.shard_info);