diff options
Diffstat (limited to 'src/gateway')
| -rw-r--r-- | src/gateway/mod.rs | 17 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 889 | ||||
| -rw-r--r-- | src/gateway/ws_client_ext.rs | 133 |
3 files changed, 461 insertions, 578 deletions
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b593f5c..147808e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -51,9 +51,18 @@ mod error; mod shard; +mod ws_client_ext; pub use self::error::Error as GatewayError; pub use self::shard::Shard; +pub use self::ws_client_ext::WebSocketGatewayClientExt; + +use model::{Game, OnlineStatus}; +use websocket::sync::client::Client; +use websocket::sync::stream::{TcpStream, TlsStream}; + +pub type CurrentPresence = (Option<Game>, OnlineStatus); +pub type WsClient = Client<TlsStream<TcpStream>>; /// Indicates the current connection stage of a [`Shard`]. /// @@ -136,3 +145,11 @@ impl ConnectionStage { } } } + +pub enum ShardAction { + Autoreconnect, + Heartbeat, + Identify, + Reconnect, + Resume, +} diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index c644eac..ef05cf4 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -1,39 +1,31 @@ -use chrono::Utc; use parking_lot::Mutex; -use serde_json::Value; -use std::env::consts; -use std::io::Write; -use std::net::Shutdown; use std::sync::Arc; use std::time::{Duration as StdDuration, Instant}; -use std::thread; -use super::{ConnectionStage, GatewayError}; +use super::{ + ConnectionStage, + CurrentPresence, + ShardAction, + GatewayError, + WsClient, + WebSocketGatewayClientExt, +}; use websocket::client::Url; -use websocket::message::{CloseData, OwnedMessage}; use websocket::stream::sync::AsTcpStream; -use websocket::sync::client::{Client, ClientBuilder}; -use websocket::sync::stream::{TcpStream, TlsStream}; +use websocket::sync::client::ClientBuilder; use websocket::WebSocketError; -use constants::{self, close_codes, OpCode}; +use constants::{self, close_codes}; use internal::prelude::*; -use internal::ws_impl::SenderExt; use model::event::{Event, GatewayEvent}; use model::{Game, GuildId, OnlineStatus}; #[cfg(feature = "voice")] +use serde_json::Value; +#[cfg(feature = "voice")] use std::sync::mpsc::{self, Receiver as MpscReceiver}; -#[cfg(feature = "cache")] -use client::CACHE; #[cfg(feature = "voice")] use voice::Manager as VoiceManager; #[cfg(feature = "voice")] use http; -#[cfg(feature = "cache")] -use utils; - -pub type WsClient = Client<TlsStream<TcpStream>>; - -type CurrentPresence = (Option<Game>, OnlineStatus); /// A Shard is a higher-level handler for a websocket connection to Discord's /// gateway. The shard allows for sending and receiving messages over the @@ -91,7 +83,7 @@ pub struct Shard { session_id: Option<String>, shard_info: [u64; 2], stage: ConnectionStage, - token: Arc<Mutex<String>>, + pub token: Arc<Mutex<String>>, ws_url: Arc<Mutex<String>>, /// The voice connections that this Shard is responsible for. The Shard will /// update the voice connections' states. @@ -138,11 +130,14 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn new(ws_url: Arc<Mutex<String>>, - token: Arc<Mutex<String>>, - shard_info: [u64; 2]) - -> Result<Shard> { - let client = connecting(&*ws_url.lock()); + pub fn new( + ws_url: Arc<Mutex<String>>, + token: Arc<Mutex<String>>, + shard_info: [u64; 2], + ) -> Result<Shard> { + let mut client = connect(&*ws_url.lock())?; + + let _ = set_client_timeout(&mut client); let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -159,6 +154,8 @@ impl Shard { let user = http::get_current_user()?; Shard { + manager: VoiceManager::new(tx, user.id), + manager_rx: rx, client, current_presence, heartbeat_instants, @@ -170,8 +167,6 @@ impl Shard { shard_info, session_id, ws_url, - manager: VoiceManager::new(tx, user.id), - manager_rx: rx, } } else { Shard { @@ -191,137 +186,122 @@ impl Shard { }) } - /// Retrieves a copy of the current shard information. - /// - /// The first element is the _current_ shard - 0-indexed - while the second - /// element is the _total number_ of shards -- 1-indexed. - /// - /// For example, if using 3 shards in total, and if this is shard 1, then it - /// can be read as "the second of three shards". - /// - /// # Examples - /// - /// Retrieving the shard info for the second shard, out of two shards total: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); - /// # - /// assert_eq!(shard.shard_info(), [1, 2]); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn shard_info(&self) -> [u64; 2] { self.shard_info } + /// Retrieves the current presence of the shard. + #[inline] + pub fn current_presence(&self) -> &CurrentPresence { + &self.current_presence + } - /// Sets the user's current game, if any. + /// Retrieves the heartbeat instants of the shard. /// - /// Other presence settings are maintained. + /// This is the time of when a heartbeat was sent and when an + /// acknowledgement was last received. + #[inline] + pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) { + &self.heartbeat_instants + } + + /// Retrieves the value of when the last heartbeat was sent. + #[inline] + pub fn last_heartbeat_sent(&self) -> Option<&Instant> { + self.heartbeat_instants.0.as_ref() + } + + /// Retrieves the value of when the last heartbeat ack was received. + #[inline] + pub fn last_heartbeat_ack(&self) -> Option<&Instant> { + self.heartbeat_instants.1.as_ref() + } + + /// Sends a heartbeat to the gateway with the current sequence. /// - /// # Examples + /// This sets the last heartbeat time to now, and + /// `last_heartbeat_acknowledged` to `false`. /// - /// Setting the current game to playing `"Heroes of the Storm"`: + /// # Errors /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::Game; + /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending + /// a heartbeat. /// - /// shard.set_game(Some(Game::playing("Heroes of the Storm"))); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` + /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed + pub fn heartbeat(&mut self) -> Result<()> { + match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) { + Ok(()) => { + self.heartbeat_instants.0 = Some(Instant::now()); + self.last_heartbeat_acknowledged = false; + + Ok(()) + }, + Err(why) => { + match why { + Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { + debug!("[Shard {:?}] Err heartbeating: {:?}", + self.shard_info, + err); + }, + other => { + warn!("[Shard {:?}] Other err w/ keepalive: {:?}", + self.shard_info, + other); + }, + } + + Err(Error::Gateway(GatewayError::HeartbeatFailed)) + } + } + } + + #[inline] + pub fn heartbeat_interval(&self) -> Option<&u64> { + self.heartbeat_interval.as_ref() + } + + #[inline] + pub fn last_heartbeat_acknowledged(&self) -> bool { + self.last_heartbeat_acknowledged + } + + #[inline] + pub fn seq(&self) -> u64 { + self.seq + } + + #[inline] + pub fn session_id(&self) -> Option<&String> { + self.session_id.as_ref() + } + + #[inline] pub fn set_game(&mut self, game: Option<Game>) { self.current_presence.0 = game; + } - self.update_presence(); + #[inline] + pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) { + self.set_game(game); + self.set_status(status); } - /// Sets the user's current online status. - /// - /// Note that [`Offline`] is not a valid online status, so it is - /// automatically converted to [`Invisible`]. - /// - /// Other presence settings are maintained. - /// - /// # Examples - /// - /// Setting the current online status for the shard to [`DoNotDisturb`]. - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::OnlineStatus; - /// - /// shard.set_status(OnlineStatus::DoNotDisturb); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb - /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible - /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline - pub fn set_status(&mut self, online_status: OnlineStatus) { - self.current_presence.1 = match online_status { - OnlineStatus::Offline => OnlineStatus::Invisible, - other => other, - }; + #[inline] + pub fn set_status(&mut self, mut status: OnlineStatus) { + if status == OnlineStatus::Offline { + status = OnlineStatus::Invisible; + } - self.update_presence(); + self.current_presence.1 = status; } - /// Sets the user's full presence information. + /// Retrieves a copy of the current shard information. /// - /// Consider using the individual setters if you only need to modify one of - /// these. + /// The first element is the _current_ shard - 0-indexed - while the second + /// element is the _total number_ of shards -- 1-indexed. + /// + /// For example, if using 3 shards in total, and if this is shard 1, then it + /// can be read as "the second of three shards". /// /// # Examples /// - /// Set the current user as playing `"Heroes of the Storm"` and being - /// online: + /// Retrieving the shard info for the second shard, out of two shards total: /// /// ```rust,no_run /// # extern crate parking_lot; @@ -335,11 +315,9 @@ impl Shard { /// # fn try_main() -> Result<(), Box<Error>> { /// # let mutex = Arc::new(Mutex::new("".to_string())); /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); + /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap(); /// # - /// use serenity::model::{Game, OnlineStatus}; - /// - /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online); + /// assert_eq!(shard.shard_info(), [1, 2]); /// # Ok(()) /// # } /// # @@ -347,15 +325,7 @@ impl Shard { /// # try_main().unwrap(); /// # } /// ``` - pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) { - if status == OnlineStatus::Offline { - status = OnlineStatus::Invisible; - } - - self.current_presence = (game, status); - - self.update_presence(); - } + pub fn shard_info(&self) -> [u64; 2] { self.shard_info } /// Returns the current connection stage of the shard. pub fn stage(&self) -> ConnectionStage { @@ -387,21 +357,24 @@ impl Shard { /// Returns a `GatewayError::OverloadedShard` if the shard would have too /// many guilds assigned to it. #[allow(cyclomatic_complexity)] - pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> { - match event { - Ok(GatewayEvent::Dispatch(seq, event)) => { + pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>) + -> Result<Option<ShardAction>> { + match *event { + Ok(GatewayEvent::Dispatch(seq, ref event)) => { if seq > self.seq + 1 { warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq); } - match event { + match *event { Event::Ready(ref ready) => { debug!("[Shard {:?}] Received Ready", self.shard_info); self.session_id = Some(ready.ready.session_id.clone()); self.stage = ConnectionStage::Connected; + /* set_client_timeout(&mut self.client)?; + */ }, Event::Resumed(_) => { info!("[Shard {:?}] Resumed", self.shard_info); @@ -421,7 +394,7 @@ impl Shard { self.seq = seq; - Ok(Some(event)) + Ok(None) }, Ok(GatewayEvent::Heartbeat(s)) => { info!("[Shard {:?}] Received shard heartbeat", self.shard_info); @@ -438,24 +411,18 @@ impl Shard { if self.stage == ConnectionStage::Handshake { self.stage = ConnectionStage::Identifying; - self.identify()?; + return Ok(Some(ShardAction::Identify)); } else { warn!( "[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting", self.shard_info ); - return self.autoreconnect().and(Ok(None)); + return Ok(Some(ShardAction::Autoreconnect)); } } - let map = json!({ - "d": Value::Null, - "op": OpCode::Heartbeat.num(), - }); - self.client.send_json(&map)?; - - Ok(None) + Ok(Some(ShardAction::Heartbeat)) }, Ok(GatewayEvent::HeartbeatAck) => { self.heartbeat_instants.1 = Some(Instant::now()); @@ -478,14 +445,14 @@ impl Shard { self.heartbeat_interval = Some(interval); } - if self.stage == ConnectionStage::Handshake { - self.identify().and(Ok(None)) + Ok(Some(if self.stage == ConnectionStage::Handshake { + ShardAction::Identify } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); - self.autoreconnect().and(Ok(None)) - } + ShardAction::Autoreconnect + })) }, Ok(GatewayEvent::InvalidateSession(resumable)) => { info!( @@ -493,16 +460,15 @@ impl Shard { self.shard_info, ); - if resumable { - self.resume().and(Ok(None)) + Ok(Some(if resumable { + ShardAction::Resume } else { - self.identify().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), - Err(Error::Gateway(GatewayError::Closed(data))) => { + Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)), + Err(Error::Gateway(GatewayError::Closed(ref data))) => { let num = data.as_ref().map(|d| d.status_code); - let reason = data.map(|d| d.reason); let clean = num == Some(1000); match num { @@ -562,7 +528,7 @@ impl Shard { "[Shard {:?}] Unknown unclean close {}: {:?}", self.shard_info, other, - reason + data.as_ref().map(|d| &d.reason), ); }, _ => {}, @@ -573,14 +539,14 @@ impl Shard { self.session_id.is_some() }).unwrap_or(true); - if resume { - self.resume().or_else(|_| self.reconnect()).and(Ok(None)) + Ok(Some(if resume { + ShardAction::Resume } else { - self.reconnect().and(Ok(None)) - } + ShardAction::Reconnect + })) }, - Err(Error::WebSocket(why)) => { - if let WebSocketError::NoDataAvailable = why { + Err(Error::WebSocket(ref why)) => { + if let WebSocketError::NoDataAvailable = *why { if self.heartbeat_instants.1.is_none() { return Ok(None); } @@ -592,44 +558,62 @@ impl Shard { info!("[Shard {:?}] Will attempt to auto-reconnect", self.shard_info); - self.autoreconnect().and(Ok(None)) + Ok(Some(ShardAction::Autoreconnect)) }, - Err(error) => Err(error), + _ => Ok(None), + } + } + + pub fn check_heartbeat(&mut self) -> Result<()> { + let wait = { + let heartbeat_interval = match self.heartbeat_interval { + Some(heartbeat_interval) => heartbeat_interval, + None => return Ok(()), + }; + + StdDuration::from_secs(heartbeat_interval / 1000) + }; + + // If a duration of time less than the heartbeat_interval has passed, + // then don't perform a keepalive or attempt to reconnect. + if let Some(last_sent) = self.heartbeat_instants.0 { + if last_sent.elapsed() <= wait { + return Ok(()); + } + } + + // If the last heartbeat didn't receive an acknowledgement, then + // auto-reconnect. + if !self.last_heartbeat_acknowledged { + debug!( + "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", + self.shard_info, + ); + + return self.reconnect().map_err(|why| { + warn!( + "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", + self.shard_info, + why, + ); + + why + }); + } + + // Otherwise, we're good to heartbeat. + if let Err(why) = self.heartbeat() { + warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); + + self.reconnect() + } else { + trace!("[Shard {:?}] Heartbeated", self.shard_info); + + Ok(()) } } /// Calculates the heartbeat latency between the shard and the gateway. - /// - /// # Examples - /// - /// When using the [`Client`], output the latency in response to a `"~ping"` - /// message handled through [`Client::on_message`]. - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::*; - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "~ping" { - /// if let Some(latency) = ctx.shard.lock().latency() { - /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos()); - /// - /// let _ = msg.channel_id.say(&s); - /// } else { - /// let _ = msg.channel_id.say("N/A"); - /// } - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Client`]: ../struct.Client.html - /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message // Shamelessly stolen from brayzure's commit in eris: // <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6> pub fn latency(&self) -> Option<StdDuration> { @@ -640,38 +624,68 @@ impl Shard { } } - /// Shuts down the receiver by attempting to cleanly close the - /// connection. - pub fn shutdown_clean(&mut self) -> Result<()> { - { - let data = CloseData { - status_code: 1000, - reason: String::new(), - }; - - let message = OwnedMessage::Close(Some(data)); - - self.client.send_message(&message)?; + #[cfg(feature = "voice")] + fn voice_dispatch(&mut self, event: &Event) { + if let Event::VoiceStateUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } } - let mut stream = self.client.stream_ref().as_tcp(); + if let Event::VoiceServerUpdate(ref update) = *event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + #[cfg(feature = "voice")] + pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> { + let mut messages = vec![]; - debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info); + while let Ok(v) = self.manager_rx.try_recv() { + messages.push(v); + } - Ok(()) + messages } - /// Uncleanly shuts down the receiver by not sending a close code. - pub fn shutdown(&mut self) -> Result<()> { - let mut stream = self.client.stream_ref().as_tcp(); + /// Performs a deterministic reconnect. + /// + /// The type of reconnect is deterministic on whether a [`session_id`]. + /// + /// If the `session_id` still exists, then a RESUME is sent. If not, then + /// an IDENTIFY is sent. + /// + /// Note that, if the shard is already in a stage of + /// [`ConnectionStage::Connecting`], then no action will be performed. + /// + /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting + /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id + pub fn autoreconnect(&mut self) -> Result<()> { + if self.stage == ConnectionStage::Connecting { + return Ok(()); + } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + if self.session_id().is_some() { + debug!( + "[Shard {:?}] Autoreconnector choosing to resume", + self.shard_info, + ); - Ok(()) + self.resume() + } else { + debug!( + "[Shard {:?}] Autoreconnector choosing to reconnect", + self.shard_info, + ); + + self.reconnect() + } } /// Requests that one or multiple [`Guild`]s be chunked. @@ -710,7 +724,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(2000), None); + /// shard.chunk_guilds(guild_ids, Some(2000), None); /// # Ok(()) /// # } /// # @@ -740,7 +754,7 @@ impl Shard { /// /// let guild_ids = vec![GuildId(81384788765712384)]; /// - /// shard.chunk_guilds(&guild_ids, Some(20), Some("do")); + /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); /// # Ok(()) /// # } /// # @@ -753,280 +767,40 @@ impl Shard { /// ../../model/event/enum.Event.html#variant.GuildMembersChunk /// [`Guild`]: ../../model/struct.Guild.html /// [`Member`]: ../../model/struct.Member.html - pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>) - where It: IntoIterator<Item=T> { + pub fn chunk_guilds<It>( + &mut self, + guild_ids: It, + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { debug!("[Shard {:?}] Requesting member chunks", self.shard_info); - let msg = json!({ - "op": OpCode::GetGuildMembers.num(), - "d": { - "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), - "limit": limit.unwrap_or(0), - "query": query.unwrap_or(""), - }, - }); - - let _ = self.client.send_json(&msg); - } - - /// Calculates the number of guilds that the shard is responsible for. - /// - /// If sharding is not being used (i.e. 1 shard), then the total number of - /// [`Guild`] in the [`Cache`] will be used. - /// - /// **Note**: Requires the `cache` feature be enabled. - /// - /// # Examples - /// - /// Retrieve the number of guilds a shard is responsible for: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string())); - /// # - /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// let info = shard.shard_info(); - /// let guilds = shard.guilds_handled(); - /// - /// println!("Shard {:?} is responsible for {} guilds", info, guilds); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`Cache`]: ../ext/cache/struct.Cache.html - /// [`Guild`]: ../model/struct.Guild.html - #[cfg(feature = "cache")] - pub fn guilds_handled(&self) -> u16 { - let cache = CACHE.read(); - - let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]); - - cache - .guilds - .keys() - .filter(|guild_id| { - utils::shard_id(guild_id.0, shard_count) == shard_id - }) - .count() as u16 - } - - #[cfg(feature = "voice")] - fn voice_dispatch(&mut self, event: &Event) { - if let Event::VoiceStateUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_state(&update.voice_state); - } - } - } - - if let Event::VoiceServerUpdate(ref update) = *event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_server(&update.endpoint, &update.token); - } - } - } - } - - #[cfg(feature = "voice")] - pub(crate) fn cycle_voice_recv(&mut self) { - if let Ok(v) = self.manager_rx.try_recv() { - if let Err(why) = self.client.send_json(&v) { - warn!("[Shard {:?}] Err sending voice msg: {:?}", - self.shard_info, - why); - } - } + self.client.send_chunk_guilds( + guild_ids, + &self.shard_info, + limit, + query, + ) } - pub(crate) fn heartbeat(&mut self) -> Result<()> { - let map = json!({ - "d": self.seq, - "op": OpCode::Heartbeat.num(), - }); - - trace!("[Shard {:?}] Sending heartbeat d: {}", - self.shard_info, - self.seq); - - match self.client.send_json(&map) { - Ok(_) => { - self.heartbeat_instants.0 = Some(Instant::now()); - self.last_heartbeat_acknowledged = false; - - trace!("[Shard {:?}] Successfully heartbeated", - self.shard_info); - - Ok(()) - }, - Err(why) => { - match why { - Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) { - debug!("[Shard {:?}] Err heartbeating: {:?}", - self.shard_info, - err); - }, - other => { - warn!("[Shard {:?}] Other err w/ keepalive: {:?}", - self.shard_info, - other); - }, - } - - Err(Error::Gateway(GatewayError::HeartbeatFailed)) - }, - } - } - - pub(crate) fn check_heartbeat(&mut self) -> Result<()> { - let heartbeat_interval = match self.heartbeat_interval { - Some(heartbeat_interval) => heartbeat_interval, - None => return Ok(()), - }; - - let wait = StdDuration::from_secs(heartbeat_interval / 1000); - - // If a duration of time less than the heartbeat_interval has passed, - // then don't perform a keepalive or attempt to reconnect. - if let Some(last_sent) = self.heartbeat_instants.0 { - if last_sent.elapsed() <= wait { - return Ok(()); - } - } - - // If the last heartbeat didn't receive an acknowledgement, then - // auto-reconnect. - if !self.last_heartbeat_acknowledged { - debug!( - "[Shard {:?}] Last heartbeat not acknowledged; re-connecting", - self.shard_info, - ); - - return self.reconnect().map_err(|why| { - warn!( - "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}", - self.shard_info, - why, - ); - - why - }); - } - - // Otherwise, we're good to heartbeat. - trace!("[Shard {:?}] Heartbeating", self.shard_info); - - if let Err(why) = self.heartbeat() { - warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why); - - self.reconnect() - } else { - trace!("[Shard {:?}] Heartbeated", self.shard_info); - self.heartbeat_instants.0 = Some(Instant::now()); - - Ok(()) - } - } - - pub(crate) fn autoreconnect(&mut self) -> Result<()> { - if self.stage == ConnectionStage::Connecting { - return Ok(()); - } - - if self.session_id.is_some() { - debug!("[Shard {:?}] Autoreconnector choosing to resume", - self.shard_info); - - self.resume() - } else { - debug!("[Shard {:?}] Autoreconnector choosing to reconnect", - self.shard_info); - - self.reconnect() - } - } - - /// Retrieves the `heartbeat_interval`. - #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option<u64> { - self.heartbeat_interval - } - - /// Retrieves the value of when the last heartbeat ack was received. - #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> { - self.heartbeat_instants.1 - } - - fn reconnect(&mut self) -> Result<()> { - info!("[Shard {:?}] Attempting to reconnect", self.shard_info); - self.reset(); - - self.initialize() - } - - // Attempts to send a RESUME message. - // - // # Examples - // - // Returns a `GatewayError::NoSessionId` is there is no `session_id`, - // indicating that the shard should instead [`reconnect`]. + // Sets the shard as going into identifying stage, which sets: // - // [`reconnect`]: #method.reconnect - fn resume(&mut self) -> Result<()> { - debug!("Shard {:?}] Attempting to resume", self.shard_info); - - self.initialize()?; - self.stage = ConnectionStage::Resuming; - - self.send_resume().or_else(|why| { - warn!("[Shard {:?}] Err sending resume: {:?}", - self.shard_info, - why); - - self.reconnect() - }) - } - - fn send_resume(&mut self) -> Result<()> { - let session_id = match self.session_id.clone() { - Some(session_id) => session_id, - None => return Err(Error::Gateway(GatewayError::NoSessionId)), - }; + // - the time that the last heartbeat sent as being now + // - the `stage` to `Identifying` + pub fn identify(&mut self) -> Result<()> { + self.client.send_identify(&self.shard_info, &self.token.lock())?; - debug!("[Shard {:?}] Sending resume; seq: {}", - self.shard_info, - self.seq); + self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; - self.client.send_json(&json!({ - "op": OpCode::Resume.num(), - "d": { - "session_id": session_id, - "seq": self.seq, - "token": &*self.token.lock(), - }, - })) + Ok(()) } /// Initializes a new WebSocket client. /// /// This will set the stage of the shard before and after instantiation of /// the client. - fn initialize(&mut self) -> Result<()> { + pub fn initialize(&mut self) -> Result<WsClient> { debug!("[Shard {:?}] Initializing", self.shard_info); // We need to do two, sort of three things here: @@ -1038,38 +812,15 @@ impl Shard { // This is used to accurately assess whether the state of the shard is // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; - self.client = connect(&self.ws_url.lock())?; + let mut client = connect(&self.ws_url.lock())?; self.stage = ConnectionStage::Handshake; - Ok(()) - } - - fn identify(&mut self) -> Result<()> { - let identification = json!({ - "op": OpCode::Identify.num(), - "d": { - "compression": true, - "large_threshold": constants::LARGE_THRESHOLD, - "shard": self.shard_info, - "token": &*self.token.lock(), - "v": constants::GATEWAY_VERSION, - "properties": { - "$browser": "serenity", - "$device": "serenity", - "$os": consts::OS, - }, - }, - }); - - self.heartbeat_instants.0 = Some(Instant::now()); - self.stage = ConnectionStage::Identifying; + let _ = set_client_timeout(&mut client); - debug!("[Shard {:?}] Identifying", self.shard_info); - - self.client.send_json(&identification) + Ok(client) } - fn reset(&mut self) { + pub fn reset(&mut self) { self.heartbeat_instants = (Some(Instant::now()), None); self.heartbeat_interval = None; self.last_heartbeat_acknowledged = true; @@ -1078,42 +829,39 @@ impl Shard { self.seq = 0; } - fn update_presence(&mut self) { - let (ref game, status) = self.current_presence; - let now = Utc::now().timestamp() as u64; - - let msg = json!({ - "op": OpCode::StatusUpdate.num(), - "d": { - "afk": false, - "since": now, - "status": status.name(), - "game": game.as_ref().map(|x| json!({ - "name": x.name, - "type": x.kind, - "url": x.url, - })), - }, - }); + pub fn resume(&mut self) -> Result<()> { + debug!("Shard {:?}] Attempting to resume", self.shard_info); - debug!("[Shard {:?}] Sending presence update", self.shard_info); + self.client = self.initialize()?; + self.stage = ConnectionStage::Resuming; - if let Err(why) = self.client.send_json(&msg) { - warn!("[Shard {:?}] Err sending presence update: {:?}", - self.shard_info, - why); + match self.session_id.as_ref() { + Some(session_id) => { + self.client.send_resume( + &self.shard_info, + session_id, + &self.seq, + &self.token.lock(), + ) + }, + None => Err(Error::Gateway(GatewayError::NoSessionId)), } + } - #[cfg(feature = "cache")] - { - let mut cache = CACHE.write(); - let current_user_id = cache.user.id; + pub fn reconnect(&mut self) -> Result<()> { + info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); - cache.presences.get_mut(¤t_user_id).map(|presence| { - presence.game = game.clone(); - presence.last_modified = Some(now); - }); - } + self.reset(); + self.client = self.initialize()?; + + Ok(()) + } + + pub fn update_presence(&mut self) -> Result<()> { + self.client.send_presence_update( + &self.shard_info, + &self.current_presence, + ) } } @@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> { Error::Gateway(GatewayError::BuildingUrl) }) } - -/// Tries to connect and upon failure, retries. -fn connecting(uri: &str) -> WsClient { - let waiting_time = 30; - - loop { - match connect(&uri) { - Ok(client) => return client, - Err(why) => { - warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time); - thread::sleep(StdDuration::from_secs(waiting_time)); - }, - }; - } -} diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs new file mode 100644 index 0000000..873d114 --- /dev/null +++ b/src/gateway/ws_client_ext.rs @@ -0,0 +1,133 @@ +use chrono::Utc; +use constants::{self, OpCode}; +use gateway::{CurrentPresence, WsClient}; +use internal::prelude::*; +use internal::ws_impl::SenderExt; +use model::GuildId; +use std::env::consts; + +pub trait WebSocketGatewayClientExt { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId>; + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()>; + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()>; + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()>; + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()>; +} + +impl WebSocketGatewayClientExt for WsClient { + fn send_chunk_guilds<It>( + &mut self, + guild_ids: It, + shard_info: &[u64; 2], + limit: Option<u16>, + query: Option<&str>, + ) -> Result<()> where It: IntoIterator<Item=GuildId> { + debug!("[Shard {:?}] Requesting member chunks", shard_info); + + self.send_json(&json!({ + "op": OpCode::GetGuildMembers.num(), + "d": { + "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(), + "limit": limit.unwrap_or(0), + "query": query.unwrap_or(""), + }, + })).map_err(From::from) + } + + fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>) + -> Result<()> { + trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq); + + self.send_json(&json!({ + "d": seq, + "op": OpCode::Heartbeat.num(), + })).map_err(From::from) + } + + fn send_identify(&mut self, shard_info: &[u64; 2], token: &str) + -> Result<()> { + debug!("[Shard {:?}] Identifying", shard_info); + + self.send_json(&json!({ + "op": OpCode::Identify.num(), + "d": { + "compression": true, + "large_threshold": constants::LARGE_THRESHOLD, + "shard": shard_info, + "token": token, + "v": constants::GATEWAY_VERSION, + "properties": { + "$browser": "serenity", + "$device": "serenity", + "$os": consts::OS, + }, + }, + })) + } + + fn send_presence_update( + &mut self, + shard_info: &[u64; 2], + current_presence: &CurrentPresence, + ) -> Result<()> { + let &(ref game, ref status) = current_presence; + let now = Utc::now().timestamp() as u64; + + debug!("[Shard {:?}] Sending presence update", shard_info); + + self.send_json(&json!({ + "op": OpCode::StatusUpdate.num(), + "d": { + "afk": false, + "since": now, + "status": status.name(), + "game": game.as_ref().map(|x| json!({ + "name": x.name, + "type": x.kind, + "url": x.url, + })), + }, + })) + } + + fn send_resume( + &mut self, + shard_info: &[u64; 2], + session_id: &str, + seq: &u64, + token: &str, + ) -> Result<()> { + debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq); + + self.send_json(&json!({ + "op": OpCode::Resume.num(), + "d": { + "session_id": session_id, + "seq": seq, + "token": token, + }, + })).map_err(From::from) + } +} |