diff options
| author | Austin Hellyer <[email protected]> | 2016-11-21 19:17:57 -0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-11-21 19:17:57 -0800 |
| commit | 6b1a83111d4d9cc2ef2f4eed1ee8f58d45525078 (patch) | |
| tree | a6f0303ebcf4a474f603aaa5c8fff67409d42a17 /src | |
| parent | Add support for creating embed images (diff) | |
| download | serenity-6b1a83111d4d9cc2ef2f4eed1ee8f58d45525078.tar.xz serenity-6b1a83111d4d9cc2ef2f4eed1ee8f58d45525078.zip | |
Re-organize the client module
Re-organize the client module, creating a `gateway` submodule, and
splitting the connection into separate files in it.
The connection was a conglomeration of a number of purposes, most of
which are actually used elsewhere in the library and/or exposed to the
user. Thus, it makes sense to separate each item in a gateway-specific
module.
By splitting the client module further, this is a re-organization for
preliminary RPC support WRT the Client.
Additionally, rename the Connection struct to a Shard. The Connection
itself was not the actual connection, and was a higher-level interface
to the real connection logic. A Shard is a more accurate representation
of what it actually is.
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/context.rs | 34 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 11 | ||||
| -rw-r--r-- | src/client/error.rs | 147 | ||||
| -rw-r--r-- | src/client/gateway/error.rs | 27 | ||||
| -rw-r--r-- | src/client/gateway/mod.rs | 60 | ||||
| -rw-r--r-- | src/client/gateway/prep.rs | 146 | ||||
| -rw-r--r-- | src/client/gateway/shard.rs (renamed from src/client/connection.rs) | 434 | ||||
| -rw-r--r-- | src/client/gateway/status.rs | 11 | ||||
| -rw-r--r-- | src/client/mod.rs | 296 | ||||
| -rw-r--r-- | src/error.rs | 19 | ||||
| -rw-r--r-- | src/ext/voice/handler.rs | 8 | ||||
| -rw-r--r-- | src/ext/voice/manager.rs | 6 | ||||
| -rw-r--r-- | src/internal/ws_impl.rs | 9 |
13 files changed, 627 insertions, 581 deletions
diff --git a/src/client/context.rs b/src/client/context.rs index b1cc57f..7008f9b 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -2,7 +2,7 @@ use serde_json::builder::ObjectBuilder; use std::collections::HashMap; use std::io::Read; use std::sync::{Arc, Mutex}; -use super::connection::Connection; +use super::gateway::Shard; use super::http; use super::login_type::LoginType; use ::utils::builder::{ @@ -26,7 +26,7 @@ use super::STATE; /// The context is a general utility struct provided on event dispatches, which /// helps with dealing with the current "context" of the event dispatch, /// and providing helper methods where possible. The context also acts as a -/// general high-level interface over the associated [`Connection`] which +/// general high-level interface over the associated [`Shard`] which /// received the event, or the low-level [`http`] module. /// /// For example, when the [`Client::on_message`] handler is dispatched to, the @@ -35,8 +35,8 @@ use super::STATE; /// post its given argument to the associated channel for you as a [`Message`]. /// /// Additionally, the context contains "shortcuts", like for interacting with -/// the connection. Methods like [`set_game`] will unlock the connection and -/// perform an update for you to save a bit of work. +/// the shard. Methods like [`set_game`] will unlock the shard and perform an +/// update for you to save a bit of work. /// /// A context will only live for the event it was dispatched for. After the /// event handler finished, it is destroyed and will not be re-used. @@ -65,10 +65,10 @@ use super::STATE; /// /// [`Channel`]: ../model/enum.Channel.html /// [`Client::on_message`]: struct.Client.html#method.on_message -/// [`Connection`]: struct.Connection.html /// [`LiveGuild`]: ../model/struct.LiveGuild.html /// [`Message`]: ../model/struct.Message.html /// [`PublicChannel`]: ../model/struct.PublicChannel.html +/// [`Shard`]: gateway/struct.Shard.html /// [`State`]: ../ext/state/struct.State.html /// [`get_channel`]: #method.get_channel /// [`http`]: http/index.html @@ -81,11 +81,11 @@ pub struct Context { /// /// [`on_message`]: struct.Client.html#method.on_message pub channel_id: Option<ChannelId>, - /// The associated connection which dispatched the event handler. + /// The associated shard which dispatched the event handler. /// /// Note that if you are sharding, in relevant terms, this is the shard /// which received the event being dispatched. - pub connection: Arc<Mutex<Connection>>, + pub shard: Arc<Mutex<Shard>>, login_type: LoginType, } @@ -99,11 +99,11 @@ impl Context { /// documentation. #[doc(hidden)] pub fn new(channel_id: Option<ChannelId>, - connection: Arc<Mutex<Connection>>, + shard: Arc<Mutex<Shard>>, login_type: LoginType) -> Context { Context { channel_id: channel_id, - connection: connection, + shard: shard, login_type: login_type, } } @@ -1227,7 +1227,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn online(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Online); + self.shard.lock().unwrap().set_status(OnlineStatus::Online); } /// Sets the current user as being [`Idle`]. This maintains the current @@ -1235,7 +1235,7 @@ impl Context { /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle pub fn idle(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Idle); + self.shard.lock().unwrap().set_status(OnlineStatus::Idle); } /// Sets the current user as being [`DoNotDisturb`]. This maintains the @@ -1243,7 +1243,7 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb pub fn dnd(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::DoNotDisturb); + self.shard.lock().unwrap().set_status(OnlineStatus::DoNotDisturb); } /// Sets the current user as being [`Invisible`]. This maintains the current @@ -1251,7 +1251,7 @@ impl Context { /// /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible pub fn invisible(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Invisible); + self.shard.lock().unwrap().set_status(OnlineStatus::Invisible); } /// "Resets" the current user's presence, by setting the game to `None`, @@ -1262,7 +1262,7 @@ impl Context { /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence pub fn reset_presence(&self) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(None, OnlineStatus::Online, false) } @@ -1284,7 +1284,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn set_game(&self, game: Game) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(Some(game), OnlineStatus::Online, false); } @@ -1309,7 +1309,7 @@ impl Context { url: None, }; - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(Some(game), OnlineStatus::Online, false); } @@ -1349,7 +1349,7 @@ impl Context { game: Option<Game>, status: OnlineStatus, afk: bool) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(game, status, afk) } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 8c10e75..96c8e69 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -2,7 +2,8 @@ use std::sync::{Arc, Mutex}; use std::thread; use super::event_store::EventStore; use super::login_type::LoginType; -use super::{Connection, Context}; +use super::Context; +use super::gateway::Shard; use ::model::{ChannelId, Event, Message}; #[cfg(feature="framework")] @@ -35,14 +36,14 @@ macro_rules! update { } fn context(channel_id: Option<ChannelId>, - conn: Arc<Mutex<Connection>>, + conn: Arc<Mutex<Shard>>, login_type: LoginType) -> Context { Context::new(channel_id, conn, login_type) } #[cfg(feature="framework")] pub fn dispatch(event: Event, - conn: Arc<Mutex<Connection>>, + conn: Arc<Mutex<Shard>>, framework: Arc<Mutex<Framework>>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>) { @@ -69,7 +70,7 @@ pub fn dispatch(event: Event, #[cfg(not(feature="framework"))] pub fn dispatch(event: Event, - conn: Arc<Mutex<Connection>>, + conn: Arc<Mutex<Shard>>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>) { match event { @@ -99,7 +100,7 @@ fn dispatch_message(context: Context, #[allow(cyclomatic_complexity)] fn handle_event(event: Event, - conn: Arc<Mutex<Connection>>, + conn: Arc<Mutex<Shard>>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>) { match event { diff --git a/src/client/error.rs b/src/client/error.rs new file mode 100644 index 0000000..9b542ec --- /dev/null +++ b/src/client/error.rs @@ -0,0 +1,147 @@ +use hyper::status::StatusCode; +use ::model::{ChannelType, Permissions}; + +/// An error returned from the [`Client`] or the [`Context`], or model instance. +/// +/// This is always wrapped within the library's generic [`Error::Client`] +/// variant. +/// +/// # Examples +/// +/// Matching an [`Error`] with this variant may look something like the +/// following for the [`Client::ban`] method, which in this example is used to +/// re-ban all members with an odd discriminator: +/// +/// ```rust,no_run +/// use serenity::client::{Client, ClientError}; +/// use serenity::Error; +/// use std::env; +/// +/// let token = env::var("DISCORD_BOT_TOKEN").unwrap(); +/// let mut client = Client::login_bot(&token); +/// +/// client.on_member_unban(|context, guild_id, user| { +/// let discriminator = match user.discriminator.parse::<u16>() { +/// Ok(discriminator) => discriminator, +/// Err(_why) => return, +/// }; +/// +/// // If the user has an even discriminator, don't re-ban them. +/// if discriminator % 2 == 0 { +/// return; +/// } +/// +/// match context.ban(guild_id, user, 8) { +/// Ok(()) => { +/// // Ban successful. +/// }, +/// Err(Error::Client(ClientError::DeleteMessageDaysAmount(amount))) => { +/// println!("Failed deleting {} days' worth of messages", amount); +/// }, +/// Err(why) => { +/// println!("Unexpected error: {:?}", why); +/// }, +/// } +/// }); +/// ``` +/// +/// [`Client`]: struct.Client.html +/// [`Context`]: struct.Context.html +/// [`Context::ban`]: struct.Context.html#method.ban +/// [`Error::Client`]: ../enum.Error.html#variant.Client +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum Error { + /// When attempting to delete below or above the minimum and maximum allowed + /// number of messages. + BulkDeleteAmount, + /// When attempting to delete a number of days' worth of messages that is + /// not allowed. + DeleteMessageDaysAmount(u8), + /// When there was an error retrieving the gateway URI from the REST API. + Gateway, + /// An indication that a [guild][`LiveGuild`] could not be found by + /// [Id][`GuildId`] in the [`State`]. + /// + /// [`GuildId`]: ../model/struct.GuildId.html + /// [`LiveGuild`]: ../model/struct.LiveGuild.html + /// [`State`]: ../ext/state/struct.State.html + GuildNotFound, + InvalidOpCode, + /// When attempting to perform an action which is only available to user + /// accounts. + InvalidOperationAsBot, + /// When attempting to perform an action which is only available to bot + /// accounts. + InvalidOperationAsUser, + /// Indicates that you do not have the required permissions to perform an + /// operation. + /// + /// The provided [`Permission`]s is the set of required permissions + /// required. + /// + /// [`Permission`]: ../model/permissions/struct.Permissions.html + InvalidPermissions(Permissions), + /// An indicator that the shard data received from the gateway is invalid. + InvalidShards, + /// When the token provided is invalid. This is returned when validating a + /// token through the [`validate_token`] function. + /// + /// [`validate_token`]: fn.validate_token.html + InvalidToken, + /// An indicator that the [current user] can not perform an action. + /// + /// [current user]: ../model/struct.CurrentUser.html + InvalidUser, + /// An indicator that an item is missing from the [`State`], and the action + /// can not be continued. + /// + /// [`State`]: ../ext/state/struct.State.html + ItemMissing, + /// Indicates that a [`Message`]s content was too long and will not + /// successfully send, as the length is over 2000 codepoints, or 4000 bytes. + /// + /// The number of bytes larger than the limit is provided. + /// + /// [`Message`]: ../model/struct.Message.html + MessageTooLong(u64), + /// When attempting to use a [`Context`] helper method which requires a + /// contextual [`ChannelId`], but the current context is not appropriate for + /// the action. + /// + /// [`ChannelId`]: ../model/struct.ChannelId.html + /// [`Context`]: struct.Context.html + NoChannelId, + /// When the decoding of a ratelimit header could not be properly decoded + /// into an `i64`. + RateLimitI64, + /// When the decoding of a ratelimit header could not be properly decoded + /// from UTF-8. + RateLimitUtf8, + /// When attempting to find a required record from the State could not be + /// found. This is required in methods such as [`Context::edit_role`]. + /// + /// [`Context::edit_role`]: struct.Context.html#method.edit_role + RecordNotFound, + /// When the shard being retrieved from within the Client could not be + /// found after being inserted into the Client's internal vector of + /// [`Shard`]s. + /// + /// This can be returned from one of the options for starting one or + /// multiple shards. + /// + /// **This should never be received.** + /// + /// [`Shard`]: gateway/struct.Shard.html + ShardUnknown, + /// When a function such as [`Context::edit_channel`] did not expect the + /// received [`ChannelType`]. + /// + /// [`ChannelType`]: ../model/enum.ChannelType.html + /// [`Context::edit_channel`]: struct.Context.html#method.edit_channel + UnexpectedChannelType(ChannelType), + /// When a status code was unexpectedly received for a request's status. + UnexpectedStatusCode(StatusCode), + /// When a status is received, but the verification to ensure the response + /// is valid does not recognize the status. + UnknownStatus(u16), +} diff --git a/src/client/gateway/error.rs b/src/client/gateway/error.rs new file mode 100644 index 0000000..fb44d3f --- /dev/null +++ b/src/client/gateway/error.rs @@ -0,0 +1,27 @@ +use std::fmt::{self, Display}; + +#[derive(Clone, Debug)] +pub enum Error { + /// The connection closed + Closed(Option<u16>, String), + /// Expected a Hello during a handshake + ExpectedHello, + /// Expected a Ready or an InvalidateSession + InvalidHandshake, +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Closed(s, ref v) => { + f.write_str(&format!("Connection closed {:?}: {:?}", s, v)) + }, + Error::ExpectedHello => { + f.write_str("Expected Hello during handshake") + }, + Error::InvalidHandshake => { + f.write_str("Expected Ready or InvalidateSession") + }, + } + } +} diff --git a/src/client/gateway/mod.rs b/src/client/gateway/mod.rs new file mode 100644 index 0000000..aa9722e --- /dev/null +++ b/src/client/gateway/mod.rs @@ -0,0 +1,60 @@ +//! The gateway module contains the pieces - primarily the [`Shard`] - +//! responsible for maintaing a WebSocket connection with Discord. +//! +//! A shard is an interface for the lower-level receiver and sender. It provides +//! what can otherwise be thought of as "sugar methods". A shard represents a +//! single connection to Discord. If acting as a [`Bot`] user, you can make +//! use of a method named "sharding" to have multiple shards, potentially +//! offloading some server load to another server(s). +//! +//! # Sharding +//! +//! Sharding is a method to split portions of bots into separate processes. This +//! is an enforced strategy by Discord once a bot reaches a certain number of +//! guilds (2500). Once this number is reached, a bot must be sharded in a way +//! that only 2500 guilds maximum may be allocated per shard. +//! +//! The "recommended" number of guilds per shard is _around_ 1000. Sharding can +//! be useful for splitting processes across separate servers. Often you may +//! want some or all shards to be in the same process, allowing for a shared +//! State. This is possible through this library. +//! +//! See [Discord's documentation][docs] for more information. +//! +//! If you are not using a bot account or do not require sharding - such as for +//! a small bot - then use [`Client::start`]. +//! +//! There are a few methods of sharding available: +//! +//! - [`Client::start_autosharded`]: retrieves the number of shards Discord +//! recommends using from the API, and then automatically starts that number of +//! shards. +//! - [`Client::start_shard`]: starts a single shard for use in the instance, +//! handled by the instance of the Client. Use this if you only want 1 shard +//! handled by this instance. +//! - [`Client::start_shards`]: starts all shards in this instance. This is best +//! for when you want a completely shared State. +//! - [`Client::start_shard_range`]: start a range of shards within this +//! instance. This should be used when you, for example, want to split 10 shards +//! across 3 instances. +//! +//! **Note**: User accounts can not shard. Use [`Client::start`]. +//! +//! [`Bot`]: ../enum.LoginType.html#variant.Bot +//! [`Client`]: ../struct.Client.html +//! [`Client::start`]: ../struct.Client.html#method.start +//! [`Client::start_autosharded`]: ../struct.Client.html#method.start_autosharded +//! [`Client::start_shard`]: ../struct.Client.html#method.start_shard +//! [`Client::start_shard_range`]: ../struct.Client.html#method.start_shard_range +//! [`Client::start_shards`]: ../struct.Client.html#method.start_shards +//! [`Shard`]: struct.Shard.html +//! [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding + +mod error; +mod prep; +mod shard; +mod status; + +pub use self::error::Error as GatewayError; +pub use self::shard::Shard; +pub use self::status::Status as GatewayStatus; diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs new file mode 100644 index 0000000..bf4e9b3 --- /dev/null +++ b/src/client/gateway/prep.rs @@ -0,0 +1,146 @@ +use serde_json::builder::ObjectBuilder; +use serde_json::Value; +use std::net::Shutdown; +use std::sync::mpsc::{ + TryRecvError, + Receiver as MpscReceiver, + Sender as MpscSender +}; +use std::time::Duration as StdDuration; +use std::{env, thread}; +use super::super::ClientError; +use super::{GatewayError, GatewayStatus}; +use time::{self, Duration}; +use websocket::client::request::Url as RequestUrl; +use websocket::client::{Receiver, Sender}; +use websocket::stream::WebSocketStream; +use ::constants::{self, OpCode}; +use ::error::{Error, Result}; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::model::{Event, GatewayEvent, ReadyEvent}; + +#[inline] +pub fn parse_ready(event: GatewayEvent, + tx: &MpscSender<GatewayStatus>, + receiver: &mut Receiver<WebSocketStream>, + identification: Value) + -> Result<(ReadyEvent, u64)> { + match event { + GatewayEvent::Dispatch(seq, Event::Ready(event)) => { + Ok((event, seq)) + }, + GatewayEvent::InvalidateSession => { + debug!("Session invalidation"); + + let _ = tx.send(GatewayStatus::SendMessage(identification)); + + match try!(receiver.recv_json(GatewayEvent::decode)) { + GatewayEvent::Dispatch(seq, Event::Ready(event)) => { + Ok((event, seq)) + }, + other => { + debug!("Unexpected event: {:?}", other); + + Err(Error::Gateway(GatewayError::InvalidHandshake)) + }, + } + }, + other => { + debug!("Unexpected event: {:?}", other); + + Err(Error::Gateway(GatewayError::InvalidHandshake)) + }, + } +} + +pub fn identify(token: &str, shard_info: Option<[u8; 2]>) -> Value { + ObjectBuilder::new() + .insert("op", OpCode::Identify.num()) + .insert_object("d", |mut object| { + object = identify_compression(object) + .insert("large_threshold", 250) // max value + .insert_object("properties", |object| object + .insert("$browser", "Feature-full and ergonomic discord rust library") + .insert("$device", "serenity") + .insert("$os", env::consts::OS) + .insert("$referrer", "") + .insert("$referring_domain", "") + ) + .insert("token", token) + .insert("v", constants::GATEWAY_VERSION); + + if let Some(shard_info) = shard_info { + object = object + .insert_array("shard", |a| a + .push(shard_info[0]) + .push(shard_info[1])); + } + + object + }) + .build() +} + +#[cfg(not(feature = "debug"))] +pub fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { + object.insert("compression", true) +} + +#[cfg(feature = "debug")] +pub fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { + object.insert("compression", false) +} + +pub fn build_gateway_url(base: &str) -> Result<RequestUrl> { + RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) + .map_err(|_| Error::Client(ClientError::Gateway)) +} + +pub fn keepalive(interval: u64, + mut sender: Sender<WebSocketStream>, + channel: MpscReceiver<GatewayStatus>) { + let mut base_interval = Duration::milliseconds(interval as i64); + let mut next_tick = time::get_time() + base_interval; + + let mut last_sequence = 0; + + 'outer: loop { + thread::sleep(StdDuration::from_millis(100)); + + loop { + match channel.try_recv() { + Ok(GatewayStatus::ChangeInterval(interval)) => { + base_interval = Duration::milliseconds(interval as i64); + }, + Ok(GatewayStatus::ChangeSender(new_sender)) => { + sender = new_sender; + }, + Ok(GatewayStatus::SendMessage(val)) => { + if let Err(why) = sender.send_json(&val) { + warn!("Err sending message: {:?}", why); + } + }, + Ok(GatewayStatus::Sequence(seq)) => { + last_sequence = seq; + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => break 'outer, + } + } + + if time::get_time() >= next_tick { + next_tick = next_tick + base_interval; + + let map = ObjectBuilder::new() + .insert("d", last_sequence) + .insert("op", OpCode::Heartbeat.num()) + .build(); + + if let Err(why) = sender.send_json(&map) { + warn!("Err sending keepalive: {:?}", why); + } + } + } + + let _ = sender.get_mut().shutdown(Shutdown::Both); +} diff --git a/src/client/connection.rs b/src/client/gateway/shard.rs index dfaefcc..d85e341 100644 --- a/src/client/connection.rs +++ b/src/client/gateway/shard.rs @@ -1,26 +1,18 @@ use serde_json::builder::ObjectBuilder; -use serde_json; -use std::fmt::{self, Display}; use std::io::Write; use std::net::Shutdown; -use std::sync::mpsc::{ - self, - Receiver as MpscReceiver, - Sender as MpscSender, - TryRecvError -}; +use std::sync::mpsc::{self, Sender as MpscSender}; use std::thread::{self, Builder as ThreadBuilder}; use std::time::Duration as StdDuration; -use std::{env, mem}; -use super::login_type::LoginType; -use super::Client; -use time::{self, Duration}; -use websocket::client::request::Url as RequestUrl; +use std::mem; +use super::super::login_type::LoginType; +use super::super::Client; +use super::{GatewayError, GatewayStatus, prep}; use websocket::client::{Client as WsClient, Sender, Receiver}; use websocket::message::Message as WsMessage; use websocket::stream::WebSocketStream; use websocket::ws::sender::Sender as WsSender; -use ::constants::{self, OpCode}; +use ::constants::OpCode; use ::internal::prelude::*; use ::internal::ws_impl::{ReceiverExt, SenderExt}; use ::model::{ @@ -36,87 +28,24 @@ use ::model::{ #[cfg(feature="voice")] use ::ext::voice::Manager as VoiceManager; -#[doc(hidden)] -pub enum Status { - SendMessage(Value), - Sequence(u64), - ChangeInterval(u64), - ChangeSender(Sender<WebSocketStream>), -} - -#[derive(Clone, Debug)] -pub enum ConnectionError { - /// The connection closed - Closed(Option<u16>, String), - /// Expected a Hello during a handshake - ExpectedHello, - /// Expected a Ready or an InvalidateSession - InvalidHandshake, -} - -impl Display for ConnectionError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - ConnectionError::Closed(s, ref v) => { - f.write_str(&format!("Connection closed {:?}: {:?}", s, v)) - }, - ConnectionError::ExpectedHello => { - f.write_str("Expected Hello during handshake") - }, - ConnectionError::InvalidHandshake => { - f.write_str("Expected Ready or InvalidateSession") - }, - } - } -} - type CurrentPresence = (Option<Game>, OnlineStatus, bool); -/// A connection is a handler for a websocket connection to Discord's gateway. -/// The connection allows for sending and receiving messages over the websocket, -/// such as setting the active game, reconnecting, syncing guilds, and more. -/// -/// # Sharding -/// -/// Sharding is a method to split portions of bots into separate processes. This -/// is an enforced strategy by Discord once a bot reaches a certain number of -/// guilds (2500). Once this number is reached, a bot must be sharded in a way -/// that only 2500 guilds maximum may be allocated per shard. -/// -/// The "recommended" number of guilds per shard is _around_ 1000. Sharding can -/// be useful for splitting processes across separate servers. Often you may -/// want some or all shards to be in the same process, allowing for a shared -/// State. This is possible through this library. -/// -/// See [Discord's documentation][docs] for more information. -/// -/// If you are not using a bot account or do not require sharding - such as for -/// a small bot - then use [`Client::start`]. +/// A Shard is a higher-level handler for a websocket connection to Discord's +/// gateway. The shard allows for sending and receiving messages over the +/// websocket, such as setting the active game, reconnecting, syncing guilds, +/// and more. /// -/// There are a few methods of sharding available: +/// Refer to the [module-level documentation][module docs] for information on +/// effectively using multiple shards, if you need to. /// -/// - [`Client::start_autosharded`]: retrieves the number of shards Discord -/// recommends using from the API, and then automatically starts that number of -/// shards. -/// - [`Client::start_shard`]: starts a single shard for use in the instance, -/// handled by the instance of the Client. Use this if you only want 1 shard -/// handled by this instance. -/// - [`Client::start_shards`]: starts all shards in this instance. This is best -/// for when you want a completely shared State. -/// - [`Client::start_shard_range`]: start a range of shards within this -/// instance. This should be used when you, for example, want to split 10 shards -/// across 3 instances. +/// # Stand-alone shards /// -/// **Note**: User accounts can not shard. Use [`Client::start`]. -/// -/// # Stand-alone connections -/// -/// You may instantiate a connection yourself if you need to, which is -/// completely decoupled from the client. For most use cases, you will not need -/// to do this, and you can leave the client to do it. +/// You may instantiate a shard yourself - decoupled from the [`Client`] - if +/// you need to. For most use cases, you will not need to do this, and you can +/// leave the client to do it. /// /// This can be done by passing in the required parameters to [`new`]. You can -/// then manually handle the connection yourself and receive events via +/// then manually handle the shard yourself and receive events via /// [`receive`]. /// /// **Note**: You _really_ do not need to do this. Just call one of the @@ -127,17 +56,13 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool); /// See the documentation for [`new`] on how to use this. /// /// [`Client`]: struct.Client.html -/// [`Client::start`]: struct.Client.html#method.start -/// [`Client::start_autosharded`]: struct.Client.html#method.start_autosharded -/// [`Client::start_shard`]: struct.Client.html#method.start_shard -/// [`Client::start_shard_range`]: struct.Client.html#method.start_shard_range -/// [`Client::start_shards`]: struct.Client.html#method.start_shards /// [`new`]: #method.new /// [`receive`]: #method.receive /// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding -pub struct Connection { +/// [module docs]: index.html#sharding +pub struct Shard { current_presence: CurrentPresence, - keepalive_channel: MpscSender<Status>, + keepalive_channel: MpscSender<GatewayStatus>, last_sequence: u64, login_type: LoginType, session_id: Option<String>, @@ -148,25 +73,26 @@ pub struct Connection { pub manager: VoiceManager, } -impl Connection { - /// Instantiates a new instance of a connection, bypassing the client. +impl Shard { + /// Instantiates a new instance of a Shard, bypassing the client. /// /// **Note**: You should likely never need to do this yourself. /// /// # Examples /// - /// Instantiating a new Connection manually for a bot with no shards, and + /// Instantiating a new Shard manually for a bot with no shards, and /// then listening for events: /// /// ```rust,ignore - /// use serenity::client::{Connection, LoginType, http}; + /// use serenity::client::gateway::Shard; + /// use serenity::client::{LoginType, http}; /// use std::env; /// /// let token = env::var("DISCORD_BOT_TOKEN").expect("Token in environment"); /// // retrieve the gateway response, which contains the URL to connect to /// let gateway = http::get_gateway().expect("Valid gateway response").url; - /// let connection = Connection::new(&gateway, &token, None, LoginType::Bot) - /// .expect("Working connection"); + /// let shard = Shard::new(&gateway, &token, None, LoginType::Bot) + /// .expect("Working shard"); /// /// // at this point, you can create a `loop`, and receive events and match /// // their variants @@ -175,15 +101,15 @@ impl Connection { token: &str, shard_info: Option<[u8; 2]>, login_type: LoginType) - -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> { - let url = try!(build_gateway_url(base_url)); + -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> { + let url = try!(prep::build_gateway_url(base_url)); let response = try!(try!(WsClient::connect(url)).send()); try!(response.validate()); let (mut sender, mut receiver) = response.begin().split(); - let identification = identify(token, shard_info); + let identification = prep::identify(token, shard_info); try!(sender.send_json(&identification)); let heartbeat_interval = match try!(receiver.recv_json(GatewayEvent::decode)) { @@ -191,7 +117,7 @@ impl Connection { other => { debug!("Unexpected event during connection start: {:?}", other); - return Err(Error::Connection(ConnectionError::ExpectedHello)); + return Err(Error::Gateway(GatewayError::ExpectedHello)); }, }; @@ -204,17 +130,17 @@ impl Connection { }; try!(ThreadBuilder::new() .name(thread_name) - .spawn(move || keepalive(heartbeat_interval, sender, rx))); + .spawn(move || prep::keepalive(heartbeat_interval, sender, rx))); // Parse READY let event = try!(receiver.recv_json(GatewayEvent::decode)); - let (ready, sequence) = try!(parse_ready(event, + let (ready, sequence) = try!(prep::parse_ready(event, &tx, &mut receiver, identification)); Ok((feature_voice! {{ - Connection { + Shard { current_presence: (None, OnlineStatus::Online, false), keepalive_channel: tx.clone(), last_sequence: sequence, @@ -226,7 +152,7 @@ impl Connection { manager: VoiceManager::new(tx, ready.ready.user.id.0), } } else { - Connection { + Shard { current_presence: (None, OnlineStatus::Online, false), keepalive_channel: tx.clone(), last_sequence: sequence, @@ -292,7 +218,7 @@ impl Connection { /// /// // assuming you are in a context /// - /// context.connection.lock() + /// context.shard.lock() /// .unwrap() /// .set_presence(Game::playing("Heroes of the Storm"), /// OnlineStatus::Online, @@ -312,36 +238,13 @@ impl Connection { self.update_presence(); } - fn update_presence(&self) { - let (ref game, status, afk) = self.current_presence; - - let msg = ObjectBuilder::new() - .insert("op", OpCode::StatusUpdate.num()) - .insert_object("d", move |mut object| { - object = object.insert("since", 0) - .insert("afk", afk) - .insert("status", status.name()); - - match game.as_ref() { - Some(ref game) => { - object.insert_object("game", move |o| o - .insert("name", &game.name)) - }, - None => object.insert("game", Value::Null), - } - }) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - pub fn handle_event(&mut self, event: Result<GatewayEvent>, mut receiver: &mut Receiver<WebSocketStream>) -> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> { match event { Ok(GatewayEvent::Dispatch(sequence, event)) => { - let status = Status::Sequence(sequence); + let status = GatewayStatus::Sequence(sequence); let _ = self.keepalive_channel.send(status); self.handle_dispatch(&event); @@ -353,7 +256,7 @@ impl Connection { .insert("d", sequence) .insert("op", OpCode::Heartbeat.num()) .build(); - let _ = self.keepalive_channel.send(Status::SendMessage(map)); + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(map)); Ok(None) }, @@ -361,16 +264,16 @@ impl Connection { Ok(None) }, Ok(GatewayEvent::Hello(interval)) => { - let _ = self.keepalive_channel.send(Status::ChangeInterval(interval)); + let _ = self.keepalive_channel.send(GatewayStatus::ChangeInterval(interval)); Ok(None) }, Ok(GatewayEvent::InvalidateSession) => { self.session_id = None; - let identification = identify(&self.token, self.shard_info); + let identification = prep::identify(&self.token, self.shard_info); - let status = Status::SendMessage(identification); + let status = GatewayStatus::SendMessage(identification); let _ = self.keepalive_channel.send(status); @@ -379,7 +282,7 @@ impl Connection { Ok(GatewayEvent::Reconnect) => { self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, - Err(Error::Connection(ConnectionError::Closed(num, message))) => { + Err(Error::Gateway(GatewayError::Closed(num, message))) => { warn!("Closing with {:?}: {:?}", num, message); // Attempt to resume if the following was not received: @@ -420,9 +323,48 @@ impl Connection { } } + pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>) + -> Result<()> { + let stream = receiver.get_mut().get_mut(); + + { + let mut sender = Sender::new(stream.by_ref(), true); + let message = WsMessage::close_because(1000, ""); + + try!(sender.send_message(&message)); + } + + try!(stream.flush()); + try!(stream.shutdown(Shutdown::Both)); + + Ok(()) + } + + pub fn sync_calls(&self, channels: &[ChannelId]) { + for &channel in channels { + let msg = ObjectBuilder::new() + .insert("op", OpCode::SyncCall.num()) + .insert_object("d", |obj| obj + .insert("channel_id", channel.0) + ) + .build(); + + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); + } + } + + pub fn sync_guilds(&self, guild_ids: &[GuildId]) { + let msg = ObjectBuilder::new() + .insert("op", OpCode::SyncGuild.num()) + .insert_array("d", |a| guild_ids.iter().fold(a, |a, s| a.push(s.0))) + .build(); + + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); + } + fn handle_dispatch(&mut self, event: &Event) { if let &Event::Resumed(ref ev) = event { - let status = Status::ChangeInterval(ev.heartbeat_interval); + let status = GatewayStatus::ChangeInterval(ev.heartbeat_interval); let _ = self.keepalive_channel.send(status); } @@ -452,13 +394,13 @@ impl Connection { // Take a few attempts at reconnecting; otherwise fall back to // re-instantiating the connection. for _ in 0..3 { - let connection = Connection::new(&self.ws_url, - &self.token, - self.shard_info, - self.login_type); + let shard = Shard::new(&self.ws_url, + &self.token, + self.shard_info, + self.login_type); - if let Ok((connection, ready, receiver_new)) = connection { - try!(mem::replace(self, connection).shutdown(&mut receiver)); + if let Ok((shard, ready, receiver_new)) = shard { + try!(mem::replace(self, shard).shutdown(&mut receiver)); self.session_id = Some(ready.ready.session_id.clone()); @@ -473,17 +415,17 @@ impl Connection { // A bit of complexity here: instantiate a temporary instance of a // Client. This client _does not_ replace the current client(s) that the // user has. This client will then connect to gateway. This new - // connection will be used to replace _this_ connection. - let (connection, ready, receiver_new) = { + // shard will be used to replace _this_ shard. + let (shard, ready, receiver_new) = { let mut client = Client::login_raw(&self.token.clone(), self.login_type); - try!(client.boot_connection(self.shard_info)) + try!(client.boot_shard(self.shard_info)) }; - // Replace this connection with a new one, and shutdown the now-old - // connection. - try!(mem::replace(self, connection).shutdown(&mut receiver)); + // Replace this shard with a new one, and shutdown the now-old + // shard. + try!(mem::replace(self, shard).shutdown(&mut receiver)); self.session_id = Some(ready.ready.session_id.clone()); @@ -493,7 +435,7 @@ impl Connection { fn resume(&mut self, session_id: String, receiver: &mut Receiver<WebSocketStream>) -> Result<(Event, Receiver<WebSocketStream>)> { try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both)); - let url = try!(build_gateway_url(&self.ws_url)); + let url = try!(prep::build_gateway_url(&self.ws_url)); let response = try!(try!(WsClient::connect(url)).send()); try!(response.validate()); @@ -524,183 +466,41 @@ impl Connection { break; }, GatewayEvent::InvalidateSession => { - try!(sender.send_json(&identify(&self.token, self.shard_info))); - } + try!(sender.send_json(&prep::identify(&self.token, self.shard_info))); + }, other => { debug!("Unexpected event: {:?}", other); - return Err(Error::Connection(ConnectionError::InvalidHandshake)); - } + return Err(Error::Gateway(GatewayError::InvalidHandshake)); + }, } } - let _ = self.keepalive_channel.send(Status::ChangeSender(sender)); + let _ = self.keepalive_channel.send(GatewayStatus::ChangeSender(sender)); Ok((first_event, receiver)) } - pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>) - -> Result<()> { - let stream = receiver.get_mut().get_mut(); - - { - let mut sender = Sender::new(stream.by_ref(), true); - let message = WsMessage::close_because(1000, ""); - - try!(sender.send_message(&message)); - } - - try!(stream.flush()); - try!(stream.shutdown(Shutdown::Both)); - - Ok(()) - } + fn update_presence(&self) { + let (ref game, status, afk) = self.current_presence; - pub fn sync_guilds(&self, guild_ids: &[GuildId]) { let msg = ObjectBuilder::new() - .insert("op", OpCode::SyncGuild.num()) - .insert_array("d", |a| guild_ids.iter().fold(a, |a, s| a.push(s.0))) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - - pub fn sync_calls(&self, channels: &[ChannelId]) { - for &channel in channels { - let msg = ObjectBuilder::new() - .insert("op", OpCode::SyncCall.num()) - .insert_object("d", |obj| obj - .insert("channel_id", channel.0) - ) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - } -} - -#[inline] -fn parse_ready(event: GatewayEvent, - tx: &MpscSender<Status>, - receiver: &mut Receiver<WebSocketStream>, - identification: Value) - -> Result<(ReadyEvent, u64)> { - match event { - GatewayEvent::Dispatch(seq, Event::Ready(event)) => { - Ok((event, seq)) - }, - GatewayEvent::InvalidateSession => { - debug!("Session invalidation"); - - let _ = tx.send(Status::SendMessage(identification)); - - match try!(receiver.recv_json(GatewayEvent::decode)) { - GatewayEvent::Dispatch(seq, Event::Ready(event)) => { - Ok((event, seq)) - }, - other => { - debug!("Unexpected event: {:?}", other); - - Err(Error::Connection(ConnectionError::InvalidHandshake)) - }, - } - }, - other => { - debug!("Unexpected event: {:?}", other); - - Err(Error::Connection(ConnectionError::InvalidHandshake)) - }, - } -} - -fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value { - ObjectBuilder::new() - .insert("op", OpCode::Identify.num()) - .insert_object("d", |mut object| { - object = identify_compression(object) - .insert("large_threshold", 250) // max value - .insert_object("properties", |object| object - .insert("$browser", "Feature-full and ergonomic discord rust library") - .insert("$device", "serenity") - .insert("$os", env::consts::OS) - .insert("$referrer", "") - .insert("$referring_domain", "") - ) - .insert("token", token) - .insert("v", constants::GATEWAY_VERSION); - - if let Some(shard_info) = shard_info { - object = object - .insert_array("shard", |a| a - .push(shard_info[0]) - .push(shard_info[1])); - } - - object - }) - .build() -} - -#[cfg(not(feature = "debug"))] -fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { - object.insert("compression", true) -} - -#[cfg(feature = "debug")] -fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { - object.insert("compression", false) -} - -fn build_gateway_url(base: &str) -> Result<RequestUrl> { - RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) - .map_err(|_| Error::Client(ClientError::Gateway)) -} - -fn keepalive(interval: u64, - mut sender: Sender<WebSocketStream>, - channel: MpscReceiver<Status>) { - let mut base_interval = Duration::milliseconds(interval as i64); - let mut next_tick = time::get_time() + base_interval; - - let mut last_sequence = 0; - - 'outer: loop { - thread::sleep(StdDuration::from_millis(100)); - - loop { - match channel.try_recv() { - Ok(Status::ChangeInterval(interval)) => { - base_interval = Duration::milliseconds(interval as i64); - }, - Ok(Status::ChangeSender(new_sender)) => { - sender = new_sender; - }, - Ok(Status::SendMessage(val)) => { - if let Err(why) = sender.send_json(&val) { - warn!("Err sending message: {:?}", why); - } - }, - Ok(Status::Sequence(seq)) => { - last_sequence = seq; - }, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => break 'outer, - } - } - - if time::get_time() >= next_tick { - next_tick = next_tick + base_interval; + .insert("op", OpCode::StatusUpdate.num()) + .insert_object("d", move |mut object| { + object = object.insert("since", 0) + .insert("afk", afk) + .insert("status", status.name()); - let map = ObjectBuilder::new() - .insert("d", last_sequence) - .insert("op", OpCode::Heartbeat.num()) - .build(); + match game.as_ref() { + Some(ref game) => { + object.insert_object("game", move |o| o + .insert("name", &game.name)) + }, + None => object.insert("game", Value::Null), + } + }) + .build(); - if let Err(why) = sender.send_json(&map) { - warn!("Err sending keepalive: {:?}", why); - } - } + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); } - - let _ = sender.get_mut().shutdown(Shutdown::Both); } diff --git a/src/client/gateway/status.rs b/src/client/gateway/status.rs new file mode 100644 index 0000000..4409e65 --- /dev/null +++ b/src/client/gateway/status.rs @@ -0,0 +1,11 @@ +use serde_json::Value; +use websocket::client::Sender; +use websocket::stream::WebSocketStream; + +#[doc(hidden)] +pub enum Status { + SendMessage(Value), + Sequence(u64), + ChangeInterval(u64), + ChangeSender(Sender<WebSocketStream>), +} diff --git a/src/client/mod.rs b/src/client/mod.rs index ee99c64..fad01eb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,8 @@ //! The Client contains information about a single bot or user's token, as well //! as event handlers. Dispatching events to configured handlers and starting -//! the connection are handled directly via the client. In addition, the -//! [`http`] module and [`State`] are also automatically handled by the Client -//! module for you. +//! the shards' connections are handled directly via the client. In addition, +//! the [`http`] module and [`State`] are also automatically handled by the +//! Client module for you. //! //! A [`Context`] is provided for every handler. The context is an ergonomic //! method of accessing the lower-level http functions. @@ -22,24 +22,21 @@ //! [Client examples]: struct.Client.html#examples pub mod http; +pub mod gateway; -mod connection; mod context; mod dispatch; +mod error; mod event_store; mod login_type; -pub use self::connection::{ - Connection, - ConnectionError, - Status as ConnectionStatus -}; pub use self::context::Context; +pub use self::error::Error as ClientError; pub use self::login_type::LoginType; -use hyper::status::StatusCode; use self::dispatch::dispatch; use self::event_store::EventStore; +use self::gateway::Shard; use serde_json::builder::ObjectBuilder; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; @@ -47,7 +44,7 @@ use std::thread; use std::time::Duration; use websocket::client::Receiver; use websocket::stream::WebSocketStream; -use ::internal::prelude::*; +use ::internal::prelude::{Error, Result, Value}; use ::internal::ws_impl::ReceiverExt; use ::model::*; @@ -83,155 +80,10 @@ lazy_static! { pub static ref STATE: Arc<Mutex<State>> = Arc::new(Mutex::new(State::default())); } -/// An error returned from the [`Client`] or the [`Context`], or model instance. -/// -/// This is always wrapped within the library's generic [`Error::Client`] -/// variant. -/// -/// # Examples -/// -/// Matching an [`Error`] with this variant may look something like the -/// following for the [`Client::ban`] method, which in this example is used to -/// re-ban all members with an odd discriminator: -/// -/// ```rust,no_run -/// use serenity::client::{Client, ClientError}; -/// use serenity::Error; -/// use std::env; -/// -/// let token = env::var("DISCORD_BOT_TOKEN").unwrap(); -/// let mut client = Client::login_bot(&token); -/// -/// client.on_member_unban(|context, guild_id, user| { -/// let discriminator = match user.discriminator.parse::<u16>() { -/// Ok(discriminator) => discriminator, -/// Err(_why) => return, -/// }; -/// -/// // If the user has an even discriminator, don't re-ban them. -/// if discriminator % 2 == 0 { -/// return; -/// } -/// -/// match context.ban(guild_id, user, 8) { -/// Ok(()) => { -/// // Ban successful. -/// }, -/// Err(Error::Client(ClientError::DeleteMessageDaysAmount(amount))) => { -/// println!("Failed deleting {} days' worth of messages", amount); -/// }, -/// Err(why) => { -/// println!("Unexpected error: {:?}", why); -/// }, -/// } -/// }); -/// ``` -/// -/// [`Client`]: struct.Client.html -/// [`Context`]: struct.Context.html -/// [`Context::ban`]: struct.Context.html#method.ban -/// [`Error::Client`]: ../enum.Error.html#variant.Client -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum ClientError { - /// When attempting to delete below or above the minimum and maximum allowed - /// number of messages. - BulkDeleteAmount, - /// When the connection being retrieved from within the Client could not be - /// found after being inserted into the Client's internal vector of - /// [`Connection`]s. - /// - /// This can be returned from one of the options for starting one or - /// multiple connections. - /// - /// **This should never be received.** - /// - /// [`Connection`]: struct.Connection.html - ConnectionUnknown, - /// When attempting to delete a number of days' worth of messages that is - /// not allowed. - DeleteMessageDaysAmount(u8), - /// When there was an error retrieving the gateway URI from the REST API. - Gateway, - /// An indication that a [guild][`LiveGuild`] could not be found by - /// [Id][`GuildId`] in the [`State`]. - /// - /// [`GuildId`]: ../model/struct.GuildId.html - /// [`LiveGuild`]: ../model/struct.LiveGuild.html - /// [`State`]: ../ext/state/struct.State.html - GuildNotFound, - InvalidOpCode, - /// When attempting to perform an action which is only available to user - /// accounts. - InvalidOperationAsBot, - /// When attempting to perform an action which is only available to bot - /// accounts. - InvalidOperationAsUser, - /// Indicates that you do not have the required permissions to perform an - /// operation. - /// - /// The provided [`Permission`]s is the set of required permissions - /// required. - /// - /// [`Permission`]: ../model/permissions/struct.Permissions.html - InvalidPermissions(Permissions), - /// An indicator that the shard data received from the gateway is invalid. - InvalidShards, - /// When the token provided is invalid. This is returned when validating a - /// token through the [`validate_token`] function. - /// - /// [`validate_token`]: fn.validate_token.html - InvalidToken, - /// An indicator that the [current user] can not perform an action. - /// - /// [current user]: ../model/struct.CurrentUser.html - InvalidUser, - /// An indicator that an item is missing from the [`State`], and the action - /// can not be continued. - /// - /// [`State`]: ../ext/state/struct.State.html - ItemMissing, - /// Indicates that a [`Message`]s content was too long and will not - /// successfully send, as the length is over 2000 codepoints, or 4000 bytes. - /// - /// The number of bytes larger than the limit is provided. - /// - /// [`Message`]: ../model/struct.Message.html - MessageTooLong(u64), - /// When attempting to use a [`Context`] helper method which requires a - /// contextual [`ChannelId`], but the current context is not appropriate for - /// the action. - /// - /// [`ChannelId`]: ../model/struct.ChannelId.html - /// [`Context`]: struct.Context.html - NoChannelId, - /// When the decoding of a ratelimit header could not be properly decoded - /// into an `i64`. - RateLimitI64, - /// When the decoding of a ratelimit header could not be properly decoded - /// from UTF-8. - RateLimitUtf8, - /// When attempting to find a required record from the State could not be - /// found. This is required in methods such as [`Context::edit_role`]. - /// - /// [`Context::edit_role`]: struct.Context.html#method.edit_role - RecordNotFound, - /// When a function such as [`Context::edit_channel`] did not expect the - /// received [`ChannelType`]. - /// - /// [`ChannelType`]: ../model/enum.ChannelType.html - /// [`Context::edit_channel`]: struct.Context.html#method.edit_channel - UnexpectedChannelType(ChannelType), - /// When a status code was unexpectedly received for a request's status. - UnexpectedStatusCode(StatusCode), - /// When a status is received, but the verification to ensure the response - /// is valid does not recognize the status. - UnknownStatus(u16), -} - /// The Client is the way to "login" and be able to start sending authenticated -/// requests over the REST API, as well as initializing a WebSocket -/// [`Connection`]. Refer to `Connection`'s [information on using sharding] for -/// more information. +/// requests over the REST API, as well as initializing a WebSocket connection +/// through [`Shard`]s. Refer to the +/// [documentation on using sharding][sharding docs] for more information. /// /// # Event Handlers /// @@ -261,22 +113,22 @@ pub enum ClientError { /// client.start(); /// ``` /// -/// [`Connection`]: struct.Connection.html +/// [`Shard`]: gateway/struct.Shard.html /// [`on_message`]: #method.on_message /// [`Event::MessageCreate`]: ../model/enum.Event.html#variant.MessageCreate -/// [information on using sharding]: struct.Connection.html#sharding +/// [sharding docs]: gateway/index.html#sharding pub struct Client { - /// A vector of all active connections that have received their - /// [`Event::Ready`] payload, and have dispatched to [`on_ready`] if an - /// event handler was configured. + /// A vector of all active shards that have received their [`Event::Ready`] + /// payload, and have dispatched to [`on_ready`] if an event handler was + /// configured. /// /// [`Event::Ready`]: ../model/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready - pub connections: Vec<Arc<Mutex<Connection>>>, event_store: Arc<Mutex<EventStore>>, #[cfg(feature="framework")] framework: Arc<Mutex<Framework>>, login_type: LoginType, + pub shards: Vec<Arc<Mutex<Shard>>>, token: String, } @@ -355,10 +207,10 @@ impl Client { /// less than 2500 guilds. If you have a reason for sharding and/or are in /// more than 2500 guilds, use one of these depending on your use case: /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start(&mut self) -> Result<()> { self.start_connection(None) } @@ -372,10 +224,10 @@ impl Client { /// from the API - determined by Discord - and then open a number of shards /// equivilant to that amount. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start_autosharded(&mut self) -> Result<()> { let res = try!(http::get_bot_gateway()); @@ -391,10 +243,10 @@ impl Client { /// you will need to start other processes with the other shard IDs in some /// way. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start_shard(&mut self, shard: u8, shards: u8) -> Result<()> { self.start_connection(Some([shard, shard, shards])) } @@ -408,12 +260,12 @@ impl Client { /// you only need to start a single shard within the process, or a range of /// shards, use [`start_shard`] or [`start_shard_range`], respectively. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// /// [`start_shard`]: #method.start_shard /// [`start_shard_range`]: #method.start_shards - /// [connection docs]: struct.Connection.html#sharding + /// [Gateway docs]: gateway/index.html#sharding pub fn start_shards(&mut self, total_shards: u8) -> Result<()> { self.start_connection(Some([0, total_shards - 1, total_shards])) } @@ -428,7 +280,7 @@ impl Client { /// process, or all shards within the process, use [`start_shard`] or /// [`start_shards`], respectively. /// - /// Refer to the [module-level documentation][connection docs] for more + /// Refer to the [Gateway documentation][gateway docs] for more /// information on effectively using sharding. /// /// # Examples @@ -447,7 +299,7 @@ impl Client { /// /// [`start_shard`]: #method.start_shard /// [`start_shards`]: #method.start_shards - /// [connection docs]: struct.Connection.html#sharding + /// [Gateway docs]: gateway/index.html#sharding pub fn start_shard_range(&mut self, range: [u8; 2], total_shards: u8) -> Result<()> { self.start_connection(Some([range[0], range[1], total_shards])) @@ -729,7 +581,7 @@ impl Client { /// Register an event to be called whenever a Ready event is received. /// /// Registering a handler for the ready event is good for noting when your - /// bot has established a connection to the gateway. + /// bot has established a connection to the gateway through a [`Shard`]. /// /// **Note**: The Ready event is not guarenteed to be the first event you /// will receive by Discord. Do not actively rely on it. @@ -751,6 +603,7 @@ impl Client { /// ``` /// /// [`CurrentUser`]: ../model/struct.CurrentUser.html + /// [`Shard`]: gateway/struct.Shard.html pub fn on_ready<F>(&mut self, handler: F) where F: Fn(Context, Ready) + Send + Sync + 'static { self.event_store.lock() @@ -868,13 +721,13 @@ impl Client { let gateway_url = try!(http::get_gateway()).url; for i in 0..shard_data.map_or(1, |x| x[1] + 1) { - let connection = Connection::new(&gateway_url, - &self.token, - shard_data.map(|s| [i, s[2]]), - self.login_type); - match connection { - Ok((connection, ready, receiver)) => { - self.connections.push(Arc::new(Mutex::new(connection))); + let shard = Shard::new(&gateway_url, + &self.token, + shard_data.map(|s| [i, s[2]]), + self.login_type); + match shard { + Ok((shard, ready, receiver)) => { + self.shards.push(Arc::new(Mutex::new(shard))); feature_state_enabled! {{ STATE.lock() @@ -882,22 +735,22 @@ impl Client { .update_with_ready(&ready); }} - match self.connections.last() { - Some(connection) => { + match self.shards.last() { + Some(shard) => { feature_framework! {{ dispatch(Event::Ready(ready), - connection.clone(), + shard.clone(), self.framework.clone(), self.login_type, self.event_store.clone()); } else { dispatch(Event::Ready(ready), - connection.clone(), + shard.clone(), self.login_type, self.event_store.clone()); }} - let connection_clone = connection.clone(); + let shard_clone = shard.clone(); let event_store = self.event_store.clone(); let login_type = self.login_type; @@ -905,22 +758,22 @@ impl Client { let framework = self.framework.clone(); thread::spawn(move || { - handle_connection(connection_clone, - framework, - login_type, - event_store, - receiver) + handle_shard(shard_clone, + framework, + login_type, + event_store, + receiver) }); } else { thread::spawn(move || { - handle_connection(connection_clone, - login_type, - event_store, - receiver) + handle_shard(shard_clone, + login_type, + event_store, + receiver) }); }} }, - None => return Err(Error::Client(ClientError::ConnectionUnknown)), + None => return Err(Error::Client(ClientError::ShardUnknown)), } }, Err(why) => return Err(why), @@ -934,16 +787,17 @@ impl Client { } } - // Boot up a new connection. This is used primarily in the scenario of - // re-instantiating a connection in the reconnect logic in another - // Connection. + // Boot up a new shard. This is used primarily in the scenario of + // re-instantiating a shard in the reconnect logic in another [`Shard`]. + // + // [`Shard`]: gateway/struct.Shard.html #[doc(hidden)] - pub fn boot_connection(&mut self, - shard_info: Option<[u8; 2]>) - -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> { + pub fn boot_shard(&mut self, + shard_info: Option<[u8; 2]>) + -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> { let gateway_url = try!(http::get_gateway()).url; - Connection::new(&gateway_url, &self.token, shard_info, self.login_type) + Shard::new(&gateway_url, &self.token, shard_info, self.login_type) } } @@ -1252,15 +1106,15 @@ impl Client { } #[cfg(feature="framework")] -fn handle_connection(connection: Arc<Mutex<Connection>>, - framework: Arc<Mutex<Framework>>, - login_type: LoginType, - event_store: Arc<Mutex<EventStore>>, - mut receiver: Receiver<WebSocketStream>) { +fn handle_shard(shard: Arc<Mutex<Shard>>, + framework: Arc<Mutex<Framework>>, + login_type: LoginType, + event_store: Arc<Mutex<EventStore>>, + mut receiver: Receiver<WebSocketStream>) { loop { let event = receiver.recv_json(GatewayEvent::decode); - let event = match connection.lock().unwrap().handle_event(event, &mut receiver) { + let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { Ok(Some(x)) => match x { (event, Some(new_receiver)) => { receiver = new_receiver; @@ -1273,7 +1127,7 @@ fn handle_connection(connection: Arc<Mutex<Connection>>, }; dispatch(event, - connection.clone(), + shard.clone(), framework.clone(), login_type, event_store.clone()); @@ -1281,14 +1135,14 @@ fn handle_connection(connection: Arc<Mutex<Connection>>, } #[cfg(not(feature="framework"))] -fn handle_connection(connection: Arc<Mutex<Connection>>, +fn handle_shard(shard: Arc<Mutex<Shard>>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>, mut receiver: Receiver<WebSocketStream>) { loop { let event = receiver.recv_json(GatewayEvent::decode); - let event = match connection.lock().unwrap().handle_event(event, &mut receiver) { + let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { Ok(Some(x)) => match x { (event, Some(new_receiver)) => { receiver = new_receiver; @@ -1301,7 +1155,7 @@ fn handle_connection(connection: Arc<Mutex<Connection>>, }; dispatch(event, - connection.clone(), + shard.clone(), login_type, event_store.clone()); } @@ -1314,7 +1168,7 @@ fn login(token: &str, login_type: LoginType) -> Client { feature_framework! {{ Client { - connections: Vec::default(), + shards: Vec::default(), event_store: Arc::new(Mutex::new(EventStore::default())), framework: Arc::new(Mutex::new(Framework::default())), login_type: login_type, @@ -1322,7 +1176,7 @@ fn login(token: &str, login_type: LoginType) -> Client { } } else { Client { - connections: Vec::default(), + shards: Vec::default(), event_store: Arc::new(Mutex::new(EventStore::default())), login_type: login_type, token: token.to_owned(), diff --git a/src/error.rs b/src/error.rs index 84f4e04..8c442ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,7 +5,8 @@ use hyper::Error as HyperError; use serde_json::Error as JsonError; use serde_json::Value; use websocket::result::WebSocketError; -use ::client::{ClientError, ConnectionError}; +use ::client::gateway::GatewayError; +use ::client::ClientError; #[cfg(feature="voice")] use ::ext::voice::VoiceError; @@ -22,14 +23,14 @@ pub type Result<T> = ::std::result::Result<T, Error>; /// A common error enum returned by most of the library's functionality within a /// custom [`Result`]. /// -/// The most common error types, the [`ClientError`] and [`ConnectionError`] +/// The most common error types, the [`ClientError`] and [`GatewayError`] /// enums, are both wrapped around this in the form of the [`Client`] and -/// [`Connection`] variants. +/// [`Gateway`] variants. /// /// [`Client`]: #variant.Client /// [`ClientError`]: client/enum.ClientError.html -/// [`Connection`]: #variant.Connection -/// [`ConnectionError`]: client/enum.ConnectionError.html +/// [`Gateway`]: #variant.Gateway +/// [`GatewayError`]: client/enum.GatewayError.html /// [`Result`]: type.Result.html #[derive(Debug)] pub enum Error { @@ -38,10 +39,10 @@ pub enum Error { /// [client]: client/index.html /// [http]: client/http/index.html Client(ClientError), - /// An error with the WebSocket [`Connection`]. + /// An error with the WebSocket [`Gateway`]. /// - /// [`Connection`]: client/struct.Connection.html - Connection(ConnectionError), + /// [`Gateway`]: client/gateway/index.html + Gateway(GatewayError), /// An error while decoding a payload. Decode(&'static str, Value), /// An error from the `hyper` crate. @@ -107,7 +108,7 @@ impl StdError for Error { fn description(&self) -> &str { match *self { Error::Client(_) => "Client refused a request", - Error::Connection(ref _inner) => "Connection error", + Error::Gateway(ref _inner) => "Gateway error", Error::Decode(msg, _) | Error::Other(msg) => msg, Error::Hyper(ref inner) => inner.description(), Error::Io(ref inner) => inner.description(), diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs index 8dc0ab1..1f2a5d2 100644 --- a/src/ext/voice/handler.rs +++ b/src/ext/voice/handler.rs @@ -2,7 +2,7 @@ use serde_json::builder::ObjectBuilder; use std::sync::mpsc::{self, Sender}; use super::connection_info::ConnectionInfo; use super::{Status as VoiceStatus, Target}; -use ::client::ConnectionStatus; +use ::client::gateway::GatewayStatus; use ::constants::VoiceOpCode; use ::model::{ChannelId, GuildId, VoiceState}; use super::threading; @@ -39,7 +39,7 @@ pub struct Handler { sender: Sender<VoiceStatus>, session_id: Option<String>, user_id: u64, - ws: Sender<ConnectionStatus>, + ws: Sender<GatewayStatus>, } impl Handler { @@ -52,7 +52,7 @@ impl Handler { /// /// [`Manager::join`]: struct.Manager.html#method.join #[doc(hidden)] - pub fn new(target: Target, ws: Sender<ConnectionStatus>, user_id: u64) + pub fn new(target: Target, ws: Sender<GatewayStatus>, user_id: u64) -> Self { let (tx, rx) = mpsc::channel(); @@ -260,7 +260,7 @@ impl Handler { .insert("self_mute", self.self_mute)) .build(); - let _ = self.ws.send(ConnectionStatus::SendMessage(map)); + let _ = self.ws.send(GatewayStatus::SendMessage(map)); } fn send(&mut self, status: VoiceStatus) { diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs index c6c7533..b4f3501 100644 --- a/src/ext/voice/manager.rs +++ b/src/ext/voice/manager.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::{Handler, Target}; -use ::client::ConnectionStatus; +use ::client::gateway::GatewayStatus; use ::model::{ChannelId, GuildId}; /// A manager is a struct responsible for managing [`Handler`]s which belong to @@ -23,12 +23,12 @@ use ::model::{ChannelId, GuildId}; pub struct Manager { handlers: HashMap<Target, Handler>, user_id: u64, - ws: MpscSender<ConnectionStatus>, + ws: MpscSender<GatewayStatus>, } impl Manager { #[doc(hidden)] - pub fn new(ws: MpscSender<ConnectionStatus>, user_id: u64) -> Manager { + pub fn new(ws: MpscSender<GatewayStatus>, user_id: u64) -> Manager { Manager { handlers: HashMap::new(), user_id: user_id, diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs index ab91dae..ea327fd 100644 --- a/src/internal/ws_impl.rs +++ b/src/internal/ws_impl.rs @@ -5,7 +5,7 @@ use websocket::message::{Message as WsMessage, Type as WsType}; use websocket::stream::WebSocketStream; use websocket::ws::receiver::Receiver as WsReceiver; use websocket::ws::sender::Sender as WsSender; -use ::client::ConnectionError; +use ::client::gateway::GatewayError; use ::internal::prelude::*; pub trait ReceiverExt { @@ -25,8 +25,8 @@ impl ReceiverExt for Receiver<WebSocketStream> { let representation = String::from_utf8_lossy(&message.payload) .into_owned(); - Err(Error::Connection(ConnectionError::Closed(message.cd_status_code, - representation))) + Err(Error::Gateway(GatewayError::Closed(message.cd_status_code, + representation))) } else if message.opcode == WsType::Binary || message.opcode == WsType::Text { let json: Value = if message.opcode == WsType::Binary { try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..]))) @@ -44,8 +44,7 @@ impl ReceiverExt for Receiver<WebSocketStream> { let representation = String::from_utf8_lossy(&message.payload) .into_owned(); - Err(Error::Connection(ConnectionError::Closed(None, - representation))) + Err(Error::Gateway(GatewayError::Closed(None, representation))) } } } |