aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-01 22:27:18 -0700
committerZeyla Hellyer <[email protected]>2017-06-01 22:27:18 -0700
commitec9b1c79abeb2a4eff9f013ba8f0e430979dbc56 (patch)
treed86b92fb5fa30f96e1d4b8e7c85c7e00666b8f69 /src
parentDeprecate User::get (diff)
downloadserenity-ec9b1c79abeb2a4eff9f013ba8f0e430979dbc56.tar.xz
serenity-ec9b1c79abeb2a4eff9f013ba8f0e430979dbc56.zip
Check last heartbeat acknowledged in heartbeater
When heartbeating, first ensure that the previous heartbeat was acknowledged. If it wasn't, shutdown the sender and receiver so that an auto-reconnect can take place. When receiving a Heartbeat Acknowledgement, set the `last_heartbeat_acknowledged` to `true` to prevent the auto-reconnect process.
Diffstat (limited to 'src')
-rw-r--r--src/gateway/prep.rs10
-rw-r--r--src/gateway/shard.rs28
2 files changed, 34 insertions, 4 deletions
diff --git a/src/gateway/prep.rs b/src/gateway/prep.rs
index f432691..6a415f8 100644
--- a/src/gateway/prep.rs
+++ b/src/gateway/prep.rs
@@ -77,6 +77,7 @@ pub fn build_gateway_url(base: &str) -> Result<RequestUrl> {
pub fn keepalive(interval: u64,
heartbeat_sent: Arc<Mutex<Instant>>,
+ last_ack: Arc<Mutex<bool>>,
mut sender: Sender<WebSocketStream>,
channel: &MpscReceiver<GatewayStatus>) {
let mut base_interval = Duration::milliseconds(interval as i64);
@@ -110,6 +111,14 @@ pub fn keepalive(interval: u64,
}
if time::get_time() >= next_tick {
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // shutdown and auto-reconnect.
+ if !*last_ack.lock().unwrap() {
+ debug!("Last heartbeat not acknowledged; re-connecting");
+
+ break;
+ }
+
next_tick = next_tick + base_interval;
let map = json!({
@@ -124,6 +133,7 @@ pub fn keepalive(interval: u64,
let now = Instant::now();
*heartbeat_sent.lock().unwrap() = now;
+ *last_ack.lock().unwrap() = false;
},
Err(why) => {
match why {
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index 9c635ec..7790cc6 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -62,17 +62,24 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool);
/// [`receive`]: #method.receive
/// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding
/// [module docs]: index.html#sharding
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct Shard {
current_presence: CurrentPresence,
- /// A tuple of the last instant that a heartbeat was sent, and the last that
- /// an acknowledgement was received.
+ /// A tuple of:
+ ///
+ /// - the last instant that a heartbeat was sent
+ /// - the last instant that an acknowledgement was received
///
/// This can be used to calculate [`latency`].
///
/// [`latency`]: fn.latency.html
heartbeat_instants: (Arc<Mutex<Instant>>, Option<Instant>),
keepalive_channel: MpscSender<GatewayStatus>,
+ /// This is used by the keepalive thread to determine whether the last
+ /// heartbeat was sent without an acknowledgement, and whether to reconnect.
+ // This _must_ be set to `true` in `Shard::handle_event`'s
+ // `Ok(GatewayEvent::HeartbeatAck)` arm.
+ last_heartbeat_acknowledged: Arc<Mutex<bool>>,
seq: u64,
session_id: Option<String>,
shard_info: Option<[u64; 2]>,
@@ -142,10 +149,19 @@ impl Shard {
let heartbeat_sent = Arc::new(Mutex::new(Instant::now()));
let heartbeat_clone = heartbeat_sent.clone();
+ // Set this to true: when the keepalive thread sends a heartbeat, it
+ // will check if the value is `false`.
+ //
+ // If it is, it will reconnect. This enters the bot into a reconnect
+ // loop. Set this to `true` to give Discord the first heartbeat to
+ // acknowledge first.
+ let last_ack = Arc::new(Mutex::new(true));
+ let last_ack_clone = last_ack.clone();
+
ThreadBuilder::new()
.name(thread_name)
.spawn(move || {
- prep::keepalive(heartbeat_interval, heartbeat_clone, sender, &rx)
+ prep::keepalive(heartbeat_interval, heartbeat_clone, last_ack_clone, sender, &rx)
})?;
// Parse READY
@@ -155,10 +171,12 @@ impl Shard {
&mut receiver,
identification)?;
+
Ok((feature_voice! {{
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
+ last_heartbeat_acknowledged: last_ack,
keepalive_channel: tx.clone(),
seq: sequence,
token: token.to_owned(),
@@ -171,6 +189,7 @@ impl Shard {
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
+ last_heartbeat_acknowledged: last_ack,
keepalive_channel: tx.clone(),
seq: sequence,
token: token.to_owned(),
@@ -356,6 +375,7 @@ impl Shard {
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());
+ *self.last_heartbeat_acknowledged.lock().unwrap() = true;
Ok(None)
},