aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2018-01-01 09:45:52 -0800
committerZeyla Hellyer <[email protected]>2018-01-01 09:45:52 -0800
commitf5a60f387d2278aa3bddac5c0a0ec110fbf5377e (patch)
treea5a73e305c50a3e33ef7e99dab6818fae3bb4a15 /src
parentDerive Hash, impl Display on ConnectionStage (diff)
downloadserenity-f5a60f387d2278aa3bddac5c0a0ec110fbf5377e.tar.xz
serenity-f5a60f387d2278aa3bddac5c0a0ec110fbf5377e.zip
Expose a method of retrieving shard status/latency
This exposes a method of retrieving a shard's status and latency through the Shard Manager, as part of the existing Shard Runner Info struct. The Shard Runner will update this whenever it notices a change.
Diffstat (limited to 'src')
-rw-r--r--src/client/bridge/gateway/mod.rs13
-rw-r--r--src/client/bridge/gateway/shard_manager_monitor.rs9
-rw-r--r--src/client/bridge/gateway/shard_queuer.rs3
-rw-r--r--src/client/bridge/gateway/shard_runner.rs28
4 files changed, 52 insertions, 1 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs
index c5410e6..5c9ec69 100644
--- a/src/client/bridge/gateway/mod.rs
+++ b/src/client/bridge/gateway/mod.rs
@@ -63,6 +63,8 @@ pub use self::shard_runner_message::ShardRunnerMessage;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::mpsc::Sender;
+use std::time::Duration as StdDuration;
+use ::gateway::ConnectionStage;
/// A message either for a [`ShardManager`] or a [`ShardRunner`].
///
@@ -88,6 +90,12 @@ pub enum ShardManagerMessage {
///
/// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Restart(ShardId),
+ /// An update from a shard runner,
+ ShardUpdate {
+ id: ShardId,
+ latency: Option<StdDuration>,
+ stage: ConnectionStage,
+ },
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard
/// without bringing it back up.
///
@@ -140,7 +148,12 @@ impl Display for ShardId {
/// [`ShardRunner`]: struct.ShardRunner.html
#[derive(Debug)]
pub struct ShardRunnerInfo {
+ /// The latency between when a heartbeat was sent and when the
+ /// acknowledgement was received.
+ pub latency: Option<StdDuration>,
/// The channel used to communicate with the shard runner, telling it
/// what to do with regards to its status.
pub runner_tx: Sender<ShardClientMessage>,
+ /// The current connection stage of the shard.
+ pub stage: ConnectionStage,
}
diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs
index d98827d..9f9c04d 100644
--- a/src/client/bridge/gateway/shard_manager_monitor.rs
+++ b/src/client/bridge/gateway/shard_manager_monitor.rs
@@ -40,6 +40,15 @@ impl ShardManagerMonitor {
ShardManagerMessage::Restart(shard_id) => {
self.manager.lock().restart(shard_id);
},
+ ShardManagerMessage::ShardUpdate { id, latency, stage } => {
+ let manager = self.manager.lock();
+ let mut runners = manager.runners.lock();
+
+ runners.get_mut(&id).map(|runner| {
+ runner.latency = latency;
+ runner.stage = stage;
+ });
+ }
ShardManagerMessage::Shutdown(shard_id) => {
self.manager.lock().shutdown(shard_id);
},
diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs
index 30a0906..461fdd1 100644
--- a/src/client/bridge/gateway/shard_queuer.rs
+++ b/src/client/bridge/gateway/shard_queuer.rs
@@ -16,6 +16,7 @@ use super::{
};
use threadpool::ThreadPool;
use typemap::ShareMap;
+use ::gateway::ConnectionStage;
#[cfg(feature = "framework")]
use framework::Framework;
@@ -184,7 +185,9 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
}};
let runner_info = ShardRunnerInfo {
+ latency: None,
runner_tx: runner.runner_tx(),
+ stage: ConnectionStage::Disconnected,
};
thread::spawn(move || {
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index cff8d7e..7c6cf83 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -141,8 +141,16 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
}
+ let pre = self.shard.stage();
+
let (event, action, successful) = self.recv_event();
+ let post = self.shard.stage();
+
+ if post != pre {
+ self.update_manager();
+ }
+
match action {
Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => {
return self.request_restart()
@@ -249,7 +257,8 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
true
},
- ShardManagerMessage::ShutdownInitiated => {
+ ShardManagerMessage::ShardUpdate { .. }
+ | ShardManagerMessage::ShutdownInitiated => {
// nb: not sent here
true
@@ -411,6 +420,13 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
},
};
+ match event {
+ Ok(GatewayEvent::HeartbeatAck) => {
+ self.update_manager();
+ },
+ _ => {},
+ }
+
let event = match event {
Ok(GatewayEvent::Dispatch(_, event)) => Some(event),
_ => None,
@@ -420,6 +436,8 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
}
fn request_restart(&self) -> Result<()> {
+ self.update_manager();
+
debug!(
"[ShardRunner {:?}] Requesting restart",
self.shard.shard_info(),
@@ -430,4 +448,12 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
Ok(())
}
+
+ fn update_manager(&self) {
+ let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate {
+ id: ShardId(self.shard.shard_info()[0]),
+ latency: self.shard.latency(),
+ stage: self.shard.stage(),
+ });
+ }
}