From 6b1a83111d4d9cc2ef2f4eed1ee8f58d45525078 Mon Sep 17 00:00:00 2001 From: Austin Hellyer Date: Mon, 21 Nov 2016 19:17:57 -0800 Subject: Re-organize the client module Re-organize the client module, creating a `gateway` submodule, and splitting the connection into separate files in it. The connection was a conglomeration of a number of purposes, most of which are actually used elsewhere in the library and/or exposed to the user. Thus, it makes sense to separate each item in a gateway-specific module. By splitting the client module further, this is a re-organization for preliminary RPC support WRT the Client. Additionally, rename the Connection struct to a Shard. The Connection itself was not the actual connection, and was a higher-level interface to the real connection logic. A Shard is a more accurate representation of what it actually is. --- src/client/connection.rs | 706 ------------------------------------------- src/client/context.rs | 34 +-- src/client/dispatch.rs | 11 +- src/client/error.rs | 147 +++++++++ src/client/gateway/error.rs | 27 ++ src/client/gateway/mod.rs | 60 ++++ src/client/gateway/prep.rs | 146 +++++++++ src/client/gateway/shard.rs | 506 +++++++++++++++++++++++++++++++ src/client/gateway/status.rs | 11 + src/client/mod.rs | 296 +++++------------- src/error.rs | 19 +- src/ext/voice/handler.rs | 8 +- src/ext/voice/manager.rs | 6 +- src/internal/ws_impl.rs | 9 +- 14 files changed, 1016 insertions(+), 970 deletions(-) delete mode 100644 src/client/connection.rs create mode 100644 src/client/error.rs create mode 100644 src/client/gateway/error.rs create mode 100644 src/client/gateway/mod.rs create mode 100644 src/client/gateway/prep.rs create mode 100644 src/client/gateway/shard.rs create mode 100644 src/client/gateway/status.rs (limited to 'src') diff --git a/src/client/connection.rs b/src/client/connection.rs deleted file mode 100644 index dfaefcc..0000000 --- a/src/client/connection.rs +++ /dev/null @@ -1,706 +0,0 @@ -use serde_json::builder::ObjectBuilder; -use serde_json; -use std::fmt::{self, Display}; -use std::io::Write; -use std::net::Shutdown; -use std::sync::mpsc::{ - self, - Receiver as MpscReceiver, - Sender as MpscSender, - TryRecvError -}; -use std::thread::{self, Builder as ThreadBuilder}; -use std::time::Duration as StdDuration; -use std::{env, mem}; -use super::login_type::LoginType; -use super::Client; -use time::{self, Duration}; -use websocket::client::request::Url as RequestUrl; -use websocket::client::{Client as WsClient, Sender, Receiver}; -use websocket::message::Message as WsMessage; -use websocket::stream::WebSocketStream; -use websocket::ws::sender::Sender as WsSender; -use ::constants::{self, OpCode}; -use ::internal::prelude::*; -use ::internal::ws_impl::{ReceiverExt, SenderExt}; -use ::model::{ - ChannelId, - Event, - Game, - GatewayEvent, - GuildId, - OnlineStatus, - ReadyEvent, -}; - -#[cfg(feature="voice")] -use ::ext::voice::Manager as VoiceManager; - -#[doc(hidden)] -pub enum Status { - SendMessage(Value), - Sequence(u64), - ChangeInterval(u64), - ChangeSender(Sender), -} - -#[derive(Clone, Debug)] -pub enum ConnectionError { - /// The connection closed - Closed(Option, String), - /// Expected a Hello during a handshake - ExpectedHello, - /// Expected a Ready or an InvalidateSession - InvalidHandshake, -} - -impl Display for ConnectionError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - ConnectionError::Closed(s, ref v) => { - f.write_str(&format!("Connection closed {:?}: {:?}", s, v)) - }, - ConnectionError::ExpectedHello => { - f.write_str("Expected Hello during handshake") - }, - ConnectionError::InvalidHandshake => { - f.write_str("Expected Ready or InvalidateSession") - }, - } - } -} - -type CurrentPresence = (Option, OnlineStatus, bool); - -/// A connection is a handler for a websocket connection to Discord's gateway. -/// The connection allows for sending and receiving messages over the websocket, -/// such as setting the active game, reconnecting, syncing guilds, and more. -/// -/// # Sharding -/// -/// Sharding is a method to split portions of bots into separate processes. This -/// is an enforced strategy by Discord once a bot reaches a certain number of -/// guilds (2500). Once this number is reached, a bot must be sharded in a way -/// that only 2500 guilds maximum may be allocated per shard. -/// -/// The "recommended" number of guilds per shard is _around_ 1000. Sharding can -/// be useful for splitting processes across separate servers. Often you may -/// want some or all shards to be in the same process, allowing for a shared -/// State. This is possible through this library. -/// -/// See [Discord's documentation][docs] for more information. -/// -/// If you are not using a bot account or do not require sharding - such as for -/// a small bot - then use [`Client::start`]. -/// -/// There are a few methods of sharding available: -/// -/// - [`Client::start_autosharded`]: retrieves the number of shards Discord -/// recommends using from the API, and then automatically starts that number of -/// shards. -/// - [`Client::start_shard`]: starts a single shard for use in the instance, -/// handled by the instance of the Client. Use this if you only want 1 shard -/// handled by this instance. -/// - [`Client::start_shards`]: starts all shards in this instance. This is best -/// for when you want a completely shared State. -/// - [`Client::start_shard_range`]: start a range of shards within this -/// instance. This should be used when you, for example, want to split 10 shards -/// across 3 instances. -/// -/// **Note**: User accounts can not shard. Use [`Client::start`]. -/// -/// # Stand-alone connections -/// -/// You may instantiate a connection yourself if you need to, which is -/// completely decoupled from the client. For most use cases, you will not need -/// to do this, and you can leave the client to do it. -/// -/// This can be done by passing in the required parameters to [`new`]. You can -/// then manually handle the connection yourself and receive events via -/// [`receive`]. -/// -/// **Note**: You _really_ do not need to do this. Just call one of the -/// appropriate methods on the [`Client`]. -/// -/// # Examples -/// -/// See the documentation for [`new`] on how to use this. -/// -/// [`Client`]: struct.Client.html -/// [`Client::start`]: struct.Client.html#method.start -/// [`Client::start_autosharded`]: struct.Client.html#method.start_autosharded -/// [`Client::start_shard`]: struct.Client.html#method.start_shard -/// [`Client::start_shard_range`]: struct.Client.html#method.start_shard_range -/// [`Client::start_shards`]: struct.Client.html#method.start_shards -/// [`new`]: #method.new -/// [`receive`]: #method.receive -/// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding -pub struct Connection { - current_presence: CurrentPresence, - keepalive_channel: MpscSender, - last_sequence: u64, - login_type: LoginType, - session_id: Option, - shard_info: Option<[u8; 2]>, - token: String, - ws_url: String, - #[cfg(feature = "voice")] - pub manager: VoiceManager, -} - -impl Connection { - /// Instantiates a new instance of a connection, bypassing the client. - /// - /// **Note**: You should likely never need to do this yourself. - /// - /// # Examples - /// - /// Instantiating a new Connection manually for a bot with no shards, and - /// then listening for events: - /// - /// ```rust,ignore - /// use serenity::client::{Connection, LoginType, http}; - /// use std::env; - /// - /// let token = env::var("DISCORD_BOT_TOKEN").expect("Token in environment"); - /// // retrieve the gateway response, which contains the URL to connect to - /// let gateway = http::get_gateway().expect("Valid gateway response").url; - /// let connection = Connection::new(&gateway, &token, None, LoginType::Bot) - /// .expect("Working connection"); - /// - /// // at this point, you can create a `loop`, and receive events and match - /// // their variants - /// ``` - pub fn new(base_url: &str, - token: &str, - shard_info: Option<[u8; 2]>, - login_type: LoginType) - -> Result<(Connection, ReadyEvent, Receiver)> { - let url = try!(build_gateway_url(base_url)); - - let response = try!(try!(WsClient::connect(url)).send()); - try!(response.validate()); - - let (mut sender, mut receiver) = response.begin().split(); - - let identification = identify(token, shard_info); - try!(sender.send_json(&identification)); - - let heartbeat_interval = match try!(receiver.recv_json(GatewayEvent::decode)) { - GatewayEvent::Hello(interval) => interval, - other => { - debug!("Unexpected event during connection start: {:?}", other); - - return Err(Error::Connection(ConnectionError::ExpectedHello)); - }, - }; - - let (tx, rx) = mpsc::channel(); - let thread_name = match shard_info { - Some(info) => format!("serenity keepalive [shard {}/{}]", - info[0], - info[1] - 1), - None => "serenity keepalive [unsharded]".to_owned(), - }; - try!(ThreadBuilder::new() - .name(thread_name) - .spawn(move || keepalive(heartbeat_interval, sender, rx))); - - // Parse READY - let event = try!(receiver.recv_json(GatewayEvent::decode)); - let (ready, sequence) = try!(parse_ready(event, - &tx, - &mut receiver, - identification)); - - Ok((feature_voice! {{ - Connection { - current_presence: (None, OnlineStatus::Online, false), - keepalive_channel: tx.clone(), - last_sequence: sequence, - login_type: login_type, - token: token.to_owned(), - session_id: Some(ready.ready.session_id.clone()), - shard_info: shard_info, - ws_url: base_url.to_owned(), - manager: VoiceManager::new(tx, ready.ready.user.id.0), - } - } else { - Connection { - current_presence: (None, OnlineStatus::Online, false), - keepalive_channel: tx.clone(), - last_sequence: sequence, - login_type: login_type, - token: token.to_owned(), - session_id: Some(ready.ready.session_id.clone()), - shard_info: shard_info, - ws_url: base_url.to_owned(), - } - }}, ready, receiver)) - } - - pub fn shard_info(&self) -> Option<[u8; 2]> { - self.shard_info - } - - /// Sets whether the current user is afk. This helps Discord determine where - /// to send notifications. - /// - /// Other presence settings are maintained. - pub fn set_afk(&mut self, afk: bool) { - self.current_presence.2 = afk; - - self.update_presence(); - } - - /// Sets the user's current game, if any. - /// - /// Other presence settings are maintained. - pub fn set_game(&mut self, game: Option) { - self.current_presence.0 = game; - - self.update_presence(); - } - - /// Sets the user's current online status. - /// - /// Note that [`Offline`] is not a valid presence, so it is automatically - /// converted to [`Invisible`]. - /// - /// Other presence settings are maintained. - pub fn set_status(&mut self, online_status: OnlineStatus) { - self.current_presence.1 = match online_status { - OnlineStatus::Offline => OnlineStatus::Invisible, - other => other, - }; - - self.update_presence(); - } - - /// Sets the user's full presence information. - /// - /// Consider using the individual setters if you only need to modify one of - /// these. - /// - /// # Examples - /// - /// Set the current user as playing `"Heroes of the Storm"`, being online, - /// and not being afk: - /// - /// ```rust,ignore - /// use serenity::model::{Game, OnlineStatus}; - /// - /// // assuming you are in a context - /// - /// context.connection.lock() - /// .unwrap() - /// .set_presence(Game::playing("Heroes of the Storm"), - /// OnlineStatus::Online, - /// false); - /// ``` - pub fn set_presence(&mut self, - game: Option, - status: OnlineStatus, - afk: bool) { - let status = match status { - OnlineStatus::Offline => OnlineStatus::Invisible, - other => other, - }; - - self.current_presence = (game, status, afk); - - self.update_presence(); - } - - fn update_presence(&self) { - let (ref game, status, afk) = self.current_presence; - - let msg = ObjectBuilder::new() - .insert("op", OpCode::StatusUpdate.num()) - .insert_object("d", move |mut object| { - object = object.insert("since", 0) - .insert("afk", afk) - .insert("status", status.name()); - - match game.as_ref() { - Some(ref game) => { - object.insert_object("game", move |o| o - .insert("name", &game.name)) - }, - None => object.insert("game", Value::Null), - } - }) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - - pub fn handle_event(&mut self, - event: Result, - mut receiver: &mut Receiver) - -> Result>)>> { - match event { - Ok(GatewayEvent::Dispatch(sequence, event)) => { - let status = Status::Sequence(sequence); - let _ = self.keepalive_channel.send(status); - - self.handle_dispatch(&event); - - Ok(Some((event, None))) - }, - Ok(GatewayEvent::Heartbeat(sequence)) => { - let map = ObjectBuilder::new() - .insert("d", sequence) - .insert("op", OpCode::Heartbeat.num()) - .build(); - let _ = self.keepalive_channel.send(Status::SendMessage(map)); - - Ok(None) - }, - Ok(GatewayEvent::HeartbeatAck) => { - Ok(None) - }, - Ok(GatewayEvent::Hello(interval)) => { - let _ = self.keepalive_channel.send(Status::ChangeInterval(interval)); - - Ok(None) - }, - Ok(GatewayEvent::InvalidateSession) => { - self.session_id = None; - - let identification = identify(&self.token, self.shard_info); - - let status = Status::SendMessage(identification); - - let _ = self.keepalive_channel.send(status); - - Ok(None) - }, - Ok(GatewayEvent::Reconnect) => { - self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) - }, - Err(Error::Connection(ConnectionError::Closed(num, message))) => { - warn!("Closing with {:?}: {:?}", num, message); - - // Attempt to resume if the following was not received: - // - // - 1000: Close. - // - // Otherwise, fallback to reconnecting. - if num != Some(1000) { - if let Some(session_id) = self.session_id.clone() { - match self.resume(session_id, receiver) { - Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), - Err(why) => debug!("Err resuming: {:?}", why), - } - } - } - - self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) - }, - Err(Error::WebSocket(why)) => { - warn!("Websocket error: {:?}", why); - info!("Reconnecting"); - - // Attempt to resume if the following was not received: - // - // - InvalidateSession. - // - // Otherwise, fallback to reconnecting. - if let Some(session_id) = self.session_id.clone() { - match self.resume(session_id, &mut receiver) { - Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), - Err(why) => debug!("Err resuming: {:?}", why), - } - } - - self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) - }, - Err(error) => Err(error), - } - } - - fn handle_dispatch(&mut self, event: &Event) { - if let &Event::Resumed(ref ev) = event { - let status = Status::ChangeInterval(ev.heartbeat_interval); - - let _ = self.keepalive_channel.send(status); - } - - feature_voice_enabled! {{ - if let &Event::VoiceStateUpdate(ref update) = event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_state(&update.voice_state); - } - } - } - - if let &Event::VoiceServerUpdate(ref update) = event { - if let Some(guild_id) = update.guild_id { - if let Some(handler) = self.manager.get(guild_id) { - handler.update_server(&update.endpoint, &update.token); - } - } - } - }} - } - - fn reconnect(&mut self, mut receiver: &mut Receiver) -> Result<(Event, Receiver)> { - debug!("Reconnecting"); - - // Take a few attempts at reconnecting; otherwise fall back to - // re-instantiating the connection. - for _ in 0..3 { - let connection = Connection::new(&self.ws_url, - &self.token, - self.shard_info, - self.login_type); - - if let Ok((connection, ready, receiver_new)) = connection { - try!(mem::replace(self, connection).shutdown(&mut receiver)); - - self.session_id = Some(ready.ready.session_id.clone()); - - return Ok((Event::Ready(ready), receiver_new)); - } - - thread::sleep(StdDuration::from_secs(1)); - } - - // If all else fails: get a new endpoint. - // - // A bit of complexity here: instantiate a temporary instance of a - // Client. This client _does not_ replace the current client(s) that the - // user has. This client will then connect to gateway. This new - // connection will be used to replace _this_ connection. - let (connection, ready, receiver_new) = { - let mut client = Client::login_raw(&self.token.clone(), - self.login_type); - - try!(client.boot_connection(self.shard_info)) - }; - - // Replace this connection with a new one, and shutdown the now-old - // connection. - try!(mem::replace(self, connection).shutdown(&mut receiver)); - - self.session_id = Some(ready.ready.session_id.clone()); - - Ok((Event::Ready(ready), receiver_new)) - } - - fn resume(&mut self, session_id: String, receiver: &mut Receiver) - -> Result<(Event, Receiver)> { - try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both)); - let url = try!(build_gateway_url(&self.ws_url)); - - let response = try!(try!(WsClient::connect(url)).send()); - try!(response.validate()); - - let (mut sender, mut receiver) = response.begin().split(); - - try!(sender.send_json(&ObjectBuilder::new() - .insert_object("d", |o| o - .insert("session_id", session_id) - .insert("seq", self.last_sequence) - .insert("token", &self.token) - ) - .insert("op", OpCode::Resume.num()) - .build())); - - let first_event; - - loop { - match try!(receiver.recv_json(GatewayEvent::decode)) { - GatewayEvent::Dispatch(seq, event) => { - if let Event::Ready(ref event) = event { - self.session_id = Some(event.ready.session_id.clone()); - } - - self.last_sequence = seq; - first_event = event; - - break; - }, - GatewayEvent::InvalidateSession => { - try!(sender.send_json(&identify(&self.token, self.shard_info))); - } - other => { - debug!("Unexpected event: {:?}", other); - - return Err(Error::Connection(ConnectionError::InvalidHandshake)); - } - } - } - - let _ = self.keepalive_channel.send(Status::ChangeSender(sender)); - - Ok((first_event, receiver)) - } - - pub fn shutdown(&mut self, receiver: &mut Receiver) - -> Result<()> { - let stream = receiver.get_mut().get_mut(); - - { - let mut sender = Sender::new(stream.by_ref(), true); - let message = WsMessage::close_because(1000, ""); - - try!(sender.send_message(&message)); - } - - try!(stream.flush()); - try!(stream.shutdown(Shutdown::Both)); - - Ok(()) - } - - pub fn sync_guilds(&self, guild_ids: &[GuildId]) { - let msg = ObjectBuilder::new() - .insert("op", OpCode::SyncGuild.num()) - .insert_array("d", |a| guild_ids.iter().fold(a, |a, s| a.push(s.0))) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - - pub fn sync_calls(&self, channels: &[ChannelId]) { - for &channel in channels { - let msg = ObjectBuilder::new() - .insert("op", OpCode::SyncCall.num()) - .insert_object("d", |obj| obj - .insert("channel_id", channel.0) - ) - .build(); - - let _ = self.keepalive_channel.send(Status::SendMessage(msg)); - } - } -} - -#[inline] -fn parse_ready(event: GatewayEvent, - tx: &MpscSender, - receiver: &mut Receiver, - identification: Value) - -> Result<(ReadyEvent, u64)> { - match event { - GatewayEvent::Dispatch(seq, Event::Ready(event)) => { - Ok((event, seq)) - }, - GatewayEvent::InvalidateSession => { - debug!("Session invalidation"); - - let _ = tx.send(Status::SendMessage(identification)); - - match try!(receiver.recv_json(GatewayEvent::decode)) { - GatewayEvent::Dispatch(seq, Event::Ready(event)) => { - Ok((event, seq)) - }, - other => { - debug!("Unexpected event: {:?}", other); - - Err(Error::Connection(ConnectionError::InvalidHandshake)) - }, - } - }, - other => { - debug!("Unexpected event: {:?}", other); - - Err(Error::Connection(ConnectionError::InvalidHandshake)) - }, - } -} - -fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value { - ObjectBuilder::new() - .insert("op", OpCode::Identify.num()) - .insert_object("d", |mut object| { - object = identify_compression(object) - .insert("large_threshold", 250) // max value - .insert_object("properties", |object| object - .insert("$browser", "Feature-full and ergonomic discord rust library") - .insert("$device", "serenity") - .insert("$os", env::consts::OS) - .insert("$referrer", "") - .insert("$referring_domain", "") - ) - .insert("token", token) - .insert("v", constants::GATEWAY_VERSION); - - if let Some(shard_info) = shard_info { - object = object - .insert_array("shard", |a| a - .push(shard_info[0]) - .push(shard_info[1])); - } - - object - }) - .build() -} - -#[cfg(not(feature = "debug"))] -fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { - object.insert("compression", true) -} - -#[cfg(feature = "debug")] -fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { - object.insert("compression", false) -} - -fn build_gateway_url(base: &str) -> Result { - RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) - .map_err(|_| Error::Client(ClientError::Gateway)) -} - -fn keepalive(interval: u64, - mut sender: Sender, - channel: MpscReceiver) { - let mut base_interval = Duration::milliseconds(interval as i64); - let mut next_tick = time::get_time() + base_interval; - - let mut last_sequence = 0; - - 'outer: loop { - thread::sleep(StdDuration::from_millis(100)); - - loop { - match channel.try_recv() { - Ok(Status::ChangeInterval(interval)) => { - base_interval = Duration::milliseconds(interval as i64); - }, - Ok(Status::ChangeSender(new_sender)) => { - sender = new_sender; - }, - Ok(Status::SendMessage(val)) => { - if let Err(why) = sender.send_json(&val) { - warn!("Err sending message: {:?}", why); - } - }, - Ok(Status::Sequence(seq)) => { - last_sequence = seq; - }, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => break 'outer, - } - } - - if time::get_time() >= next_tick { - next_tick = next_tick + base_interval; - - let map = ObjectBuilder::new() - .insert("d", last_sequence) - .insert("op", OpCode::Heartbeat.num()) - .build(); - - if let Err(why) = sender.send_json(&map) { - warn!("Err sending keepalive: {:?}", why); - } - } - } - - let _ = sender.get_mut().shutdown(Shutdown::Both); -} diff --git a/src/client/context.rs b/src/client/context.rs index b1cc57f..7008f9b 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -2,7 +2,7 @@ use serde_json::builder::ObjectBuilder; use std::collections::HashMap; use std::io::Read; use std::sync::{Arc, Mutex}; -use super::connection::Connection; +use super::gateway::Shard; use super::http; use super::login_type::LoginType; use ::utils::builder::{ @@ -26,7 +26,7 @@ use super::STATE; /// The context is a general utility struct provided on event dispatches, which /// helps with dealing with the current "context" of the event dispatch, /// and providing helper methods where possible. The context also acts as a -/// general high-level interface over the associated [`Connection`] which +/// general high-level interface over the associated [`Shard`] which /// received the event, or the low-level [`http`] module. /// /// For example, when the [`Client::on_message`] handler is dispatched to, the @@ -35,8 +35,8 @@ use super::STATE; /// post its given argument to the associated channel for you as a [`Message`]. /// /// Additionally, the context contains "shortcuts", like for interacting with -/// the connection. Methods like [`set_game`] will unlock the connection and -/// perform an update for you to save a bit of work. +/// the shard. Methods like [`set_game`] will unlock the shard and perform an +/// update for you to save a bit of work. /// /// A context will only live for the event it was dispatched for. After the /// event handler finished, it is destroyed and will not be re-used. @@ -65,10 +65,10 @@ use super::STATE; /// /// [`Channel`]: ../model/enum.Channel.html /// [`Client::on_message`]: struct.Client.html#method.on_message -/// [`Connection`]: struct.Connection.html /// [`LiveGuild`]: ../model/struct.LiveGuild.html /// [`Message`]: ../model/struct.Message.html /// [`PublicChannel`]: ../model/struct.PublicChannel.html +/// [`Shard`]: gateway/struct.Shard.html /// [`State`]: ../ext/state/struct.State.html /// [`get_channel`]: #method.get_channel /// [`http`]: http/index.html @@ -81,11 +81,11 @@ pub struct Context { /// /// [`on_message`]: struct.Client.html#method.on_message pub channel_id: Option, - /// The associated connection which dispatched the event handler. + /// The associated shard which dispatched the event handler. /// /// Note that if you are sharding, in relevant terms, this is the shard /// which received the event being dispatched. - pub connection: Arc>, + pub shard: Arc>, login_type: LoginType, } @@ -99,11 +99,11 @@ impl Context { /// documentation. #[doc(hidden)] pub fn new(channel_id: Option, - connection: Arc>, + shard: Arc>, login_type: LoginType) -> Context { Context { channel_id: channel_id, - connection: connection, + shard: shard, login_type: login_type, } } @@ -1227,7 +1227,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn online(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Online); + self.shard.lock().unwrap().set_status(OnlineStatus::Online); } /// Sets the current user as being [`Idle`]. This maintains the current @@ -1235,7 +1235,7 @@ impl Context { /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle pub fn idle(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Idle); + self.shard.lock().unwrap().set_status(OnlineStatus::Idle); } /// Sets the current user as being [`DoNotDisturb`]. This maintains the @@ -1243,7 +1243,7 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb pub fn dnd(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::DoNotDisturb); + self.shard.lock().unwrap().set_status(OnlineStatus::DoNotDisturb); } /// Sets the current user as being [`Invisible`]. This maintains the current @@ -1251,7 +1251,7 @@ impl Context { /// /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible pub fn invisible(&self) { - self.connection.lock().unwrap().set_status(OnlineStatus::Invisible); + self.shard.lock().unwrap().set_status(OnlineStatus::Invisible); } /// "Resets" the current user's presence, by setting the game to `None`, @@ -1262,7 +1262,7 @@ impl Context { /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence pub fn reset_presence(&self) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(None, OnlineStatus::Online, false) } @@ -1284,7 +1284,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn set_game(&self, game: Game) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(Some(game), OnlineStatus::Online, false); } @@ -1309,7 +1309,7 @@ impl Context { url: None, }; - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(Some(game), OnlineStatus::Online, false); } @@ -1349,7 +1349,7 @@ impl Context { game: Option, status: OnlineStatus, afk: bool) { - self.connection.lock() + self.shard.lock() .unwrap() .set_presence(game, status, afk) } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 8c10e75..96c8e69 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -2,7 +2,8 @@ use std::sync::{Arc, Mutex}; use std::thread; use super::event_store::EventStore; use super::login_type::LoginType; -use super::{Connection, Context}; +use super::Context; +use super::gateway::Shard; use ::model::{ChannelId, Event, Message}; #[cfg(feature="framework")] @@ -35,14 +36,14 @@ macro_rules! update { } fn context(channel_id: Option, - conn: Arc>, + conn: Arc>, login_type: LoginType) -> Context { Context::new(channel_id, conn, login_type) } #[cfg(feature="framework")] pub fn dispatch(event: Event, - conn: Arc>, + conn: Arc>, framework: Arc>, login_type: LoginType, event_store: Arc>) { @@ -69,7 +70,7 @@ pub fn dispatch(event: Event, #[cfg(not(feature="framework"))] pub fn dispatch(event: Event, - conn: Arc>, + conn: Arc>, login_type: LoginType, event_store: Arc>) { match event { @@ -99,7 +100,7 @@ fn dispatch_message(context: Context, #[allow(cyclomatic_complexity)] fn handle_event(event: Event, - conn: Arc>, + conn: Arc>, login_type: LoginType, event_store: Arc>) { match event { diff --git a/src/client/error.rs b/src/client/error.rs new file mode 100644 index 0000000..9b542ec --- /dev/null +++ b/src/client/error.rs @@ -0,0 +1,147 @@ +use hyper::status::StatusCode; +use ::model::{ChannelType, Permissions}; + +/// An error returned from the [`Client`] or the [`Context`], or model instance. +/// +/// This is always wrapped within the library's generic [`Error::Client`] +/// variant. +/// +/// # Examples +/// +/// Matching an [`Error`] with this variant may look something like the +/// following for the [`Client::ban`] method, which in this example is used to +/// re-ban all members with an odd discriminator: +/// +/// ```rust,no_run +/// use serenity::client::{Client, ClientError}; +/// use serenity::Error; +/// use std::env; +/// +/// let token = env::var("DISCORD_BOT_TOKEN").unwrap(); +/// let mut client = Client::login_bot(&token); +/// +/// client.on_member_unban(|context, guild_id, user| { +/// let discriminator = match user.discriminator.parse::() { +/// Ok(discriminator) => discriminator, +/// Err(_why) => return, +/// }; +/// +/// // If the user has an even discriminator, don't re-ban them. +/// if discriminator % 2 == 0 { +/// return; +/// } +/// +/// match context.ban(guild_id, user, 8) { +/// Ok(()) => { +/// // Ban successful. +/// }, +/// Err(Error::Client(ClientError::DeleteMessageDaysAmount(amount))) => { +/// println!("Failed deleting {} days' worth of messages", amount); +/// }, +/// Err(why) => { +/// println!("Unexpected error: {:?}", why); +/// }, +/// } +/// }); +/// ``` +/// +/// [`Client`]: struct.Client.html +/// [`Context`]: struct.Context.html +/// [`Context::ban`]: struct.Context.html#method.ban +/// [`Error::Client`]: ../enum.Error.html#variant.Client +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum Error { + /// When attempting to delete below or above the minimum and maximum allowed + /// number of messages. + BulkDeleteAmount, + /// When attempting to delete a number of days' worth of messages that is + /// not allowed. + DeleteMessageDaysAmount(u8), + /// When there was an error retrieving the gateway URI from the REST API. + Gateway, + /// An indication that a [guild][`LiveGuild`] could not be found by + /// [Id][`GuildId`] in the [`State`]. + /// + /// [`GuildId`]: ../model/struct.GuildId.html + /// [`LiveGuild`]: ../model/struct.LiveGuild.html + /// [`State`]: ../ext/state/struct.State.html + GuildNotFound, + InvalidOpCode, + /// When attempting to perform an action which is only available to user + /// accounts. + InvalidOperationAsBot, + /// When attempting to perform an action which is only available to bot + /// accounts. + InvalidOperationAsUser, + /// Indicates that you do not have the required permissions to perform an + /// operation. + /// + /// The provided [`Permission`]s is the set of required permissions + /// required. + /// + /// [`Permission`]: ../model/permissions/struct.Permissions.html + InvalidPermissions(Permissions), + /// An indicator that the shard data received from the gateway is invalid. + InvalidShards, + /// When the token provided is invalid. This is returned when validating a + /// token through the [`validate_token`] function. + /// + /// [`validate_token`]: fn.validate_token.html + InvalidToken, + /// An indicator that the [current user] can not perform an action. + /// + /// [current user]: ../model/struct.CurrentUser.html + InvalidUser, + /// An indicator that an item is missing from the [`State`], and the action + /// can not be continued. + /// + /// [`State`]: ../ext/state/struct.State.html + ItemMissing, + /// Indicates that a [`Message`]s content was too long and will not + /// successfully send, as the length is over 2000 codepoints, or 4000 bytes. + /// + /// The number of bytes larger than the limit is provided. + /// + /// [`Message`]: ../model/struct.Message.html + MessageTooLong(u64), + /// When attempting to use a [`Context`] helper method which requires a + /// contextual [`ChannelId`], but the current context is not appropriate for + /// the action. + /// + /// [`ChannelId`]: ../model/struct.ChannelId.html + /// [`Context`]: struct.Context.html + NoChannelId, + /// When the decoding of a ratelimit header could not be properly decoded + /// into an `i64`. + RateLimitI64, + /// When the decoding of a ratelimit header could not be properly decoded + /// from UTF-8. + RateLimitUtf8, + /// When attempting to find a required record from the State could not be + /// found. This is required in methods such as [`Context::edit_role`]. + /// + /// [`Context::edit_role`]: struct.Context.html#method.edit_role + RecordNotFound, + /// When the shard being retrieved from within the Client could not be + /// found after being inserted into the Client's internal vector of + /// [`Shard`]s. + /// + /// This can be returned from one of the options for starting one or + /// multiple shards. + /// + /// **This should never be received.** + /// + /// [`Shard`]: gateway/struct.Shard.html + ShardUnknown, + /// When a function such as [`Context::edit_channel`] did not expect the + /// received [`ChannelType`]. + /// + /// [`ChannelType`]: ../model/enum.ChannelType.html + /// [`Context::edit_channel`]: struct.Context.html#method.edit_channel + UnexpectedChannelType(ChannelType), + /// When a status code was unexpectedly received for a request's status. + UnexpectedStatusCode(StatusCode), + /// When a status is received, but the verification to ensure the response + /// is valid does not recognize the status. + UnknownStatus(u16), +} diff --git a/src/client/gateway/error.rs b/src/client/gateway/error.rs new file mode 100644 index 0000000..fb44d3f --- /dev/null +++ b/src/client/gateway/error.rs @@ -0,0 +1,27 @@ +use std::fmt::{self, Display}; + +#[derive(Clone, Debug)] +pub enum Error { + /// The connection closed + Closed(Option, String), + /// Expected a Hello during a handshake + ExpectedHello, + /// Expected a Ready or an InvalidateSession + InvalidHandshake, +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Closed(s, ref v) => { + f.write_str(&format!("Connection closed {:?}: {:?}", s, v)) + }, + Error::ExpectedHello => { + f.write_str("Expected Hello during handshake") + }, + Error::InvalidHandshake => { + f.write_str("Expected Ready or InvalidateSession") + }, + } + } +} diff --git a/src/client/gateway/mod.rs b/src/client/gateway/mod.rs new file mode 100644 index 0000000..aa9722e --- /dev/null +++ b/src/client/gateway/mod.rs @@ -0,0 +1,60 @@ +//! The gateway module contains the pieces - primarily the [`Shard`] - +//! responsible for maintaing a WebSocket connection with Discord. +//! +//! A shard is an interface for the lower-level receiver and sender. It provides +//! what can otherwise be thought of as "sugar methods". A shard represents a +//! single connection to Discord. If acting as a [`Bot`] user, you can make +//! use of a method named "sharding" to have multiple shards, potentially +//! offloading some server load to another server(s). +//! +//! # Sharding +//! +//! Sharding is a method to split portions of bots into separate processes. This +//! is an enforced strategy by Discord once a bot reaches a certain number of +//! guilds (2500). Once this number is reached, a bot must be sharded in a way +//! that only 2500 guilds maximum may be allocated per shard. +//! +//! The "recommended" number of guilds per shard is _around_ 1000. Sharding can +//! be useful for splitting processes across separate servers. Often you may +//! want some or all shards to be in the same process, allowing for a shared +//! State. This is possible through this library. +//! +//! See [Discord's documentation][docs] for more information. +//! +//! If you are not using a bot account or do not require sharding - such as for +//! a small bot - then use [`Client::start`]. +//! +//! There are a few methods of sharding available: +//! +//! - [`Client::start_autosharded`]: retrieves the number of shards Discord +//! recommends using from the API, and then automatically starts that number of +//! shards. +//! - [`Client::start_shard`]: starts a single shard for use in the instance, +//! handled by the instance of the Client. Use this if you only want 1 shard +//! handled by this instance. +//! - [`Client::start_shards`]: starts all shards in this instance. This is best +//! for when you want a completely shared State. +//! - [`Client::start_shard_range`]: start a range of shards within this +//! instance. This should be used when you, for example, want to split 10 shards +//! across 3 instances. +//! +//! **Note**: User accounts can not shard. Use [`Client::start`]. +//! +//! [`Bot`]: ../enum.LoginType.html#variant.Bot +//! [`Client`]: ../struct.Client.html +//! [`Client::start`]: ../struct.Client.html#method.start +//! [`Client::start_autosharded`]: ../struct.Client.html#method.start_autosharded +//! [`Client::start_shard`]: ../struct.Client.html#method.start_shard +//! [`Client::start_shard_range`]: ../struct.Client.html#method.start_shard_range +//! [`Client::start_shards`]: ../struct.Client.html#method.start_shards +//! [`Shard`]: struct.Shard.html +//! [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding + +mod error; +mod prep; +mod shard; +mod status; + +pub use self::error::Error as GatewayError; +pub use self::shard::Shard; +pub use self::status::Status as GatewayStatus; diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs new file mode 100644 index 0000000..bf4e9b3 --- /dev/null +++ b/src/client/gateway/prep.rs @@ -0,0 +1,146 @@ +use serde_json::builder::ObjectBuilder; +use serde_json::Value; +use std::net::Shutdown; +use std::sync::mpsc::{ + TryRecvError, + Receiver as MpscReceiver, + Sender as MpscSender +}; +use std::time::Duration as StdDuration; +use std::{env, thread}; +use super::super::ClientError; +use super::{GatewayError, GatewayStatus}; +use time::{self, Duration}; +use websocket::client::request::Url as RequestUrl; +use websocket::client::{Receiver, Sender}; +use websocket::stream::WebSocketStream; +use ::constants::{self, OpCode}; +use ::error::{Error, Result}; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::model::{Event, GatewayEvent, ReadyEvent}; + +#[inline] +pub fn parse_ready(event: GatewayEvent, + tx: &MpscSender, + receiver: &mut Receiver, + identification: Value) + -> Result<(ReadyEvent, u64)> { + match event { + GatewayEvent::Dispatch(seq, Event::Ready(event)) => { + Ok((event, seq)) + }, + GatewayEvent::InvalidateSession => { + debug!("Session invalidation"); + + let _ = tx.send(GatewayStatus::SendMessage(identification)); + + match try!(receiver.recv_json(GatewayEvent::decode)) { + GatewayEvent::Dispatch(seq, Event::Ready(event)) => { + Ok((event, seq)) + }, + other => { + debug!("Unexpected event: {:?}", other); + + Err(Error::Gateway(GatewayError::InvalidHandshake)) + }, + } + }, + other => { + debug!("Unexpected event: {:?}", other); + + Err(Error::Gateway(GatewayError::InvalidHandshake)) + }, + } +} + +pub fn identify(token: &str, shard_info: Option<[u8; 2]>) -> Value { + ObjectBuilder::new() + .insert("op", OpCode::Identify.num()) + .insert_object("d", |mut object| { + object = identify_compression(object) + .insert("large_threshold", 250) // max value + .insert_object("properties", |object| object + .insert("$browser", "Feature-full and ergonomic discord rust library") + .insert("$device", "serenity") + .insert("$os", env::consts::OS) + .insert("$referrer", "") + .insert("$referring_domain", "") + ) + .insert("token", token) + .insert("v", constants::GATEWAY_VERSION); + + if let Some(shard_info) = shard_info { + object = object + .insert_array("shard", |a| a + .push(shard_info[0]) + .push(shard_info[1])); + } + + object + }) + .build() +} + +#[cfg(not(feature = "debug"))] +pub fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { + object.insert("compression", true) +} + +#[cfg(feature = "debug")] +pub fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { + object.insert("compression", false) +} + +pub fn build_gateway_url(base: &str) -> Result { + RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) + .map_err(|_| Error::Client(ClientError::Gateway)) +} + +pub fn keepalive(interval: u64, + mut sender: Sender, + channel: MpscReceiver) { + let mut base_interval = Duration::milliseconds(interval as i64); + let mut next_tick = time::get_time() + base_interval; + + let mut last_sequence = 0; + + 'outer: loop { + thread::sleep(StdDuration::from_millis(100)); + + loop { + match channel.try_recv() { + Ok(GatewayStatus::ChangeInterval(interval)) => { + base_interval = Duration::milliseconds(interval as i64); + }, + Ok(GatewayStatus::ChangeSender(new_sender)) => { + sender = new_sender; + }, + Ok(GatewayStatus::SendMessage(val)) => { + if let Err(why) = sender.send_json(&val) { + warn!("Err sending message: {:?}", why); + } + }, + Ok(GatewayStatus::Sequence(seq)) => { + last_sequence = seq; + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => break 'outer, + } + } + + if time::get_time() >= next_tick { + next_tick = next_tick + base_interval; + + let map = ObjectBuilder::new() + .insert("d", last_sequence) + .insert("op", OpCode::Heartbeat.num()) + .build(); + + if let Err(why) = sender.send_json(&map) { + warn!("Err sending keepalive: {:?}", why); + } + } + } + + let _ = sender.get_mut().shutdown(Shutdown::Both); +} diff --git a/src/client/gateway/shard.rs b/src/client/gateway/shard.rs new file mode 100644 index 0000000..d85e341 --- /dev/null +++ b/src/client/gateway/shard.rs @@ -0,0 +1,506 @@ +use serde_json::builder::ObjectBuilder; +use std::io::Write; +use std::net::Shutdown; +use std::sync::mpsc::{self, Sender as MpscSender}; +use std::thread::{self, Builder as ThreadBuilder}; +use std::time::Duration as StdDuration; +use std::mem; +use super::super::login_type::LoginType; +use super::super::Client; +use super::{GatewayError, GatewayStatus, prep}; +use websocket::client::{Client as WsClient, Sender, Receiver}; +use websocket::message::Message as WsMessage; +use websocket::stream::WebSocketStream; +use websocket::ws::sender::Sender as WsSender; +use ::constants::OpCode; +use ::internal::prelude::*; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::model::{ + ChannelId, + Event, + Game, + GatewayEvent, + GuildId, + OnlineStatus, + ReadyEvent, +}; + +#[cfg(feature="voice")] +use ::ext::voice::Manager as VoiceManager; + +type CurrentPresence = (Option, OnlineStatus, bool); + +/// A Shard is a higher-level handler for a websocket connection to Discord's +/// gateway. The shard allows for sending and receiving messages over the +/// websocket, such as setting the active game, reconnecting, syncing guilds, +/// and more. +/// +/// Refer to the [module-level documentation][module docs] for information on +/// effectively using multiple shards, if you need to. +/// +/// # Stand-alone shards +/// +/// You may instantiate a shard yourself - decoupled from the [`Client`] - if +/// you need to. For most use cases, you will not need to do this, and you can +/// leave the client to do it. +/// +/// This can be done by passing in the required parameters to [`new`]. You can +/// then manually handle the shard yourself and receive events via +/// [`receive`]. +/// +/// **Note**: You _really_ do not need to do this. Just call one of the +/// appropriate methods on the [`Client`]. +/// +/// # Examples +/// +/// See the documentation for [`new`] on how to use this. +/// +/// [`Client`]: struct.Client.html +/// [`new`]: #method.new +/// [`receive`]: #method.receive +/// [docs]: https://discordapp.com/developers/docs/topics/gateway#sharding +/// [module docs]: index.html#sharding +pub struct Shard { + current_presence: CurrentPresence, + keepalive_channel: MpscSender, + last_sequence: u64, + login_type: LoginType, + session_id: Option, + shard_info: Option<[u8; 2]>, + token: String, + ws_url: String, + #[cfg(feature = "voice")] + pub manager: VoiceManager, +} + +impl Shard { + /// Instantiates a new instance of a Shard, bypassing the client. + /// + /// **Note**: You should likely never need to do this yourself. + /// + /// # Examples + /// + /// Instantiating a new Shard manually for a bot with no shards, and + /// then listening for events: + /// + /// ```rust,ignore + /// use serenity::client::gateway::Shard; + /// use serenity::client::{LoginType, http}; + /// use std::env; + /// + /// let token = env::var("DISCORD_BOT_TOKEN").expect("Token in environment"); + /// // retrieve the gateway response, which contains the URL to connect to + /// let gateway = http::get_gateway().expect("Valid gateway response").url; + /// let shard = Shard::new(&gateway, &token, None, LoginType::Bot) + /// .expect("Working shard"); + /// + /// // at this point, you can create a `loop`, and receive events and match + /// // their variants + /// ``` + pub fn new(base_url: &str, + token: &str, + shard_info: Option<[u8; 2]>, + login_type: LoginType) + -> Result<(Shard, ReadyEvent, Receiver)> { + let url = try!(prep::build_gateway_url(base_url)); + + let response = try!(try!(WsClient::connect(url)).send()); + try!(response.validate()); + + let (mut sender, mut receiver) = response.begin().split(); + + let identification = prep::identify(token, shard_info); + try!(sender.send_json(&identification)); + + let heartbeat_interval = match try!(receiver.recv_json(GatewayEvent::decode)) { + GatewayEvent::Hello(interval) => interval, + other => { + debug!("Unexpected event during connection start: {:?}", other); + + return Err(Error::Gateway(GatewayError::ExpectedHello)); + }, + }; + + let (tx, rx) = mpsc::channel(); + let thread_name = match shard_info { + Some(info) => format!("serenity keepalive [shard {}/{}]", + info[0], + info[1] - 1), + None => "serenity keepalive [unsharded]".to_owned(), + }; + try!(ThreadBuilder::new() + .name(thread_name) + .spawn(move || prep::keepalive(heartbeat_interval, sender, rx))); + + // Parse READY + let event = try!(receiver.recv_json(GatewayEvent::decode)); + let (ready, sequence) = try!(prep::parse_ready(event, + &tx, + &mut receiver, + identification)); + + Ok((feature_voice! {{ + Shard { + current_presence: (None, OnlineStatus::Online, false), + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + manager: VoiceManager::new(tx, ready.ready.user.id.0), + } + } else { + Shard { + current_presence: (None, OnlineStatus::Online, false), + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + } + }}, ready, receiver)) + } + + pub fn shard_info(&self) -> Option<[u8; 2]> { + self.shard_info + } + + /// Sets whether the current user is afk. This helps Discord determine where + /// to send notifications. + /// + /// Other presence settings are maintained. + pub fn set_afk(&mut self, afk: bool) { + self.current_presence.2 = afk; + + self.update_presence(); + } + + /// Sets the user's current game, if any. + /// + /// Other presence settings are maintained. + pub fn set_game(&mut self, game: Option) { + self.current_presence.0 = game; + + self.update_presence(); + } + + /// Sets the user's current online status. + /// + /// Note that [`Offline`] is not a valid presence, so it is automatically + /// converted to [`Invisible`]. + /// + /// Other presence settings are maintained. + pub fn set_status(&mut self, online_status: OnlineStatus) { + self.current_presence.1 = match online_status { + OnlineStatus::Offline => OnlineStatus::Invisible, + other => other, + }; + + self.update_presence(); + } + + /// Sets the user's full presence information. + /// + /// Consider using the individual setters if you only need to modify one of + /// these. + /// + /// # Examples + /// + /// Set the current user as playing `"Heroes of the Storm"`, being online, + /// and not being afk: + /// + /// ```rust,ignore + /// use serenity::model::{Game, OnlineStatus}; + /// + /// // assuming you are in a context + /// + /// context.shard.lock() + /// .unwrap() + /// .set_presence(Game::playing("Heroes of the Storm"), + /// OnlineStatus::Online, + /// false); + /// ``` + pub fn set_presence(&mut self, + game: Option, + status: OnlineStatus, + afk: bool) { + let status = match status { + OnlineStatus::Offline => OnlineStatus::Invisible, + other => other, + }; + + self.current_presence = (game, status, afk); + + self.update_presence(); + } + + pub fn handle_event(&mut self, + event: Result, + mut receiver: &mut Receiver) + -> Result>)>> { + match event { + Ok(GatewayEvent::Dispatch(sequence, event)) => { + let status = GatewayStatus::Sequence(sequence); + let _ = self.keepalive_channel.send(status); + + self.handle_dispatch(&event); + + Ok(Some((event, None))) + }, + Ok(GatewayEvent::Heartbeat(sequence)) => { + let map = ObjectBuilder::new() + .insert("d", sequence) + .insert("op", OpCode::Heartbeat.num()) + .build(); + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(map)); + + Ok(None) + }, + Ok(GatewayEvent::HeartbeatAck) => { + Ok(None) + }, + Ok(GatewayEvent::Hello(interval)) => { + let _ = self.keepalive_channel.send(GatewayStatus::ChangeInterval(interval)); + + Ok(None) + }, + Ok(GatewayEvent::InvalidateSession) => { + self.session_id = None; + + let identification = prep::identify(&self.token, self.shard_info); + + let status = GatewayStatus::SendMessage(identification); + + let _ = self.keepalive_channel.send(status); + + Ok(None) + }, + Ok(GatewayEvent::Reconnect) => { + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) + }, + Err(Error::Gateway(GatewayError::Closed(num, message))) => { + warn!("Closing with {:?}: {:?}", num, message); + + // Attempt to resume if the following was not received: + // + // - 1000: Close. + // + // Otherwise, fallback to reconnecting. + if num != Some(1000) { + if let Some(session_id) = self.session_id.clone() { + match self.resume(session_id, receiver) { + Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), + Err(why) => debug!("Err resuming: {:?}", why), + } + } + } + + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) + }, + Err(Error::WebSocket(why)) => { + warn!("Websocket error: {:?}", why); + info!("Reconnecting"); + + // Attempt to resume if the following was not received: + // + // - InvalidateSession. + // + // Otherwise, fallback to reconnecting. + if let Some(session_id) = self.session_id.clone() { + match self.resume(session_id, &mut receiver) { + Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), + Err(why) => debug!("Err resuming: {:?}", why), + } + } + + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) + }, + Err(error) => Err(error), + } + } + + pub fn shutdown(&mut self, receiver: &mut Receiver) + -> Result<()> { + let stream = receiver.get_mut().get_mut(); + + { + let mut sender = Sender::new(stream.by_ref(), true); + let message = WsMessage::close_because(1000, ""); + + try!(sender.send_message(&message)); + } + + try!(stream.flush()); + try!(stream.shutdown(Shutdown::Both)); + + Ok(()) + } + + pub fn sync_calls(&self, channels: &[ChannelId]) { + for &channel in channels { + let msg = ObjectBuilder::new() + .insert("op", OpCode::SyncCall.num()) + .insert_object("d", |obj| obj + .insert("channel_id", channel.0) + ) + .build(); + + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); + } + } + + pub fn sync_guilds(&self, guild_ids: &[GuildId]) { + let msg = ObjectBuilder::new() + .insert("op", OpCode::SyncGuild.num()) + .insert_array("d", |a| guild_ids.iter().fold(a, |a, s| a.push(s.0))) + .build(); + + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); + } + + fn handle_dispatch(&mut self, event: &Event) { + if let &Event::Resumed(ref ev) = event { + let status = GatewayStatus::ChangeInterval(ev.heartbeat_interval); + + let _ = self.keepalive_channel.send(status); + } + + feature_voice_enabled! {{ + if let &Event::VoiceStateUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } + } + + if let &Event::VoiceServerUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + }} + } + + fn reconnect(&mut self, mut receiver: &mut Receiver) -> Result<(Event, Receiver)> { + debug!("Reconnecting"); + + // Take a few attempts at reconnecting; otherwise fall back to + // re-instantiating the connection. + for _ in 0..3 { + let shard = Shard::new(&self.ws_url, + &self.token, + self.shard_info, + self.login_type); + + if let Ok((shard, ready, receiver_new)) = shard { + try!(mem::replace(self, shard).shutdown(&mut receiver)); + + self.session_id = Some(ready.ready.session_id.clone()); + + return Ok((Event::Ready(ready), receiver_new)); + } + + thread::sleep(StdDuration::from_secs(1)); + } + + // If all else fails: get a new endpoint. + // + // A bit of complexity here: instantiate a temporary instance of a + // Client. This client _does not_ replace the current client(s) that the + // user has. This client will then connect to gateway. This new + // shard will be used to replace _this_ shard. + let (shard, ready, receiver_new) = { + let mut client = Client::login_raw(&self.token.clone(), + self.login_type); + + try!(client.boot_shard(self.shard_info)) + }; + + // Replace this shard with a new one, and shutdown the now-old + // shard. + try!(mem::replace(self, shard).shutdown(&mut receiver)); + + self.session_id = Some(ready.ready.session_id.clone()); + + Ok((Event::Ready(ready), receiver_new)) + } + + fn resume(&mut self, session_id: String, receiver: &mut Receiver) + -> Result<(Event, Receiver)> { + try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both)); + let url = try!(prep::build_gateway_url(&self.ws_url)); + + let response = try!(try!(WsClient::connect(url)).send()); + try!(response.validate()); + + let (mut sender, mut receiver) = response.begin().split(); + + try!(sender.send_json(&ObjectBuilder::new() + .insert_object("d", |o| o + .insert("session_id", session_id) + .insert("seq", self.last_sequence) + .insert("token", &self.token) + ) + .insert("op", OpCode::Resume.num()) + .build())); + + let first_event; + + loop { + match try!(receiver.recv_json(GatewayEvent::decode)) { + GatewayEvent::Dispatch(seq, event) => { + if let Event::Ready(ref event) = event { + self.session_id = Some(event.ready.session_id.clone()); + } + + self.last_sequence = seq; + first_event = event; + + break; + }, + GatewayEvent::InvalidateSession => { + try!(sender.send_json(&prep::identify(&self.token, self.shard_info))); + }, + other => { + debug!("Unexpected event: {:?}", other); + + return Err(Error::Gateway(GatewayError::InvalidHandshake)); + }, + } + } + + let _ = self.keepalive_channel.send(GatewayStatus::ChangeSender(sender)); + + Ok((first_event, receiver)) + } + + fn update_presence(&self) { + let (ref game, status, afk) = self.current_presence; + + let msg = ObjectBuilder::new() + .insert("op", OpCode::StatusUpdate.num()) + .insert_object("d", move |mut object| { + object = object.insert("since", 0) + .insert("afk", afk) + .insert("status", status.name()); + + match game.as_ref() { + Some(ref game) => { + object.insert_object("game", move |o| o + .insert("name", &game.name)) + }, + None => object.insert("game", Value::Null), + } + }) + .build(); + + let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(msg)); + } +} diff --git a/src/client/gateway/status.rs b/src/client/gateway/status.rs new file mode 100644 index 0000000..4409e65 --- /dev/null +++ b/src/client/gateway/status.rs @@ -0,0 +1,11 @@ +use serde_json::Value; +use websocket::client::Sender; +use websocket::stream::WebSocketStream; + +#[doc(hidden)] +pub enum Status { + SendMessage(Value), + Sequence(u64), + ChangeInterval(u64), + ChangeSender(Sender), +} diff --git a/src/client/mod.rs b/src/client/mod.rs index ee99c64..fad01eb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,8 +1,8 @@ //! The Client contains information about a single bot or user's token, as well //! as event handlers. Dispatching events to configured handlers and starting -//! the connection are handled directly via the client. In addition, the -//! [`http`] module and [`State`] are also automatically handled by the Client -//! module for you. +//! the shards' connections are handled directly via the client. In addition, +//! the [`http`] module and [`State`] are also automatically handled by the +//! Client module for you. //! //! A [`Context`] is provided for every handler. The context is an ergonomic //! method of accessing the lower-level http functions. @@ -22,24 +22,21 @@ //! [Client examples]: struct.Client.html#examples pub mod http; +pub mod gateway; -mod connection; mod context; mod dispatch; +mod error; mod event_store; mod login_type; -pub use self::connection::{ - Connection, - ConnectionError, - Status as ConnectionStatus -}; pub use self::context::Context; +pub use self::error::Error as ClientError; pub use self::login_type::LoginType; -use hyper::status::StatusCode; use self::dispatch::dispatch; use self::event_store::EventStore; +use self::gateway::Shard; use serde_json::builder::ObjectBuilder; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; @@ -47,7 +44,7 @@ use std::thread; use std::time::Duration; use websocket::client::Receiver; use websocket::stream::WebSocketStream; -use ::internal::prelude::*; +use ::internal::prelude::{Error, Result, Value}; use ::internal::ws_impl::ReceiverExt; use ::model::*; @@ -83,155 +80,10 @@ lazy_static! { pub static ref STATE: Arc> = Arc::new(Mutex::new(State::default())); } -/// An error returned from the [`Client`] or the [`Context`], or model instance. -/// -/// This is always wrapped within the library's generic [`Error::Client`] -/// variant. -/// -/// # Examples -/// -/// Matching an [`Error`] with this variant may look something like the -/// following for the [`Client::ban`] method, which in this example is used to -/// re-ban all members with an odd discriminator: -/// -/// ```rust,no_run -/// use serenity::client::{Client, ClientError}; -/// use serenity::Error; -/// use std::env; -/// -/// let token = env::var("DISCORD_BOT_TOKEN").unwrap(); -/// let mut client = Client::login_bot(&token); -/// -/// client.on_member_unban(|context, guild_id, user| { -/// let discriminator = match user.discriminator.parse::() { -/// Ok(discriminator) => discriminator, -/// Err(_why) => return, -/// }; -/// -/// // If the user has an even discriminator, don't re-ban them. -/// if discriminator % 2 == 0 { -/// return; -/// } -/// -/// match context.ban(guild_id, user, 8) { -/// Ok(()) => { -/// // Ban successful. -/// }, -/// Err(Error::Client(ClientError::DeleteMessageDaysAmount(amount))) => { -/// println!("Failed deleting {} days' worth of messages", amount); -/// }, -/// Err(why) => { -/// println!("Unexpected error: {:?}", why); -/// }, -/// } -/// }); -/// ``` -/// -/// [`Client`]: struct.Client.html -/// [`Context`]: struct.Context.html -/// [`Context::ban`]: struct.Context.html#method.ban -/// [`Error::Client`]: ../enum.Error.html#variant.Client -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum ClientError { - /// When attempting to delete below or above the minimum and maximum allowed - /// number of messages. - BulkDeleteAmount, - /// When the connection being retrieved from within the Client could not be - /// found after being inserted into the Client's internal vector of - /// [`Connection`]s. - /// - /// This can be returned from one of the options for starting one or - /// multiple connections. - /// - /// **This should never be received.** - /// - /// [`Connection`]: struct.Connection.html - ConnectionUnknown, - /// When attempting to delete a number of days' worth of messages that is - /// not allowed. - DeleteMessageDaysAmount(u8), - /// When there was an error retrieving the gateway URI from the REST API. - Gateway, - /// An indication that a [guild][`LiveGuild`] could not be found by - /// [Id][`GuildId`] in the [`State`]. - /// - /// [`GuildId`]: ../model/struct.GuildId.html - /// [`LiveGuild`]: ../model/struct.LiveGuild.html - /// [`State`]: ../ext/state/struct.State.html - GuildNotFound, - InvalidOpCode, - /// When attempting to perform an action which is only available to user - /// accounts. - InvalidOperationAsBot, - /// When attempting to perform an action which is only available to bot - /// accounts. - InvalidOperationAsUser, - /// Indicates that you do not have the required permissions to perform an - /// operation. - /// - /// The provided [`Permission`]s is the set of required permissions - /// required. - /// - /// [`Permission`]: ../model/permissions/struct.Permissions.html - InvalidPermissions(Permissions), - /// An indicator that the shard data received from the gateway is invalid. - InvalidShards, - /// When the token provided is invalid. This is returned when validating a - /// token through the [`validate_token`] function. - /// - /// [`validate_token`]: fn.validate_token.html - InvalidToken, - /// An indicator that the [current user] can not perform an action. - /// - /// [current user]: ../model/struct.CurrentUser.html - InvalidUser, - /// An indicator that an item is missing from the [`State`], and the action - /// can not be continued. - /// - /// [`State`]: ../ext/state/struct.State.html - ItemMissing, - /// Indicates that a [`Message`]s content was too long and will not - /// successfully send, as the length is over 2000 codepoints, or 4000 bytes. - /// - /// The number of bytes larger than the limit is provided. - /// - /// [`Message`]: ../model/struct.Message.html - MessageTooLong(u64), - /// When attempting to use a [`Context`] helper method which requires a - /// contextual [`ChannelId`], but the current context is not appropriate for - /// the action. - /// - /// [`ChannelId`]: ../model/struct.ChannelId.html - /// [`Context`]: struct.Context.html - NoChannelId, - /// When the decoding of a ratelimit header could not be properly decoded - /// into an `i64`. - RateLimitI64, - /// When the decoding of a ratelimit header could not be properly decoded - /// from UTF-8. - RateLimitUtf8, - /// When attempting to find a required record from the State could not be - /// found. This is required in methods such as [`Context::edit_role`]. - /// - /// [`Context::edit_role`]: struct.Context.html#method.edit_role - RecordNotFound, - /// When a function such as [`Context::edit_channel`] did not expect the - /// received [`ChannelType`]. - /// - /// [`ChannelType`]: ../model/enum.ChannelType.html - /// [`Context::edit_channel`]: struct.Context.html#method.edit_channel - UnexpectedChannelType(ChannelType), - /// When a status code was unexpectedly received for a request's status. - UnexpectedStatusCode(StatusCode), - /// When a status is received, but the verification to ensure the response - /// is valid does not recognize the status. - UnknownStatus(u16), -} - /// The Client is the way to "login" and be able to start sending authenticated -/// requests over the REST API, as well as initializing a WebSocket -/// [`Connection`]. Refer to `Connection`'s [information on using sharding] for -/// more information. +/// requests over the REST API, as well as initializing a WebSocket connection +/// through [`Shard`]s. Refer to the +/// [documentation on using sharding][sharding docs] for more information. /// /// # Event Handlers /// @@ -261,22 +113,22 @@ pub enum ClientError { /// client.start(); /// ``` /// -/// [`Connection`]: struct.Connection.html +/// [`Shard`]: gateway/struct.Shard.html /// [`on_message`]: #method.on_message /// [`Event::MessageCreate`]: ../model/enum.Event.html#variant.MessageCreate -/// [information on using sharding]: struct.Connection.html#sharding +/// [sharding docs]: gateway/index.html#sharding pub struct Client { - /// A vector of all active connections that have received their - /// [`Event::Ready`] payload, and have dispatched to [`on_ready`] if an - /// event handler was configured. + /// A vector of all active shards that have received their [`Event::Ready`] + /// payload, and have dispatched to [`on_ready`] if an event handler was + /// configured. /// /// [`Event::Ready`]: ../model/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready - pub connections: Vec>>, event_store: Arc>, #[cfg(feature="framework")] framework: Arc>, login_type: LoginType, + pub shards: Vec>>, token: String, } @@ -355,10 +207,10 @@ impl Client { /// less than 2500 guilds. If you have a reason for sharding and/or are in /// more than 2500 guilds, use one of these depending on your use case: /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start(&mut self) -> Result<()> { self.start_connection(None) } @@ -372,10 +224,10 @@ impl Client { /// from the API - determined by Discord - and then open a number of shards /// equivilant to that amount. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start_autosharded(&mut self) -> Result<()> { let res = try!(http::get_bot_gateway()); @@ -391,10 +243,10 @@ impl Client { /// you will need to start other processes with the other shard IDs in some /// way. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// - /// [connection docs]: struct.Connection.html#sharding + /// [gateway docs]: gateway/index.html#sharding pub fn start_shard(&mut self, shard: u8, shards: u8) -> Result<()> { self.start_connection(Some([shard, shard, shards])) } @@ -408,12 +260,12 @@ impl Client { /// you only need to start a single shard within the process, or a range of /// shards, use [`start_shard`] or [`start_shard_range`], respectively. /// - /// Refer to the [module-level documentation][connection docs] for more - /// information on effectively using sharding. + /// Refer to the [Gateway documentation][gateway docs] for more information + /// on effectively using sharding. /// /// [`start_shard`]: #method.start_shard /// [`start_shard_range`]: #method.start_shards - /// [connection docs]: struct.Connection.html#sharding + /// [Gateway docs]: gateway/index.html#sharding pub fn start_shards(&mut self, total_shards: u8) -> Result<()> { self.start_connection(Some([0, total_shards - 1, total_shards])) } @@ -428,7 +280,7 @@ impl Client { /// process, or all shards within the process, use [`start_shard`] or /// [`start_shards`], respectively. /// - /// Refer to the [module-level documentation][connection docs] for more + /// Refer to the [Gateway documentation][gateway docs] for more /// information on effectively using sharding. /// /// # Examples @@ -447,7 +299,7 @@ impl Client { /// /// [`start_shard`]: #method.start_shard /// [`start_shards`]: #method.start_shards - /// [connection docs]: struct.Connection.html#sharding + /// [Gateway docs]: gateway/index.html#sharding pub fn start_shard_range(&mut self, range: [u8; 2], total_shards: u8) -> Result<()> { self.start_connection(Some([range[0], range[1], total_shards])) @@ -729,7 +581,7 @@ impl Client { /// Register an event to be called whenever a Ready event is received. /// /// Registering a handler for the ready event is good for noting when your - /// bot has established a connection to the gateway. + /// bot has established a connection to the gateway through a [`Shard`]. /// /// **Note**: The Ready event is not guarenteed to be the first event you /// will receive by Discord. Do not actively rely on it. @@ -751,6 +603,7 @@ impl Client { /// ``` /// /// [`CurrentUser`]: ../model/struct.CurrentUser.html + /// [`Shard`]: gateway/struct.Shard.html pub fn on_ready(&mut self, handler: F) where F: Fn(Context, Ready) + Send + Sync + 'static { self.event_store.lock() @@ -868,13 +721,13 @@ impl Client { let gateway_url = try!(http::get_gateway()).url; for i in 0..shard_data.map_or(1, |x| x[1] + 1) { - let connection = Connection::new(&gateway_url, - &self.token, - shard_data.map(|s| [i, s[2]]), - self.login_type); - match connection { - Ok((connection, ready, receiver)) => { - self.connections.push(Arc::new(Mutex::new(connection))); + let shard = Shard::new(&gateway_url, + &self.token, + shard_data.map(|s| [i, s[2]]), + self.login_type); + match shard { + Ok((shard, ready, receiver)) => { + self.shards.push(Arc::new(Mutex::new(shard))); feature_state_enabled! {{ STATE.lock() @@ -882,22 +735,22 @@ impl Client { .update_with_ready(&ready); }} - match self.connections.last() { - Some(connection) => { + match self.shards.last() { + Some(shard) => { feature_framework! {{ dispatch(Event::Ready(ready), - connection.clone(), + shard.clone(), self.framework.clone(), self.login_type, self.event_store.clone()); } else { dispatch(Event::Ready(ready), - connection.clone(), + shard.clone(), self.login_type, self.event_store.clone()); }} - let connection_clone = connection.clone(); + let shard_clone = shard.clone(); let event_store = self.event_store.clone(); let login_type = self.login_type; @@ -905,22 +758,22 @@ impl Client { let framework = self.framework.clone(); thread::spawn(move || { - handle_connection(connection_clone, - framework, - login_type, - event_store, - receiver) + handle_shard(shard_clone, + framework, + login_type, + event_store, + receiver) }); } else { thread::spawn(move || { - handle_connection(connection_clone, - login_type, - event_store, - receiver) + handle_shard(shard_clone, + login_type, + event_store, + receiver) }); }} }, - None => return Err(Error::Client(ClientError::ConnectionUnknown)), + None => return Err(Error::Client(ClientError::ShardUnknown)), } }, Err(why) => return Err(why), @@ -934,16 +787,17 @@ impl Client { } } - // Boot up a new connection. This is used primarily in the scenario of - // re-instantiating a connection in the reconnect logic in another - // Connection. + // Boot up a new shard. This is used primarily in the scenario of + // re-instantiating a shard in the reconnect logic in another [`Shard`]. + // + // [`Shard`]: gateway/struct.Shard.html #[doc(hidden)] - pub fn boot_connection(&mut self, - shard_info: Option<[u8; 2]>) - -> Result<(Connection, ReadyEvent, Receiver)> { + pub fn boot_shard(&mut self, + shard_info: Option<[u8; 2]>) + -> Result<(Shard, ReadyEvent, Receiver)> { let gateway_url = try!(http::get_gateway()).url; - Connection::new(&gateway_url, &self.token, shard_info, self.login_type) + Shard::new(&gateway_url, &self.token, shard_info, self.login_type) } } @@ -1252,15 +1106,15 @@ impl Client { } #[cfg(feature="framework")] -fn handle_connection(connection: Arc>, - framework: Arc>, - login_type: LoginType, - event_store: Arc>, - mut receiver: Receiver) { +fn handle_shard(shard: Arc>, + framework: Arc>, + login_type: LoginType, + event_store: Arc>, + mut receiver: Receiver) { loop { let event = receiver.recv_json(GatewayEvent::decode); - let event = match connection.lock().unwrap().handle_event(event, &mut receiver) { + let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { Ok(Some(x)) => match x { (event, Some(new_receiver)) => { receiver = new_receiver; @@ -1273,7 +1127,7 @@ fn handle_connection(connection: Arc>, }; dispatch(event, - connection.clone(), + shard.clone(), framework.clone(), login_type, event_store.clone()); @@ -1281,14 +1135,14 @@ fn handle_connection(connection: Arc>, } #[cfg(not(feature="framework"))] -fn handle_connection(connection: Arc>, +fn handle_shard(shard: Arc>, login_type: LoginType, event_store: Arc>, mut receiver: Receiver) { loop { let event = receiver.recv_json(GatewayEvent::decode); - let event = match connection.lock().unwrap().handle_event(event, &mut receiver) { + let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { Ok(Some(x)) => match x { (event, Some(new_receiver)) => { receiver = new_receiver; @@ -1301,7 +1155,7 @@ fn handle_connection(connection: Arc>, }; dispatch(event, - connection.clone(), + shard.clone(), login_type, event_store.clone()); } @@ -1314,7 +1168,7 @@ fn login(token: &str, login_type: LoginType) -> Client { feature_framework! {{ Client { - connections: Vec::default(), + shards: Vec::default(), event_store: Arc::new(Mutex::new(EventStore::default())), framework: Arc::new(Mutex::new(Framework::default())), login_type: login_type, @@ -1322,7 +1176,7 @@ fn login(token: &str, login_type: LoginType) -> Client { } } else { Client { - connections: Vec::default(), + shards: Vec::default(), event_store: Arc::new(Mutex::new(EventStore::default())), login_type: login_type, token: token.to_owned(), diff --git a/src/error.rs b/src/error.rs index 84f4e04..8c442ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,7 +5,8 @@ use hyper::Error as HyperError; use serde_json::Error as JsonError; use serde_json::Value; use websocket::result::WebSocketError; -use ::client::{ClientError, ConnectionError}; +use ::client::gateway::GatewayError; +use ::client::ClientError; #[cfg(feature="voice")] use ::ext::voice::VoiceError; @@ -22,14 +23,14 @@ pub type Result = ::std::result::Result; /// A common error enum returned by most of the library's functionality within a /// custom [`Result`]. /// -/// The most common error types, the [`ClientError`] and [`ConnectionError`] +/// The most common error types, the [`ClientError`] and [`GatewayError`] /// enums, are both wrapped around this in the form of the [`Client`] and -/// [`Connection`] variants. +/// [`Gateway`] variants. /// /// [`Client`]: #variant.Client /// [`ClientError`]: client/enum.ClientError.html -/// [`Connection`]: #variant.Connection -/// [`ConnectionError`]: client/enum.ConnectionError.html +/// [`Gateway`]: #variant.Gateway +/// [`GatewayError`]: client/enum.GatewayError.html /// [`Result`]: type.Result.html #[derive(Debug)] pub enum Error { @@ -38,10 +39,10 @@ pub enum Error { /// [client]: client/index.html /// [http]: client/http/index.html Client(ClientError), - /// An error with the WebSocket [`Connection`]. + /// An error with the WebSocket [`Gateway`]. /// - /// [`Connection`]: client/struct.Connection.html - Connection(ConnectionError), + /// [`Gateway`]: client/gateway/index.html + Gateway(GatewayError), /// An error while decoding a payload. Decode(&'static str, Value), /// An error from the `hyper` crate. @@ -107,7 +108,7 @@ impl StdError for Error { fn description(&self) -> &str { match *self { Error::Client(_) => "Client refused a request", - Error::Connection(ref _inner) => "Connection error", + Error::Gateway(ref _inner) => "Gateway error", Error::Decode(msg, _) | Error::Other(msg) => msg, Error::Hyper(ref inner) => inner.description(), Error::Io(ref inner) => inner.description(), diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs index 8dc0ab1..1f2a5d2 100644 --- a/src/ext/voice/handler.rs +++ b/src/ext/voice/handler.rs @@ -2,7 +2,7 @@ use serde_json::builder::ObjectBuilder; use std::sync::mpsc::{self, Sender}; use super::connection_info::ConnectionInfo; use super::{Status as VoiceStatus, Target}; -use ::client::ConnectionStatus; +use ::client::gateway::GatewayStatus; use ::constants::VoiceOpCode; use ::model::{ChannelId, GuildId, VoiceState}; use super::threading; @@ -39,7 +39,7 @@ pub struct Handler { sender: Sender, session_id: Option, user_id: u64, - ws: Sender, + ws: Sender, } impl Handler { @@ -52,7 +52,7 @@ impl Handler { /// /// [`Manager::join`]: struct.Manager.html#method.join #[doc(hidden)] - pub fn new(target: Target, ws: Sender, user_id: u64) + pub fn new(target: Target, ws: Sender, user_id: u64) -> Self { let (tx, rx) = mpsc::channel(); @@ -260,7 +260,7 @@ impl Handler { .insert("self_mute", self.self_mute)) .build(); - let _ = self.ws.send(ConnectionStatus::SendMessage(map)); + let _ = self.ws.send(GatewayStatus::SendMessage(map)); } fn send(&mut self, status: VoiceStatus) { diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs index c6c7533..b4f3501 100644 --- a/src/ext/voice/manager.rs +++ b/src/ext/voice/manager.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::{Handler, Target}; -use ::client::ConnectionStatus; +use ::client::gateway::GatewayStatus; use ::model::{ChannelId, GuildId}; /// A manager is a struct responsible for managing [`Handler`]s which belong to @@ -23,12 +23,12 @@ use ::model::{ChannelId, GuildId}; pub struct Manager { handlers: HashMap, user_id: u64, - ws: MpscSender, + ws: MpscSender, } impl Manager { #[doc(hidden)] - pub fn new(ws: MpscSender, user_id: u64) -> Manager { + pub fn new(ws: MpscSender, user_id: u64) -> Manager { Manager { handlers: HashMap::new(), user_id: user_id, diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs index ab91dae..ea327fd 100644 --- a/src/internal/ws_impl.rs +++ b/src/internal/ws_impl.rs @@ -5,7 +5,7 @@ use websocket::message::{Message as WsMessage, Type as WsType}; use websocket::stream::WebSocketStream; use websocket::ws::receiver::Receiver as WsReceiver; use websocket::ws::sender::Sender as WsSender; -use ::client::ConnectionError; +use ::client::gateway::GatewayError; use ::internal::prelude::*; pub trait ReceiverExt { @@ -25,8 +25,8 @@ impl ReceiverExt for Receiver { let representation = String::from_utf8_lossy(&message.payload) .into_owned(); - Err(Error::Connection(ConnectionError::Closed(message.cd_status_code, - representation))) + Err(Error::Gateway(GatewayError::Closed(message.cd_status_code, + representation))) } else if message.opcode == WsType::Binary || message.opcode == WsType::Text { let json: Value = if message.opcode == WsType::Binary { try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..]))) @@ -44,8 +44,7 @@ impl ReceiverExt for Receiver { let representation = String::from_utf8_lossy(&message.payload) .into_owned(); - Err(Error::Connection(ConnectionError::Closed(None, - representation))) + Err(Error::Gateway(GatewayError::Closed(None, representation))) } } } -- cgit v1.2.3