diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-30 16:07:40 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-10-09 15:45:48 -0700 |
| commit | 45c1f27edbeedcb30aa3e9daa78eba44817f7260 (patch) | |
| tree | c8ca606c603fe78c7b9efdd3c9b6708838324763 /src/client | |
| parent | Improve shard and shard runner logging (diff) | |
| download | serenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.tar.xz serenity-45c1f27edbeedcb30aa3e9daa78eba44817f7260.zip | |
Improve shard logic
Improve shard logic by more cleanly differentiating when resuming, as
well as actually fixing resume logic.
For shard runners, better handling of dead clients is added, as well as
more use of the shard manager, in that the runner will now more
liberally request a restart when required (instead of sitting and doing
nothing infinitely).
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 158 |
1 files changed, 75 insertions, 83 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 530e367..909e511 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { // If the message is to shutdown, first verify the ID so we know // for certain this runner is to shutdown. if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == shard.shard_info()[0] { + if id.0 == self.shard_info[0] { let _ = shard.shutdown_clean(); return Ok(()); @@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); - let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); - let _ = self.manager_tx.send(msg); - - return Ok(()); + return self.request_restart(); } #[cfg(feature = "voice")] @@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> { } } - let events = self.recv_events(); + let (event, successful) = self.recv_events(); + + if let Some(event) = event { + dispatch( + event, + &self.shard, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + ); + } - for event in events { - feature_framework! {{ - dispatch(event, - &self.shard, - &self.framework, - &self.data, - &self.event_handler); - } else { - dispatch(event, - &self.shard, - &self.data, - &self.event_handler); - }} + if !successful && !self.shard.lock().stage().is_connecting() { + return self.request_restart(); } } } @@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> { self.runner_tx.clone() } - fn recv_events(&mut self) -> Vec<Event> { + /// Returns a received event, as well as whether reading the potentially + /// present event was successful. + fn recv_events(&mut self) -> (Option<Event>, bool) { let mut shard = self.shard.lock(); - let mut events = vec![]; + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; - loop { - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - break; - } - } else { - break; + if seconds_passed <= interval_in_secs * 2 { + return (None, true); } + } else { + return (None, true); + } - debug!("[ShardRunner {:?}] Attempting to auto-reconnect", - self.shard_info); + debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { - error!( - "[ShardRunner {:?}] Failed to auto-reconnect: {:?}", - self.shard_info, - why, - ); - } + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } - break; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - break; - }, - other => other, - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => break, - Err(why) => Err(why), - }; - - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - }; - - events.push(event); - - if events.len() > 5 { - break; - } + return (None, true); + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + // This is hit when the websocket client dies this will be + // hit every iteration. + return (None, false); + }, + other => other, }; - events + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => return (None, true), + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => return (None, true), + Err(why) => { + error!("Shard handler received err: {:?}", why); + + return (None, true); + }, + }; + + (Some(event), true) + } + + fn request_restart(&self) -> Result<()> { + debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); + let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + let _ = self.manager_tx.send(msg); + + Ok(()) } } |