aboutsummaryrefslogtreecommitdiff
path: root/src/gateway
diff options
context:
space:
mode:
Diffstat (limited to 'src/gateway')
-rw-r--r--src/gateway/mod.rs17
-rw-r--r--src/gateway/shard.rs889
-rw-r--r--src/gateway/ws_client_ext.rs133
3 files changed, 461 insertions, 578 deletions
diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs
index b593f5c..147808e 100644
--- a/src/gateway/mod.rs
+++ b/src/gateway/mod.rs
@@ -51,9 +51,18 @@
mod error;
mod shard;
+mod ws_client_ext;
pub use self::error::Error as GatewayError;
pub use self::shard::Shard;
+pub use self::ws_client_ext::WebSocketGatewayClientExt;
+
+use model::{Game, OnlineStatus};
+use websocket::sync::client::Client;
+use websocket::sync::stream::{TcpStream, TlsStream};
+
+pub type CurrentPresence = (Option<Game>, OnlineStatus);
+pub type WsClient = Client<TlsStream<TcpStream>>;
/// Indicates the current connection stage of a [`Shard`].
///
@@ -136,3 +145,11 @@ impl ConnectionStage {
}
}
}
+
+pub enum ShardAction {
+ Autoreconnect,
+ Heartbeat,
+ Identify,
+ Reconnect,
+ Resume,
+}
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index c644eac..ef05cf4 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -1,39 +1,31 @@
-use chrono::Utc;
use parking_lot::Mutex;
-use serde_json::Value;
-use std::env::consts;
-use std::io::Write;
-use std::net::Shutdown;
use std::sync::Arc;
use std::time::{Duration as StdDuration, Instant};
-use std::thread;
-use super::{ConnectionStage, GatewayError};
+use super::{
+ ConnectionStage,
+ CurrentPresence,
+ ShardAction,
+ GatewayError,
+ WsClient,
+ WebSocketGatewayClientExt,
+};
use websocket::client::Url;
-use websocket::message::{CloseData, OwnedMessage};
use websocket::stream::sync::AsTcpStream;
-use websocket::sync::client::{Client, ClientBuilder};
-use websocket::sync::stream::{TcpStream, TlsStream};
+use websocket::sync::client::ClientBuilder;
use websocket::WebSocketError;
-use constants::{self, close_codes, OpCode};
+use constants::{self, close_codes};
use internal::prelude::*;
-use internal::ws_impl::SenderExt;
use model::event::{Event, GatewayEvent};
use model::{Game, GuildId, OnlineStatus};
#[cfg(feature = "voice")]
+use serde_json::Value;
+#[cfg(feature = "voice")]
use std::sync::mpsc::{self, Receiver as MpscReceiver};
-#[cfg(feature = "cache")]
-use client::CACHE;
#[cfg(feature = "voice")]
use voice::Manager as VoiceManager;
#[cfg(feature = "voice")]
use http;
-#[cfg(feature = "cache")]
-use utils;
-
-pub type WsClient = Client<TlsStream<TcpStream>>;
-
-type CurrentPresence = (Option<Game>, OnlineStatus);
/// A Shard is a higher-level handler for a websocket connection to Discord's
/// gateway. The shard allows for sending and receiving messages over the
@@ -91,7 +83,7 @@ pub struct Shard {
session_id: Option<String>,
shard_info: [u64; 2],
stage: ConnectionStage,
- token: Arc<Mutex<String>>,
+ pub token: Arc<Mutex<String>>,
ws_url: Arc<Mutex<String>>,
/// The voice connections that this Shard is responsible for. The Shard will
/// update the voice connections' states.
@@ -138,11 +130,14 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn new(ws_url: Arc<Mutex<String>>,
- token: Arc<Mutex<String>>,
- shard_info: [u64; 2])
- -> Result<Shard> {
- let client = connecting(&*ws_url.lock());
+ pub fn new(
+ ws_url: Arc<Mutex<String>>,
+ token: Arc<Mutex<String>>,
+ shard_info: [u64; 2],
+ ) -> Result<Shard> {
+ let mut client = connect(&*ws_url.lock())?;
+
+ let _ = set_client_timeout(&mut client);
let current_presence = (None, OnlineStatus::Online);
let heartbeat_instants = (None, None);
@@ -159,6 +154,8 @@ impl Shard {
let user = http::get_current_user()?;
Shard {
+ manager: VoiceManager::new(tx, user.id),
+ manager_rx: rx,
client,
current_presence,
heartbeat_instants,
@@ -170,8 +167,6 @@ impl Shard {
shard_info,
session_id,
ws_url,
- manager: VoiceManager::new(tx, user.id),
- manager_rx: rx,
}
} else {
Shard {
@@ -191,137 +186,122 @@ impl Shard {
})
}
- /// Retrieves a copy of the current shard information.
- ///
- /// The first element is the _current_ shard - 0-indexed - while the second
- /// element is the _total number_ of shards -- 1-indexed.
- ///
- /// For example, if using 3 shards in total, and if this is shard 1, then it
- /// can be read as "the second of three shards".
- ///
- /// # Examples
- ///
- /// Retrieving the shard info for the second shard, out of two shards total:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
- /// #
- /// assert_eq!(shard.shard_info(), [1, 2]);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
+ /// Retrieves the current presence of the shard.
+ #[inline]
+ pub fn current_presence(&self) -> &CurrentPresence {
+ &self.current_presence
+ }
- /// Sets the user's current game, if any.
+ /// Retrieves the heartbeat instants of the shard.
///
- /// Other presence settings are maintained.
+ /// This is the time of when a heartbeat was sent and when an
+ /// acknowledgement was last received.
+ #[inline]
+ pub fn heartbeat_instants(&self) -> &(Option<Instant>, Option<Instant>) {
+ &self.heartbeat_instants
+ }
+
+ /// Retrieves the value of when the last heartbeat was sent.
+ #[inline]
+ pub fn last_heartbeat_sent(&self) -> Option<&Instant> {
+ self.heartbeat_instants.0.as_ref()
+ }
+
+ /// Retrieves the value of when the last heartbeat ack was received.
+ #[inline]
+ pub fn last_heartbeat_ack(&self) -> Option<&Instant> {
+ self.heartbeat_instants.1.as_ref()
+ }
+
+ /// Sends a heartbeat to the gateway with the current sequence.
///
- /// # Examples
+ /// This sets the last heartbeat time to now, and
+ /// `last_heartbeat_acknowledged` to `false`.
///
- /// Setting the current game to playing `"Heroes of the Storm"`:
+ /// # Errors
///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::Game;
+ /// Returns [`GatewayError::HeartbeatFailed`] if there was an error sending
+ /// a heartbeat.
///
- /// shard.set_game(Some(Game::playing("Heroes of the Storm")));
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
+ /// [`GatewayError::HeartbeatFailed`]: enum.GatewayError.html#variant.HeartbeatFailed
+ pub fn heartbeat(&mut self) -> Result<()> {
+ match self.client.send_heartbeat(&self.shard_info, Some(self.seq)) {
+ Ok(()) => {
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.last_heartbeat_acknowledged = false;
+
+ Ok(())
+ },
+ Err(why) => {
+ match why {
+ Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
+ debug!("[Shard {:?}] Err heartbeating: {:?}",
+ self.shard_info,
+ err);
+ },
+ other => {
+ warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
+ self.shard_info,
+ other);
+ },
+ }
+
+ Err(Error::Gateway(GatewayError::HeartbeatFailed))
+ }
+ }
+ }
+
+ #[inline]
+ pub fn heartbeat_interval(&self) -> Option<&u64> {
+ self.heartbeat_interval.as_ref()
+ }
+
+ #[inline]
+ pub fn last_heartbeat_acknowledged(&self) -> bool {
+ self.last_heartbeat_acknowledged
+ }
+
+ #[inline]
+ pub fn seq(&self) -> u64 {
+ self.seq
+ }
+
+ #[inline]
+ pub fn session_id(&self) -> Option<&String> {
+ self.session_id.as_ref()
+ }
+
+ #[inline]
pub fn set_game(&mut self, game: Option<Game>) {
self.current_presence.0 = game;
+ }
- self.update_presence();
+ #[inline]
+ pub fn set_presence(&mut self, status: OnlineStatus, game: Option<Game>) {
+ self.set_game(game);
+ self.set_status(status);
}
- /// Sets the user's current online status.
- ///
- /// Note that [`Offline`] is not a valid online status, so it is
- /// automatically converted to [`Invisible`].
- ///
- /// Other presence settings are maintained.
- ///
- /// # Examples
- ///
- /// Setting the current online status for the shard to [`DoNotDisturb`].
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("".to_string()));
- /// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// use serenity::model::OnlineStatus;
- ///
- /// shard.set_status(OnlineStatus::DoNotDisturb);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`DoNotDisturb`]: ../../model/enum.OnlineStatus.html#variant.DoNotDisturb
- /// [`Invisible`]: ../../model/enum.OnlineStatus.html#variant.Invisible
- /// [`Offline`]: ../../model/enum.OnlineStatus.html#variant.Offline
- pub fn set_status(&mut self, online_status: OnlineStatus) {
- self.current_presence.1 = match online_status {
- OnlineStatus::Offline => OnlineStatus::Invisible,
- other => other,
- };
+ #[inline]
+ pub fn set_status(&mut self, mut status: OnlineStatus) {
+ if status == OnlineStatus::Offline {
+ status = OnlineStatus::Invisible;
+ }
- self.update_presence();
+ self.current_presence.1 = status;
}
- /// Sets the user's full presence information.
+ /// Retrieves a copy of the current shard information.
///
- /// Consider using the individual setters if you only need to modify one of
- /// these.
+ /// The first element is the _current_ shard - 0-indexed - while the second
+ /// element is the _total number_ of shards -- 1-indexed.
+ ///
+ /// For example, if using 3 shards in total, and if this is shard 1, then it
+ /// can be read as "the second of three shards".
///
/// # Examples
///
- /// Set the current user as playing `"Heroes of the Storm"` and being
- /// online:
+ /// Retrieving the shard info for the second shard, out of two shards total:
///
/// ```rust,no_run
/// # extern crate parking_lot;
@@ -335,11 +315,9 @@ impl Shard {
/// # fn try_main() -> Result<(), Box<Error>> {
/// # let mutex = Arc::new(Mutex::new("".to_string()));
/// #
- /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
+ /// # let shard = Shard::new(mutex.clone(), mutex, [1, 2]).unwrap();
/// #
- /// use serenity::model::{Game, OnlineStatus};
- ///
- /// shard.set_presence(Some(Game::playing("Heroes of the Storm")), OnlineStatus::Online);
+ /// assert_eq!(shard.shard_info(), [1, 2]);
/// # Ok(())
/// # }
/// #
@@ -347,15 +325,7 @@ impl Shard {
/// # try_main().unwrap();
/// # }
/// ```
- pub fn set_presence(&mut self, game: Option<Game>, mut status: OnlineStatus) {
- if status == OnlineStatus::Offline {
- status = OnlineStatus::Invisible;
- }
-
- self.current_presence = (game, status);
-
- self.update_presence();
- }
+ pub fn shard_info(&self) -> [u64; 2] { self.shard_info }
/// Returns the current connection stage of the shard.
pub fn stage(&self) -> ConnectionStage {
@@ -387,21 +357,24 @@ impl Shard {
/// Returns a `GatewayError::OverloadedShard` if the shard would have too
/// many guilds assigned to it.
#[allow(cyclomatic_complexity)]
- pub(crate) fn handle_event(&mut self, event: Result<GatewayEvent>) -> Result<Option<Event>> {
- match event {
- Ok(GatewayEvent::Dispatch(seq, event)) => {
+ pub(crate) fn handle_event(&mut self, event: &Result<GatewayEvent>)
+ -> Result<Option<ShardAction>> {
+ match *event {
+ Ok(GatewayEvent::Dispatch(seq, ref event)) => {
if seq > self.seq + 1 {
warn!("[Shard {:?}] Heartbeat off; them: {}, us: {}", self.shard_info, seq, self.seq);
}
- match event {
+ match *event {
Event::Ready(ref ready) => {
debug!("[Shard {:?}] Received Ready", self.shard_info);
self.session_id = Some(ready.ready.session_id.clone());
self.stage = ConnectionStage::Connected;
+ /*
set_client_timeout(&mut self.client)?;
+ */
},
Event::Resumed(_) => {
info!("[Shard {:?}] Resumed", self.shard_info);
@@ -421,7 +394,7 @@ impl Shard {
self.seq = seq;
- Ok(Some(event))
+ Ok(None)
},
Ok(GatewayEvent::Heartbeat(s)) => {
info!("[Shard {:?}] Received shard heartbeat", self.shard_info);
@@ -438,24 +411,18 @@ impl Shard {
if self.stage == ConnectionStage::Handshake {
self.stage = ConnectionStage::Identifying;
- self.identify()?;
+ return Ok(Some(ShardAction::Identify));
} else {
warn!(
"[Shard {:?}] Heartbeat during non-Handshake; auto-reconnecting",
self.shard_info
);
- return self.autoreconnect().and(Ok(None));
+ return Ok(Some(ShardAction::Autoreconnect));
}
}
- let map = json!({
- "d": Value::Null,
- "op": OpCode::Heartbeat.num(),
- });
- self.client.send_json(&map)?;
-
- Ok(None)
+ Ok(Some(ShardAction::Heartbeat))
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());
@@ -478,14 +445,14 @@ impl Shard {
self.heartbeat_interval = Some(interval);
}
- if self.stage == ConnectionStage::Handshake {
- self.identify().and(Ok(None))
+ Ok(Some(if self.stage == ConnectionStage::Handshake {
+ ShardAction::Identify
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);
- self.autoreconnect().and(Ok(None))
- }
+ ShardAction::Autoreconnect
+ }))
},
Ok(GatewayEvent::InvalidateSession(resumable)) => {
info!(
@@ -493,16 +460,15 @@ impl Shard {
self.shard_info,
);
- if resumable {
- self.resume().and(Ok(None))
+ Ok(Some(if resumable {
+ ShardAction::Resume
} else {
- self.identify().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)),
- Err(Error::Gateway(GatewayError::Closed(data))) => {
+ Ok(GatewayEvent::Reconnect) => Ok(Some(ShardAction::Reconnect)),
+ Err(Error::Gateway(GatewayError::Closed(ref data))) => {
let num = data.as_ref().map(|d| d.status_code);
- let reason = data.map(|d| d.reason);
let clean = num == Some(1000);
match num {
@@ -562,7 +528,7 @@ impl Shard {
"[Shard {:?}] Unknown unclean close {}: {:?}",
self.shard_info,
other,
- reason
+ data.as_ref().map(|d| &d.reason),
);
},
_ => {},
@@ -573,14 +539,14 @@ impl Shard {
self.session_id.is_some()
}).unwrap_or(true);
- if resume {
- self.resume().or_else(|_| self.reconnect()).and(Ok(None))
+ Ok(Some(if resume {
+ ShardAction::Resume
} else {
- self.reconnect().and(Ok(None))
- }
+ ShardAction::Reconnect
+ }))
},
- Err(Error::WebSocket(why)) => {
- if let WebSocketError::NoDataAvailable = why {
+ Err(Error::WebSocket(ref why)) => {
+ if let WebSocketError::NoDataAvailable = *why {
if self.heartbeat_instants.1.is_none() {
return Ok(None);
}
@@ -592,44 +558,62 @@ impl Shard {
info!("[Shard {:?}] Will attempt to auto-reconnect",
self.shard_info);
- self.autoreconnect().and(Ok(None))
+ Ok(Some(ShardAction::Autoreconnect))
},
- Err(error) => Err(error),
+ _ => Ok(None),
+ }
+ }
+
+ pub fn check_heartbeat(&mut self) -> Result<()> {
+ let wait = {
+ let heartbeat_interval = match self.heartbeat_interval {
+ Some(heartbeat_interval) => heartbeat_interval,
+ None => return Ok(()),
+ };
+
+ StdDuration::from_secs(heartbeat_interval / 1000)
+ };
+
+ // If a duration of time less than the heartbeat_interval has passed,
+ // then don't perform a keepalive or attempt to reconnect.
+ if let Some(last_sent) = self.heartbeat_instants.0 {
+ if last_sent.elapsed() <= wait {
+ return Ok(());
+ }
+ }
+
+ // If the last heartbeat didn't receive an acknowledgement, then
+ // auto-reconnect.
+ if !self.last_heartbeat_acknowledged {
+ debug!(
+ "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
+ self.shard_info,
+ );
+
+ return self.reconnect().map_err(|why| {
+ warn!(
+ "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
+ self.shard_info,
+ why,
+ );
+
+ why
+ });
+ }
+
+ // Otherwise, we're good to heartbeat.
+ if let Err(why) = self.heartbeat() {
+ warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
+
+ self.reconnect()
+ } else {
+ trace!("[Shard {:?}] Heartbeated", self.shard_info);
+
+ Ok(())
}
}
/// Calculates the heartbeat latency between the shard and the gateway.
- ///
- /// # Examples
- ///
- /// When using the [`Client`], output the latency in response to a `"~ping"`
- /// message handled through [`Client::on_message`].
- ///
- /// ```rust,no_run
- /// # use serenity::prelude::*;
- /// # use serenity::model::*;
- /// struct Handler;
- ///
- /// impl EventHandler for Handler {
- /// fn message(&self, ctx: Context, msg: Message) {
- /// if msg.content == "~ping" {
- /// if let Some(latency) = ctx.shard.lock().latency() {
- /// let s = format!("{}.{}s", latency.as_secs(), latency.subsec_nanos());
- ///
- /// let _ = msg.channel_id.say(&s);
- /// } else {
- /// let _ = msg.channel_id.say("N/A");
- /// }
- /// }
- /// }
- /// }
- /// let mut client = Client::new("token", Handler).unwrap();
- ///
- /// client.start().unwrap();
- /// ```
- ///
- /// [`Client`]: ../struct.Client.html
- /// [`EventHandler::on_message`]: ../event_handler/trait.EventHandler.html#method.on_message
// Shamelessly stolen from brayzure's commit in eris:
// <https://github.com/abalabahaha/eris/commit/0ce296ae9a542bcec0edf1c999ee2d9986bed5a6>
pub fn latency(&self) -> Option<StdDuration> {
@@ -640,38 +624,68 @@ impl Shard {
}
}
- /// Shuts down the receiver by attempting to cleanly close the
- /// connection.
- pub fn shutdown_clean(&mut self) -> Result<()> {
- {
- let data = CloseData {
- status_code: 1000,
- reason: String::new(),
- };
-
- let message = OwnedMessage::Close(Some(data));
-
- self.client.send_message(&message)?;
+ #[cfg(feature = "voice")]
+ fn voice_dispatch(&mut self, event: &Event) {
+ if let Event::VoiceStateUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_state(&update.voice_state);
+ }
+ }
}
- let mut stream = self.client.stream_ref().as_tcp();
+ if let Event::VoiceServerUpdate(ref update) = *event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_server(&update.endpoint, &update.token);
+ }
+ }
+ }
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ #[cfg(feature = "voice")]
+ pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> {
+ let mut messages = vec![];
- debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info);
+ while let Ok(v) = self.manager_rx.try_recv() {
+ messages.push(v);
+ }
- Ok(())
+ messages
}
- /// Uncleanly shuts down the receiver by not sending a close code.
- pub fn shutdown(&mut self) -> Result<()> {
- let mut stream = self.client.stream_ref().as_tcp();
+ /// Performs a deterministic reconnect.
+ ///
+ /// The type of reconnect is deterministic on whether a [`session_id`].
+ ///
+ /// If the `session_id` still exists, then a RESUME is sent. If not, then
+ /// an IDENTIFY is sent.
+ ///
+ /// Note that, if the shard is already in a stage of
+ /// [`ConnectionStage::Connecting`], then no action will be performed.
+ ///
+ /// [`ConnectionStage::Connecting`]: ../../../gateway/enum.ConnectionStage.html#variant.Connecting
+ /// [`session_id`]: ../../../gateway/struct.Shard.html#method.session_id
+ pub fn autoreconnect(&mut self) -> Result<()> {
+ if self.stage == ConnectionStage::Connecting {
+ return Ok(());
+ }
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ if self.session_id().is_some() {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to resume",
+ self.shard_info,
+ );
- Ok(())
+ self.resume()
+ } else {
+ debug!(
+ "[Shard {:?}] Autoreconnector choosing to reconnect",
+ self.shard_info,
+ );
+
+ self.reconnect()
+ }
}
/// Requests that one or multiple [`Guild`]s be chunked.
@@ -710,7 +724,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(2000), None);
+ /// shard.chunk_guilds(guild_ids, Some(2000), None);
/// # Ok(())
/// # }
/// #
@@ -740,7 +754,7 @@ impl Shard {
///
/// let guild_ids = vec![GuildId(81384788765712384)];
///
- /// shard.chunk_guilds(&guild_ids, Some(20), Some("do"));
+ /// shard.chunk_guilds(guild_ids, Some(20), Some("do"));
/// # Ok(())
/// # }
/// #
@@ -753,280 +767,40 @@ impl Shard {
/// ../../model/event/enum.Event.html#variant.GuildMembersChunk
/// [`Guild`]: ../../model/struct.Guild.html
/// [`Member`]: ../../model/struct.Member.html
- pub fn chunk_guilds<T: AsRef<GuildId>, It>(&mut self, guild_ids: It, limit: Option<u16>, query: Option<&str>)
- where It: IntoIterator<Item=T> {
+ pub fn chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);
- let msg = json!({
- "op": OpCode::GetGuildMembers.num(),
- "d": {
- "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
- "limit": limit.unwrap_or(0),
- "query": query.unwrap_or(""),
- },
- });
-
- let _ = self.client.send_json(&msg);
- }
-
- /// Calculates the number of guilds that the shard is responsible for.
- ///
- /// If sharding is not being used (i.e. 1 shard), then the total number of
- /// [`Guild`] in the [`Cache`] will be used.
- ///
- /// **Note**: Requires the `cache` feature be enabled.
- ///
- /// # Examples
- ///
- /// Retrieve the number of guilds a shard is responsible for:
- ///
- /// ```rust,no_run
- /// # extern crate parking_lot;
- /// # extern crate serenity;
- /// #
- /// # use parking_lot::Mutex;
- /// # use serenity::client::gateway::Shard;
- /// # use std::error::Error;
- /// # use std::sync::Arc;
- /// #
- /// # fn try_main() -> Result<(), Box<Error>> {
- /// # let mutex = Arc::new(Mutex::new("will anyone read this".to_string()));
- /// #
- /// # let shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap();
- /// #
- /// let info = shard.shard_info();
- /// let guilds = shard.guilds_handled();
- ///
- /// println!("Shard {:?} is responsible for {} guilds", info, guilds);
- /// # Ok(())
- /// # }
- /// #
- /// # fn main() {
- /// # try_main().unwrap();
- /// # }
- /// ```
- ///
- /// [`Cache`]: ../ext/cache/struct.Cache.html
- /// [`Guild`]: ../model/struct.Guild.html
- #[cfg(feature = "cache")]
- pub fn guilds_handled(&self) -> u16 {
- let cache = CACHE.read();
-
- let (shard_id, shard_count) = (self.shard_info[0], self.shard_info[1]);
-
- cache
- .guilds
- .keys()
- .filter(|guild_id| {
- utils::shard_id(guild_id.0, shard_count) == shard_id
- })
- .count() as u16
- }
-
- #[cfg(feature = "voice")]
- fn voice_dispatch(&mut self, event: &Event) {
- if let Event::VoiceStateUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_state(&update.voice_state);
- }
- }
- }
-
- if let Event::VoiceServerUpdate(ref update) = *event {
- if let Some(guild_id) = update.guild_id {
- if let Some(handler) = self.manager.get(guild_id) {
- handler.update_server(&update.endpoint, &update.token);
- }
- }
- }
- }
-
- #[cfg(feature = "voice")]
- pub(crate) fn cycle_voice_recv(&mut self) {
- if let Ok(v) = self.manager_rx.try_recv() {
- if let Err(why) = self.client.send_json(&v) {
- warn!("[Shard {:?}] Err sending voice msg: {:?}",
- self.shard_info,
- why);
- }
- }
+ self.client.send_chunk_guilds(
+ guild_ids,
+ &self.shard_info,
+ limit,
+ query,
+ )
}
- pub(crate) fn heartbeat(&mut self) -> Result<()> {
- let map = json!({
- "d": self.seq,
- "op": OpCode::Heartbeat.num(),
- });
-
- trace!("[Shard {:?}] Sending heartbeat d: {}",
- self.shard_info,
- self.seq);
-
- match self.client.send_json(&map) {
- Ok(_) => {
- self.heartbeat_instants.0 = Some(Instant::now());
- self.last_heartbeat_acknowledged = false;
-
- trace!("[Shard {:?}] Successfully heartbeated",
- self.shard_info);
-
- Ok(())
- },
- Err(why) => {
- match why {
- Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
- debug!("[Shard {:?}] Err heartbeating: {:?}",
- self.shard_info,
- err);
- },
- other => {
- warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
- self.shard_info,
- other);
- },
- }
-
- Err(Error::Gateway(GatewayError::HeartbeatFailed))
- },
- }
- }
-
- pub(crate) fn check_heartbeat(&mut self) -> Result<()> {
- let heartbeat_interval = match self.heartbeat_interval {
- Some(heartbeat_interval) => heartbeat_interval,
- None => return Ok(()),
- };
-
- let wait = StdDuration::from_secs(heartbeat_interval / 1000);
-
- // If a duration of time less than the heartbeat_interval has passed,
- // then don't perform a keepalive or attempt to reconnect.
- if let Some(last_sent) = self.heartbeat_instants.0 {
- if last_sent.elapsed() <= wait {
- return Ok(());
- }
- }
-
- // If the last heartbeat didn't receive an acknowledgement, then
- // auto-reconnect.
- if !self.last_heartbeat_acknowledged {
- debug!(
- "[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
- self.shard_info,
- );
-
- return self.reconnect().map_err(|why| {
- warn!(
- "[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
- self.shard_info,
- why,
- );
-
- why
- });
- }
-
- // Otherwise, we're good to heartbeat.
- trace!("[Shard {:?}] Heartbeating", self.shard_info);
-
- if let Err(why) = self.heartbeat() {
- warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);
-
- self.reconnect()
- } else {
- trace!("[Shard {:?}] Heartbeated", self.shard_info);
- self.heartbeat_instants.0 = Some(Instant::now());
-
- Ok(())
- }
- }
-
- pub(crate) fn autoreconnect(&mut self) -> Result<()> {
- if self.stage == ConnectionStage::Connecting {
- return Ok(());
- }
-
- if self.session_id.is_some() {
- debug!("[Shard {:?}] Autoreconnector choosing to resume",
- self.shard_info);
-
- self.resume()
- } else {
- debug!("[Shard {:?}] Autoreconnector choosing to reconnect",
- self.shard_info);
-
- self.reconnect()
- }
- }
-
- /// Retrieves the `heartbeat_interval`.
- #[inline]
- pub(crate) fn heartbeat_interval(&self) -> Option<u64> {
- self.heartbeat_interval
- }
-
- /// Retrieves the value of when the last heartbeat ack was received.
- #[inline]
- pub(crate) fn last_heartbeat_ack(&self) -> Option<Instant> {
- self.heartbeat_instants.1
- }
-
- fn reconnect(&mut self) -> Result<()> {
- info!("[Shard {:?}] Attempting to reconnect", self.shard_info);
- self.reset();
-
- self.initialize()
- }
-
- // Attempts to send a RESUME message.
- //
- // # Examples
- //
- // Returns a `GatewayError::NoSessionId` is there is no `session_id`,
- // indicating that the shard should instead [`reconnect`].
+ // Sets the shard as going into identifying stage, which sets:
//
- // [`reconnect`]: #method.reconnect
- fn resume(&mut self) -> Result<()> {
- debug!("Shard {:?}] Attempting to resume", self.shard_info);
-
- self.initialize()?;
- self.stage = ConnectionStage::Resuming;
-
- self.send_resume().or_else(|why| {
- warn!("[Shard {:?}] Err sending resume: {:?}",
- self.shard_info,
- why);
-
- self.reconnect()
- })
- }
-
- fn send_resume(&mut self) -> Result<()> {
- let session_id = match self.session_id.clone() {
- Some(session_id) => session_id,
- None => return Err(Error::Gateway(GatewayError::NoSessionId)),
- };
+ // - the time that the last heartbeat sent as being now
+ // - the `stage` to `Identifying`
+ pub fn identify(&mut self) -> Result<()> {
+ self.client.send_identify(&self.shard_info, &self.token.lock())?;
- debug!("[Shard {:?}] Sending resume; seq: {}",
- self.shard_info,
- self.seq);
+ self.heartbeat_instants.0 = Some(Instant::now());
+ self.stage = ConnectionStage::Identifying;
- self.client.send_json(&json!({
- "op": OpCode::Resume.num(),
- "d": {
- "session_id": session_id,
- "seq": self.seq,
- "token": &*self.token.lock(),
- },
- }))
+ Ok(())
}
/// Initializes a new WebSocket client.
///
/// This will set the stage of the shard before and after instantiation of
/// the client.
- fn initialize(&mut self) -> Result<()> {
+ pub fn initialize(&mut self) -> Result<WsClient> {
debug!("[Shard {:?}] Initializing", self.shard_info);
// We need to do two, sort of three things here:
@@ -1038,38 +812,15 @@ impl Shard {
// This is used to accurately assess whether the state of the shard is
// accurate when a Hello is received.
self.stage = ConnectionStage::Connecting;
- self.client = connect(&self.ws_url.lock())?;
+ let mut client = connect(&self.ws_url.lock())?;
self.stage = ConnectionStage::Handshake;
- Ok(())
- }
-
- fn identify(&mut self) -> Result<()> {
- let identification = json!({
- "op": OpCode::Identify.num(),
- "d": {
- "compression": true,
- "large_threshold": constants::LARGE_THRESHOLD,
- "shard": self.shard_info,
- "token": &*self.token.lock(),
- "v": constants::GATEWAY_VERSION,
- "properties": {
- "$browser": "serenity",
- "$device": "serenity",
- "$os": consts::OS,
- },
- },
- });
-
- self.heartbeat_instants.0 = Some(Instant::now());
- self.stage = ConnectionStage::Identifying;
+ let _ = set_client_timeout(&mut client);
- debug!("[Shard {:?}] Identifying", self.shard_info);
-
- self.client.send_json(&identification)
+ Ok(client)
}
- fn reset(&mut self) {
+ pub fn reset(&mut self) {
self.heartbeat_instants = (Some(Instant::now()), None);
self.heartbeat_interval = None;
self.last_heartbeat_acknowledged = true;
@@ -1078,42 +829,39 @@ impl Shard {
self.seq = 0;
}
- fn update_presence(&mut self) {
- let (ref game, status) = self.current_presence;
- let now = Utc::now().timestamp() as u64;
-
- let msg = json!({
- "op": OpCode::StatusUpdate.num(),
- "d": {
- "afk": false,
- "since": now,
- "status": status.name(),
- "game": game.as_ref().map(|x| json!({
- "name": x.name,
- "type": x.kind,
- "url": x.url,
- })),
- },
- });
+ pub fn resume(&mut self) -> Result<()> {
+ debug!("Shard {:?}] Attempting to resume", self.shard_info);
- debug!("[Shard {:?}] Sending presence update", self.shard_info);
+ self.client = self.initialize()?;
+ self.stage = ConnectionStage::Resuming;
- if let Err(why) = self.client.send_json(&msg) {
- warn!("[Shard {:?}] Err sending presence update: {:?}",
- self.shard_info,
- why);
+ match self.session_id.as_ref() {
+ Some(session_id) => {
+ self.client.send_resume(
+ &self.shard_info,
+ session_id,
+ &self.seq,
+ &self.token.lock(),
+ )
+ },
+ None => Err(Error::Gateway(GatewayError::NoSessionId)),
}
+ }
- #[cfg(feature = "cache")]
- {
- let mut cache = CACHE.write();
- let current_user_id = cache.user.id;
+ pub fn reconnect(&mut self) -> Result<()> {
+ info!("[Shard {:?}] Attempting to reconnect", self.shard_info());
- cache.presences.get_mut(&current_user_id).map(|presence| {
- presence.game = game.clone();
- presence.last_modified = Some(now);
- });
- }
+ self.reset();
+ self.client = self.initialize()?;
+
+ Ok(())
+ }
+
+ pub fn update_presence(&mut self) -> Result<()> {
+ self.client.send_presence_update(
+ &self.shard_info,
+ &self.current_presence,
+ )
}
}
@@ -1140,18 +888,3 @@ fn build_gateway_url(base: &str) -> Result<Url> {
Error::Gateway(GatewayError::BuildingUrl)
})
}
-
-/// Tries to connect and upon failure, retries.
-fn connecting(uri: &str) -> WsClient {
- let waiting_time = 30;
-
- loop {
- match connect(&uri) {
- Ok(client) => return client,
- Err(why) => {
- warn!("Connecting failed: {:?}\n Will retry in {} seconds.", why, waiting_time);
- thread::sleep(StdDuration::from_secs(waiting_time));
- },
- };
- }
-}
diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs
new file mode 100644
index 0000000..873d114
--- /dev/null
+++ b/src/gateway/ws_client_ext.rs
@@ -0,0 +1,133 @@
+use chrono::Utc;
+use constants::{self, OpCode};
+use gateway::{CurrentPresence, WsClient};
+use internal::prelude::*;
+use internal::ws_impl::SenderExt;
+use model::GuildId;
+use std::env::consts;
+
+pub trait WebSocketGatewayClientExt {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId>;
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()>;
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()>;
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()>;
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()>;
+}
+
+impl WebSocketGatewayClientExt for WsClient {
+ fn send_chunk_guilds<It>(
+ &mut self,
+ guild_ids: It,
+ shard_info: &[u64; 2],
+ limit: Option<u16>,
+ query: Option<&str>,
+ ) -> Result<()> where It: IntoIterator<Item=GuildId> {
+ debug!("[Shard {:?}] Requesting member chunks", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::GetGuildMembers.num(),
+ "d": {
+ "guild_id": guild_ids.into_iter().map(|x| x.as_ref().0).collect::<Vec<u64>>(),
+ "limit": limit.unwrap_or(0),
+ "query": query.unwrap_or(""),
+ },
+ })).map_err(From::from)
+ }
+
+ fn send_heartbeat(&mut self, shard_info: &[u64; 2], seq: Option<u64>)
+ -> Result<()> {
+ trace!("[Shard {:?}] Sending heartbeat d: {:?}", shard_info, seq);
+
+ self.send_json(&json!({
+ "d": seq,
+ "op": OpCode::Heartbeat.num(),
+ })).map_err(From::from)
+ }
+
+ fn send_identify(&mut self, shard_info: &[u64; 2], token: &str)
+ -> Result<()> {
+ debug!("[Shard {:?}] Identifying", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::Identify.num(),
+ "d": {
+ "compression": true,
+ "large_threshold": constants::LARGE_THRESHOLD,
+ "shard": shard_info,
+ "token": token,
+ "v": constants::GATEWAY_VERSION,
+ "properties": {
+ "$browser": "serenity",
+ "$device": "serenity",
+ "$os": consts::OS,
+ },
+ },
+ }))
+ }
+
+ fn send_presence_update(
+ &mut self,
+ shard_info: &[u64; 2],
+ current_presence: &CurrentPresence,
+ ) -> Result<()> {
+ let &(ref game, ref status) = current_presence;
+ let now = Utc::now().timestamp() as u64;
+
+ debug!("[Shard {:?}] Sending presence update", shard_info);
+
+ self.send_json(&json!({
+ "op": OpCode::StatusUpdate.num(),
+ "d": {
+ "afk": false,
+ "since": now,
+ "status": status.name(),
+ "game": game.as_ref().map(|x| json!({
+ "name": x.name,
+ "type": x.kind,
+ "url": x.url,
+ })),
+ },
+ }))
+ }
+
+ fn send_resume(
+ &mut self,
+ shard_info: &[u64; 2],
+ session_id: &str,
+ seq: &u64,
+ token: &str,
+ ) -> Result<()> {
+ debug!("[Shard {:?}] Sending resume; seq: {}", shard_info, seq);
+
+ self.send_json(&json!({
+ "op": OpCode::Resume.num(),
+ "d": {
+ "session_id": session_id,
+ "seq": seq,
+ "token": token,
+ },
+ })).map_err(From::from)
+ }
+}