diff options
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 158 | ||||
| -rw-r--r-- | src/gateway/mod.rs | 53 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 70 |
3 files changed, 168 insertions, 113 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 530e367..909e511 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { // If the message is to shutdown, first verify the ID so we know // for certain this runner is to shutdown. if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == shard.shard_info()[0] { + if id.0 == self.shard_info[0] { let _ = shard.shutdown_clean(); return Ok(()); @@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); - let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); - let _ = self.manager_tx.send(msg); - - return Ok(()); + return self.request_restart(); } #[cfg(feature = "voice")] @@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> { } } - let events = self.recv_events(); + let (event, successful) = self.recv_events(); + + if let Some(event) = event { + dispatch( + event, + &self.shard, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + ); + } - for event in events { - feature_framework! {{ - dispatch(event, - &self.shard, - &self.framework, - &self.data, - &self.event_handler); - } else { - dispatch(event, - &self.shard, - &self.data, - &self.event_handler); - }} + if !successful && !self.shard.lock().stage().is_connecting() { + return self.request_restart(); } } } @@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> { self.runner_tx.clone() } - fn recv_events(&mut self) -> Vec<Event> { + /// Returns a received event, as well as whether reading the potentially + /// present event was successful. + fn recv_events(&mut self) -> (Option<Event>, bool) { let mut shard = self.shard.lock(); - let mut events = vec![]; + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; - loop { - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - break; - } - } else { - break; + if seconds_passed <= interval_in_secs * 2 { + return (None, true); } + } else { + return (None, true); + } - debug!("[ShardRunner {:?}] Attempting to auto-reconnect", - self.shard_info); + debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { - error!( - "[ShardRunner {:?}] Failed to auto-reconnect: {:?}", - self.shard_info, - why, - ); - } + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } - break; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - break; - }, - other => other, - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => break, - Err(why) => Err(why), - }; - - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - }; - - events.push(event); - - if events.len() > 5 { - break; - } + return (None, true); + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + // This is hit when the websocket client dies this will be + // hit every iteration. + return (None, false); + }, + other => other, }; - events + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => return (None, true), + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => return (None, true), + Err(why) => { + error!("Shard handler received err: {:?}", why); + + return (None, true); + }, + }; + + (Some(event), true) + } + + fn request_restart(&self) -> Result<()> { + debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); + let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + let _ = self.manager_tx.send(msg); + + Ok(()) } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 6f839db..bd9c45b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -60,7 +60,7 @@ pub use self::shard::Shard; /// This can be useful for knowing which shards are currently "down"/"up". /// /// [`Shard`]: struct.Shard.html -#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] pub enum ConnectionStage { /// Indicator that the [`Shard`] is normally connected and is not in, e.g., /// a resume phase. @@ -83,5 +83,56 @@ pub enum ConnectionStage { Handshake, /// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting /// a READY packet. + /// + /// [`Shard`]: struct.Shard.html Identifying, + /// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a + /// RESUMED packet. + /// + /// [`Shard`]: struct.Shard.html + Resuming, +} + +impl ConnectionStage { + /// Whether the stage is a form of connecting. + /// + /// This will return `true` on: + /// + /// - [`Connecting`][`ConnectionStage::Connecting`] + /// - [`Handshake`][`ConnectionStage::Handshake`] + /// - [`Identifying`][`ConnectionStage::Identifying`] + /// - [`Resuming`][`ConnectionStage::Resuming`] + /// + /// All other variants will return `false`. + /// + /// # Examples + /// + /// Assert that [`ConnectionStage::Identifying`] is a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(ConnectionStage::Identifying.is_connecting()); + /// ``` + /// + /// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(!ConnectionStage::Connected.is_connecting()); + /// ``` + /// + /// [`ConnectionStage::Connecting`]: #variant.Connecting + /// [`ConnectionStage::Handshake`]: #variant.Handshake + /// [`ConnectionStage::Identifying`]: #variant.Identifying + /// [`ConnectionStage::Resuming`]: #variant.Resuming + pub fn is_connecting(&self) -> bool { + use self::ConnectionStage::*; + + *self == Connecting + || *self == Handshake + || *self == Identifying + || *self == Resuming + } } diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index e617255..de5104e 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -136,7 +136,7 @@ impl Shard { let stage = ConnectionStage::Handshake; let session_id = None; - let mut shard = feature_voice! { + Ok(feature_voice! { { let (tx, rx) = mpsc::channel(); @@ -172,11 +172,7 @@ impl Shard { ws_url, } } - }; - - shard.identify()?; - - Ok(shard) + }) } /// Retrieves a copy of the current shard information. @@ -308,6 +304,11 @@ impl Shard { self.update_presence(); } + /// Returns the current connection stage of the shard. + pub fn stage(&self) -> ConnectionStage { + self.stage.clone() + } + /// Handles an event from the gateway over the receiver, requiring the /// receiver to be passed if a reconnect needs to occur. /// @@ -414,14 +415,16 @@ impl Shard { self.shard_info, interval); + if self.stage == ConnectionStage::Resuming { + return Ok(None); + } + if interval > 0 { self.heartbeat_interval = Some(interval); } if self.stage == ConnectionStage::Handshake { - self.stage = ConnectionStage::Identifying; - - Ok(None) + self.identify().and(Ok(None)) } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); @@ -438,9 +441,7 @@ impl Shard { self.seq = 0; self.session_id = None; - self.identify()?; - - Ok(None) + self.identify().and(Ok(None)) }, Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), Err(Error::Gateway(GatewayError::Closed(data))) => { @@ -448,18 +449,6 @@ impl Shard { let reason = data.map(|d| d.reason); let clean = num == Some(1000); - { - let kind = if clean { "Cleanly" } else { "Uncleanly" }; - - info!( - "[Shard {:?}] {} closing with {:?}: {:?}", - self.shard_info, - kind, - num, - reason - ); - } - match num { Some(close_codes::UNKNOWN_OPCODE) => { warn!("[Shard {:?}] Sent invalid opcode", @@ -524,9 +513,9 @@ impl Shard { } let resume = num.map(|x| { - x != 1000 && x != close_codes::AUTHENTICATION_FAILED && + x != close_codes::AUTHENTICATION_FAILED && self.session_id.is_some() - }).unwrap_or(false); + }).unwrap_or(true); if resume { self.resume().or_else(|_| self.reconnect()).and(Ok(None)) @@ -878,11 +867,15 @@ impl Shard { /// Retrieves the `heartbeat_interval`. #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval } + pub(crate) fn heartbeat_interval(&self) -> Option<u64> { + self.heartbeat_interval + } /// Retrieves the value of when the last heartbeat ack was received. #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 } + pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { + self.heartbeat_instants.1 + } fn reconnect(&mut self) -> Result<()> { info!("[Shard {:?}] Attempting to reconnect", self.shard_info); @@ -902,6 +895,9 @@ impl Shard { fn resume(&mut self) -> Result<()> { debug!("Shard {:?}] Attempting to resume", self.shard_info); + self.initialize()?; + self.stage = ConnectionStage::Resuming; + self.send_resume().or_else(|why| { warn!("[Shard {:?}] Err sending resume: {:?}", self.shard_info, @@ -931,11 +927,26 @@ impl Shard { })) } + /// Initializes a new WebSocket client. + /// + /// This will set the stage of the shard before and after instantiation of + /// the client. fn initialize(&mut self) -> Result<()> { + debug!("[Shard {:?}] Initializing", self.shard_info); + + // We need to do two, sort of three things here: + // + // - set the stage of the shard as opening the websocket connection + // - open the websocket connection + // - if successful, set the current stage as Handshaking + // + // This is used to accurately assess whether the state of the shard is + // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; self.client = connect(&self.ws_url.lock().unwrap())?; + self.stage = ConnectionStage::Handshake; - self.identify() + Ok(()) } fn identify(&mut self) -> Result<()> { @@ -956,6 +967,7 @@ impl Shard { }); self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; debug!("[Shard {:?}] Identifying", self.shard_info); |