diff options
Diffstat (limited to 'src/client/bridge/gateway/shard_runner.rs')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 158 |
1 files changed, 75 insertions, 83 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 530e367..909e511 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { // If the message is to shutdown, first verify the ID so we know // for certain this runner is to shutdown. if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == shard.shard_info()[0] { + if id.0 == self.shard_info[0] { let _ = shard.shutdown_clean(); return Ok(()); @@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> { if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); - let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); - let _ = self.manager_tx.send(msg); - - return Ok(()); + return self.request_restart(); } #[cfg(feature = "voice")] @@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> { } } - let events = self.recv_events(); + let (event, successful) = self.recv_events(); + + if let Some(event) = event { + dispatch( + event, + &self.shard, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + ); + } - for event in events { - feature_framework! {{ - dispatch(event, - &self.shard, - &self.framework, - &self.data, - &self.event_handler); - } else { - dispatch(event, - &self.shard, - &self.data, - &self.event_handler); - }} + if !successful && !self.shard.lock().stage().is_connecting() { + return self.request_restart(); } } } @@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> { self.runner_tx.clone() } - fn recv_events(&mut self) -> Vec<Event> { + /// Returns a received event, as well as whether reading the potentially + /// present event was successful. + fn recv_events(&mut self) -> (Option<Event>, bool) { let mut shard = self.shard.lock(); - let mut events = vec![]; + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; - loop { - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - break; - } - } else { - break; + if seconds_passed <= interval_in_secs * 2 { + return (None, true); } + } else { + return (None, true); + } - debug!("[ShardRunner {:?}] Attempting to auto-reconnect", - self.shard_info); + debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { - error!( - "[ShardRunner {:?}] Failed to auto-reconnect: {:?}", - self.shard_info, - why, - ); - } + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } - break; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - break; - }, - other => other, - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => break, - Err(why) => Err(why), - }; - - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - }; - - events.push(event); - - if events.len() > 5 { - break; - } + return (None, true); + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + // This is hit when the websocket client dies this will be + // hit every iteration. + return (None, false); + }, + other => other, }; - events + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => return (None, true), + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => return (None, true), + Err(why) => { + error!("Shard handler received err: {:?}", why); + + return (None, true); + }, + }; + + (Some(event), true) + } + + fn request_restart(&self) -> Result<()> { + debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); + let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + let _ = self.manager_tx.send(msg); + + Ok(()) } } |