aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-07 15:01:47 -0700
committerZeyla Hellyer <[email protected]>2017-06-07 15:01:47 -0700
commit8f8a05996c5b47ec9401aabb517d96ed2af5c36b (patch)
treeab48c3b558c396f4f6d12c98a466074f97f17acf /src/client
parentWs read/write timeout after 90s (diff)
downloadserenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.tar.xz
serenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.zip
Upgrade rust-websocket, rust-openssl, and hyper
Upgrade `rust-websocket` to v0.20, maintaining use of its sync client. This indirectly switches from `rust-openssl` v0.7 - which required openssl-1.0 on all platforms - to `native-tls`, which allows for use of schannel on Windows, Secure Transport on OSX, and openssl-1.1 on other platforms. Additionally, since hyper is no longer even a dependency of rust-websocket, we can safely and easily upgrade to `hyper` v0.10 and `multipart` v0.12. This commit is fairly experimental as it has not been tested on a long-running bot.
Diffstat (limited to 'src/client')
-rw-r--r--src/client/context.rs32
-rw-r--r--src/client/mod.rs81
2 files changed, 69 insertions, 44 deletions
diff --git a/src/client/context.rs b/src/client/context.rs
index 5d36647..ec072ca 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -134,7 +134,8 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn online(&self) {
- self.shard.lock().unwrap().set_status(OnlineStatus::Online);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_status(OnlineStatus::Online);
}
/// Sets the current user as being [`Idle`]. This maintains the current
@@ -157,7 +158,8 @@ impl Context {
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
pub fn idle(&self) {
- self.shard.lock().unwrap().set_status(OnlineStatus::Idle);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_status(OnlineStatus::Idle);
}
/// Sets the current user as being [`DoNotDisturb`]. This maintains the
@@ -180,7 +182,8 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
pub fn dnd(&self) {
- self.shard.lock().unwrap().set_status(OnlineStatus::DoNotDisturb);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_status(OnlineStatus::DoNotDisturb);
}
/// Sets the current user as being [`Invisible`]. This maintains the current
@@ -203,7 +206,8 @@ impl Context {
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
pub fn invisible(&self) {
- self.shard.lock().unwrap().set_status(OnlineStatus::Invisible);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_status(OnlineStatus::Invisible);
}
/// "Resets" the current user's presence, by setting the game to `None` and
@@ -228,9 +232,8 @@ impl Context {
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
pub fn reset_presence(&self) {
- self.shard.lock()
- .unwrap()
- .set_presence(None, OnlineStatus::Online, false)
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_presence(None, OnlineStatus::Online, false)
}
/// Sets the current game, defaulting to an online status of [`Online`].
@@ -260,9 +263,8 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn set_game(&self, game: Game) {
- self.shard.lock()
- .unwrap()
- .set_presence(Some(game), OnlineStatus::Online, false);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_presence(Some(game), OnlineStatus::Online, false);
}
/// Sets the current game, passing in only its name. This will automatically
@@ -302,9 +304,8 @@ impl Context {
url: None,
};
- self.shard.lock()
- .unwrap()
- .set_presence(Some(game), OnlineStatus::Online, false);
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_presence(Some(game), OnlineStatus::Online, false);
}
/// Sets the current user's presence, providing all fields to be passed.
@@ -351,8 +352,7 @@ impl Context {
game: Option<Game>,
status: OnlineStatus,
afk: bool) {
- self.shard.lock()
- .unwrap()
- .set_presence(game, status, afk)
+ let mut shard = self.shard.lock().unwrap();
+ shard.set_presence(game, status, afk)
}
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index f728792..91790fa 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -35,6 +35,7 @@ pub use ::http as rest;
#[cfg(feature="cache")]
pub use ::CACHE;
+use chrono::UTC;
use self::dispatch::dispatch;
use self::event_store::EventStore;
use std::collections::HashMap;
@@ -43,9 +44,7 @@ use std::time::Duration;
use std::{mem, thread};
use super::gateway::Shard;
use typemap::ShareMap;
-use websocket::client::Receiver;
use websocket::result::WebSocketError;
-use websocket::stream::WebSocketStream;
use ::http;
use ::internal::prelude::*;
use ::internal::ws_impl::ReceiverExt;
@@ -982,7 +981,7 @@ impl Client {
});
match boot {
- Ok((shard, ready, receiver)) => {
+ Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
@@ -1011,7 +1010,6 @@ impl Client {
event_store: self.event_store.clone(),
framework: self.framework.clone(),
gateway_url: gateway_url.clone(),
- receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
@@ -1021,7 +1019,6 @@ impl Client {
data: self.data.clone(),
event_store: self.event_store.clone(),
gateway_url: gateway_url.clone(),
- receiver: receiver,
shard: shard,
shard_info: shard_info,
token: self.token.clone(),
@@ -1254,7 +1251,6 @@ struct MonitorInfo {
event_store: Arc<RwLock<EventStore>>,
framework: Arc<Mutex<Framework>>,
gateway_url: Arc<Mutex<String>>,
- receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
@@ -1265,13 +1261,12 @@ struct MonitorInfo {
data: Arc<Mutex<ShareMap>>,
event_store: Arc<RwLock<EventStore>>,
gateway_url: Arc<Mutex<String>>,
- receiver: Receiver<WebSocketStream>,
shard: Arc<Mutex<Shard>>,
shard_info: Option<[u64; 2]>,
token: String,
}
-fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> {
+fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
// Make ten attempts to boot the shard, exponentially backing off; if it
// still doesn't boot after that, accept it as a failure.
//
@@ -1298,7 +1293,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS
info.shard_info);
match attempt {
- Ok((shard, ready, receiver)) => {
+ Ok((shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write()
@@ -1308,7 +1303,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS
info!("Successfully booted shard: {:?}", info.shard_info);
- return Ok((shard, ready, receiver));
+ return Ok((shard, ready));
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
}
@@ -1332,14 +1327,13 @@ fn monitor_shard(mut info: MonitorInfo) {
});
match boot {
- Ok((new_shard, ready, new_receiver)) => {
+ Ok((new_shard, ready)) => {
#[cfg(feature="cache")]
{
CACHE.write().unwrap().update_with_ready(&ready);
}
*info.shard.lock().unwrap() = new_shard;
- info.receiver = new_receiver;
boot_successful = true;
@@ -1375,16 +1369,54 @@ fn monitor_shard(mut info: MonitorInfo) {
}
fn handle_shard(info: &mut MonitorInfo) {
+ // This is currently all ducktape. Redo this.
+ let mut last_ack_time = UTC::now().timestamp();
+ let mut last_heartbeat_sent = UTC::now().timestamp();
+
loop {
- let event = match info.receiver.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
- debug!("Attempting to shutdown receiver/sender");
+ let mut shard = info.shard.lock().unwrap();
+ let in_secs = shard.heartbeat_interval() / 1000;
- match info.shard.lock().unwrap().resume(&mut info.receiver) {
- Ok((_, receiver)) => {
+ if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // shutdown and auto-reconnect.
+ if !shard.last_heartbeat_acknowledged() {
+ debug!("Last heartbeat not acknowledged; re-connecting");
+
+ match shard.resume() {
+ Ok(_) => {
debug!("Successfully resumed shard");
- info.receiver = receiver;
+ continue;
+ },
+ Err(why) => {
+ warn!("Err resuming shard: {:?}", why);
+
+ return;
+ },
+ }
+ }
+
+ let _ = shard.heartbeat();
+ last_heartbeat_sent = UTC::now().timestamp();
+ }
+
+ let event = match shard.client.recv_json(GatewayEvent::decode) {
+ Ok(GatewayEvent::HeartbeatAck) => {
+ last_ack_time = UTC::now().timestamp();
+
+ Ok(GatewayEvent::HeartbeatAck)
+ },
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
+ continue;
+ }
+
+ debug!("Attempting to shutdown receiver/sender");
+
+ match shard.resume() {
+ Ok(_) => {
+ debug!("Successfully resumed shard");
continue;
},
@@ -1395,21 +1427,14 @@ fn handle_shard(info: &mut MonitorInfo) {
},
}
},
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
other => other,
};
trace!("Received event on shard handler: {:?}", event);
- // This will only lock when _updating_ the shard, resuming, etc. Most
- // of the time, this won't be locked (i.e. when receiving an event over
- // the receiver, separate from the shard itself).
- let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) {
- Ok(Some((event, Some(new_receiver)))) => {
- info.receiver = new_receiver;
-
- event
- },
- Ok(Some((event, None))) => event,
+ let event = match shard.handle_event(event) {
+ Ok(Some(event)) => event,
Ok(None) => continue,
Err(why) => {
error!("Shard handler received err: {:?}", why);