diff options
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 255 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 1 |
2 files changed, 247 insertions, 9 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index ea1fad6..1075925 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -51,8 +51,15 @@ pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// Creates a new runner for a Shard. pub fn new(opt: ShardRunnerOptions<H>) -> Self { + trace!("[ShardRunner {:?}] Setting up", opt.shard.shard_info()); + let (tx, rx) = mpsc::channel(); + trace!( + "[ShardRunner {:?}] Setup channels", + opt.shard.shard_info(), + ); + Self { runner_rx: rx, runner_tx: tx, @@ -68,6 +75,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } } + #[inline] + fn shard_info(&self) -> [u64; 2] { + self.shard.shard_info() + } + /// Starts the runner's loop to receive events. /// /// This runs a loop that performs the following in each iteration: @@ -95,40 +107,78 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// [`ShardManager`]: struct.ShardManager.html /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html pub fn run(&mut self) -> Result<()> { - debug!("[ShardRunner {:?}] Running", self.shard.shard_info()); + debug!("[ShardRunner {:?}] Running", self.shard_info()); loop { + trace!("[ShardRunner {:?}] Recv'ing...", self.shard_info()); if !self.recv()? { + trace!("[ShardRunner {:?}] Failed to recv", self.shard_info()); + return Ok(()); } // check heartbeat + trace!("[ShardRunner {:?}] Checking heartbeat", self.shard_info()); + if !self.shard.check_heartbeat() { warn!( - "[ShardRunner {:?}] Error heartbeating", + "[ShardRunner {:?}] Error heartbeating; restarting", self.shard.shard_info(), ); return self.request_restart(); } + trace!("[ShardRunner {:?}] Checked heartbeat", self.shard_info()); + let pre = self.shard.stage(); + trace!( + "[ShardRunner {:?}] Receiving event; stage: {:?}", + self.shard_info(), + pre, + ); let (event, action, successful) = self.recv_event(); let post = self.shard.stage(); + trace!( + "[ShardRunner {:?}] Received event; stage: {:?}", + self.shard_info(), + post, + ); if post != pre { + trace!("[ShardRunner {:?}] Stages changed", self.shard_info()); self.update_manager(); + trace!( + "[ShardRunner {:?}] Updated shard manager", + self.shard_info(), + ); let e = ClientEvent::ShardStageUpdate(ShardStageUpdateEvent { new: post, old: pre, shard_id: ShardId(self.shard.shard_info()[0]), }); + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + e, + ); self.dispatch(DispatchEvent::Client(e)); + trace!("[ShardRunner {:?}] Dispatched", self.shard_info()); } + trace!( + "[ShardRunner {:?}] Action: {:?}", + self.shard_info(), + action, + ); + match action { Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => { + trace!( + "[ShardRunner {:?}] Requesting restart", + self.shard_info(), + ); return self.request_restart() }, Some(other) => { @@ -138,9 +188,23 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } if let Some(event) = event { + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + event, + ); + self.dispatch(DispatchEvent::Model(event)); + trace!("[ShardRunner {:?}] Dispatched", self.shard_info()); } + trace!( + "[ShardRunner {:?}] Successful: {:?}; connecting?: {:?}", + self.shard_info(), + successful, + self.shard.stage().is_connecting(), + ); + if !successful && !self.shard.stage().is_connecting() { return self.request_restart(); } @@ -187,6 +251,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { if id.0 != self.shard.shard_info()[0] { // Not meant for this runner for some reason, don't // shutdown. + debug!( + "[ShardRunner {:?}] Received message meant for {:?}?", + self.shard_info(), + id.0, + ); + return true; } @@ -199,6 +269,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { #[inline] fn dispatch(&self, event: DispatchEvent) { + trace!( + "[ShardRunner {:?}] Dispatching: {:?}", + self.shard_info(), + event, + ); + dispatch( event, #[cfg(feature = "framework")] @@ -218,10 +294,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // This always returns true, except in the case that the shard manager asked // the runner to shutdown. fn handle_rx_value(&mut self, value: InterMessage) -> bool { + trace!( + "[ShardRunner {:?}] Handling rx value: {:?}", + self.shard_info(), + value, + ); + match value { InterMessage::Client(ShardClientMessage::Manager(x)) => match x { ShardManagerMessage::Restart(id) | ShardManagerMessage::Shutdown(id) => { + trace!( + "[ShardRunner {:?}] Received checked shutdown", + self.shard_info(), + ); + self.checked_shutdown(id) }, ShardManagerMessage::ShutdownAll => { @@ -242,6 +329,14 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { }, InterMessage::Client(ShardClientMessage::Runner(x)) => match x { ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { + trace!( + "[ShardRunner {:?}] Received rx guild chunk: {:?}; {:?}; {:?}", + self.shard_info(), + guild_ids, + limit, + query, + ); + self.shard.chunk_guilds( guild_ids, limit, @@ -253,12 +348,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let data = CloseData::new(code, reason); let msg = OwnedMessage::Close(Some(data)); + trace!( + "[ShardRunner {:?}] Received rx close: {:?}", + self.shard_info(), + msg, + ); + self.shard.client.send_message(&msg).is_ok() }, ShardRunnerMessage::Message(msg) => { + trace!( + "[ShardRunner {:?}] Received rx heartbeat: {:?}", + self.shard_info(), + msg, + ); + self.shard.client.send_message(&msg).is_ok() }, ShardRunnerMessage::SetGame(game) => { + trace!( + "[ShardRunner {:?}] Received rx set game: {:?}", + self.shard_info(), + game, + ); // To avoid a clone of `game`, we do a little bit of // trickery here: // @@ -277,17 +389,35 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { self.shard.update_presence().is_ok() }, ShardRunnerMessage::SetPresence(status, game) => { + trace!( + "[ShardRunner {:?}] Received rx set presence: {:?}; {:?}", + self.shard_info(), + status, + game, + ); + self.shard.set_presence(status, game); self.shard.update_presence().is_ok() }, ShardRunnerMessage::SetStatus(status) => { + trace!( + "[ShardRunner {:?}] Received rx set status: {:?}", + self.shard_info(), + status, + ); + self.shard.set_status(status); self.shard.update_presence().is_ok() }, }, InterMessage::Json(value) => { + trace!( + "[ShardRunner {:?}] Received rx json: {}", + self.shard_info(), + value, + ); // Value must be forwarded over the websocket self.shard.client.send_json(&value).is_ok() }, @@ -296,6 +426,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { #[cfg(feature = "voice")] fn handle_voice_event(&self, event: &Event) { + trace!( + "[ShardRunner {:?}] Handling voice event: {:?}", + self.shard_info(), + event, + ); + match *event { Event::Ready(_) => { self.voice_manager.lock().set( @@ -337,6 +473,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // Returns whether the shard runner is in a state that can continue. fn recv(&mut self) -> Result<bool> { + trace!( + "[ShardRunner {:?}] Starting recv", + self.shard_info(), + ); + loop { match self.runner_rx.try_recv() { Ok(value) => { @@ -352,13 +493,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let _ = self.request_restart(); + trace!( + "[ShardRunner {:?}] Recv: requested restart", + self.shard_info(), + ); + return Ok(false); }, - Err(TryRecvError::Empty) => break, + Err(TryRecvError::Empty) => { + trace!( + "[ShardRunner {:?}] Recv empty", + self.shard_info(), + ); + + break; + }, } } // There are no longer any values available. + trace!( + "[ShardRunner {:?}] Recv: no values available", + self.shard_info(), + ); Ok(true) } @@ -366,12 +523,36 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { /// Returns a received event, as well as whether reading the potentially /// present event was successful. fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) { + trace!( + "[ShardRunner {:?}] Recv event", + self.shard_info(), + ); + let gw_event = match self.shard.client.recv_json() { Ok(Some(value)) => { + trace!( + "[ShardRunner {:?}] Recv event: value: {:?}", + self.shard_info(), + value, + ); + GatewayEvent::deserialize(value).map(Some).map_err(From::from) }, - Ok(None) => Ok(None), - Err(Error::WebSocket(WebSocketError::IoError(_))) => { + Ok(None) => { + trace!( + "[ShardRunner {:?}] Recv event: Ok(None)", + self.shard_info(), + ); + + Ok(None) + }, + Err(Error::WebSocket(WebSocketError::IoError(why))) => { + trace!( + "[ShardRunner {:?}] recv event: ws io err: {:?}", + self.shard_info(), + why, + ); + // Check that an amount of time at least double the // heartbeat_interval has passed. // @@ -387,18 +568,46 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { let interval_in_secs = interval / 1000; if seconds_passed <= interval_in_secs * 2 { + trace!( + "[ShardRunner {:?}] Recv event: seconds_passed <", + self.shard_info(), + ); + return (None, None, true); } } else { + trace!( + "[ShardRunner {:?}] Recv event: no interval; last: {:?}; interval: {:?}", + self.shard_info(), + last, + interval, + ); + return (None, None, true); } } - debug!("Attempting to auto-reconnect"); + debug!( + "[ShardRunner {:?}] Attempting to auto-reconnect: {:?}", + self.shard_info(), + self.shard.reconnection_type(), + ); match self.shard.reconnection_type() { - ReconnectType::Reidentify => return (None, None, false), + ReconnectType::Reidentify => { + trace!( + "[ShardRunner {:?}] Reconnection type: reidentify", + self.shard_info(), + ); + + return (None, None, false); + }, ReconnectType::Resume => { + trace!( + "[ShardRunner {:?}] Reconnection type: resume", + self.shard_info(), + ); + if let Err(why) = self.shard.resume() { warn!("Failed to resume: {:?}", why); @@ -410,6 +619,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { return (None, None, true); }, Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + trace!( + "[ShardRunner {:?}] Recv event: no data available", + self.shard_info(), + ); + // This is hit when the websocket client dies this will be // hit every iteration. return (None, None, false); @@ -423,6 +637,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { Err(why) => Err(why), }; + trace!( + "[ShardRunner {:?}] Recv event: done: {:?}", + self.shard_info(), + event, + ); + let action = match self.shard.handle_event(&event) { Ok(Some(action)) => Some(action), Ok(None) => None, @@ -433,7 +653,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { }, }; + trace!( + "[ShardRunner {:?}] Recv event: handle action: {:?}", + self.shard_info(), + action, + ); + if let Ok(GatewayEvent::HeartbeatAck) = event { + trace!( + "[ShardRunner {:?}] Recv event: heartbeatack", + self.shard_info(), + ); + self.update_manager(); } @@ -472,11 +703,17 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } fn update_manager(&self) { - let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate { + if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShardUpdate { id: ShardId(self.shard.shard_info()[0]), latency: self.shard.latency(), stage: self.shard.stage(), - }); + }) { + warn!( + "[ShardRunner {:?}] Error updating manager: {:?}", + self.shard_info(), + why, + ); + } } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 663a283..18ff63e 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -42,6 +42,7 @@ fn context( Context::new(Arc::clone(data), runner_tx.clone(), shard_id) } +#[derive(Debug)] pub(crate) enum DispatchEvent { Client(ClientEvent), Model(Event), |