aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/client')
-rw-r--r--src/client/bridge/gateway/shard_runner.rs255
-rw-r--r--src/client/dispatch.rs1
2 files changed, 247 insertions, 9 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index ea1fad6..1075925 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -51,8 +51,15 @@ pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> {
impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// Creates a new runner for a Shard.
pub fn new(opt: ShardRunnerOptions<H>) -> Self {
+ trace!("[ShardRunner {:?}] Setting up", opt.shard.shard_info());
+
let (tx, rx) = mpsc::channel();
+ trace!(
+ "[ShardRunner {:?}] Setup channels",
+ opt.shard.shard_info(),
+ );
+
Self {
runner_rx: rx,
runner_tx: tx,
@@ -68,6 +75,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
}
+ #[inline]
+ fn shard_info(&self) -> [u64; 2] {
+ self.shard.shard_info()
+ }
+
/// Starts the runner's loop to receive events.
///
/// This runs a loop that performs the following in each iteration:
@@ -95,40 +107,78 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// [`ShardManager`]: struct.ShardManager.html
/// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html
pub fn run(&mut self) -> Result<()> {
- debug!("[ShardRunner {:?}] Running", self.shard.shard_info());
+ debug!("[ShardRunner {:?}] Running", self.shard_info());
loop {
+ trace!("[ShardRunner {:?}] Recv'ing...", self.shard_info());
if !self.recv()? {
+ trace!("[ShardRunner {:?}] Failed to recv", self.shard_info());
+
return Ok(());
}
// check heartbeat
+ trace!("[ShardRunner {:?}] Checking heartbeat", self.shard_info());
+
if !self.shard.check_heartbeat() {
warn!(
- "[ShardRunner {:?}] Error heartbeating",
+ "[ShardRunner {:?}] Error heartbeating; restarting",
self.shard.shard_info(),
);
return self.request_restart();
}
+ trace!("[ShardRunner {:?}] Checked heartbeat", self.shard_info());
+
let pre = self.shard.stage();
+ trace!(
+ "[ShardRunner {:?}] Receiving event; stage: {:?}",
+ self.shard_info(),
+ pre,
+ );
let (event, action, successful) = self.recv_event();
let post = self.shard.stage();
+ trace!(
+ "[ShardRunner {:?}] Received event; stage: {:?}",
+ self.shard_info(),
+ post,
+ );
if post != pre {
+ trace!("[ShardRunner {:?}] Stages changed", self.shard_info());
self.update_manager();
+ trace!(
+ "[ShardRunner {:?}] Updated shard manager",
+ self.shard_info(),
+ );
let e = ClientEvent::ShardStageUpdate(ShardStageUpdateEvent {
new: post,
old: pre,
shard_id: ShardId(self.shard.shard_info()[0]),
});
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ e,
+ );
self.dispatch(DispatchEvent::Client(e));
+ trace!("[ShardRunner {:?}] Dispatched", self.shard_info());
}
+ trace!(
+ "[ShardRunner {:?}] Action: {:?}",
+ self.shard_info(),
+ action,
+ );
+
match action {
Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => {
+ trace!(
+ "[ShardRunner {:?}] Requesting restart",
+ self.shard_info(),
+ );
return self.request_restart()
},
Some(other) => {
@@ -138,9 +188,23 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
if let Some(event) = event {
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ event,
+ );
+
self.dispatch(DispatchEvent::Model(event));
+ trace!("[ShardRunner {:?}] Dispatched", self.shard_info());
}
+ trace!(
+ "[ShardRunner {:?}] Successful: {:?}; connecting?: {:?}",
+ self.shard_info(),
+ successful,
+ self.shard.stage().is_connecting(),
+ );
+
if !successful && !self.shard.stage().is_connecting() {
return self.request_restart();
}
@@ -187,6 +251,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
if id.0 != self.shard.shard_info()[0] {
// Not meant for this runner for some reason, don't
// shutdown.
+ debug!(
+ "[ShardRunner {:?}] Received message meant for {:?}?",
+ self.shard_info(),
+ id.0,
+ );
+
return true;
}
@@ -199,6 +269,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
#[inline]
fn dispatch(&self, event: DispatchEvent) {
+ trace!(
+ "[ShardRunner {:?}] Dispatching: {:?}",
+ self.shard_info(),
+ event,
+ );
+
dispatch(
event,
#[cfg(feature = "framework")]
@@ -218,10 +294,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// This always returns true, except in the case that the shard manager asked
// the runner to shutdown.
fn handle_rx_value(&mut self, value: InterMessage) -> bool {
+ trace!(
+ "[ShardRunner {:?}] Handling rx value: {:?}",
+ self.shard_info(),
+ value,
+ );
+
match value {
InterMessage::Client(ShardClientMessage::Manager(x)) => match x {
ShardManagerMessage::Restart(id) |
ShardManagerMessage::Shutdown(id) => {
+ trace!(
+ "[ShardRunner {:?}] Received checked shutdown",
+ self.shard_info(),
+ );
+
self.checked_shutdown(id)
},
ShardManagerMessage::ShutdownAll => {
@@ -242,6 +329,14 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
},
InterMessage::Client(ShardClientMessage::Runner(x)) => match x {
ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => {
+ trace!(
+ "[ShardRunner {:?}] Received rx guild chunk: {:?}; {:?}; {:?}",
+ self.shard_info(),
+ guild_ids,
+ limit,
+ query,
+ );
+
self.shard.chunk_guilds(
guild_ids,
limit,
@@ -253,12 +348,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let data = CloseData::new(code, reason);
let msg = OwnedMessage::Close(Some(data));
+ trace!(
+ "[ShardRunner {:?}] Received rx close: {:?}",
+ self.shard_info(),
+ msg,
+ );
+
self.shard.client.send_message(&msg).is_ok()
},
ShardRunnerMessage::Message(msg) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx heartbeat: {:?}",
+ self.shard_info(),
+ msg,
+ );
+
self.shard.client.send_message(&msg).is_ok()
},
ShardRunnerMessage::SetGame(game) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set game: {:?}",
+ self.shard_info(),
+ game,
+ );
// To avoid a clone of `game`, we do a little bit of
// trickery here:
//
@@ -277,17 +389,35 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
self.shard.update_presence().is_ok()
},
ShardRunnerMessage::SetPresence(status, game) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set presence: {:?}; {:?}",
+ self.shard_info(),
+ status,
+ game,
+ );
+
self.shard.set_presence(status, game);
self.shard.update_presence().is_ok()
},
ShardRunnerMessage::SetStatus(status) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx set status: {:?}",
+ self.shard_info(),
+ status,
+ );
+
self.shard.set_status(status);
self.shard.update_presence().is_ok()
},
},
InterMessage::Json(value) => {
+ trace!(
+ "[ShardRunner {:?}] Received rx json: {}",
+ self.shard_info(),
+ value,
+ );
// Value must be forwarded over the websocket
self.shard.client.send_json(&value).is_ok()
},
@@ -296,6 +426,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
#[cfg(feature = "voice")]
fn handle_voice_event(&self, event: &Event) {
+ trace!(
+ "[ShardRunner {:?}] Handling voice event: {:?}",
+ self.shard_info(),
+ event,
+ );
+
match *event {
Event::Ready(_) => {
self.voice_manager.lock().set(
@@ -337,6 +473,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
// Returns whether the shard runner is in a state that can continue.
fn recv(&mut self) -> Result<bool> {
+ trace!(
+ "[ShardRunner {:?}] Starting recv",
+ self.shard_info(),
+ );
+
loop {
match self.runner_rx.try_recv() {
Ok(value) => {
@@ -352,13 +493,29 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let _ = self.request_restart();
+ trace!(
+ "[ShardRunner {:?}] Recv: requested restart",
+ self.shard_info(),
+ );
+
return Ok(false);
},
- Err(TryRecvError::Empty) => break,
+ Err(TryRecvError::Empty) => {
+ trace!(
+ "[ShardRunner {:?}] Recv empty",
+ self.shard_info(),
+ );
+
+ break;
+ },
}
}
// There are no longer any values available.
+ trace!(
+ "[ShardRunner {:?}] Recv: no values available",
+ self.shard_info(),
+ );
Ok(true)
}
@@ -366,12 +523,36 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) {
+ trace!(
+ "[ShardRunner {:?}] Recv event",
+ self.shard_info(),
+ );
+
let gw_event = match self.shard.client.recv_json() {
Ok(Some(value)) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: value: {:?}",
+ self.shard_info(),
+ value,
+ );
+
GatewayEvent::deserialize(value).map(Some).map_err(From::from)
},
- Ok(None) => Ok(None),
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ Ok(None) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: Ok(None)",
+ self.shard_info(),
+ );
+
+ Ok(None)
+ },
+ Err(Error::WebSocket(WebSocketError::IoError(why))) => {
+ trace!(
+ "[ShardRunner {:?}] recv event: ws io err: {:?}",
+ self.shard_info(),
+ why,
+ );
+
// Check that an amount of time at least double the
// heartbeat_interval has passed.
//
@@ -387,18 +568,46 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
let interval_in_secs = interval / 1000;
if seconds_passed <= interval_in_secs * 2 {
+ trace!(
+ "[ShardRunner {:?}] Recv event: seconds_passed <",
+ self.shard_info(),
+ );
+
return (None, None, true);
}
} else {
+ trace!(
+ "[ShardRunner {:?}] Recv event: no interval; last: {:?}; interval: {:?}",
+ self.shard_info(),
+ last,
+ interval,
+ );
+
return (None, None, true);
}
}
- debug!("Attempting to auto-reconnect");
+ debug!(
+ "[ShardRunner {:?}] Attempting to auto-reconnect: {:?}",
+ self.shard_info(),
+ self.shard.reconnection_type(),
+ );
match self.shard.reconnection_type() {
- ReconnectType::Reidentify => return (None, None, false),
+ ReconnectType::Reidentify => {
+ trace!(
+ "[ShardRunner {:?}] Reconnection type: reidentify",
+ self.shard_info(),
+ );
+
+ return (None, None, false);
+ },
ReconnectType::Resume => {
+ trace!(
+ "[ShardRunner {:?}] Reconnection type: resume",
+ self.shard_info(),
+ );
+
if let Err(why) = self.shard.resume() {
warn!("Failed to resume: {:?}", why);
@@ -410,6 +619,11 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
return (None, None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
+ trace!(
+ "[ShardRunner {:?}] Recv event: no data available",
+ self.shard_info(),
+ );
+
// This is hit when the websocket client dies this will be
// hit every iteration.
return (None, None, false);
@@ -423,6 +637,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
Err(why) => Err(why),
};
+ trace!(
+ "[ShardRunner {:?}] Recv event: done: {:?}",
+ self.shard_info(),
+ event,
+ );
+
let action = match self.shard.handle_event(&event) {
Ok(Some(action)) => Some(action),
Ok(None) => None,
@@ -433,7 +653,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
},
};
+ trace!(
+ "[ShardRunner {:?}] Recv event: handle action: {:?}",
+ self.shard_info(),
+ action,
+ );
+
if let Ok(GatewayEvent::HeartbeatAck) = event {
+ trace!(
+ "[ShardRunner {:?}] Recv event: heartbeatack",
+ self.shard_info(),
+ );
+
self.update_manager();
}
@@ -472,11 +703,17 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
fn update_manager(&self) {
- let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate {
+ if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShardUpdate {
id: ShardId(self.shard.shard_info()[0]),
latency: self.shard.latency(),
stage: self.shard.stage(),
- });
+ }) {
+ warn!(
+ "[ShardRunner {:?}] Error updating manager: {:?}",
+ self.shard_info(),
+ why,
+ );
+ }
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 663a283..18ff63e 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -42,6 +42,7 @@ fn context(
Context::new(Arc::clone(data), runner_tx.clone(), shard_id)
}
+#[derive(Debug)]
pub(crate) enum DispatchEvent {
Client(ClientEvent),
Model(Event),