diff options
| author | Austin Hellyer <[email protected]> | 2016-11-13 19:28:13 -0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-11-14 18:32:10 -0800 |
| commit | 7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch) | |
| tree | ca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src/client | |
| parent | Add internal module (diff) | |
| download | serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip | |
Add voice connection support
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/connection.rs | 198 | ||||
| -rw-r--r-- | src/client/context.rs | 2 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 2 | ||||
| -rw-r--r-- | src/client/http/mod.rs | 2 | ||||
| -rw-r--r-- | src/client/mod.rs | 10 |
5 files changed, 121 insertions, 93 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs index 93c967e..d1e37e5 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,4 +1,3 @@ -use flate2::read::ZlibDecoder; use serde_json::builder::ObjectBuilder; use serde_json; use std::fmt::{self, Display}; @@ -18,15 +17,46 @@ use super::Client; use time::{self, Duration}; use websocket::client::request::Url as RequestUrl; use websocket::client::{Client as WsClient, Sender, Receiver}; -use websocket::message::{Message as WsMessage, Type as WsType}; +use websocket::message::Message as WsMessage; use websocket::stream::WebSocketStream; -use websocket::ws::receiver::Receiver as WsReceiver; use websocket::ws::sender::Sender as WsSender; use ::constants::{self, OpCode}; -use ::model::*; use ::internal::prelude::*; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::model::{ + ChannelId, + Event, + Game, + GatewayEvent, + GuildId, + OnlineStatus, + ReadyEvent, +}; + +#[cfg(feature="voice")] +use ::ext::voice::Manager as VoiceManager; + +#[cfg(feature="voice")] +macro_rules! connection { + ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => { + Connection { + $($name1: $val1,)* + $($name2: $val2,)* + } + } +} -enum Status { +#[cfg(not(feature="voice"))] +macro_rules! connection { + ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => { + Connection { + $($name1: $val1,)* + } + } +} + +#[doc(hidden)] +pub enum Status { SendMessage(Value), Sequence(u64), ChangeInterval(u64), @@ -131,6 +161,8 @@ pub struct Connection { shard_info: Option<[u8; 2]>, token: String, ws_url: String, + #[cfg(feature = "voice")] + pub manager: VoiceManager, } impl Connection { @@ -181,8 +213,14 @@ impl Connection { }; let (tx, rx) = mpsc::channel(); + let thread_name = match shard_info { + Some(info) => format!("serenity keepalive [shard {}/{}]", + info[0], + info[1] - 1), + None => "serenity keepalive [unsharded]".to_owned(), + }; try!(ThreadBuilder::new() - .name("serenity keepalive".into()) + .name(thread_name) .spawn(move || keepalive(heartbeat_interval, sender, rx))); // Parse READY @@ -192,16 +230,30 @@ impl Connection { &mut receiver, identification)); - Ok((Connection { - keepalive_channel: tx, - last_sequence: sequence, - login_type: login_type, - receiver: receiver, - token: token.to_owned(), - session_id: Some(ready.ready.session_id.clone()), - shard_info: shard_info, - ws_url: base_url.to_owned(), - }, ready)) + Ok((feature_voice! {{ + Connection { + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + receiver: receiver, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + manager: VoiceManager::new(tx, ready.ready.user.id.0), + } + } { + Connection { + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + receiver: receiver, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + } + }}, ready)) } pub fn shard_info(&self) -> Option<[u8; 2]> { @@ -229,8 +281,8 @@ impl Connection { match game { Some(game) => { - object.insert_object("game", move |o| - o.insert("name", game.name)) + object.insert_object("game", move |o| o + .insert("name", game.name)) }, None => object.insert("game", Value::Null), } @@ -243,15 +295,10 @@ impl Connection { pub fn receive(&mut self) -> Result<Event> { match self.receiver.recv_json(GatewayEvent::decode) { Ok(GatewayEvent::Dispatch(sequence, event)) => { - self.last_sequence = sequence; - - let _ = self.keepalive_channel.send(Status::Sequence(sequence)); - - if let Event::Resumed(ref ev) = event { - let _ = self.keepalive_channel.send(Status::ChangeInterval(ev.heartbeat_interval)); - } + let status = Status::Sequence(sequence); + let _ = self.keepalive_channel.send(status); - Ok(event) + Ok(self.handle_dispatch(event)) }, Ok(GatewayEvent::Heartbeat(sequence)) => { let map = ObjectBuilder::new() @@ -273,8 +320,9 @@ impl Connection { Ok(GatewayEvent::InvalidateSession) => { self.session_id = None; - let status = Status::SendMessage(identify(&self.token, - self.shard_info)); + let identification = identify(&self.token, self.shard_info); + + let status = Status::SendMessage(identification); let _ = self.keepalive_channel.send(status); @@ -324,6 +372,34 @@ impl Connection { } } + fn handle_dispatch(&mut self, event: Event) -> Event { + if let Event::Resumed(ref ev) = event { + let status = Status::ChangeInterval(ev.heartbeat_interval); + + let _ = self.keepalive_channel.send(status); + } + + feature_voice_enabled! {{ + if let Event::VoiceStateUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } + } + + if let Event::VoiceServerUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + }} + + event + } + fn reconnect(&mut self) -> Result<Event> { debug!("Reconnecting"); @@ -480,57 +556,7 @@ impl Drop for Connection { } } -trait ReceiverExt { - fn recv_json<F, T>(&mut self, decode: F) -> Result<T> - where F: FnOnce(Value) -> Result<T>; -} - -trait SenderExt { - fn send_json(&mut self, value: &Value) -> Result<()>; -} - -impl ReceiverExt for Receiver<WebSocketStream> { - fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> { - let message: WsMessage = try!(self.recv_message()); - - if message.opcode == WsType::Close { - let representation = String::from_utf8_lossy(&message.payload) - .into_owned(); - - Err(Error::Connection(ConnectionError::Closed(message.cd_status_code, - representation))) - } else if message.opcode == WsType::Binary || message.opcode == WsType::Text { - let json: Value = if message.opcode == WsType::Binary { - try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..]))) - } else { - try!(serde_json::from_reader(&message.payload[..])) - }; - - decode(json).map_err(|err| { - warn!("Error decoding: {}", - String::from_utf8_lossy(&message.payload)); - - err - }) - } else { - let representation = String::from_utf8_lossy(&message.payload) - .into_owned(); - - Err(Error::Connection(ConnectionError::Closed(None, - representation))) - } - } -} - -impl SenderExt for Sender<WebSocketStream> { - fn send_json(&mut self, value: &Value) -> Result<()> { - serde_json::to_string(value) - .map(WsMessage::text) - .map_err(Error::from) - .and_then(|m| self.send_message(&m).map_err(Error::from)) - } -} - +#[inline] fn parse_ready(event: GatewayEvent, tx: &MpscSender<Status>, receiver: &mut Receiver<WebSocketStream>, @@ -582,7 +608,7 @@ fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value { if let Some(shard_info) = shard_info { object = object - .insert_array("shard", |array| array + .insert_array("shard", |a| a .push(shard_info[0]) .push(shard_info[1])); } @@ -602,7 +628,7 @@ fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { object.insert("compression", false) } -fn build_gateway_url(base: &str) -> Result<::websocket::client::request::Url> { +fn build_gateway_url(base: &str) -> Result<RequestUrl> { RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) .map_err(|_| Error::Client(ClientError::Gateway)) } @@ -627,9 +653,8 @@ fn keepalive(interval: u64, sender = new_sender; }, Ok(Status::SendMessage(val)) => { - match sender.send_json(&val) { - Ok(()) => {}, - Err(e) => warn!("Err sending message: {:?}", e), + if let Err(why) = sender.send_json(&val) { + warn!("Err sending message: {:?}", why); } }, Ok(Status::Sequence(seq)) => { @@ -648,9 +673,8 @@ fn keepalive(interval: u64, .insert("op", OpCode::Heartbeat.num()) .build(); - match sender.send_json(&map) { - Ok(()) => {}, - Err(e) => warn!("Error sending gateway keeaplive: {:?}", e) + if let Err(why) = sender.send_json(&map) { + warn!("Err sending keepalive: {:?}", why); } } } diff --git a/src/client/context.rs b/src/client/context.rs index d7465d1..1bee390 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -14,8 +14,8 @@ use ::utils::builder::{ EditRole, GetMessages }; -use ::model::*; use ::internal::prelude::*; +use ::model::*; use ::utils; #[derive(Clone)] diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index f0d9c91..7efee75 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -4,8 +4,8 @@ use super::event_store::EventStore; use super::login_type::LoginType; use super::{STATE, Connection, Context}; use ::ext::framework::Framework; -use ::model::{ChannelId, Event, Message}; use ::internal::prelude::*; +use ::model::{ChannelId, Event, Message}; macro_rules! handler { ($field:ident, $event_store:ident) => { diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index b77e24f..9fba750 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -36,8 +36,8 @@ use std::default::Default; use std::io::{ErrorKind as IoErrorKind, Read}; use std::sync::{Arc, Mutex}; use ::constants; -use ::model::*; use ::internal::prelude::*; +use ::model::*; use ::utils::decode_array; lazy_static! { diff --git a/src/client/mod.rs b/src/client/mod.rs index 580ae74..661d477 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -41,7 +41,11 @@ mod dispatch; mod event_store; mod login_type; -pub use self::connection::{Connection, ConnectionError}; +pub use self::connection::{ + Connection, + ConnectionError, + Status as ConnectionStatus +}; pub use self::context::Context; pub use self::login_type::LoginType; @@ -53,10 +57,10 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use ::model::*; -use ::internal::prelude::*; use ::ext::framework::Framework; use ::ext::state::State; +use ::internal::prelude::*; +use ::model::*; lazy_static! { /// The STATE is a mutable lazily-initialized static binding. It can be |