diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-30 16:07:40 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-10-09 15:45:48 -0700 |
| commit | 45c1f27edbeedcb30aa3e9daa78eba44817f7260 (patch) | |
| tree | c8ca606c603fe78c7b9efdd3c9b6708838324763 /src | |
| parent | Improve shard and shard runner logging (diff) | |
| download | serenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.tar.xz serenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.zip | |
Improve shard logic
Improve shard logic by more cleanly differentiating when resuming, as
well as actually fixing resume logic.
For shard runners, better handling of dead clients is added, as well as
more use of the shard manager, in that the runner will now more
liberally request a restart when required (instead of sitting and doing
nothing infinitely).
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 158 | ||||
| -rw-r--r-- | src/gateway/mod.rs | 53 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 70 |
3 files changed, 168 insertions, 113 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 530e367..909e511 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { // If the message is to shutdown, first verify the ID so we know // for certain this runner is to shutdown. if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == shard.shard_info()[0] { + if id.0 == self.shard_info[0] { let _ = shard.shutdown_clean(); return Ok(()); @@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); - let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); - let _ = self.manager_tx.send(msg); - - return Ok(()); + return self.request_restart(); } #[cfg(feature = "voice")] @@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> { } } - let events = self.recv_events(); + let (event, successful) = self.recv_events(); + + if let Some(event) = event { + dispatch( + event, + &self.shard, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + ); + } - for event in events { - feature_framework! {{ - dispatch(event, - &self.shard, - &self.framework, - &self.data, - &self.event_handler); - } else { - dispatch(event, - &self.shard, - &self.data, - &self.event_handler); - }} + if !successful && !self.shard.lock().stage().is_connecting() { + return self.request_restart(); } } } @@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> { self.runner_tx.clone() } - fn recv_events(&mut self) -> Vec<Event> { + /// Returns a received event, as well as whether reading the potentially + /// present event was successful. + fn recv_events(&mut self) -> (Option<Event>, bool) { let mut shard = self.shard.lock(); - let mut events = vec![]; + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; - loop { - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - break; - } - } else { - break; + if seconds_passed <= interval_in_secs * 2 { + return (None, true); } + } else { + return (None, true); + } - debug!("[ShardRunner {:?}] Attempting to auto-reconnect", - self.shard_info); + debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { - error!( - "[ShardRunner {:?}] Failed to auto-reconnect: {:?}", - self.shard_info, - why, - ); - } + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } - break; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - break; - }, - other => other, - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => break, - Err(why) => Err(why), - }; - - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - }; - - events.push(event); - - if events.len() > 5 { - break; - } + return (None, true); + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + // This is hit when the websocket client dies this will be + // hit every iteration. + return (None, false); + }, + other => other, }; - events + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => return (None, true), + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => return (None, true), + Err(why) => { + error!("Shard handler received err: {:?}", why); + + return (None, true); + }, + }; + + (Some(event), true) + } + + fn request_restart(&self) -> Result<()> { + debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); + let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + let _ = self.manager_tx.send(msg); + + Ok(()) } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 6f839db..bd9c45b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -60,7 +60,7 @@ pub use self::shard::Shard; /// This can be useful for knowing which shards are currently "down"/"up". /// /// [`Shard`]: struct.Shard.html -#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] pub enum ConnectionStage { /// Indicator that the [`Shard`] is normally connected and is not in, e.g., /// a resume phase. @@ -83,5 +83,56 @@ pub enum ConnectionStage { Handshake, /// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting /// a READY packet. + /// + /// [`Shard`]: struct.Shard.html Identifying, + /// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a + /// RESUMED packet. + /// + /// [`Shard`]: struct.Shard.html + Resuming, +} + +impl ConnectionStage { + /// Whether the stage is a form of connecting. + /// + /// This will return `true` on: + /// + /// - [`Connecting`][`ConnectionStage::Connecting`] + /// - [`Handshake`][`ConnectionStage::Handshake`] + /// - [`Identifying`][`ConnectionStage::Identifying`] + /// - [`Resuming`][`ConnectionStage::Resuming`] + /// + /// All other variants will return `false`. + /// + /// # Examples + /// + /// Assert that [`ConnectionStage::Identifying`] is a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(ConnectionStage::Identifying.is_connecting()); + /// ``` + /// + /// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(!ConnectionStage::Connected.is_connecting()); + /// ``` + /// + /// [`ConnectionStage::Connecting`]: #variant.Connecting + /// [`ConnectionStage::Handshake`]: #variant.Handshake + /// [`ConnectionStage::Identifying`]: #variant.Identifying + /// [`ConnectionStage::Resuming`]: #variant.Resuming + pub fn is_connecting(&self) -> bool { + use self::ConnectionStage::*; + + *self == Connecting + || *self == Handshake + || *self == Identifying + || *self == Resuming + } } diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index e617255..de5104e 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -136,7 +136,7 @@ impl Shard { let stage = ConnectionStage::Handshake; let session_id = None; - let mut shard = feature_voice! { + Ok(feature_voice! { { let (tx, rx) = mpsc::channel(); @@ -172,11 +172,7 @@ impl Shard { ws_url, } } - }; - - shard.identify()?; - - Ok(shard) + }) } /// Retrieves a copy of the current shard information. @@ -308,6 +304,11 @@ impl Shard { self.update_presence(); } + /// Returns the current connection stage of the shard. + pub fn stage(&self) -> ConnectionStage { + self.stage.clone() + } + /// Handles an event from the gateway over the receiver, requiring the /// receiver to be passed if a reconnect needs to occur. /// @@ -414,14 +415,16 @@ impl Shard { self.shard_info, interval); + if self.stage == ConnectionStage::Resuming { + return Ok(None); + } + if interval > 0 { self.heartbeat_interval = Some(interval); } if self.stage == ConnectionStage::Handshake { - self.stage = ConnectionStage::Identifying; - - Ok(None) + self.identify().and(Ok(None)) } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); @@ -438,9 +441,7 @@ impl Shard { self.seq = 0; self.session_id = None; - self.identify()?; - - Ok(None) + self.identify().and(Ok(None)) }, Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), Err(Error::Gateway(GatewayError::Closed(data))) => { @@ -448,18 +449,6 @@ impl Shard { let reason = data.map(|d| d.reason); let clean = num == Some(1000); - { - let kind = if clean { "Cleanly" } else { "Uncleanly" }; - - info!( - "[Shard {:?}] {} closing with {:?}: {:?}", - self.shard_info, - kind, - num, - reason - ); - } - match num { Some(close_codes::UNKNOWN_OPCODE) => { warn!("[Shard {:?}] Sent invalid opcode", @@ -524,9 +513,9 @@ impl Shard { } let resume = num.map(|x| { - x != 1000 && x != close_codes::AUTHENTICATION_FAILED && + x != close_codes::AUTHENTICATION_FAILED && self.session_id.is_some() - }).unwrap_or(false); + }).unwrap_or(true); if resume { self.resume().or_else(|_| self.reconnect()).and(Ok(None)) @@ -878,11 +867,15 @@ impl Shard { /// Retrieves the `heartbeat_interval`. #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option<u64> { self.heartbeat_interval } + pub(crate) fn heartbeat_interval(&self) -> Option<u64> { + self.heartbeat_interval + } /// Retrieves the value of when the last heartbeat ack was received. #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { self.heartbeat_instants.1 } + pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { + self.heartbeat_instants.1 + } fn reconnect(&mut self) -> Result<()> { info!("[Shard {:?}] Attempting to reconnect", self.shard_info); @@ -902,6 +895,9 @@ impl Shard { fn resume(&mut self) -> Result<()> { debug!("Shard {:?}] Attempting to resume", self.shard_info); + self.initialize()?; + self.stage = ConnectionStage::Resuming; + self.send_resume().or_else(|why| { warn!("[Shard {:?}] Err sending resume: {:?}", self.shard_info, @@ -931,11 +927,26 @@ impl Shard { })) } + /// Initializes a new WebSocket client. + /// + /// This will set the stage of the shard before and after instantiation of + /// the client. fn initialize(&mut self) -> Result<()> { + debug!("[Shard {:?}] Initializing", self.shard_info); + + // We need to do two, sort of three things here: + // + // - set the stage of the shard as opening the websocket connection + // - open the websocket connection + // - if successful, set the current stage as Handshaking + // + // This is used to accurately assess whether the state of the shard is + // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; self.client = connect(&self.ws_url.lock().unwrap())?; + self.stage = ConnectionStage::Handshake; - self.identify() + Ok(()) } fn identify(&mut self) -> Result<()> { @@ -956,6 +967,7 @@ impl Shard { }); self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; debug!("[Shard {:?}] Identifying", self.shard_info); |