aboutsummaryrefslogtreecommitdiff
path: root/src/client/mod.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-07 15:01:47 -0700
committerZeyla Hellyer <[email protected]>2017-06-07 15:01:47 -0700
commit8f8a05996c5b47ec9401aabb517d96ed2af5c36b (patch)
treeab48c3b558c396f4f6d12c98a466074f97f17acf /src/client/mod.rs
parentWs read/write timeout after 90s (diff)
downloadserenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.tar.xz
serenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.zip
Upgrade rust-websocket, rust-openssl, and hyper
Upgrade `rust-websocket` to v0.20, maintaining use of its sync client. This indirectly switches from `rust-openssl` v0.7 - which required openssl-1.0 on all platforms - to `native-tls`, which allows for use of schannel on Windows, Secure Transport on OSX, and openssl-1.1 on other platforms. Additionally, since hyper is no longer even a dependency of rust-websocket, we can safely and easily upgrade to `hyper` v0.10 and `multipart` v0.12. This commit is fairly experimental as it has not been tested on a long-running bot.
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r--src/client/mod.rs81
1 files changed, 53 insertions, 28 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index f728792..91790fa 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -35,6 +35,7 @@ pub use ::http as rest;
#[cfg(feature="cache")]
pub use ::CACHE;
+use chrono::UTC;
use self::dispatch::dispatch;
use self::event_store::EventStore;
use std::collections::HashMap;
@@ -43,9 +44,7 @@ use std::time::Duration;
use std::{mem, thread};
use super::gateway::Shard;
use typemap::ShareMap;
-use websocket::client::Receiver;
use websocket::result::WebSocketError;
-use websocket::stream::WebSocketStream;
use ::http;
use ::internal::prelude::*;
use ::internal::ws_impl::ReceiverExt;
@@ -982,7 +981,7 @@ impl Client {
});
match boot {
- Ok((shard, ready, receiver)) => {
+ Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
@@ -1011,7 +1010,6 @@ impl Client {
event_store: self.event_store.clone(),
framework: self.framework.clone(),
gateway_url: gateway_url.clone(),
- receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
@@ -1021,7 +1019,6 @@ impl Client {
data: self.data.clone(),
event_store: self.event_store.clone(),
gateway_url: gateway_url.clone(),
- receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
@@ -1254,7 +1251,6 @@ struct MonitorInfo {
event_store: Arc<RwLock<EventStore>>,
framework: Arc<Mutex<Framework>>,
gateway_url: Arc<Mutex<String>>,
- receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
@@ -1265,13 +1261,12 @@ struct MonitorInfo {
data: Arc<Mutex<ShareMap>>,
event_store: Arc<RwLock<EventStore>>,
gateway_url: Arc<Mutex<String>>,
- receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
}
-fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> {
+fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
// Make ten attempts to boot the shard, exponentially backing off; if it
// still doesn't boot after that, accept it as a failure.
//
@@ -1298,7 +1293,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS
info.shard_info);
match attempt {
- Ok((shard, ready, receiver)) => {
+ Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
@@ -1308,7 +1303,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS
info!("Successfully booted shard: {:?}", info.shard_info);
- return Ok((shard, ready, receiver));
+ return Ok((shard, ready));
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
}
@@ -1332,14 +1327,13 @@ fn monitor_shard(mut info: MonitorInfo) {
});
match boot {
- Ok((new_shard, ready, new_receiver)) => {
+ Ok((new_shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write().unwrap().update_with_ready(&ready);
}
*info.shard.lock().unwrap() = new_shard;
- info.receiver = new_receiver;
boot_successful = true;
@@ -1375,16 +1369,54 @@ fn monitor_shard(mut info: MonitorInfo) {
}
fn handle_shard(info: &mut MonitorInfo) {
+ // This is currently all ducktape. Redo this.
+ let mut last_ack_time = UTC::now().timestamp();
+ let mut last_heartbeat_sent = UTC::now().timestamp();
+
loop {
- let event = match info.receiver.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
- debug!("Attempting to shutdown receiver/sender");
+ let mut shard = info.shard.lock().unwrap();
+ let in_secs = shard.heartbeat_interval() / 1000;
- match info.shard.lock().unwrap().resume(&mut info.receiver) {
- Ok((_, receiver)) => {
+ if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // shutdown and auto-reconnect.
+ if !shard.last_heartbeat_acknowledged() {
+ debug!("Last heartbeat not acknowledged; re-connecting");
+
+ match shard.resume() {
+ Ok(_) => {
debug!("Successfully resumed shard");
- info.receiver = receiver;
+ continue;
+ },
+ Err(why) => {
+ warn!("Err resuming shard: {:?}", why);
+
+ return;
+ },
+ }
+ }
+
+ let _ = shard.heartbeat();
+ last_heartbeat_sent = UTC::now().timestamp();
+ }
+
+ let event = match shard.client.recv_json(GatewayEvent::decode) {
+ Ok(GatewayEvent::HeartbeatAck) => {
+ last_ack_time = UTC::now().timestamp();
+
+ Ok(GatewayEvent::HeartbeatAck)
+ },
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
+ continue;
+ }
+
+ debug!("Attempting to shutdown receiver/sender");
+
+ match shard.resume() {
+ Ok(_) => {
+ debug!("Successfully resumed shard");
continue;
},
@@ -1395,21 +1427,14 @@ fn handle_shard(info: &mut MonitorInfo) {
},
}
},
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
other => other,
};
trace!("Received event on shard handler: {:?}", event);
- // This will only lock when _updating_ the shard, resuming, etc. Most
- // of the time, this won't be locked (i.e. when receiving an event over
- // the receiver, separate from the shard itself).
- let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) {
- Ok(Some((event, Some(new_receiver)))) => {
- info.receiver = new_receiver;
-
- event
- },
- Ok(Some((event, None))) => event,
+ let event = match shard.handle_event(event) {
+ Ok(Some(event)) => event,
Ok(None) => continue,
Err(why) => {
error!("Shard handler received err: {:?}", why);