diff options
| author | Zeyla Hellyer <[email protected]> | 2017-11-03 07:13:24 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-11-03 07:13:38 -0700 |
| commit | b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5 (patch) | |
| tree | 17eb07c8218f1e145d5eb3fba353fd1486b3874d /src/gateway | |
| parent | Make the Client return a Result (diff) | |
| download | serenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.tar.xz serenity-b8efeaf5e920cbfc775cdee70f23aa41ab7b9dd5.zip | |
Redo client internals + gateway
This commit is a rewrite of the client module's internals and the
gateway.
The main benefit of this is that there is either 0 or 1 lock retrievals
per event received, and the ability to utilize the ShardManager both
internally and in userland code has been improved.
The primary rework is in the `serenity::client` module, which now
includes a few more structures, some changes to existing ones, and more
functionality (such as to the `ShardManager`).
The two notable additions to the client-gateway bridge are the
`ShardMessenger` and `ShardManagerMonitor`.
The `ShardMessenger` is a simple-to-use interface for users to use to
interact with shards. The user is given one of these in the
`serenity::client::Context` in dispatches to the
`serenity::client::EventHandler`. This can be used for updating the
presence of a shard, sending a guild chunk message, or sending a user's
defined WebSocket message.
The `ShardManagerMonitor` is a loop run in its own thread, potentially
the main thread, that is responsible for receiving messages over an mpsc
channel on what to do with shards via the `ShardManager`. For example,
it will receive a message to shutdown a single shard, restart a single
shard, or shutdown the entire thing.
Users, in most applications, will not interact with the
`ShardManagerMonitor`. Users using the `serenity::client::Client`
interact with only the `ShardMessenger`.
The `ShardManager` is now usable by the user and is available to them,
and contains public functions for shutdowns, initializations, restarts,
and complete shutdowns of shards. It contains utility functions like
determining whether the `ShardManager` is responsible for a shard of a
given ID and the IDs of shards currently active (having an associated
`ShardRunner`). It can be found on
`serenity::client::Client::shard_manager`.
Speaking of the `ShardRunner`, it no longer owns a clone of an Arc to
its assigned `serenity::gateway::Shard`. It now completely owns the
Shard. This means that in order to open the shard, a `ShardRunner` no
longer has to repeatedly retrieve a lock to it. This reduces the number
of lock retrievals per event dispatching cycle from 3 or 4 depending on
event type to 0 or 1 depending on whether it's a message create _and_ if
the framework is in use. To interact with the Shard, one must now go
through the previously mentioned `ShardMessenger`, which the
`ShardRunner` will check for messages from on a loop.
`serenity::client::Context` is now slightly different. Instead of the
`shard` field being `Arc<Mutex<Shard>>`, it is an instance of a
`ShardMessenger`. The interface is the same (minus losing some
Shard-specific methods like `latency`), and `Context`'s shortcuts still
exist (like `Context::online` or `Context::set_game`). It now
additionally includes a `Context::shard_id` field which is a u64
containing the ID of the shard that the event was dispatched from.
`serenity::client::Client` has one changed field name, one field that is
now public, and a new field. `Client::shard_runners` is now
`Client::shard_manager` of type `Arc<Mutex<ShardManager>>`. The
`Client::token` field is now public. This can, for example, be mutated
on token resets if you know what you're doing. `Client::ws_uri` is new
and contains the URI for shards to use when connecting to the gateway.
Otherwise, the Client's usage is unchanged.
`serenity::gateway::Shard` has a couple of minor changes and many more
public methods and fields. The `autoreconnect`, `check_heartbeat`,
`handle_event`, `heartbeat`, `identify`, `initialize`, `reset`,
`resume`, `reconnect`, and `update_presence` methods are now public. The
`token` structfield is now public. There are new getters for various
structfields, such as `heartbeat_instants` and `last_heartbeat_ack`.
The breaking change on the `Shard` is that `Shard::handle_event` now
takes an event by reference and, instead of returning
`Result<Option<Event>>`, it now returns `Result<Option<ShardAction>>`.
`serenity::gateway::ShardAction` is a light enum determining an action
that someone _should_/_must_ perform on the shard, e.g. reconnecting or
identifying. This is determined by `Shard::handle_event`.
In total, there aren't too many breaking changes that most of userland
use cases has to deal with -- at most, changing some usage of `Context`.
Retrieving information like a Shard's latency is currently not possible
anymore but work will be done to make this functionality available
again.
Diffstat (limited to 'src/gateway')
| -rw-r--r-- | src/gateway/mod.rs | 17 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 889 | ||||
| -rw-r--r-- | src/gateway/ws_client_ext.rs | 133 |
3 files changed, 461 insertions, 578 deletions
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b593f5c..147808e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -51,9 +51,18 @@ mod error; mod shard; +mod ws_client_ext; pub use self::error::Error as GatewayError; pub use self::shard::Shard; +pub use self::ws_client_ext::WebSocketGatewayClientExt; + +use model::{Game, OnlineStatus}; +use websocket::sync::client::Client; +use websocket::sync::stream::{TcpStream, TlsStream}; + +pub type CurrentPresence = (Option<Game>, OnlineStatus); +pub type WsClient = Client<TlsStream<TcpStream>>; /// Indicates the current connection stage of a [`Shard`]. /// @@ -136,3 +145,11 @@ impl ConnectionStage { } } } + +pub enum ShardAction { + Autoreconnect, + Heartbeat, + Identify, + Reconnect, + Resume, +} diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index c644eac..ef05cf4 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -1,39 +1,31 @@ -use chrono::Utc; use parking_lot::Mutex; -use serde_json::Value; -use std::env::consts; -use std::io::Write; -use std::net::Shutdown; use std::sync::Arc; use std::time::{Duration as StdDuration, Instant}; -use std::thread; -use super::{ConnectionStage, GatewayError}; +use super::{ + ConnectionStage, + CurrentPresence, + ShardAction, + GatewayError, + WsClient, + WebSocketGatewayClientExt, +}; use websocket::client::Url; -use websocket::message::{CloseData, OwnedMessage}; use websocket::stream::sync::AsTcpStream; -use websocket::sync::client::{Client, ClientBuilder}; -use websocket::sync::stream::{TcpStream, TlsStream}; +use websocket::sync::client::ClientBuilder; use websocket::WebSocketError; -use constants::{self, close_codes, OpCode}; +use constants::{self, close_codes}; use internal::prelude::*; -use internal::ws_impl::SenderExt; use model::event::{Event, GatewayEvent}; use model::{Game, GuildId, OnlineStatus}; #[cfg(feature = "voice")] +use serde_json::Value; +#[cfg(feature = "voice")] use std::sync::mpsc::{self, Receiver as MpscReceiver}; -#[cfg(feature = "cache")] -use client::CACHE; #[cfg(feature = "voice")] use voice::Manager as VoiceManager; #[cfg(feature = "voice")] use http; -#[cfg(feature = "cache")] -use utils; - -pub type WsClient = Client<TlsStream<TcpStream>>; - -type CurrentPresence = (Option<Game>, OnlineStatus); /// A Shard is a higher-level handler for a websocket connection to Discord's /// gateway. The shard allows for sending and receiving messages over the @@ -91,7 +83,7 @@ pub struct Shard { session_id: Option<String>, shard_info: [u64; 2], stage: ConnectionStage, - token: Arc<Mutex<String>>, + pub token: Arc<Mutex<String>>, ws_url: Arc<Mutex<String>>, /// The voice connections that this Shard is responsible for. The Shard will /// update the voice connections' states. @@ -138,11 +130,14 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn new(ws_url: Arc<Mutex<String>>, - token: Arc<Mutex<String>>, - shard_info: [u64; 2]) - -> Result<Shard> { - let client = connecting(&*ws_url.lock()); + pub fn new( + ws_url: Arc<Mutex<String>>, + token: Arc<Mutex<String>>, + shard_info: [u64; 2], + ) -> Result<Shard> { + let mut client = connect(&*ws_url.lock())?; + + let _ = set_client_timeout(&mut client); let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -159,6 +154,8 @@ impl Shard { let user = http::get_current_user()?; Shard { + manager: VoiceManager::new(tx, user.id), + manager_rx: rx, client, current_presence, heartbeat_instants, @@ -170,8 +167,6 @@ impl Shard { shard_info, session_id, ws_url, - manager: VoiceManager::new(tx, user.id), - manager_rx: rx, } } else { Shard { @@ -191,137 +186,122 @@ impl Shard { }) } - /// Retrieves a copy of the current shard information. - /// - /// The first element is the _current_ shard - 0-indexed - while the second - /// element is the _total number_ of shards -- 1-indexed. - /// - /// For example, if using 3 shards in total, and if this is shard 1, then it - /// can be read as "the second of three shards". - /// - /// # Examples - /// - /// Retrieving the shard info for the second shard, out of two shards total: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); - /// # - /// assert_eq!(shard.shard_info(), [1, 2]); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn shard_info(&self) -> [u64; 2] { self.shard_info } + /// Retrieves the current presence of the shard. + #[inline] + pub fn current_presence(&self) -> &CurrentPresence { + &self.current_presence + } - /// Sets the user's current game, if any. + /// Retrieves the heartbeat instants of the shard. /// - /// Other presence settings are maintained. + /// This is the time of when a heartbeat was sent and when an + /// acknowledgement was last received. + #[inline] + pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) { + &self.heartbeat_instants + } + + /// Retrieves the value of when the last heartbeat was sent. + #[inline] + pub fn last_heartbeat_sent(&self) -> Option<&Instant> { + self.heartbeat_instants.0.as_ref() + } + + /// Retrieves the value of when the last heartbeat ack was received. + #[inline] + pub fn last_heartbeat_ack(&self) -> Option<&Instant> { + self.heartbeat_instants.1.as_ref() + } + + /// Sends a heartbeat to the gateway with the current sequence. /// - /// # Examples + /// This sets the last heartbeat time to now, and + /// `last_heartbeat_acknowledged` to `false`. /// - /// Setting the current game to playing `"Heroes of the Storm"`: + /// # Errors /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::Game; + /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending + /// a heartbeat. /// - /// shard.set_game(Some(Game::playing("Heroes of the Storm"))); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` + /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed + pub fn heartbeat(&mut self) -> Result<()> { + match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) { + Ok(()) => { + self.heartbeat_instants.0 = Some(Instant::now()); + self.last_heartbeat_acknowledged = false; + + Ok(()) + }, + Err(why) => { + match why { + Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { + debug!("[Shard {:?}] Err heartbeating: {:?}", + self.shard_info, + err); + }, + other => { + warn!("[Shard {:?}] Other err w/ keepalive: {:?}", + self.shard_info, + other); + }, + } + + Err(Error::Gateway(GatewayError::HeartbeatFailed)) + } + } + } + + #[inline] + pub fn heartbeat_interval(&self) -> Option<&u64> { + self.heartbeat_interval.as_ref() + } + + #[inline] + pub fn last_heartbeat_acknowledged(&self) -> bool { + self.last_heartbeat_acknowledged + } + + #[inline] + pub fn seq(&self) -> u64 { + self.seq + } + + #[inline] + pub fn session_id(&self) -> Option<&String> { + self.session_id.as_ref() + } + + #[inline] pub fn set_game(&mut self, game: Option<Game>) { self.current_presence.0 = game; + } - self.update_presence(); + #[inline] + pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) { + self.set_game(game); + self.set_status(status); } - /// Sets the user's current online status. - /// - /// Note that [`Offline`] is not a valid online status, so it is - /// automatically converted to [`Invisible`]. - /// - /// Other presence settings are maintained. - /// - /// # Examples - /// - /// Setting the current online status for the shard to [`DoNotDisturb`]. - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::OnlineStatus; - /// - /// shard.set_status(OnlineStatus::DoNotDisturb); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb - /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible - /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline - pub fn set_status(&mut self, online_status: OnlineStatus) { - self.current_presence.1 = match online_status { - OnlineStatus::Offline => OnlineStatus::Invisible, - other => other, - }; + #[inline] + pub fn set_status(&mut self, mut status: OnlineStatus) { + if status == OnlineStatus::Offline { + status = OnlineStatus::Invisible; + } - self.update_presence(); + self.current_presence.1 = status; } - /// Sets the user's full presence information. + /// Retrieves a copy of the current shard information. /// - /// Consider using the individual setters if you only need to modify one of - /// these. + /// The first element is the _current_ shard - 0-indexed - while the second + /// element is the _total number_ of shards -- 1-indexed. + /// + /// For example, if using 3 shards in total, and if this is shard 1, then it + /// can be read as "the second of three shards". /// /// # Examples /// - /// Set the current user as playing `"Heroes of the Storm"` and being - /// online: + /// Retrieving the shard info for the second shard, out of two shards total: /// /// ```rust,no_run /// # extern crate parking_lot; @@ -335,11 +315,9 @@ impl Shard { /// # fn try_main() -> Result<(), Box<Error>> { /// # let mutex = Arc::new(Mutex::new("".to_string())); /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); /// # - /// use serenity::model::{Game, OnlineStatus}; - /// - /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online); + /// assert_eq!(shard.shard_info(), [1, 2]); /// # Ok(()) /// # } /// # @@ -347,15 +325,7 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) { - if status == OnlineStatus::Offline { - status = OnlineStatus::Invisible; - } - - self.current_presence = (game, status); - - self.update_presence(); - } + pub fn shard_info(&self) -> [u64; 2] { self.shard_info } /// Returns the current connection stage of the shard. pub fn stage(&self) -> ConnectionStage { @@ -387,21 +357,24 @@ impl Shard { /// Returns a `GatewayError::OverloadedShard` if the shard would have too /// many guilds assigned to it. #[allow(cyclomatic_complexity)] - pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> { - match event { - Ok(GatewayEvent::Dispatch(seq, event)) => { + pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>) + -> Result<Option<ShardAction>> { + match *event { + Ok(GatewayEvent::Dispatch(seq, ref event)) => { if seq > self.seq + 1 { warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq); } - match event { + match *event { Event::Ready(ref ready) => { debug!("[Shard {:?}] Received Ready", self.shard_info); self.session_id = Some(ready.ready.session_id.clone()); self.stage = ConnectionStage::Connected; + /* set_client_timeout(&mut self.client)?; + */ }, Event::Resumed(_) => { info!("[Shard {:?}] Resumed", self.shard_info); @@ -421,7 +394,7 @@ impl Shard { self.seq = seq; - Ok(Some(event)) + Ok(None) }, Ok(GatewayEvent::Heartbeat(s)) => { info!("[Shard {:?}] Received shard heartbeat", self.shard_info); @@ -438,24 +411,18 @@ impl Shard { if self.stage == ConnectionStage::Handshake { self.stage = ConnectionStage::Identifying; - self.identify()?; + return Ok(Some(ShardAction::Identify)); } else { warn!( "[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info ); - return self.autoreconnect().and(Ok(None)); + return Ok(Some(ShardAction::Autoreconnect)); } } - let map = json!({ - "d": Value::Null, - "op": OpCode::Heartbeat.num(), - }); - self.client.send_json(&map)?; - - Ok(None) + Ok(Some(ShardAction::Heartbeat)) }, Ok(GatewayEvent::HeartbeatAck) => { self.heartbeat_instants.1 = Some(Instant::now()); @@ -478,14 +445,14 @@ impl Shard { self.heartbeat_interval = Some(interval); } - if self.stage == ConnectionStage::Handshake { - self.identify().and(Ok(None)) + Ok(Some(if self.stage == ConnectionStage::Handshake { + ShardAction::Identify } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); - self.autoreconnect().and(Ok(None)) - } + ShardAction::Autoreconnect + })) }, Ok(GatewayEvent::InvalidateSession(resumable)) => { info!( @@ -493,16 +460,15 @@ impl Shard { self.shard_info, ); - if resumable { - self.resume().and(Ok(None)) + Ok(Some(if resumable { + ShardAction::Resume } else { - self.identify().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), - Err(Error::Gateway(GatewayError::Closed(data))) => { + Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)), + Err(Error::Gateway(GatewayError::Closed(ref data))) => { let num = data.as_ref().map(|d| d.status_code); - let reason = data.map(|d| d.reason); let clean = num == Some(1000); match num { @@ -562,7 +528,7 @@ impl Shard { "[Shard {:?}] Unknown unclean close {}: {:?}", self.shard_info, other, - reason + data.as_ref().map(|d| &d.reason), ); }, _ => {}, @@ -573,14 +539,14 @@ impl Shard { self.session_id.is_some() }).unwrap_or(true); - if resume { - self.resume().or_else(|_| self.reconnect()).and(Ok(None)) + Ok(Some(if resume { + ShardAction::Resume } else { - self.reconnect().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Err(Error::WebSocket(why)) => { - if let WebSocketError::NoDataAvailable = why { + Err(Error::WebSocket(ref why)) => { + if let WebSocketError::NoDataAvailable = *why { if self.heartbeat_instants.1.is_none() { return Ok(None); } @@ -592,44 +558,62 @@ impl Shard { info!("[Shard {:?}] Will attempt to auto-reconnect", self.shard_info); - self.autoreconnect().and(Ok(None)) + Ok(Some(ShardAction::Autoreconnect)) }, - Err(error) => Err(error), + _ => Ok(None), + } + } + + pub fn check_heartbeat(&mut self) -> Result<()> { + let wait = { + let heartbeat_interval = match self.heartbeat_interval { + Some(heartbeat_interval) => heartbeat_interval, + None => return Ok(()), + }; + + StdDuration::from_secs(heartbeat_interval / 1000) + }; + + // If a duration of time less than the heartbeat_interval has passed, + // then don't perform a keepalive or attempt to reconnect. + if let Some(last_sent) = self.heartbeat_instants.0 { + if last_sent.elapsed() <= wait { + return Ok(()); + } + } + + // If the last heartbeat didn't receive an acknowledgement, then + // auto-reconnect. + if !self.last_heartbeat_acknowledged { + debug!( + "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", + self.shard_info, + ); + + return self.reconnect().map_err(|why| { + warn!( + "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", + self.shard_info, + why, + ); + + why + }); + } + + // Otherwise, we're good to heartbeat. + if let Err(why) = self.heartbeat() { + warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); + + self.reconnect() + } else { + trace!("[Shard {:?}] Heartbeated", self.shard_info); + + Ok(()) } } /// Calculates the heartbeat latency between the shard and the gateway. - /// - /// # Examples - /// - /// When using the [`Client`], output the latency in response to a `"~ping"` - /// message handled through [`Client::on_message`]. - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::*; - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "~ping" { - /// if let Some(latency) = ctx.shard.lock().latency() { - /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos()); - /// - /// let _ = msg.channel_id.say(&s); - /// } else { - /// let _ = msg.channel_id.say("N/A"); - /// } - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Client`]: ../struct.Client.html - /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message // Shamelessly stolen from brayzure's commit in eris: // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6> pub fn latency(&self) -> Option<StdDuration> { @@ -640,38 +624,68 @@ impl Shard { } } - /// Shuts down the receiver by attempting to cleanly close the - /// connection. - pub fn shutdown_clean(&mut self) -> Result<()> { - { - let data = CloseData { - status_code: 1000, - reason: String::new(), - }; - - let message = OwnedMessage::Close(Some(data)); - - self.client.send_message(&message)?; + #[cfg(feature = "voice")] + fn voice_dispatch(&mut self, event: &Event) { + if let Event::VoiceStateUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } } - let mut stream = self.client.stream_ref().as_tcp(); + if let Event::VoiceServerUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + #[cfg(feature = "voice")] + pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> { + let mut messages = vec![]; - debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info); + while let Ok(v) = self.manager_rx.try_recv() { + messages.push(v); + } - Ok(()) + messages } - /// Uncleanly shuts down the receiver by not sending a close code. - pub fn shutdown(&mut self) -> Result<()> { - let mut stream = self.client.stream_ref().as_tcp(); + /// Performs a deterministic reconnect. + /// + /// The type of reconnect is deterministic on whether a [`session_id`]. + /// + /// If the `session_id` still exists, then a RESUME is sent. If not, then + /// an IDENTIFY is sent. + /// + /// Note that, if the shard is already in a stage of + /// [`ConnectionStage::Connecting`], then no action will be performed. + /// + /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting + /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id + pub fn autoreconnect(&mut self) -> Result<()> { + if self.stage == ConnectionStage::Connecting { + return Ok(()); + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + if self.session_id().is_some() { + debug!( + "[Shard {:?}] Autoreconnector choosing to resume", + self.shard_info, + ); - Ok(()) + self.resume() + } else { + debug!( + "[Shard {:?}] Autoreconnector choosing to reconnect", + self.shard_info, + ); + + self.reconnect() + } } /// Requests that one or multiple [`Guild`]s be chunked. @@ -710,7 +724,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(2000), None); + /// shard.chunk_guilds(guild_ids, Some(2000), None); /// # Ok(()) /// # } /// # @@ -740,7 +754,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(20), Some("do")); + /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); /// # Ok(()) /// # } /// # @@ -753,280 +767,40 @@ impl Shard { /// ../../model/event/enum.Event.html#variant.GuildMembersChunk /// [`Guild`]: ../../model/struct.Guild.html /// [`Member`]: ../../model/struct.Member.html - pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>) - where It: IntoIterator<Item=T> { + pub fn chunk_guilds<It>( + &mut self, + guild_ids: It, + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { debug!("[Shard {:?}] Requesting member chunks", self.shard_info); - let msg = json!({ - "op": OpCode::GetGuildMembers.num(), - "d": { - "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), - "limit": limit.unwrap_or(0), - "query": query.unwrap_or(""), - }, - }); - - let _ = self.client.send_json(&msg); - } - - /// Calculates the number of guilds that the shard is responsible for. - /// - /// If sharding is not being used (i.e. 1 shard), then the total number of - /// [`Guild`] in the [`Cache`] will be used. - /// - /// **Note**: Requires the `cache` feature be enabled. - /// - /// # Examples - /// - /// Retrieve the number of guilds a shard is responsible for: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// let info = shard.shard_info(); - /// let guilds = shard.guilds_handled(); - /// - /// println!("Shard {:?} is responsible for {} guilds", info, guilds); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`Cache`]: ../ext/cache/struct.Cache.html - /// [`Guild`]: ../model/struct.Guild.html - #[cfg(feature = "cache")] - pub fn guilds_handled(&self) -> u16 { - let cache = CACHE.read(); - - let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]); - - cache - .guilds - .keys() - .filter(|guild_id| { - utils::shard_id(guild_id.0, shard_count) == shard_id - }) - .count() as u16 - } - - #[cfg(feature = "voice")] - fn voice_dispatch(&mut self, event: &Event) { - if let Event::VoiceStateUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_state(&update.voice_state); - } - } - } - - if let Event::VoiceServerUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_server(&update.endpoint, &update.token); - } - } - } - } - - #[cfg(feature = "voice")] - pub(crate) fn cycle_voice_recv(&mut self) { - if let Ok(v) = self.manager_rx.try_recv() { - if let Err(why) = self.client.send_json(&v) { - warn!("[Shard {:?}] Err sending voice msg: {:?}", - self.shard_info, - why); - } - } + self.client.send_chunk_guilds( + guild_ids, + &self.shard_info, + limit, + query, + ) } - pub(crate) fn heartbeat(&mut self) -> Result<()> { - let map = json!({ - "d": self.seq, - "op": OpCode::Heartbeat.num(), - }); - - trace!("[Shard {:?}] Sending heartbeat d: {}", - self.shard_info, - self.seq); - - match self.client.send_json(&map) { - Ok(_) => { - self.heartbeat_instants.0 = Some(Instant::now()); - self.last_heartbeat_acknowledged = false; - - trace!("[Shard {:?}] Successfully heartbeated", - self.shard_info); - - Ok(()) - }, - Err(why) => { - match why { - Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { - debug!("[Shard {:?}] Err heartbeating: {:?}", - self.shard_info, - err); - }, - other => { - warn!("[Shard {:?}] Other err w/ keepalive: {:?}", - self.shard_info, - other); - }, - } - - Err(Error::Gateway(GatewayError::HeartbeatFailed)) - }, - } - } - - pub(crate) fn check_heartbeat(&mut self) -> Result<()> { - let heartbeat_interval = match self.heartbeat_interval { - Some(heartbeat_interval) => heartbeat_interval, - None => return Ok(()), - }; - - let wait = StdDuration::from_secs(heartbeat_interval / 1000); - - // If a duration of time less than the heartbeat_interval has passed, - // then don't perform a keepalive or attempt to reconnect. - if let Some(last_sent) = self.heartbeat_instants.0 { - if last_sent.elapsed() <= wait { - return Ok(()); - } - } - - // If the last heartbeat didn't receive an acknowledgement, then - // auto-reconnect. - if !self.last_heartbeat_acknowledged { - debug!( - "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", - self.shard_info, - ); - - return self.reconnect().map_err(|why| { - warn!( - "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", - self.shard_info, - why, - ); - - why - }); - } - - // Otherwise, we're good to heartbeat. - trace!("[Shard {:?}] Heartbeating", self.shard_info); - - if let Err(why) = self.heartbeat() { - warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); - - self.reconnect() - } else { - trace!("[Shard {:?}] Heartbeated", self.shard_info); - self.heartbeat_instants.0 = Some(Instant::now()); - - Ok(()) - } - } - - pub(crate) fn autoreconnect(&mut self) -> Result<()> { - if self.stage == ConnectionStage::Connecting { - return Ok(()); - } - - if self.session_id.is_some() { - debug!("[Shard {:?}] Autoreconnector choosing to resume", - self.shard_info); - - self.resume() - } else { - debug!("[Shard {:?}] Autoreconnector choosing to reconnect", - self.shard_info); - - self.reconnect() - } - } - - /// Retrieves the `heartbeat_interval`. - #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option<u64> { - self.heartbeat_interval - } - - /// Retrieves the value of when the last heartbeat ack was received. - #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { - self.heartbeat_instants.1 - } - - fn reconnect(&mut self) -> Result<()> { - info!("[Shard {:?}] Attempting to reconnect", self.shard_info); - self.reset(); - - self.initialize() - } - - // Attempts to send a RESUME message. - // - // # Examples - // - // Returns a `GatewayError::NoSessionId` is there is no `session_id`, - // indicating that the shard should instead [`reconnect`]. + // Sets the shard as going into identifying stage, which sets: // - // [`reconnect`]: #method.reconnect - fn resume(&mut self) -> Result<()> { - debug!("Shard {:?}] Attempting to resume", self.shard_info); - - self.initialize()?; - self.stage = ConnectionStage::Resuming; - - self.send_resume().or_else(|why| { - warn!("[Shard {:?}] Err sending resume: {:?}", - self.shard_info, - why); - - self.reconnect() - }) - } - - fn send_resume(&mut self) -> Result<()> { - let session_id = match self.session_id.clone() { - Some(session_id) => session_id, - None => return Err(Error::Gateway(GatewayError::NoSessionId)), - }; + // - the time that the last heartbeat sent as being now + // - the `stage` to `Identifying` + pub fn identify(&mut self) -> Result<()> { + self.client.send_identify(&self.shard_info, &self.token.lock())?; - debug!("[Shard {:?}] Sending resume; seq: {}", - self.shard_info, - self.seq); + self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; - self.client.send_json(&json!({ - "op": OpCode::Resume.num(), - "d": { - "session_id": session_id, - "seq": self.seq, - "token": &*self.token.lock(), - }, - })) + Ok(()) } /// Initializes a new WebSocket client. /// /// This will set the stage of the shard before and after instantiation of /// the client. - fn initialize(&mut self) -> Result<()> { + pub fn initialize(&mut self) -> Result<WsClient> { debug!("[Shard {:?}] Initializing", self.shard_info); // We need to do two, sort of three things here: @@ -1038,38 +812,15 @@ impl Shard { // This is used to accurately assess whether the state of the shard is // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; - self.client = connect(&self.ws_url.lock())?; + let mut client = connect(&self.ws_url.lock())?; self.stage = ConnectionStage::Handshake; - Ok(()) - } - - fn identify(&mut self) -> Result<()> { - let identification = json!({ - "op": OpCode::Identify.num(), - "d": { - "compression": true, - "large_threshold": constants::LARGE_THRESHOLD, - "shard": self.shard_info, - "token": &*self.token.lock(), - "v": constants::GATEWAY_VERSION, - "properties": { - "$browser": "serenity", - "$device": "serenity", - "$os": consts::OS, - }, - }, - }); - - self.heartbeat_instants.0 = Some(Instant::now()); - self.stage = ConnectionStage::Identifying; + let _ = set_client_timeout(&mut client); - debug!("[Shard {:?}] Identifying", self.shard_info); - - self.client.send_json(&identification) + Ok(client) } - fn reset(&mut self) { + pub fn reset(&mut self) { self.heartbeat_instants = (Some(Instant::now()), None); self.heartbeat_interval = None; self.last_heartbeat_acknowledged = true; @@ -1078,42 +829,39 @@ impl Shard { self.seq = 0; } - fn update_presence(&mut self) { - let (ref game, status) = self.current_presence; - let now = Utc::now().timestamp() as u64; - - let msg = json!({ - "op": OpCode::StatusUpdate.num(), - "d": { - "afk": false, - "since": now, - "status": status.name(), - "game": game.as_ref().map(|x| json!({ - "name": x.name, - "type": x.kind, - "url": x.url, - })), - }, - }); + pub fn resume(&mut self) -> Result<()> { + debug!("Shard {:?}] Attempting to resume", self.shard_info); - debug!("[Shard {:?}] Sending presence update", self.shard_info); + self.client = self.initialize()?; + self.stage = ConnectionStage::Resuming; - if let Err(why) = self.client.send_json(&msg) { - warn!("[Shard {:?}] Err sending presence update: {:?}", - self.shard_info, - why); + match self.session_id.as_ref() { + Some(session_id) => { + self.client.send_resume( + &self.shard_info, + session_id, + &self.seq, + &self.token.lock(), + ) + }, + None => Err(Error::Gateway(GatewayError::NoSessionId)), } + } - #[cfg(feature = "cache")] - { - let mut cache = CACHE.write(); - let current_user_id = cache.user.id; + pub fn reconnect(&mut self) -> Result<()> { + info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); - cache.presences.get_mut(¤t_user_id).map(|presence| { - presence.game = game.clone(); - presence.last_modified = Some(now); - }); - } + self.reset(); + self.client = self.initialize()?; + + Ok(()) + } + + pub fn update_presence(&mut self) -> Result<()> { + self.client.send_presence_update( + &self.shard_info, + &self.current_presence, + ) } } @@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> { Error::Gateway(GatewayError::BuildingUrl) }) } - -/// Tries to connect and upon failure, retries. -fn connecting(uri: &str) -> WsClient { - let waiting_time = 30; - - loop { - match connect(&uri) { - Ok(client) => return client, - Err(why) => { - warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time); - thread::sleep(StdDuration::from_secs(waiting_time)); - }, - }; - } -} diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs new file mode 100644 index 0000000..873d114 --- /dev/null +++ b/src/gateway/ws_client_ext.rs @@ -0,0 +1,133 @@ +use chrono::Utc; +use constants::{self, OpCode}; +use gateway::{CurrentPresence, WsClient}; +use internal::prelude::*; +use internal::ws_impl::SenderExt; +use model::GuildId; +use std::env::consts; + +pub trait WebSocketGatewayClientExt { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId>; + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()>; + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()>; + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()>; + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()>; +} + +impl WebSocketGatewayClientExt for WsClient { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { + debug!("[Shard {:?}] Requesting member chunks", shard_info); + + self.send_json(&json!({ + "op": OpCode::GetGuildMembers.num(), + "d": { + "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), + "limit": limit.unwrap_or(0), + "query": query.unwrap_or(""), + }, + })).map_err(From::from) + } + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()> { + trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq); + + self.send_json(&json!({ + "d": seq, + "op": OpCode::Heartbeat.num(), + })).map_err(From::from) + } + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()> { + debug!("[Shard {:?}] Identifying", shard_info); + + self.send_json(&json!({ + "op": OpCode::Identify.num(), + "d": { + "compression": true, + "large_threshold": constants::LARGE_THRESHOLD, + "shard": shard_info, + "token": token, + "v": constants::GATEWAY_VERSION, + "properties": { + "$browser": "serenity", + "$device": "serenity", + "$os": consts::OS, + }, + }, + })) + } + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()> { + let &(ref game, ref status) = current_presence; + let now = Utc::now().timestamp() as u64; + + debug!("[Shard {:?}] Sending presence update", shard_info); + + self.send_json(&json!({ + "op": OpCode::StatusUpdate.num(), + "d": { + "afk": false, + "since": now, + "status": status.name(), + "game": game.as_ref().map(|x| json!({ + "name": x.name, + "type": x.kind, + "url": x.url, + })), + }, + })) + } + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()> { + debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq); + + self.send_json(&json!({ + "op": OpCode::Resume.num(), + "d": { + "session_id": session_id, + "seq": seq, + "token": token, + }, + })).map_err(From::from) + } +} |