From f5a60f387d2278aa3bddac5c0a0ec110fbf5377e Mon Sep 17 00:00:00 2001 From: Zeyla Hellyer Date: Mon, 1 Jan 2018 09:45:52 -0800 Subject: Expose a method of retrieving shard status/latency This exposes a method of retrieving a shard's status and latency through the Shard Manager, as part of the existing Shard Runner Info struct. The Shard Runner will update this whenever it notices a change. --- src/client/bridge/gateway/mod.rs | 13 ++++++++++ src/client/bridge/gateway/shard_manager_monitor.rs | 9 +++++++ src/client/bridge/gateway/shard_queuer.rs | 3 +++ src/client/bridge/gateway/shard_runner.rs | 28 +++++++++++++++++++++- 4 files changed, 52 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs index c5410e6..5c9ec69 100644 --- a/src/client/bridge/gateway/mod.rs +++ b/src/client/bridge/gateway/mod.rs @@ -63,6 +63,8 @@ pub use self::shard_runner_message::ShardRunnerMessage; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::mpsc::Sender; +use std::time::Duration as StdDuration; +use ::gateway::ConnectionStage; /// A message either for a [`ShardManager`] or a [`ShardRunner`]. /// @@ -88,6 +90,12 @@ pub enum ShardManagerMessage { /// /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html Restart(ShardId), + /// An update from a shard runner, + ShardUpdate { + id: ShardId, + latency: Option, + stage: ConnectionStage, + }, /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard /// without bringing it back up. /// @@ -140,7 +148,12 @@ impl Display for ShardId { /// [`ShardRunner`]: struct.ShardRunner.html #[derive(Debug)] pub struct ShardRunnerInfo { + /// The latency between when a heartbeat was sent and when the + /// acknowledgement was received. + pub latency: Option, /// The channel used to communicate with the shard runner, telling it /// what to do with regards to its status. pub runner_tx: Sender, + /// The current connection stage of the shard. + pub stage: ConnectionStage, } diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs index d98827d..9f9c04d 100644 --- a/src/client/bridge/gateway/shard_manager_monitor.rs +++ b/src/client/bridge/gateway/shard_manager_monitor.rs @@ -40,6 +40,15 @@ impl ShardManagerMonitor { ShardManagerMessage::Restart(shard_id) => { self.manager.lock().restart(shard_id); }, + ShardManagerMessage::ShardUpdate { id, latency, stage } => { + let manager = self.manager.lock(); + let mut runners = manager.runners.lock(); + + runners.get_mut(&id).map(|runner| { + runner.latency = latency; + runner.stage = stage; + }); + } ShardManagerMessage::Shutdown(shard_id) => { self.manager.lock().shutdown(shard_id); }, diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs index 30a0906..461fdd1 100644 --- a/src/client/bridge/gateway/shard_queuer.rs +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -16,6 +16,7 @@ use super::{ }; use threadpool::ThreadPool; use typemap::ShareMap; +use ::gateway::ConnectionStage; #[cfg(feature = "framework")] use framework::Framework; @@ -184,7 +185,9 @@ impl ShardQueuer { }}; let runner_info = ShardRunnerInfo { + latency: None, runner_tx: runner.runner_tx(), + stage: ConnectionStage::Disconnected, }; thread::spawn(move || { diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index cff8d7e..7c6cf83 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -141,8 +141,16 @@ impl ShardRunner { } } + let pre = self.shard.stage(); + let (event, action, successful) = self.recv_event(); + let post = self.shard.stage(); + + if post != pre { + self.update_manager(); + } + match action { Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => { return self.request_restart() @@ -249,7 +257,8 @@ impl ShardRunner { true }, - ShardManagerMessage::ShutdownInitiated => { + ShardManagerMessage::ShardUpdate { .. } + | ShardManagerMessage::ShutdownInitiated => { // nb: not sent here true @@ -411,6 +420,13 @@ impl ShardRunner { }, }; + match event { + Ok(GatewayEvent::HeartbeatAck) => { + self.update_manager(); + }, + _ => {}, + } + let event = match event { Ok(GatewayEvent::Dispatch(_, event)) => Some(event), _ => None, @@ -420,6 +436,8 @@ impl ShardRunner { } fn request_restart(&self) -> Result<()> { + self.update_manager(); + debug!( "[ShardRunner {:?}] Requesting restart", self.shard.shard_info(), @@ -430,4 +448,12 @@ impl ShardRunner { Ok(()) } + + fn update_manager(&self) { + let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate { + id: ShardId(self.shard.shard_info()[0]), + latency: self.shard.latency(), + stage: self.shard.stage(), + }); + } } -- cgit v1.2.3