aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-11-13 19:28:13 -0800
committerAustin Hellyer <[email protected]>2016-11-14 18:32:10 -0800
commit7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch)
treeca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src/client
parentAdd internal module (diff)
downloadserenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz
serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip
Add voice connection support
Diffstat (limited to 'src/client')
-rw-r--r--src/client/connection.rs198
-rw-r--r--src/client/context.rs2
-rw-r--r--src/client/dispatch.rs2
-rw-r--r--src/client/http/mod.rs2
-rw-r--r--src/client/mod.rs10
5 files changed, 121 insertions, 93 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 93c967e..d1e37e5 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -1,4 +1,3 @@
-use flate2::read::ZlibDecoder;
use serde_json::builder::ObjectBuilder;
use serde_json;
use std::fmt::{self, Display};
@@ -18,15 +17,46 @@ use super::Client;
use time::{self, Duration};
use websocket::client::request::Url as RequestUrl;
use websocket::client::{Client as WsClient, Sender, Receiver};
-use websocket::message::{Message as WsMessage, Type as WsType};
+use websocket::message::Message as WsMessage;
use websocket::stream::WebSocketStream;
-use websocket::ws::receiver::Receiver as WsReceiver;
use websocket::ws::sender::Sender as WsSender;
use ::constants::{self, OpCode};
-use ::model::*;
use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::model::{
+ ChannelId,
+ Event,
+ Game,
+ GatewayEvent,
+ GuildId,
+ OnlineStatus,
+ ReadyEvent,
+};
+
+#[cfg(feature="voice")]
+use ::ext::voice::Manager as VoiceManager;
+
+#[cfg(feature="voice")]
+macro_rules! connection {
+ ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => {
+ Connection {
+ $($name1: $val1,)*
+ $($name2: $val2,)*
+ }
+ }
+}
-enum Status {
+#[cfg(not(feature="voice"))]
+macro_rules! connection {
+ ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => {
+ Connection {
+ $($name1: $val1,)*
+ }
+ }
+}
+
+#[doc(hidden)]
+pub enum Status {
SendMessage(Value),
Sequence(u64),
ChangeInterval(u64),
@@ -131,6 +161,8 @@ pub struct Connection {
shard_info: Option<[u8; 2]>,
token: String,
ws_url: String,
+ #[cfg(feature = "voice")]
+ pub manager: VoiceManager,
}
impl Connection {
@@ -181,8 +213,14 @@ impl Connection {
};
let (tx, rx) = mpsc::channel();
+ let thread_name = match shard_info {
+ Some(info) => format!("serenity keepalive [shard {}/{}]",
+ info[0],
+ info[1] - 1),
+ None => "serenity keepalive [unsharded]".to_owned(),
+ };
try!(ThreadBuilder::new()
- .name("serenity keepalive".into())
+ .name(thread_name)
.spawn(move || keepalive(heartbeat_interval, sender, rx)));
// Parse READY
@@ -192,16 +230,30 @@ impl Connection {
&mut receiver,
identification));
- Ok((Connection {
- keepalive_channel: tx,
- last_sequence: sequence,
- login_type: login_type,
- receiver: receiver,
- token: token.to_owned(),
- session_id: Some(ready.ready.session_id.clone()),
- shard_info: shard_info,
- ws_url: base_url.to_owned(),
- }, ready))
+ Ok((feature_voice! {{
+ Connection {
+ keepalive_channel: tx.clone(),
+ last_sequence: sequence,
+ login_type: login_type,
+ receiver: receiver,
+ token: token.to_owned(),
+ session_id: Some(ready.ready.session_id.clone()),
+ shard_info: shard_info,
+ ws_url: base_url.to_owned(),
+ manager: VoiceManager::new(tx, ready.ready.user.id.0),
+ }
+ } {
+ Connection {
+ keepalive_channel: tx.clone(),
+ last_sequence: sequence,
+ login_type: login_type,
+ receiver: receiver,
+ token: token.to_owned(),
+ session_id: Some(ready.ready.session_id.clone()),
+ shard_info: shard_info,
+ ws_url: base_url.to_owned(),
+ }
+ }}, ready))
}
pub fn shard_info(&self) -> Option<[u8; 2]> {
@@ -229,8 +281,8 @@ impl Connection {
match game {
Some(game) => {
- object.insert_object("game", move |o|
- o.insert("name", game.name))
+ object.insert_object("game", move |o| o
+ .insert("name", game.name))
},
None => object.insert("game", Value::Null),
}
@@ -243,15 +295,10 @@ impl Connection {
pub fn receive(&mut self) -> Result<Event> {
match self.receiver.recv_json(GatewayEvent::decode) {
Ok(GatewayEvent::Dispatch(sequence, event)) => {
- self.last_sequence = sequence;
-
- let _ = self.keepalive_channel.send(Status::Sequence(sequence));
-
- if let Event::Resumed(ref ev) = event {
- let _ = self.keepalive_channel.send(Status::ChangeInterval(ev.heartbeat_interval));
- }
+ let status = Status::Sequence(sequence);
+ let _ = self.keepalive_channel.send(status);
- Ok(event)
+ Ok(self.handle_dispatch(event))
},
Ok(GatewayEvent::Heartbeat(sequence)) => {
let map = ObjectBuilder::new()
@@ -273,8 +320,9 @@ impl Connection {
Ok(GatewayEvent::InvalidateSession) => {
self.session_id = None;
- let status = Status::SendMessage(identify(&self.token,
- self.shard_info));
+ let identification = identify(&self.token, self.shard_info);
+
+ let status = Status::SendMessage(identification);
let _ = self.keepalive_channel.send(status);
@@ -324,6 +372,34 @@ impl Connection {
}
}
+ fn handle_dispatch(&mut self, event: Event) -> Event {
+ if let Event::Resumed(ref ev) = event {
+ let status = Status::ChangeInterval(ev.heartbeat_interval);
+
+ let _ = self.keepalive_channel.send(status);
+ }
+
+ feature_voice_enabled! {{
+ if let Event::VoiceStateUpdate(ref update) = event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_state(&update.voice_state);
+ }
+ }
+ }
+
+ if let Event::VoiceServerUpdate(ref update) = event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_server(&update.endpoint, &update.token);
+ }
+ }
+ }
+ }}
+
+ event
+ }
+
fn reconnect(&mut self) -> Result<Event> {
debug!("Reconnecting");
@@ -480,57 +556,7 @@ impl Drop for Connection {
}
}
-trait ReceiverExt {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T>
- where F: FnOnce(Value) -> Result<T>;
-}
-
-trait SenderExt {
- fn send_json(&mut self, value: &Value) -> Result<()>;
-}
-
-impl ReceiverExt for Receiver<WebSocketStream> {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> {
- let message: WsMessage = try!(self.recv_message());
-
- if message.opcode == WsType::Close {
- let representation = String::from_utf8_lossy(&message.payload)
- .into_owned();
-
- Err(Error::Connection(ConnectionError::Closed(message.cd_status_code,
- representation)))
- } else if message.opcode == WsType::Binary || message.opcode == WsType::Text {
- let json: Value = if message.opcode == WsType::Binary {
- try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..])))
- } else {
- try!(serde_json::from_reader(&message.payload[..]))
- };
-
- decode(json).map_err(|err| {
- warn!("Error decoding: {}",
- String::from_utf8_lossy(&message.payload));
-
- err
- })
- } else {
- let representation = String::from_utf8_lossy(&message.payload)
- .into_owned();
-
- Err(Error::Connection(ConnectionError::Closed(None,
- representation)))
- }
- }
-}
-
-impl SenderExt for Sender<WebSocketStream> {
- fn send_json(&mut self, value: &Value) -> Result<()> {
- serde_json::to_string(value)
- .map(WsMessage::text)
- .map_err(Error::from)
- .and_then(|m| self.send_message(&m).map_err(Error::from))
- }
-}
-
+#[inline]
fn parse_ready(event: GatewayEvent,
tx: &MpscSender<Status>,
receiver: &mut Receiver<WebSocketStream>,
@@ -582,7 +608,7 @@ fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value {
if let Some(shard_info) = shard_info {
object = object
- .insert_array("shard", |array| array
+ .insert_array("shard", |a| a
.push(shard_info[0])
.push(shard_info[1]));
}
@@ -602,7 +628,7 @@ fn identify_compression(object: ObjectBuilder) -> ObjectBuilder {
object.insert("compression", false)
}
-fn build_gateway_url(base: &str) -> Result<::websocket::client::request::Url> {
+fn build_gateway_url(base: &str) -> Result<RequestUrl> {
RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION))
.map_err(|_| Error::Client(ClientError::Gateway))
}
@@ -627,9 +653,8 @@ fn keepalive(interval: u64,
sender = new_sender;
},
Ok(Status::SendMessage(val)) => {
- match sender.send_json(&val) {
- Ok(()) => {},
- Err(e) => warn!("Err sending message: {:?}", e),
+ if let Err(why) = sender.send_json(&val) {
+ warn!("Err sending message: {:?}", why);
}
},
Ok(Status::Sequence(seq)) => {
@@ -648,9 +673,8 @@ fn keepalive(interval: u64,
.insert("op", OpCode::Heartbeat.num())
.build();
- match sender.send_json(&map) {
- Ok(()) => {},
- Err(e) => warn!("Error sending gateway keeaplive: {:?}", e)
+ if let Err(why) = sender.send_json(&map) {
+ warn!("Err sending keepalive: {:?}", why);
}
}
}
diff --git a/src/client/context.rs b/src/client/context.rs
index d7465d1..1bee390 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -14,8 +14,8 @@ use ::utils::builder::{
EditRole,
GetMessages
};
-use ::model::*;
use ::internal::prelude::*;
+use ::model::*;
use ::utils;
#[derive(Clone)]
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index f0d9c91..7efee75 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -4,8 +4,8 @@ use super::event_store::EventStore;
use super::login_type::LoginType;
use super::{STATE, Connection, Context};
use ::ext::framework::Framework;
-use ::model::{ChannelId, Event, Message};
use ::internal::prelude::*;
+use ::model::{ChannelId, Event, Message};
macro_rules! handler {
($field:ident, $event_store:ident) => {
diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs
index b77e24f..9fba750 100644
--- a/src/client/http/mod.rs
+++ b/src/client/http/mod.rs
@@ -36,8 +36,8 @@ use std::default::Default;
use std::io::{ErrorKind as IoErrorKind, Read};
use std::sync::{Arc, Mutex};
use ::constants;
-use ::model::*;
use ::internal::prelude::*;
+use ::model::*;
use ::utils::decode_array;
lazy_static! {
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 580ae74..661d477 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -41,7 +41,11 @@ mod dispatch;
mod event_store;
mod login_type;
-pub use self::connection::{Connection, ConnectionError};
+pub use self::connection::{
+ Connection,
+ ConnectionError,
+ Status as ConnectionStatus
+};
pub use self::context::Context;
pub use self::login_type::LoginType;
@@ -53,10 +57,10 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
-use ::model::*;
-use ::internal::prelude::*;
use ::ext::framework::Framework;
use ::ext::state::State;
+use ::internal::prelude::*;
+use ::model::*;
lazy_static! {
/// The STATE is a mutable lazily-initialized static binding. It can be