diff options
| author | Zeyla Hellyer <[email protected]> | 2017-06-07 15:01:47 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-06-07 15:01:47 -0700 |
| commit | 8f8a05996c5b47ec9401aabb517d96ed2af5c36b (patch) | |
| tree | ab48c3b558c396f4f6d12c98a466074f97f17acf /src/client | |
| parent | Ws read/write timeout after 90s (diff) | |
| download | serenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.tar.xz serenity-8f8a05996c5b47ec9401aabb517d96ed2af5c36b.zip | |
Upgrade rust-websocket, rust-openssl, and hyper
Upgrade `rust-websocket` to v0.20, maintaining use of its sync client.
This indirectly switches from `rust-openssl` v0.7 - which required
openssl-1.0 on all platforms - to `native-tls`, which allows for use of
schannel on Windows, Secure Transport on OSX, and openssl-1.1 on other
platforms.
Additionally, since hyper is no longer even a dependency of
rust-websocket, we can safely and easily upgrade to `hyper` v0.10 and
`multipart` v0.12.
This commit is fairly experimental as it has not been tested on a
long-running bot.
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/context.rs | 32 | ||||
| -rw-r--r-- | src/client/mod.rs | 81 |
2 files changed, 69 insertions, 44 deletions
diff --git a/src/client/context.rs b/src/client/context.rs index 5d36647..ec072ca 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -134,7 +134,8 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn online(&self) { - self.shard.lock().unwrap().set_status(OnlineStatus::Online); + let mut shard = self.shard.lock().unwrap(); + shard.set_status(OnlineStatus::Online); } /// Sets the current user as being [`Idle`]. This maintains the current @@ -157,7 +158,8 @@ impl Context { /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle pub fn idle(&self) { - self.shard.lock().unwrap().set_status(OnlineStatus::Idle); + let mut shard = self.shard.lock().unwrap(); + shard.set_status(OnlineStatus::Idle); } /// Sets the current user as being [`DoNotDisturb`]. This maintains the @@ -180,7 +182,8 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb pub fn dnd(&self) { - self.shard.lock().unwrap().set_status(OnlineStatus::DoNotDisturb); + let mut shard = self.shard.lock().unwrap(); + shard.set_status(OnlineStatus::DoNotDisturb); } /// Sets the current user as being [`Invisible`]. This maintains the current @@ -203,7 +206,8 @@ impl Context { /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible pub fn invisible(&self) { - self.shard.lock().unwrap().set_status(OnlineStatus::Invisible); + let mut shard = self.shard.lock().unwrap(); + shard.set_status(OnlineStatus::Invisible); } /// "Resets" the current user's presence, by setting the game to `None` and @@ -228,9 +232,8 @@ impl Context { /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence pub fn reset_presence(&self) { - self.shard.lock() - .unwrap() - .set_presence(None, OnlineStatus::Online, false) + let mut shard = self.shard.lock().unwrap(); + shard.set_presence(None, OnlineStatus::Online, false) } /// Sets the current game, defaulting to an online status of [`Online`]. @@ -260,9 +263,8 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn set_game(&self, game: Game) { - self.shard.lock() - .unwrap() - .set_presence(Some(game), OnlineStatus::Online, false); + let mut shard = self.shard.lock().unwrap(); + shard.set_presence(Some(game), OnlineStatus::Online, false); } /// Sets the current game, passing in only its name. This will automatically @@ -302,9 +304,8 @@ impl Context { url: None, }; - self.shard.lock() - .unwrap() - .set_presence(Some(game), OnlineStatus::Online, false); + let mut shard = self.shard.lock().unwrap(); + shard.set_presence(Some(game), OnlineStatus::Online, false); } /// Sets the current user's presence, providing all fields to be passed. @@ -351,8 +352,7 @@ impl Context { game: Option<Game>, status: OnlineStatus, afk: bool) { - self.shard.lock() - .unwrap() - .set_presence(game, status, afk) + let mut shard = self.shard.lock().unwrap(); + shard.set_presence(game, status, afk) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index f728792..91790fa 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -35,6 +35,7 @@ pub use ::http as rest; #[cfg(feature="cache")] pub use ::CACHE; +use chrono::UTC; use self::dispatch::dispatch; use self::event_store::EventStore; use std::collections::HashMap; @@ -43,9 +44,7 @@ use std::time::Duration; use std::{mem, thread}; use super::gateway::Shard; use typemap::ShareMap; -use websocket::client::Receiver; use websocket::result::WebSocketError; -use websocket::stream::WebSocketStream; use ::http; use ::internal::prelude::*; use ::internal::ws_impl::ReceiverExt; @@ -982,7 +981,7 @@ impl Client { }); match boot { - Ok((shard, ready, receiver)) => { + Ok((shard, ready)) => { #[cfg(feature="cache")] { CACHE.write() @@ -1011,7 +1010,6 @@ impl Client { event_store: self.event_store.clone(), framework: self.framework.clone(), gateway_url: gateway_url.clone(), - receiver: receiver, shard: shard, shard_info: shard_info, token: self.token.clone(), @@ -1021,7 +1019,6 @@ impl Client { data: self.data.clone(), event_store: self.event_store.clone(), gateway_url: gateway_url.clone(), - receiver: receiver, shard: shard, shard_info: shard_info, token: self.token.clone(), @@ -1254,7 +1251,6 @@ struct MonitorInfo { event_store: Arc<RwLock<EventStore>>, framework: Arc<Mutex<Framework>>, gateway_url: Arc<Mutex<String>>, - receiver: Receiver<WebSocketStream>, shard: Arc<Mutex<Shard>>, shard_info: Option<[u64; 2]>, token: String, @@ -1265,13 +1261,12 @@ struct MonitorInfo { data: Arc<Mutex<ShareMap>>, event_store: Arc<RwLock<EventStore>>, gateway_url: Arc<Mutex<String>>, - receiver: Receiver<WebSocketStream>, shard: Arc<Mutex<Shard>>, shard_info: Option<[u64; 2]>, token: String, } -fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> { +fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> { // Make ten attempts to boot the shard, exponentially backing off; if it // still doesn't boot after that, accept it as a failure. // @@ -1298,7 +1293,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS info.shard_info); match attempt { - Ok((shard, ready, receiver)) => { + Ok((shard, ready)) => { #[cfg(feature="cache")] { CACHE.write() @@ -1308,7 +1303,7 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketS info!("Successfully booted shard: {:?}", info.shard_info); - return Ok((shard, ready, receiver)); + return Ok((shard, ready)); }, Err(why) => warn!("Failed to boot shard: {:?}", why), } @@ -1332,14 +1327,13 @@ fn monitor_shard(mut info: MonitorInfo) { }); match boot { - Ok((new_shard, ready, new_receiver)) => { + Ok((new_shard, ready)) => { #[cfg(feature="cache")] { CACHE.write().unwrap().update_with_ready(&ready); } *info.shard.lock().unwrap() = new_shard; - info.receiver = new_receiver; boot_successful = true; @@ -1375,16 +1369,54 @@ fn monitor_shard(mut info: MonitorInfo) { } fn handle_shard(info: &mut MonitorInfo) { + // This is currently all ducktape. Redo this. + let mut last_ack_time = UTC::now().timestamp(); + let mut last_heartbeat_sent = UTC::now().timestamp(); + loop { - let event = match info.receiver.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - debug!("Attempting to shutdown receiver/sender"); + let mut shard = info.shard.lock().unwrap(); + let in_secs = shard.heartbeat_interval() / 1000; - match info.shard.lock().unwrap().resume(&mut info.receiver) { - Ok((_, receiver)) => { + if UTC::now().timestamp() - last_heartbeat_sent > in_secs { + // If the last heartbeat didn't receive an acknowledgement, then + // shutdown and auto-reconnect. + if !shard.last_heartbeat_acknowledged() { + debug!("Last heartbeat not acknowledged; re-connecting"); + + match shard.resume() { + Ok(_) => { debug!("Successfully resumed shard"); - info.receiver = receiver; + continue; + }, + Err(why) => { + warn!("Err resuming shard: {:?}", why); + + return; + }, + } + } + + let _ = shard.heartbeat(); + last_heartbeat_sent = UTC::now().timestamp(); + } + + let event = match shard.client.recv_json(GatewayEvent::decode) { + Ok(GatewayEvent::HeartbeatAck) => { + last_ack_time = UTC::now().timestamp(); + + Ok(GatewayEvent::HeartbeatAck) + }, + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time { + continue; + } + + debug!("Attempting to shutdown receiver/sender"); + + match shard.resume() { + Ok(_) => { + debug!("Successfully resumed shard"); continue; }, @@ -1395,21 +1427,14 @@ fn handle_shard(info: &mut MonitorInfo) { }, } }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, other => other, }; trace!("Received event on shard handler: {:?}", event); - // This will only lock when _updating_ the shard, resuming, etc. Most - // of the time, this won't be locked (i.e. when receiving an event over - // the receiver, separate from the shard itself). - let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) { - Ok(Some((event, Some(new_receiver)))) => { - info.receiver = new_receiver; - - event - }, - Ok(Some((event, None))) => event, + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, Ok(None) => continue, Err(why) => { error!("Shard handler received err: {:?}", why); |