aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_runner.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/bridge/gateway/shard_runner.rs')
-rw-r--r--src/client/bridge/gateway/shard_runner.rs158
1 files changed, 75 insertions, 83 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index 530e367..909e511 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
// If the message is to shutdown, first verify the ID so we know
// for certain this runner is to shutdown.
if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
- if id.0 == shard.shard_info()[0] {
+ if id.0 == self.shard_info[0] {
let _ = shard.shutdown_clean();
return Ok(());
@@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
if let Err(why) = shard.check_heartbeat() {
error!("Failed to heartbeat and reconnect: {:?}", why);
- let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0]));
- let _ = self.manager_tx.send(msg);
-
- return Ok(());
+ return self.request_restart();
}
#[cfg(feature = "voice")]
@@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
}
}
- let events = self.recv_events();
+ let (event, successful) = self.recv_events();
+
+ if let Some(event) = event {
+ dispatch(
+ event,
+ &self.shard,
+ #[cfg(feature = "framework")]
+ &self.framework,
+ &self.data,
+ &self.event_handler,
+ );
+ }
- for event in events {
- feature_framework! {{
- dispatch(event,
- &self.shard,
- &self.framework,
- &self.data,
- &self.event_handler);
- } else {
- dispatch(event,
- &self.shard,
- &self.data,
- &self.event_handler);
- }}
+ if !successful && !self.shard.lock().stage().is_connecting() {
+ return self.request_restart();
}
}
}
@@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
self.runner_tx.clone()
}
- fn recv_events(&mut self) -> Vec<Event> {
+ /// Returns a received event, as well as whether reading the potentially
+ /// present event was successful.
+ fn recv_events(&mut self) -> (Option<Event>, bool) {
let mut shard = self.shard.lock();
- let mut events = vec![];
+ let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ // Check that an amount of time at least double the
+ // heartbeat_interval has passed.
+ //
+ // If not, continue on trying to receive messages.
+ //
+ // If it has, attempt to auto-reconnect.
+ let last = shard.last_heartbeat_ack();
+ let interval = shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
- loop {
- let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- // Check that an amount of time at least double the
- // heartbeat_interval has passed.
- //
- // If not, continue on trying to receive messages.
- //
- // If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- break;
- }
- } else {
- break;
+ if seconds_passed <= interval_in_secs * 2 {
+ return (None, true);
}
+ } else {
+ return (None, true);
+ }
- debug!("[ShardRunner {:?}] Attempting to auto-reconnect",
- self.shard_info);
+ debug!("Attempting to auto-reconnect");
- if let Err(why) = shard.autoreconnect() {
- error!(
- "[ShardRunner {:?}] Failed to auto-reconnect: {:?}",
- self.shard_info,
- why,
- );
- }
+ if let Err(why) = shard.autoreconnect() {
+ error!("Failed to auto-reconnect: {:?}", why);
+ }
- break;
- },
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
- // This is hit when the websocket client dies this will be
- // hit every iteration.
- break;
- },
- other => other,
- };
-
- let event = match gw_event {
- Ok(Some(event)) => Ok(event),
- Ok(None) => break,
- Err(why) => Err(why),
- };
-
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => continue,
- Err(why) => {
- error!("Shard handler received err: {:?}", why);
-
- continue;
- },
- };
-
- events.push(event);
-
- if events.len() > 5 {
- break;
- }
+ return (None, true);
+ },
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
+ // This is hit when the websocket client dies this will be
+ // hit every iteration.
+ return (None, false);
+ },
+ other => other,
};
- events
+ let event = match gw_event {
+ Ok(Some(event)) => Ok(event),
+ Ok(None) => return (None, true),
+ Err(why) => Err(why),
+ };
+
+ let event = match shard.handle_event(event) {
+ Ok(Some(event)) => event,
+ Ok(None) => return (None, true),
+ Err(why) => {
+ error!("Shard handler received err: {:?}", why);
+
+ return (None, true);
+ },
+ };
+
+ (Some(event), true)
+ }
+
+ fn request_restart(&self) -> Result<()> {
+ debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
+ let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
+ let _ = self.manager_tx.send(msg);
+
+ Ok(())
}
}