diff options
| author | Zeyla Hellyer <[email protected]> | 2017-06-01 22:27:18 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-06-01 22:27:18 -0700 |
| commit | ec9b1c79abeb2a4eff9f013ba8f0e430979dbc56 (patch) | |
| tree | d86b92fb5fa30f96e1d4b8e7c85c7e00666b8f69 /src | |
| parent | Deprecate User::get (diff) | |
| download | serenity-ec9b1c79abeb2a4eff9f013ba8f0e430979dbc56.tar.xz serenity-ec9b1c79abeb2a4eff9f013ba8f0e430979dbc56.zip | |
Check last heartbeat acknowledged in heartbeater
When heartbeating, first ensure that the previous heartbeat was
acknowledged. If it wasn't, shutdown the sender and receiver so that
an auto-reconnect can take place.
When receiving a Heartbeat Acknowledgement, set the
`last_heartbeat_acknowledged` to `true` to prevent the auto-reconnect
process.
Diffstat (limited to 'src')
| -rw-r--r-- | src/gateway/prep.rs | 10 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 28 |
2 files changed, 34 insertions, 4 deletions
diff --git a/src/gateway/prep.rs b/src/gateway/prep.rs index f432691..6a415f8 100644 --- a/src/gateway/prep.rs +++ b/src/gateway/prep.rs @@ -77,6 +77,7 @@ pub fn build_gateway_url(base: &str) -> Result<RequestUrl> { pub fn keepalive(interval: u64, heartbeat_sent: Arc<Mutex<Instant>>, + last_ack: Arc<Mutex<bool>>, mut sender: Sender<WebSocketStream>, channel: &MpscReceiver<GatewayStatus>) { let mut base_interval = Duration::milliseconds(interval as i64); @@ -110,6 +111,14 @@ pub fn keepalive(interval: u64, } if time::get_time() >= next_tick { + // If the last heartbeat didn't receive an acknowledgement, then + // shutdown and auto-reconnect. + if !*last_ack.lock().unwrap() { + debug!("Last heartbeat not acknowledged; re-connecting"); + + break; + } + next_tick = next_tick + base_interval; let map = json!({ @@ -124,6 +133,7 @@ pub fn keepalive(interval: u64, let now = Instant::now(); *heartbeat_sent.lock().unwrap() = now; + *last_ack.lock().unwrap() = false; }, Err(why) => { match why { diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 9c635ec..7790cc6 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -62,17 +62,24 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool); /// [`receive`]: #method.receive /// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding /// [module docs]: index.html#sharding -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Shard { current_presence: CurrentPresence, - /// A tuple of the last instant that a heartbeat was sent, and the last that - /// an acknowledgement was received. + /// A tuple of: + /// + /// - the last instant that a heartbeat was sent + /// - the last instant that an acknowledgement was received /// /// This can be used to calculate [`latency`]. /// /// [`latency`]: fn.latency.html heartbeat_instants: (Arc<Mutex<Instant>>, Option<Instant>), keepalive_channel: MpscSender<GatewayStatus>, + /// This is used by the keepalive thread to determine whether the last + /// heartbeat was sent without an acknowledgement, and whether to reconnect. + // This _must_ be set to `true` in `Shard::handle_event`'s + // `Ok(GatewayEvent::HeartbeatAck)` arm. + last_heartbeat_acknowledged: Arc<Mutex<bool>>, seq: u64, session_id: Option<String>, shard_info: Option<[u64; 2]>, @@ -142,10 +149,19 @@ impl Shard { let heartbeat_sent = Arc::new(Mutex::new(Instant::now())); let heartbeat_clone = heartbeat_sent.clone(); + // Set this to true: when the keepalive thread sends a heartbeat, it + // will check if the value is `false`. + // + // If it is, it will reconnect. This enters the bot into a reconnect + // loop. Set this to `true` to give Discord the first heartbeat to + // acknowledge first. + let last_ack = Arc::new(Mutex::new(true)); + let last_ack_clone = last_ack.clone(); + ThreadBuilder::new() .name(thread_name) .spawn(move || { - prep::keepalive(heartbeat_interval, heartbeat_clone, sender, &rx) + prep::keepalive(heartbeat_interval, heartbeat_clone, last_ack_clone, sender, &rx) })?; // Parse READY @@ -155,10 +171,12 @@ impl Shard { &mut receiver, identification)?; + Ok((feature_voice! {{ Shard { current_presence: (None, OnlineStatus::Online, false), heartbeat_instants: (heartbeat_sent, None), + last_heartbeat_acknowledged: last_ack, keepalive_channel: tx.clone(), seq: sequence, token: token.to_owned(), @@ -171,6 +189,7 @@ impl Shard { Shard { current_presence: (None, OnlineStatus::Online, false), heartbeat_instants: (heartbeat_sent, None), + last_heartbeat_acknowledged: last_ack, keepalive_channel: tx.clone(), seq: sequence, token: token.to_owned(), @@ -356,6 +375,7 @@ impl Shard { }, Ok(GatewayEvent::HeartbeatAck) => { self.heartbeat_instants.1 = Some(Instant::now()); + *self.last_heartbeat_acknowledged.lock().unwrap() = true; Ok(None) }, |