aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-11-13 19:28:13 -0800
committerAustin Hellyer <[email protected]>2016-11-14 18:32:10 -0800
commit7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch)
treeca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src
parentAdd internal module (diff)
downloadserenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz
serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip
Add voice connection support
Diffstat (limited to 'src')
-rw-r--r--src/client/connection.rs198
-rw-r--r--src/client/context.rs2
-rw-r--r--src/client/dispatch.rs2
-rw-r--r--src/client/http/mod.rs2
-rw-r--r--src/client/mod.rs10
-rw-r--r--src/constants.rs10
-rw-r--r--src/error.rs7
-rw-r--r--src/ext/mod.rs1
-rw-r--r--src/ext/voice/audio.rs15
-rw-r--r--src/ext/voice/connection.rs310
-rw-r--r--src/ext/voice/connection_info.rs6
-rw-r--r--src/ext/voice/error.rs10
-rw-r--r--src/ext/voice/handler.rs331
-rw-r--r--src/ext/voice/manager.rs141
-rw-r--r--src/ext/voice/mod.rs55
-rw-r--r--src/ext/voice/threading.rs93
-rw-r--r--src/internal/mod.rs7
-rw-r--r--src/internal/timer.rs45
-rw-r--r--src/internal/ws_impl.rs60
-rw-r--r--src/lib.rs5
-rw-r--r--src/model/gateway.rs61
-rw-r--r--src/model/utils.rs6
-rw-r--r--src/model/voice.rs97
-rw-r--r--src/utils/mod.rs51
24 files changed, 1366 insertions, 159 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 93c967e..d1e37e5 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -1,4 +1,3 @@
-use flate2::read::ZlibDecoder;
use serde_json::builder::ObjectBuilder;
use serde_json;
use std::fmt::{self, Display};
@@ -18,15 +17,46 @@ use super::Client;
use time::{self, Duration};
use websocket::client::request::Url as RequestUrl;
use websocket::client::{Client as WsClient, Sender, Receiver};
-use websocket::message::{Message as WsMessage, Type as WsType};
+use websocket::message::Message as WsMessage;
use websocket::stream::WebSocketStream;
-use websocket::ws::receiver::Receiver as WsReceiver;
use websocket::ws::sender::Sender as WsSender;
use ::constants::{self, OpCode};
-use ::model::*;
use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::model::{
+ ChannelId,
+ Event,
+ Game,
+ GatewayEvent,
+ GuildId,
+ OnlineStatus,
+ ReadyEvent,
+};
+
+#[cfg(feature="voice")]
+use ::ext::voice::Manager as VoiceManager;
+
+#[cfg(feature="voice")]
+macro_rules! connection {
+ ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => {
+ Connection {
+ $($name1: $val1,)*
+ $($name2: $val2,)*
+ }
+ }
+}
-enum Status {
+#[cfg(not(feature="voice"))]
+macro_rules! connection {
+ ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => {
+ Connection {
+ $($name1: $val1,)*
+ }
+ }
+}
+
+#[doc(hidden)]
+pub enum Status {
SendMessage(Value),
Sequence(u64),
ChangeInterval(u64),
@@ -131,6 +161,8 @@ pub struct Connection {
shard_info: Option<[u8; 2]>,
token: String,
ws_url: String,
+ #[cfg(feature = "voice")]
+ pub manager: VoiceManager,
}
impl Connection {
@@ -181,8 +213,14 @@ impl Connection {
};
let (tx, rx) = mpsc::channel();
+ let thread_name = match shard_info {
+ Some(info) => format!("serenity keepalive [shard {}/{}]",
+ info[0],
+ info[1] - 1),
+ None => "serenity keepalive [unsharded]".to_owned(),
+ };
try!(ThreadBuilder::new()
- .name("serenity keepalive".into())
+ .name(thread_name)
.spawn(move || keepalive(heartbeat_interval, sender, rx)));
// Parse READY
@@ -192,16 +230,30 @@ impl Connection {
&mut receiver,
identification));
- Ok((Connection {
- keepalive_channel: tx,
- last_sequence: sequence,
- login_type: login_type,
- receiver: receiver,
- token: token.to_owned(),
- session_id: Some(ready.ready.session_id.clone()),
- shard_info: shard_info,
- ws_url: base_url.to_owned(),
- }, ready))
+ Ok((feature_voice! {{
+ Connection {
+ keepalive_channel: tx.clone(),
+ last_sequence: sequence,
+ login_type: login_type,
+ receiver: receiver,
+ token: token.to_owned(),
+ session_id: Some(ready.ready.session_id.clone()),
+ shard_info: shard_info,
+ ws_url: base_url.to_owned(),
+ manager: VoiceManager::new(tx, ready.ready.user.id.0),
+ }
+ } {
+ Connection {
+ keepalive_channel: tx.clone(),
+ last_sequence: sequence,
+ login_type: login_type,
+ receiver: receiver,
+ token: token.to_owned(),
+ session_id: Some(ready.ready.session_id.clone()),
+ shard_info: shard_info,
+ ws_url: base_url.to_owned(),
+ }
+ }}, ready))
}
pub fn shard_info(&self) -> Option<[u8; 2]> {
@@ -229,8 +281,8 @@ impl Connection {
match game {
Some(game) => {
- object.insert_object("game", move |o|
- o.insert("name", game.name))
+ object.insert_object("game", move |o| o
+ .insert("name", game.name))
},
None => object.insert("game", Value::Null),
}
@@ -243,15 +295,10 @@ impl Connection {
pub fn receive(&mut self) -> Result<Event> {
match self.receiver.recv_json(GatewayEvent::decode) {
Ok(GatewayEvent::Dispatch(sequence, event)) => {
- self.last_sequence = sequence;
-
- let _ = self.keepalive_channel.send(Status::Sequence(sequence));
-
- if let Event::Resumed(ref ev) = event {
- let _ = self.keepalive_channel.send(Status::ChangeInterval(ev.heartbeat_interval));
- }
+ let status = Status::Sequence(sequence);
+ let _ = self.keepalive_channel.send(status);
- Ok(event)
+ Ok(self.handle_dispatch(event))
},
Ok(GatewayEvent::Heartbeat(sequence)) => {
let map = ObjectBuilder::new()
@@ -273,8 +320,9 @@ impl Connection {
Ok(GatewayEvent::InvalidateSession) => {
self.session_id = None;
- let status = Status::SendMessage(identify(&self.token,
- self.shard_info));
+ let identification = identify(&self.token, self.shard_info);
+
+ let status = Status::SendMessage(identification);
let _ = self.keepalive_channel.send(status);
@@ -324,6 +372,34 @@ impl Connection {
}
}
+ fn handle_dispatch(&mut self, event: Event) -> Event {
+ if let Event::Resumed(ref ev) = event {
+ let status = Status::ChangeInterval(ev.heartbeat_interval);
+
+ let _ = self.keepalive_channel.send(status);
+ }
+
+ feature_voice_enabled! {{
+ if let Event::VoiceStateUpdate(ref update) = event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_state(&update.voice_state);
+ }
+ }
+ }
+
+ if let Event::VoiceServerUpdate(ref update) = event {
+ if let Some(guild_id) = update.guild_id {
+ if let Some(handler) = self.manager.get(guild_id) {
+ handler.update_server(&update.endpoint, &update.token);
+ }
+ }
+ }
+ }}
+
+ event
+ }
+
fn reconnect(&mut self) -> Result<Event> {
debug!("Reconnecting");
@@ -480,57 +556,7 @@ impl Drop for Connection {
}
}
-trait ReceiverExt {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T>
- where F: FnOnce(Value) -> Result<T>;
-}
-
-trait SenderExt {
- fn send_json(&mut self, value: &Value) -> Result<()>;
-}
-
-impl ReceiverExt for Receiver<WebSocketStream> {
- fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> {
- let message: WsMessage = try!(self.recv_message());
-
- if message.opcode == WsType::Close {
- let representation = String::from_utf8_lossy(&message.payload)
- .into_owned();
-
- Err(Error::Connection(ConnectionError::Closed(message.cd_status_code,
- representation)))
- } else if message.opcode == WsType::Binary || message.opcode == WsType::Text {
- let json: Value = if message.opcode == WsType::Binary {
- try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..])))
- } else {
- try!(serde_json::from_reader(&message.payload[..]))
- };
-
- decode(json).map_err(|err| {
- warn!("Error decoding: {}",
- String::from_utf8_lossy(&message.payload));
-
- err
- })
- } else {
- let representation = String::from_utf8_lossy(&message.payload)
- .into_owned();
-
- Err(Error::Connection(ConnectionError::Closed(None,
- representation)))
- }
- }
-}
-
-impl SenderExt for Sender<WebSocketStream> {
- fn send_json(&mut self, value: &Value) -> Result<()> {
- serde_json::to_string(value)
- .map(WsMessage::text)
- .map_err(Error::from)
- .and_then(|m| self.send_message(&m).map_err(Error::from))
- }
-}
-
+#[inline]
fn parse_ready(event: GatewayEvent,
tx: &MpscSender<Status>,
receiver: &mut Receiver<WebSocketStream>,
@@ -582,7 +608,7 @@ fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value {
if let Some(shard_info) = shard_info {
object = object
- .insert_array("shard", |array| array
+ .insert_array("shard", |a| a
.push(shard_info[0])
.push(shard_info[1]));
}
@@ -602,7 +628,7 @@ fn identify_compression(object: ObjectBuilder) -> ObjectBuilder {
object.insert("compression", false)
}
-fn build_gateway_url(base: &str) -> Result<::websocket::client::request::Url> {
+fn build_gateway_url(base: &str) -> Result<RequestUrl> {
RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION))
.map_err(|_| Error::Client(ClientError::Gateway))
}
@@ -627,9 +653,8 @@ fn keepalive(interval: u64,
sender = new_sender;
},
Ok(Status::SendMessage(val)) => {
- match sender.send_json(&val) {
- Ok(()) => {},
- Err(e) => warn!("Err sending message: {:?}", e),
+ if let Err(why) = sender.send_json(&val) {
+ warn!("Err sending message: {:?}", why);
}
},
Ok(Status::Sequence(seq)) => {
@@ -648,9 +673,8 @@ fn keepalive(interval: u64,
.insert("op", OpCode::Heartbeat.num())
.build();
- match sender.send_json(&map) {
- Ok(()) => {},
- Err(e) => warn!("Error sending gateway keeaplive: {:?}", e)
+ if let Err(why) = sender.send_json(&map) {
+ warn!("Err sending keepalive: {:?}", why);
}
}
}
diff --git a/src/client/context.rs b/src/client/context.rs
index d7465d1..1bee390 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -14,8 +14,8 @@ use ::utils::builder::{
EditRole,
GetMessages
};
-use ::model::*;
use ::internal::prelude::*;
+use ::model::*;
use ::utils;
#[derive(Clone)]
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index f0d9c91..7efee75 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -4,8 +4,8 @@ use super::event_store::EventStore;
use super::login_type::LoginType;
use super::{STATE, Connection, Context};
use ::ext::framework::Framework;
-use ::model::{ChannelId, Event, Message};
use ::internal::prelude::*;
+use ::model::{ChannelId, Event, Message};
macro_rules! handler {
($field:ident, $event_store:ident) => {
diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs
index b77e24f..9fba750 100644
--- a/src/client/http/mod.rs
+++ b/src/client/http/mod.rs
@@ -36,8 +36,8 @@ use std::default::Default;
use std::io::{ErrorKind as IoErrorKind, Read};
use std::sync::{Arc, Mutex};
use ::constants;
-use ::model::*;
use ::internal::prelude::*;
+use ::model::*;
use ::utils::decode_array;
lazy_static! {
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 580ae74..661d477 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -41,7 +41,11 @@ mod dispatch;
mod event_store;
mod login_type;
-pub use self::connection::{Connection, ConnectionError};
+pub use self::connection::{
+ Connection,
+ ConnectionError,
+ Status as ConnectionStatus
+};
pub use self::context::Context;
pub use self::login_type::LoginType;
@@ -53,10 +57,10 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
-use ::model::*;
-use ::internal::prelude::*;
use ::ext::framework::Framework;
use ::ext::state::State;
+use ::internal::prelude::*;
+use ::model::*;
lazy_static! {
/// The STATE is a mutable lazily-initialized static binding. It can be
diff --git a/src/constants.rs b/src/constants.rs
index 69d49ae..68f6524 100644
--- a/src/constants.rs
+++ b/src/constants.rs
@@ -8,7 +8,7 @@ pub const MESSAGE_CODE_LIMIT: u16 = 2000;
/// The [UserAgent] sent along with every request.
///
/// [UserAgent]: ../hyper/header/struct.UserAgent.html
-pub const USER_AGENT: &'static str = concat!("DiscordBot (https://github.com/zeyla/serenity, ", env!("CARGO_PKG_VERSION"), ")");
+pub const USER_AGENT: &'static str = concat!("DiscordBot (https://github.com/zeyla/serenity.rs, ", env!("CARGO_PKG_VERSION"), ")");
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@@ -136,9 +136,10 @@ map_nums! { OpCode;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum VoiceOpCode {
Identify,
- SelectProtocol,
- Hello,
Heartbeat,
+ Hello,
+ KeepAlive,
+ SelectProtocol,
SessionDescription,
Speaking,
}
@@ -147,7 +148,8 @@ map_nums! { VoiceOpCode;
Identify 0,
SelectProtocol 1,
Hello 2,
- Heartbeat 3,
+ KeepAlive 3,
SessionDescription 4,
Speaking 5,
+ Heartbeat 8,
}
diff --git a/src/error.rs b/src/error.rs
index 2e3ea07..97524b0 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -6,6 +6,8 @@ use serde_json::Error as JsonError;
use serde_json::Value;
use websocket::result::WebSocketError;
use ::client::{ClientError, ConnectionError};
+#[cfg(feature="voice")]
+use ::ext::voice::VoiceError;
/// The common result type between most library functions.
pub type Result<T> = ::std::result::Result<T, Error>;
@@ -40,6 +42,9 @@ pub enum Error {
Other(&'static str),
/// An error from the `url` crate.
Url(String),
+ /// Indicating an error within the voice module.
+ #[cfg(feature="voice")]
+ Voice(VoiceError),
/// An error from the `rust-websocket` crate.
WebSocket(WebSocketError),
}
@@ -91,6 +96,8 @@ impl StdError for Error {
Error::Json(ref inner) => inner.description(),
Error::Url(ref inner) => inner,
Error::WebSocket(ref inner) => inner.description(),
+ #[cfg(feature = "voice")]
+ Error::Voice(_) => "Voice error",
}
}
diff --git a/src/ext/mod.rs b/src/ext/mod.rs
index 92fda62..bb87911 100644
--- a/src/ext/mod.rs
+++ b/src/ext/mod.rs
@@ -10,4 +10,5 @@
pub mod framework;
pub mod state;
+#[cfg(feature="voice")]
pub mod voice;
diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs
new file mode 100644
index 0000000..e49bd2a
--- /dev/null
+++ b/src/ext/voice/audio.rs
@@ -0,0 +1,15 @@
+use ::model::UserId;
+
+/// A readable audio source.
+pub trait AudioSource: Send {
+ fn is_stereo(&mut self) -> bool;
+
+ fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>;
+}
+
+/// A receiver for incoming audio.
+pub trait AudioReceiver: Send {
+ fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool);
+
+ fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]);
+}
diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs
new file mode 100644
index 0000000..7dfc034
--- /dev/null
+++ b/src/ext/voice/connection.rs
@@ -0,0 +1,310 @@
+use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
+use serde_json::builder::ObjectBuilder;
+use sodiumoxide::crypto::secretbox::Key;
+use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket};
+use std::sync::mpsc::{self, Receiver as MpscReceiver};
+use std::thread::{self, Builder as ThreadBuilder};
+use super::audio::{AudioReceiver, AudioSource};
+use super::connection_info::ConnectionInfo;
+use super::{CRYPTO_MODE, VoiceError};
+use websocket::client::request::Url as WebsocketUrl;
+use websocket::client::{
+ Client as WsClient,
+ Receiver as WsReceiver,
+ Sender as WsSender
+};
+use websocket::stream::WebSocketStream;
+use ::client::STATE;
+use ::constants::VoiceOpCode;
+use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::internal::Timer;
+use ::model::VoiceEvent;
+
+pub enum ReceiverStatus {
+ Udp(Vec<u8>),
+ Websocket(VoiceEvent),
+}
+
+#[allow(dead_code)]
+pub struct Connection {
+ audio_timer: Timer,
+ destination: SocketAddr,
+ keepalive_timer: Timer,
+ key: Key,
+ receive_channel: MpscReceiver<ReceiverStatus>,
+ sender: WsSender<WebSocketStream>,
+ sequence: u64,
+ speaking: bool,
+ ssrc: u32,
+ timestamp: u32,
+ udp: UdpSocket,
+}
+
+impl Connection {
+ pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
+ let url = try!(generate_url(&mut info.endpoint));
+
+ let response = try!(try!(WsClient::connect(url)).send());
+ try!(response.validate());
+ let (mut sender, mut receiver) = response.begin().split();
+
+ try!(sender.send_json(&identify(&info)));
+
+ let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) {
+ VoiceEvent::Handshake(handshake) => handshake,
+ _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)),
+ };
+
+ if !has_valid_mode(handshake.modes) {
+ return Err(Error::Voice(VoiceError::VoiceModeUnavailable));
+ }
+
+ let destination = {
+ try!(try!((&info.endpoint[..], handshake.port)
+ .to_socket_addrs())
+ .next()
+ .ok_or(Error::Voice(VoiceError::HostnameResolve)))
+ };
+ let udp = try!(UdpSocket::bind("0.0.0.0:0"));
+
+ {
+ let mut bytes = [0; 70];
+ try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc));
+ try!(udp.send_to(&bytes, destination));
+ }
+
+ try!(send_acknowledgement(&mut sender, &udp));
+
+ let key = try!(get_encryption_key(&mut receiver));
+
+ let receive_channel = try!(start_threads(receiver, &udp));
+
+ info!("[Voice] Connected to: {}", info.endpoint);
+
+ Ok(Connection {
+ audio_timer: Timer::new(1000 * 60 * 4),
+ destination: destination,
+ key: key,
+ keepalive_timer: Timer::new(handshake.heartbeat_interval),
+ receive_channel: receive_channel,
+ udp: udp,
+ sender: sender,
+ sequence: 0,
+ speaking: false,
+ ssrc: handshake.ssrc,
+ timestamp: 0,
+ })
+ }
+
+ #[allow(unused_variables)]
+ pub fn update(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ receiver: &mut Option<Box<AudioReceiver>>,
+ audio_timer: &mut Timer)
+ -> Result<()> {
+ if let Some(receiver) = receiver.as_mut() {
+ while let Ok(status) = self.receive_channel.try_recv() {
+ match status {
+ ReceiverStatus::Udp(packet) => {
+ debug!("[Voice] Received UDP packet: {:?}", packet);
+ },
+ ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
+ receiver.speaking_update(ev.ssrc,
+ &ev.user_id,
+ ev.speaking);
+ },
+ ReceiverStatus::Websocket(other) => {
+ info!("[Voice] Received other websocket data: {:?}",
+ other);
+ },
+ }
+ }
+ } else {
+ while let Ok(_) = self.receive_channel.try_recv() {}
+ }
+
+ // Send the voice websocket keepalive if it's time
+ if self.keepalive_timer.check() {
+ try!(self.sender.send_json(&keepalive()));
+ }
+
+ // Send the UDP keepalive if it's time
+ if self.audio_timer.check() {
+ let mut bytes = [0; 4];
+ try!((&mut bytes[..]).write_u32::<BigEndian>(self.ssrc));
+ try!(self.udp.send_to(&bytes, self.destination));
+ }
+
+ try!(self.speaking(true));
+
+ self.sequence = self.sequence.wrapping_add(1);
+ self.timestamp = self.timestamp.wrapping_add(960);
+
+ audio_timer.await();
+ self.audio_timer.reset();
+ Ok(())
+ }
+
+ fn speaking(&mut self, speaking: bool) -> Result<()> {
+ if self.speaking == speaking {
+ return Ok(());
+ }
+
+ self.speaking = speaking;
+
+ let map = ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Speaking.num())
+ .insert_object("d", |object| object
+ .insert("delay", 0))
+ .insert("speaking", speaking)
+ .build();
+
+ self.sender.send_json(&map)
+ }
+}
+
+impl Drop for Connection {
+ fn drop(&mut self) {
+ let _ = self.sender.get_mut().shutdown(Shutdown::Both);
+
+ info!("Voice disconnected");
+ }
+}
+
+fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
+ if endpoint.ends_with(":80") {
+ let len = endpoint.len();
+
+ endpoint.truncate(len - 3);
+ }
+
+ WebsocketUrl::parse(&format!("wss://{}", endpoint))
+ .or(Err(Error::Voice(VoiceError::EndpointUrl)))
+}
+
+pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+ -> Result<Key> {
+ loop {
+ match try!(receiver.recv_json(VoiceEvent::decode)) {
+ VoiceEvent::Ready(ready) => {
+ if ready.mode != CRYPTO_MODE {
+ return Err(Error::Voice(VoiceError::VoiceModeInvalid));
+ }
+
+ return Key::from_slice(&ready.secret_key)
+ .ok_or(Error::Voice(VoiceError::KeyGen));
+ },
+ VoiceEvent::Unknown(op, value) => {
+ debug!("Unknown message type: {}/{:?}", op.num(), value);
+ },
+ _ => {},
+ }
+ }
+}
+
+fn identify(info: &ConnectionInfo) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Identify.num())
+ .insert_object("d", |o| o
+ .insert("server_id", info.server_id)
+ .insert("session_id", &info.session_id)
+ .insert("token", &info.token)
+ .insert("user_id", STATE.lock().unwrap().user.id.0))
+ .build()
+}
+
+#[inline(always)]
+fn has_valid_mode(modes: Vec<String>) -> bool {
+ modes.iter().any(|s| s == CRYPTO_MODE)
+}
+
+fn keepalive() -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::KeepAlive.num())
+ .insert("d", Value::Null)
+ .build()
+}
+
+#[inline]
+fn select_protocol(address: &[u8], port: u16) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::SelectProtocol.num())
+ .insert_object("d", |o| o
+ .insert("protocol", "udp")
+ .insert_object("data", |o| o
+ .insert("address", address)
+ .insert("mode", "xsalsa20_poly1305")))
+ .insert("port", port)
+ .build()
+}
+
+#[inline]
+fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket)
+ -> Result<()> {
+ let mut bytes = [0; 256];
+
+ let (len, _) = try!(udp.recv_from(&mut bytes));
+
+ let zero_index = bytes.iter()
+ .skip(4)
+ .position(|&x| x == 0)
+ .unwrap();
+
+ let address = &bytes[4..4 + zero_index];
+
+ let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>());
+
+ // send the acknowledgement websocket message
+ let map = select_protocol(address, port);
+ sender.send_json(&map).map(|_| ())
+}
+
+#[inline]
+fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+ -> Result<MpscReceiver<ReceiverStatus>> {
+ let thread = thread::current();
+ let thread_name = thread.name().unwrap_or("serenity.rs voice");
+
+ let (tx, rx) = mpsc::channel();
+ let tx_clone = tx.clone();
+ let udp_clone = try!(udp.try_clone());
+
+ try!(ThreadBuilder::new()
+ .name(format!("{} WS", thread_name))
+ .spawn(move || {
+ loop {
+ let msg = receiver.recv_json(VoiceEvent::decode);
+
+ if let Ok(msg) = msg {
+ let send = tx.send(ReceiverStatus::Websocket(msg));
+
+ if let Err(_why) = send {
+ return;
+ }
+ } else {
+ break;
+ }
+ }
+ }));
+
+ try!(ThreadBuilder::new()
+ .name(format!("{} UDP", thread_name))
+ .spawn(move || {
+ let mut buffer = [0; 512];
+
+ loop {
+ let (len, _) = udp_clone.recv_from(&mut buffer).unwrap();
+ let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len]
+ .iter()
+ .cloned()
+ .collect()));
+
+ if let Err(_why) = req {
+ return;
+ }
+ }
+ }));
+
+ Ok(rx)
+}
diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs
new file mode 100644
index 0000000..9115385
--- /dev/null
+++ b/src/ext/voice/connection_info.rs
@@ -0,0 +1,6 @@
+pub struct ConnectionInfo {
+ pub endpoint: String,
+ pub server_id: u64,
+ pub session_id: String,
+ pub token: String,
+}
diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs
new file mode 100644
index 0000000..b1f3251
--- /dev/null
+++ b/src/ext/voice/error.rs
@@ -0,0 +1,10 @@
+#[derive(Debug)]
+pub enum VoiceError {
+ // An indicator that an endpoint URL was invalid.
+ EndpointUrl,
+ ExpectedHandshake,
+ HostnameResolve,
+ KeyGen,
+ VoiceModeInvalid,
+ VoiceModeUnavailable,
+}
diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs
new file mode 100644
index 0000000..8dc0ab1
--- /dev/null
+++ b/src/ext/voice/handler.rs
@@ -0,0 +1,331 @@
+use serde_json::builder::ObjectBuilder;
+use std::sync::mpsc::{self, Sender};
+use super::connection_info::ConnectionInfo;
+use super::{Status as VoiceStatus, Target};
+use ::client::ConnectionStatus;
+use ::constants::VoiceOpCode;
+use ::model::{ChannelId, GuildId, VoiceState};
+use super::threading;
+
+/// The handler is responsible for "handling" a single voice connection, acting
+/// as a clean API above the inner connection.
+///
+/// # Examples
+///
+/// Assuming that you already have a [`Manager`], most likely retrieved via a
+/// [WebSocket connection], you can join a guild's voice channel and deafen
+/// yourself like so:
+///
+/// ```rust,ignore
+/// // assuming a `manager` has already been bound, hopefully retrieved through
+/// // a websocket's connection.
+/// use serenity::model::{ChannelId, GuildId};
+///
+/// let guild_id = GuildId(81384788765712384);
+/// let channel_id = ChannelId(85482585546833920);
+///
+/// let handler = manager.join(Some(guild_id), channel_id);
+/// handler.deafen(true);
+/// ```
+///
+/// [`Manager`]: struct.Manager.html
+/// [WebSocket connection]: ../../client/struct.Connection.html
+pub struct Handler {
+ channel_id: Option<ChannelId>,
+ endpoint_token: Option<(String, String)>,
+ guild_id: Option<GuildId>,
+ self_deaf: bool,
+ self_mute: bool,
+ sender: Sender<VoiceStatus>,
+ session_id: Option<String>,
+ user_id: u64,
+ ws: Sender<ConnectionStatus>,
+}
+
+impl Handler {
+ /// Creates a new Handler.
+ ///
+ /// **Note**: You should never call this yourself, and should instead use
+ /// [`Manager::join`].
+ ///
+ /// Like, really. Really do not use this. Please.
+ ///
+ /// [`Manager::join`]: struct.Manager.html#method.join
+ #[doc(hidden)]
+ pub fn new(target: Target, ws: Sender<ConnectionStatus>, user_id: u64)
+ -> Self {
+ let (tx, rx) = mpsc::channel();
+
+ let (channel_id, guild_id) = match target {
+ Target::Channel(channel_id) => (Some(channel_id), None),
+ Target::Guild(guild_id) => (None, Some(guild_id)),
+ };
+
+ threading::start(target, rx);
+
+ Handler {
+ channel_id: channel_id,
+ endpoint_token: None,
+ guild_id: guild_id,
+ self_deaf: false,
+ self_mute: false,
+ sender: tx,
+ session_id: None,
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Retrieves the current connected voice channel's `ChannelId`, if connected
+ /// to one.
+ ///
+ /// Note that when connected to a voice channel, while the `ChannelId` will
+ /// not be `None`, the [`GuildId`] retrieved via [`guild`] can, in the event
+ /// of [`Group`] or 1-on-1 [`Call`]s.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ /// [`GuildId`]: ../../model/struct.GuildId.html
+ /// [`guild`]: #method.guild
+ pub fn channel(&self) -> Option<ChannelId> {
+ self.channel_id
+ }
+
+ /// Sets whether the current connection to be deafened.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ ///
+ /// **Note**: Unlike in the official client, you _can_ be deafened while
+ /// not being muted.
+ pub fn deafen(&mut self, deaf: bool) {
+ self.self_deaf = deaf;
+
+ // Only send an update if there is currently a connected channel.
+ //
+ // Otherwise, this can be treated as a "settings" update for a
+ // connection.
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Retrieves the current connected voice channel's `GuildId`, if connected
+ /// to one.
+ ///
+ /// Note that the `GuildId` can be `None` in the event of [`Group`] or
+ /// 1-on-1 [`Call`]s, although when connected to a voice channel, the
+ /// [`ChannelId`] retrieved via [`channel`] will be `Some`.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`ChannelId`]: ../../model/struct.ChannelId.html
+ /// [`Group`]: ../../model/struct.Group.html
+ /// [`channel`]: #method.channel
+ pub fn guild(&self) -> Option<GuildId> {
+ self.guild_id
+ }
+
+ /// Whether the current handler is set to deafen voice connections.
+ ///
+ /// Use [`deafen`] to modify this configuration.
+ ///
+ /// [`deafen`]: #method.deafen
+ pub fn is_deafened(&self) -> bool {
+ self.self_deaf
+ }
+
+ /// Whether the current handler is set to mute voice connections.
+ ///
+ /// Use [`mute`] to modify this configuration.
+ ///
+ /// [`mute`]: #method.mute
+ pub fn is_muted(&self) -> bool {
+ self.self_mute
+ }
+
+ /// Connect - or switch - to the given voice channel by its Id.
+ ///
+ /// **Note**: This is not necessary for [`Group`] or direct [call][`Call`]s.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ pub fn join(&mut self, channel_id: ChannelId) {
+ self.channel_id = Some(channel_id);
+
+ self.connect();
+ }
+
+ /// Leaves the current voice channel, disconnecting from it.
+ ///
+ /// This does _not_ forget settings, like whether to be self-deafened or
+ /// self-muted.
+ pub fn leave(&mut self) {
+ match self.channel_id {
+ None => return,
+ Some(_channel_id) => {
+ self.channel_id = None;
+
+ self.update();
+ },
+ }
+ }
+
+ /// Sets whether the current connection is to be muted.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ pub fn mute(&mut self, mute: bool) {
+ self.self_mute = mute;
+
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Switches the current connected voice channel to the given `channel_id`.
+ ///
+ /// This has 3 separate behaviors:
+ ///
+ /// - if the given `channel_id` is equivilant to the current connected
+ /// `channel_id`, then do nothing;
+ /// - if the given `channel_id` is _not_ equivilant to the current connected
+ /// `channel_id`, then switch to the given `channel_id`;
+ /// - if not currently connected to a voice channel, connect to the given
+ /// one.
+ ///
+ /// **Note**: The given `channel_id`, if in a guild, _must_ be in the
+ /// current handler's associated guild.
+ ///
+ /// If you are dealing with switching from one group to another, then open
+ /// another handler, and optionally drop this one via [`Manager::remove`].
+ ///
+ /// [`Manager::remove`]: struct.Manager.html#method.remove
+ pub fn switch_to(&mut self, channel_id: ChannelId) {
+ match self.channel_id {
+ Some(current_id) if current_id == channel_id => {
+ // If already connected to the given channel, do nothing.
+ return;
+ },
+ Some(_current_id) => {
+ self.channel_id = Some(channel_id);
+
+ self.update();
+ },
+ None => {
+ self.channel_id = Some(channel_id);
+
+ self.connect();
+ },
+ }
+ }
+
+ fn connect(&self) {
+ // Do _not_ try connecting if there is not at least a channel. There
+ // does not _necessarily_ need to be a guild.
+ if self.channel_id.is_none() {
+ return;
+ }
+
+ self.update();
+ }
+
+ fn connect_with_data(&mut self, session_id: String, endpoint: String, token: String) {
+ let target_id = if let Some(guild_id) = self.guild_id {
+ guild_id.0
+ } else if let Some(channel_id) = self.channel_id {
+ channel_id.0
+ } else {
+ // Theoretically never happens? This needs to be researched more.
+ error!("[Voice] No guild/channel ID when connecting");
+
+ return;
+ };
+
+ self.send(VoiceStatus::Connect(ConnectionInfo {
+ endpoint: endpoint,
+ server_id: target_id,
+ session_id: session_id,
+ token: token,
+ }))
+ }
+
+ // Send an update for the current session.
+ fn update(&self) {
+ let map = ObjectBuilder::new()
+ .insert("op", VoiceOpCode::SessionDescription.num())
+ .insert_object("d", |o| o
+ .insert("channel_id", self.channel_id.map(|c| c.0))
+ .insert("guild_id", self.guild_id.map(|g| g.0))
+ .insert("self_deaf", self.self_deaf)
+ .insert("self_mute", self.self_mute))
+ .build();
+
+ let _ = self.ws.send(ConnectionStatus::SendMessage(map));
+ }
+
+ fn send(&mut self, status: VoiceStatus) {
+ let send = self.sender.send(status);
+
+ // Reconnect if it errored.
+ if let Err(mpsc::SendError(status)) = send {
+ let (tx, rx) = mpsc::channel();
+
+ self.sender = tx;
+ self.sender.send(status).unwrap();
+
+ threading::start(Target::Guild(self.guild_id.unwrap()), rx);
+
+ self.update();
+ }
+ }
+
+ /// You probably shouldn't use this if you're reading the source code.
+ #[doc(hidden)]
+ pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) {
+ if let &Some(ref endpoint) = endpoint {
+ let endpoint = endpoint.clone();
+ let token = token.to_owned();
+ let session_id = match self.session_id {
+ Some(ref session_id) => session_id.clone(),
+ None => return,
+ };
+
+ self.connect_with_data(session_id, endpoint, token);
+ } else {
+ self.leave();
+ }
+ }
+
+ /// You probably shouldn't use this if you're reading the source code.
+ #[doc(hidden)]
+ pub fn update_state(&mut self, voice_state: &VoiceState) {
+ if self.user_id != voice_state.user_id.0 {
+ return;
+ }
+
+ self.channel_id = voice_state.channel_id;
+
+ if voice_state.channel_id.is_some() {
+ let session_id = voice_state.session_id.clone();
+
+ match self.endpoint_token.take() {
+ Some((endpoint, token)) => {
+ self.connect_with_data(session_id, endpoint, token);
+ },
+ None => {
+ self.session_id = Some(session_id);
+ },
+ }
+ } else {
+ self.leave();
+ }
+ }
+}
+
+impl Drop for Handler {
+ /// Leaves the current connected voice channel, if connected to one, and
+ /// forgets all configurations relevant to this Handler.
+ fn drop(&mut self) {
+ self.leave();
+ }
+}
diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs
new file mode 100644
index 0000000..c6c7533
--- /dev/null
+++ b/src/ext/voice/manager.rs
@@ -0,0 +1,141 @@
+use std::collections::HashMap;
+use std::sync::mpsc::Sender as MpscSender;
+use super::{Handler, Target};
+use ::client::ConnectionStatus;
+use ::model::{ChannelId, GuildId};
+
+/// A manager is a struct responsible for managing [`Handler`]s which belong to
+/// a single [WebSocket connection]. This is a fairly complex key-value store,
+/// with a bit of extra utility for easily joining a "target".
+///
+/// The "target" used by the Manager is determined based on the `guild_id` and
+/// `channel_id` provided. If a `guild_id` is _not_ provided to methods that
+/// optionally require it, then the target is a group or 1-on-1 call with a
+/// user. The `channel_id` is then used as the target.
+///
+/// If a `guild_id` is provided, then the target is the guild, as a user
+/// can not be connected to two channels within one guild simultaneously.
+///
+/// [`Group`]: ../../model/struct.Group.html
+/// [`Handler`]: struct.Handler.html
+/// [guild's channel]: ../../model/enum.ChannelType.html#variant.Voice
+/// [WebSocket connection]: ../../client/struct.Connection.html
+pub struct Manager {
+ handlers: HashMap<Target, Handler>,
+ user_id: u64,
+ ws: MpscSender<ConnectionStatus>,
+}
+
+impl Manager {
+ #[doc(hidden)]
+ pub fn new(ws: MpscSender<ConnectionStatus>, user_id: u64) -> Manager {
+ Manager {
+ handlers: HashMap::new(),
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Retrieves a mutable handler for the given target, if one exists.
+ pub fn get<T: Into<Target>>(&mut self, target_id: T)
+ -> Option<&mut Handler> {
+ self.handlers.get_mut(&target_id.into())
+ }
+
+ /// Connects to a target by retrieving its relevant [`Handler`] and
+ /// connecting, or creating the handler if required.
+ ///
+ /// This can also switch to the given channel, if a handler already exists
+ /// for the target and the current connected channel is not equal to the
+ /// given channel.
+ ///
+ /// In the case of channel targets, the same channel is used to connect to.
+ ///
+ /// In the case of guilds, the provided channel is used to connect to. The
+ /// channel _must_ be in the provided guild. This is _not_ checked by the
+ /// library, and will result in an error. If there is already a connected
+ /// handler for the guild, _and_ the provided channel is different from the
+ /// channel that the connection is already connected to, then the handler
+ /// will switch the connection to the provided channel.
+ ///
+ /// If you _only_ need to retrieve the handler for a target, then use
+ /// [`get`].
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ #[allow(map_entry)]
+ pub fn join(&mut self, guild_id: Option<GuildId>, channel_id: ChannelId)
+ -> &mut Handler {
+ if let Some(guild_id) = guild_id {
+ let target = Target::Guild(guild_id);
+
+ {
+ let mut found = false;
+
+ if let Some(handler) = self.handlers.get_mut(&target) {
+ handler.switch_to(channel_id);
+
+ found = true;
+ }
+
+ if found {
+ // Actually safe, as the key has already been found above.
+ return self.handlers.get_mut(&target).unwrap();
+ }
+ }
+
+ let mut handler = Handler::new(target, self.ws.clone(), self.user_id);
+ handler.join(channel_id);
+
+ self.handlers.insert(target, handler);
+
+ // Actually safe, as the key would have been inserted above.
+ self.handlers.get_mut(&target).unwrap()
+ } else {
+ let target = Target::Channel(channel_id);
+
+ if !self.handlers.contains_key(&target) {
+ let mut handler = Handler::new(target, self.ws.clone(), self.user_id);
+ handler.join(channel_id);
+
+ self.handlers.insert(target, handler);
+ }
+
+ // Actually safe, as the key would have been inserted above.
+ self.handlers.get_mut(&target).unwrap()
+ }
+ }
+
+ /// Retrieves the [handler][`Handler`] for the given target and leaves the
+ /// associated voice channel, if connected.
+ ///
+ /// This will _not_ drop the handler, and will preserve it and its settings.
+ ///
+ /// This is a wrapper around [getting][`get`] a handler and calling
+ /// [`leave`] on it.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ /// [`leave`]: struct.Handler.html#method.leave
+ pub fn leave<T: Into<Target>>(&mut self, target_id: T) {
+ let target = target_id.into();
+
+ if let Some(handler) = self.handlers.get_mut(&target) {
+ handler.leave();
+ }
+ }
+
+ /// Retrieves the [`Handler`] for the given target and leaves the associated
+ /// voice channel, if connected.
+ ///
+ /// The handler is then dropped, removing settings for the target.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ pub fn remove<T: Into<Target>>(&mut self, target_id: T) {
+ let target = target_id.into();
+
+ self.leave(target);
+
+ self.handlers.remove(&target);
+ }
+}
diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs
index e69de29..107a473 100644
--- a/src/ext/voice/mod.rs
+++ b/src/ext/voice/mod.rs
@@ -0,0 +1,55 @@
+mod audio;
+mod connection;
+mod connection_info;
+mod error;
+mod manager;
+mod handler;
+mod threading;
+
+pub use self::error::VoiceError;
+pub use self::handler::Handler;
+pub use self::manager::Manager;
+
+use self::audio::{AudioReceiver, AudioSource};
+use self::connection_info::ConnectionInfo;
+use ::model::{ChannelId, GuildId};
+
+const CRYPTO_MODE: &'static str = "xsalsa20_poly1305";
+
+#[doc(hidden)]
+pub enum Status {
+ Connect(ConnectionInfo),
+ Disconnect,
+ SetReceiver(Option<Box<AudioReceiver>>),
+ SetSender(Option<Box<AudioSource>>),
+}
+
+/// Denotes the target to manage a connection for.
+///
+/// For most cases, targets should entirely be guilds, except for the one case
+/// where a user account can be in a 1-to-1 or group call.
+///
+/// It _may_ be possible in the future for bots to be in multiple groups. If
+/// this turns out to be the case, supporting that now rather than messily in
+/// the future is the best option. Thus, these types of calls are specified by
+/// the group's channel Id.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+pub enum Target {
+ /// Used for managing a voice handler for a 1-on-1 (user-to-user) or group
+ /// call.
+ Channel(ChannelId),
+ /// Used for managing a voice handler for a guild.
+ Guild(GuildId),
+}
+
+impl From<ChannelId> for Target {
+ fn from(channel_id: ChannelId) -> Target {
+ Target::Channel(channel_id)
+ }
+}
+
+impl From<GuildId> for Target {
+ fn from(guild_id: GuildId) -> Target {
+ Target::Guild(guild_id)
+ }
+}
diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs
new file mode 100644
index 0000000..4110c5f
--- /dev/null
+++ b/src/ext/voice/threading.rs
@@ -0,0 +1,93 @@
+use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError};
+use std::thread::Builder as ThreadBuilder;
+use super::connection::Connection;
+use super::{Status, Target};
+use ::internal::Timer;
+
+pub fn start(target_id: Target, rx: MpscReceiver<Status>) {
+ let name = match target_id {
+ Target::Channel(channel_id) => format!("Serenity Voice (C{})", channel_id),
+ Target::Guild(guild_id) => format!("Serenity Voice (G{})", guild_id),
+ };
+
+ ThreadBuilder::new()
+ .name(name)
+ .spawn(move || runner(rx))
+ .expect("Err starting voice");
+}
+
+fn runner(rx: MpscReceiver<Status>) {
+ let mut sender = None;
+ let mut receiver = None;
+ let mut connection = None;
+ let mut timer = Timer::new(20);
+
+ 'runner: loop {
+ loop {
+ match rx.try_recv() {
+ Ok(Status::Connect(info)) => {
+ connection = match Connection::new(info) {
+ Ok(connection) => Some(connection),
+ Err(why) => {
+ error!("Err connecting via voice: {:?}", why);
+
+ None
+ },
+ };
+ },
+ Ok(Status::Disconnect) => {
+ connection = None;
+ },
+ Ok(Status::SetReceiver(r)) => {
+ receiver = r;
+ },
+ Ok(Status::SetSender(s)) => {
+ sender = s;
+ },
+ Err(TryRecvError::Empty) => {
+ // If we receieved nothing, then we can perform an update.
+ break;
+ },
+ Err(TryRecvError::Disconnected) => {
+ break 'runner;
+ },
+ }
+ }
+
+ // Overall here, check if there's an error.
+ //
+ // If there is a connection, try to send an update. This should not
+ // error. If there is though for some spurious reason, then set `error`
+ // to `true`.
+ //
+ // Otherwise, wait out the timer and do _not_ error and wait to receive
+ // another event.
+ let error = match connection.as_mut() {
+ Some(connection) => {
+ let update = connection.update(&mut sender,
+ &mut receiver,
+ &mut timer);
+
+ match update {
+ Ok(()) => false,
+ Err(why) => {
+ error!("Err updating voice connection: {:?}", why);
+
+ true
+ },
+ }
+ },
+ None => {
+ timer.await();
+
+ false
+ },
+ };
+
+ // If there was an error, then just reset the connection and try to get
+ // another.
+ if error {
+ connection = None;
+ }
+ }
+}
diff --git a/src/internal/mod.rs b/src/internal/mod.rs
index b9d7209..9dd4676 100644
--- a/src/internal/mod.rs
+++ b/src/internal/mod.rs
@@ -1 +1,8 @@
pub mod prelude;
+pub mod ws_impl;
+
+#[cfg(feature = "voice")]
+mod timer;
+
+#[cfg(feature = "voice")]
+pub use self::timer::Timer;
diff --git a/src/internal/timer.rs b/src/internal/timer.rs
new file mode 100644
index 0000000..cc846b3
--- /dev/null
+++ b/src/internal/timer.rs
@@ -0,0 +1,45 @@
+use std::thread;
+use std::time::Duration as StdDuration;
+use time::{self, Duration, Timespec};
+
+pub struct Timer {
+ due: Timespec,
+ duration: Duration,
+}
+
+impl Timer {
+ pub fn new(duration_in_ms: u64) -> Timer {
+ let duration = Duration::milliseconds(duration_in_ms as i64);
+
+ Timer {
+ due: time::get_time() + duration,
+ duration: duration,
+ }
+ }
+
+ pub fn await(&mut self) {
+ let diff = self.due - time::get_time();
+
+ if diff > time::Duration::zero() {
+ let amount = diff.num_milliseconds() as u64;
+
+ thread::sleep(StdDuration::from_millis(amount));
+ }
+
+ self.due = self.due + self.duration;
+ }
+
+ pub fn check(&mut self) -> bool {
+ if time::get_time() >= self.due {
+ self.due = self.due + self.duration;
+
+ true
+ } else {
+ false
+ }
+ }
+
+ pub fn reset(&mut self) {
+ self.due = time::get_time() + self.duration;
+ }
+}
diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs
new file mode 100644
index 0000000..ab91dae
--- /dev/null
+++ b/src/internal/ws_impl.rs
@@ -0,0 +1,60 @@
+use flate2::read::ZlibDecoder;
+use serde_json;
+use websocket::client::{Receiver, Sender};
+use websocket::message::{Message as WsMessage, Type as WsType};
+use websocket::stream::WebSocketStream;
+use websocket::ws::receiver::Receiver as WsReceiver;
+use websocket::ws::sender::Sender as WsSender;
+use ::client::ConnectionError;
+use ::internal::prelude::*;
+
+pub trait ReceiverExt {
+ fn recv_json<F, T>(&mut self, decode: F) -> Result<T>
+ where F: FnOnce(Value) -> Result<T>;
+}
+
+pub trait SenderExt {
+ fn send_json(&mut self, value: &Value) -> Result<()>;
+}
+
+impl ReceiverExt for Receiver<WebSocketStream> {
+ fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> {
+ let message: WsMessage = try!(self.recv_message());
+
+ if message.opcode == WsType::Close {
+ let representation = String::from_utf8_lossy(&message.payload)
+ .into_owned();
+
+ Err(Error::Connection(ConnectionError::Closed(message.cd_status_code,
+ representation)))
+ } else if message.opcode == WsType::Binary || message.opcode == WsType::Text {
+ let json: Value = if message.opcode == WsType::Binary {
+ try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..])))
+ } else {
+ try!(serde_json::from_reader(&message.payload[..]))
+ };
+
+ decode(json).map_err(|err| {
+ warn!("Error decoding: {}",
+ String::from_utf8_lossy(&message.payload));
+
+ err
+ })
+ } else {
+ let representation = String::from_utf8_lossy(&message.payload)
+ .into_owned();
+
+ Err(Error::Connection(ConnectionError::Closed(None,
+ representation)))
+ }
+ }
+}
+
+impl SenderExt for Sender<WebSocketStream> {
+ fn send_json(&mut self, value: &Value) -> Result<()> {
+ serde_json::to_string(value)
+ .map(WsMessage::text)
+ .map_err(Error::from)
+ .and_then(|m| self.send_message(&m).map_err(Error::from))
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 8bc8533..7a25e27 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -93,6 +93,11 @@ extern crate serde_json;
extern crate time;
extern crate websocket;
+#[cfg(feature="voice")]
+extern crate opus;
+#[cfg(feature="voice")]
+extern crate sodiumoxide;
+
#[macro_use]
pub mod utils;
diff --git a/src/model/gateway.rs b/src/model/gateway.rs
index ee62dd5..aa3d995 100644
--- a/src/model/gateway.rs
+++ b/src/model/gateway.rs
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashMap};
use super::utils::*;
use super::*;
-use ::constants::{OpCode, VoiceOpCode};
+use ::constants::OpCode;
use ::internal::prelude::*;
use ::utils::decode_array;
@@ -355,65 +355,6 @@ impl GatewayEvent {
}
}
-#[derive(Debug, Clone)]
-pub enum VoiceEvent {
- Handshake {
- heartbeat_interval: u64,
- port: u16,
- ssrc: u32,
- modes: Vec<String>,
- },
- Ready {
- mode: String,
- secret_key: Vec<u8>,
- },
- SpeakingUpdate {
- user_id: UserId,
- ssrc: u32,
- speaking: bool,
- },
- KeepAlive,
- Unknown(u64, Value)
-}
-
-impl VoiceEvent {
- pub fn decode(value: Value) -> Result<VoiceEvent> {
- let mut value = try!(into_map(value));
-
- let op = req!(try!(remove(&mut value, "op")).as_u64());
- let op = try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode)));
-
- if op == VoiceOpCode::Heartbeat {
- return Ok(VoiceEvent::KeepAlive)
- }
-
- let mut value = try!(remove(&mut value, "d").and_then(into_map));
- if op == VoiceOpCode::Hello {
- missing!(value, VoiceEvent::Handshake {
- heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()),
- modes: try!(decode_array(try!(remove(&mut value, "modes")), into_string)),
- port: req!(try!(remove(&mut value, "port")).as_u64()) as u16,
- ssrc: req!(try!(remove(&mut value, "ssrc")).as_u64()) as u32,
- })
- } else if op == VoiceOpCode::SessionDescription {
- missing!(value, VoiceEvent::Ready {
- mode: try!(remove(&mut value, "mode").and_then(into_string)),
- secret_key: try!(decode_array(try!(remove(&mut value, "secret_key")),
- |v| Ok(req!(v.as_u64()) as u8)
- )),
- })
- } else if op == VoiceOpCode::Speaking {
- missing!(value, VoiceEvent::SpeakingUpdate {
- user_id: try!(remove(&mut value, "user_id").and_then(UserId::decode)),
- ssrc: req!(try!(remove(&mut value, "ssrc")).as_u64()) as u32,
- speaking: req!(try!(remove(&mut value, "speaking")).as_bool()),
- })
- } else {
- Ok(VoiceEvent::Unknown(op as u64, Value::Object(value)))
- }
- }
-}
-
/// Event received over a websocket connection
#[derive(Clone, Debug)]
pub enum Event {
diff --git a/src/model/utils.rs b/src/model/utils.rs
index f0108e9..f85a30f 100644
--- a/src/model/utils.rs
+++ b/src/model/utils.rs
@@ -35,7 +35,11 @@ macro_rules! missing {
#[macro_escape]
macro_rules! req {
($opt:expr) => {
- try!($opt.ok_or(Error::Decode(concat!("Type mismatch in model:", line!(), ": ", stringify!($opt)), Value::Null)))
+ try!($opt.ok_or(Error::Decode(concat!("Type mismatch in model:",
+ line!(),
+ ": ",
+ stringify!($opt)),
+ Value::Null)))
}
}
diff --git a/src/model/voice.rs b/src/model/voice.rs
index e69de29..a888029 100644
--- a/src/model/voice.rs
+++ b/src/model/voice.rs
@@ -0,0 +1,97 @@
+use super::utils::{into_map, into_string, remove, warn_field};
+use super::UserId;
+use ::constants::VoiceOpCode;
+use ::internal::prelude::*;
+use ::utils::decode_array;
+
+#[derive(Clone, Debug)]
+pub struct VoiceHandshake {
+ pub heartbeat_interval: u64,
+ pub ip: Option<String>,
+ pub modes: Vec<String>,
+ pub port: u16,
+ pub ssrc: u32,
+}
+
+#[derive(Clone, Debug)]
+pub struct VoiceHeartbeat {
+ pub heartbeat_interval: u64,
+}
+
+#[derive(Clone, Debug)]
+pub struct VoiceHello {
+ pub heartbeat_interval: u64,
+ pub modes: Vec<String>,
+ pub port: u16,
+ pub ssrc: u32,
+}
+
+#[derive(Clone, Debug)]
+pub struct VoiceReady {
+ pub mode: String,
+ pub secret_key: Vec<u8>,
+}
+
+#[derive(Clone, Debug)]
+pub struct VoiceSpeaking {
+ pub speaking: bool,
+ pub ssrc: u32,
+ pub user_id: UserId,
+}
+
+#[derive(Clone, Debug)]
+pub enum VoiceEvent {
+ Handshake(VoiceHandshake),
+ Heartbeat(VoiceHeartbeat),
+ Hello(VoiceHello),
+ Ready(VoiceReady),
+ Speaking(VoiceSpeaking),
+ KeepAlive,
+ Unknown(VoiceOpCode, Value)
+}
+
+impl VoiceEvent {
+ pub fn decode(value: Value) -> Result<VoiceEvent> {
+ let mut value = try!(into_map(value));
+ let op = req!(try!(remove(&mut value, "op")).as_u64());
+ let mut map = try!(remove(&mut value, "d").and_then(into_map));
+
+ match try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode))) {
+ VoiceOpCode::Heartbeat => {
+ missing!(map, VoiceEvent::Heartbeat(VoiceHeartbeat {
+ heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()),
+ }))
+ },
+ VoiceOpCode::Hello => {
+ missing!(map, VoiceEvent::Hello(VoiceHello {
+ heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval"))
+ .as_u64()),
+ modes: try!(decode_array(try!(remove(&mut map, "modes")),
+ into_string)),
+ port: req!(try!(remove(&mut map, "port"))
+ .as_u64()) as u16,
+ ssrc: req!(try!(remove(&mut map, "ssrc"))
+ .as_u64()) as u32,
+ }))
+ },
+ VoiceOpCode::KeepAlive => Ok(VoiceEvent::KeepAlive),
+ VoiceOpCode::SessionDescription => {
+ missing!(map, VoiceEvent::Ready(VoiceReady {
+ mode: try!(remove(&mut map, "mode")
+ .and_then(into_string)),
+ secret_key: try!(decode_array(try!(remove(&mut map, "secret_key")),
+ |v| Ok(req!(v.as_u64()) as u8)
+ )),
+ }))
+ },
+ VoiceOpCode::Speaking => {
+ missing!(map, VoiceEvent::Speaking(VoiceSpeaking {
+ speaking: req!(try!(remove(&mut map, "speaking")).as_bool()),
+ ssrc: req!(try!(remove(&mut map, "ssrc")).as_u64()) as u32,
+ user_id: try!(remove(&mut map, "user_id").and_then(UserId::decode)),
+ }))
+ }
+ other => Ok(VoiceEvent::Unknown(other, Value::Object(map))),
+ }
+ }
+}
diff --git a/src/utils/mod.rs b/src/utils/mod.rs
index dbd1455..28d18f1 100644
--- a/src/utils/mod.rs
+++ b/src/utils/mod.rs
@@ -1,4 +1,3 @@
-
//! A set of utilities to help with common use cases that are not required to
//! fully use the library.
@@ -109,6 +108,56 @@ macro_rules! request {
}};
}
+// Enable/disable check for voice
+macro_rules! feature_voice {
+ ($enabled:block) => {
+ {
+ feature_voice_enabled! {{
+ $enabled
+ }}
+ }
+ };
+ ($enabled:block $disabled:block) => {
+ {
+ feature_voice_enabled! {{
+ $enabled
+ }}
+
+ feature_voice_disabled! {{
+ $disabled
+ }}
+ }
+ };
+}
+
+#[cfg(feature="voice")]
+macro_rules! feature_voice_enabled {
+ ($enabled:block) => {
+ {
+ $enabled
+ }
+ }
+}
+
+#[cfg(not(feature="voice"))]
+macro_rules! feature_voice_enabled {
+ ($enabled:block) => {}
+}
+
+#[cfg(feature="voice")]
+macro_rules! feature_voice_disabled {
+ ($disabled:block) => {}
+}
+
+#[cfg(not(feature="voice"))]
+macro_rules! feature_voice_disabled {
+ ($disabled:block) => {
+ {
+ $disabled
+ }
+ }
+}
+
/// Retrieves the "code" part of an [invite][`RichInvite`] out of a URL.
///
/// # Examples