diff options
| author | Austin Hellyer <[email protected]> | 2016-11-13 19:28:13 -0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-11-14 18:32:10 -0800 |
| commit | 7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch) | |
| tree | ca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src | |
| parent | Add internal module (diff) | |
| download | serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip | |
Add voice connection support
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/connection.rs | 198 | ||||
| -rw-r--r-- | src/client/context.rs | 2 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 2 | ||||
| -rw-r--r-- | src/client/http/mod.rs | 2 | ||||
| -rw-r--r-- | src/client/mod.rs | 10 | ||||
| -rw-r--r-- | src/constants.rs | 10 | ||||
| -rw-r--r-- | src/error.rs | 7 | ||||
| -rw-r--r-- | src/ext/mod.rs | 1 | ||||
| -rw-r--r-- | src/ext/voice/audio.rs | 15 | ||||
| -rw-r--r-- | src/ext/voice/connection.rs | 310 | ||||
| -rw-r--r-- | src/ext/voice/connection_info.rs | 6 | ||||
| -rw-r--r-- | src/ext/voice/error.rs | 10 | ||||
| -rw-r--r-- | src/ext/voice/handler.rs | 331 | ||||
| -rw-r--r-- | src/ext/voice/manager.rs | 141 | ||||
| -rw-r--r-- | src/ext/voice/mod.rs | 55 | ||||
| -rw-r--r-- | src/ext/voice/threading.rs | 93 | ||||
| -rw-r--r-- | src/internal/mod.rs | 7 | ||||
| -rw-r--r-- | src/internal/timer.rs | 45 | ||||
| -rw-r--r-- | src/internal/ws_impl.rs | 60 | ||||
| -rw-r--r-- | src/lib.rs | 5 | ||||
| -rw-r--r-- | src/model/gateway.rs | 61 | ||||
| -rw-r--r-- | src/model/utils.rs | 6 | ||||
| -rw-r--r-- | src/model/voice.rs | 97 | ||||
| -rw-r--r-- | src/utils/mod.rs | 51 |
24 files changed, 1366 insertions, 159 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs index 93c967e..d1e37e5 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,4 +1,3 @@ -use flate2::read::ZlibDecoder; use serde_json::builder::ObjectBuilder; use serde_json; use std::fmt::{self, Display}; @@ -18,15 +17,46 @@ use super::Client; use time::{self, Duration}; use websocket::client::request::Url as RequestUrl; use websocket::client::{Client as WsClient, Sender, Receiver}; -use websocket::message::{Message as WsMessage, Type as WsType}; +use websocket::message::Message as WsMessage; use websocket::stream::WebSocketStream; -use websocket::ws::receiver::Receiver as WsReceiver; use websocket::ws::sender::Sender as WsSender; use ::constants::{self, OpCode}; -use ::model::*; use ::internal::prelude::*; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::model::{ + ChannelId, + Event, + Game, + GatewayEvent, + GuildId, + OnlineStatus, + ReadyEvent, +}; + +#[cfg(feature="voice")] +use ::ext::voice::Manager as VoiceManager; + +#[cfg(feature="voice")] +macro_rules! connection { + ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => { + Connection { + $($name1: $val1,)* + $($name2: $val2,)* + } + } +} -enum Status { +#[cfg(not(feature="voice"))] +macro_rules! connection { + ($($name1:ident: $val1:expr),*; $($name2:ident: $val2:expr,)*) => { + Connection { + $($name1: $val1,)* + } + } +} + +#[doc(hidden)] +pub enum Status { SendMessage(Value), Sequence(u64), ChangeInterval(u64), @@ -131,6 +161,8 @@ pub struct Connection { shard_info: Option<[u8; 2]>, token: String, ws_url: String, + #[cfg(feature = "voice")] + pub manager: VoiceManager, } impl Connection { @@ -181,8 +213,14 @@ impl Connection { }; let (tx, rx) = mpsc::channel(); + let thread_name = match shard_info { + Some(info) => format!("serenity keepalive [shard {}/{}]", + info[0], + info[1] - 1), + None => "serenity keepalive [unsharded]".to_owned(), + }; try!(ThreadBuilder::new() - .name("serenity keepalive".into()) + .name(thread_name) .spawn(move || keepalive(heartbeat_interval, sender, rx))); // Parse READY @@ -192,16 +230,30 @@ impl Connection { &mut receiver, identification)); - Ok((Connection { - keepalive_channel: tx, - last_sequence: sequence, - login_type: login_type, - receiver: receiver, - token: token.to_owned(), - session_id: Some(ready.ready.session_id.clone()), - shard_info: shard_info, - ws_url: base_url.to_owned(), - }, ready)) + Ok((feature_voice! {{ + Connection { + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + receiver: receiver, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + manager: VoiceManager::new(tx, ready.ready.user.id.0), + } + } { + Connection { + keepalive_channel: tx.clone(), + last_sequence: sequence, + login_type: login_type, + receiver: receiver, + token: token.to_owned(), + session_id: Some(ready.ready.session_id.clone()), + shard_info: shard_info, + ws_url: base_url.to_owned(), + } + }}, ready)) } pub fn shard_info(&self) -> Option<[u8; 2]> { @@ -229,8 +281,8 @@ impl Connection { match game { Some(game) => { - object.insert_object("game", move |o| - o.insert("name", game.name)) + object.insert_object("game", move |o| o + .insert("name", game.name)) }, None => object.insert("game", Value::Null), } @@ -243,15 +295,10 @@ impl Connection { pub fn receive(&mut self) -> Result<Event> { match self.receiver.recv_json(GatewayEvent::decode) { Ok(GatewayEvent::Dispatch(sequence, event)) => { - self.last_sequence = sequence; - - let _ = self.keepalive_channel.send(Status::Sequence(sequence)); - - if let Event::Resumed(ref ev) = event { - let _ = self.keepalive_channel.send(Status::ChangeInterval(ev.heartbeat_interval)); - } + let status = Status::Sequence(sequence); + let _ = self.keepalive_channel.send(status); - Ok(event) + Ok(self.handle_dispatch(event)) }, Ok(GatewayEvent::Heartbeat(sequence)) => { let map = ObjectBuilder::new() @@ -273,8 +320,9 @@ impl Connection { Ok(GatewayEvent::InvalidateSession) => { self.session_id = None; - let status = Status::SendMessage(identify(&self.token, - self.shard_info)); + let identification = identify(&self.token, self.shard_info); + + let status = Status::SendMessage(identification); let _ = self.keepalive_channel.send(status); @@ -324,6 +372,34 @@ impl Connection { } } + fn handle_dispatch(&mut self, event: Event) -> Event { + if let Event::Resumed(ref ev) = event { + let status = Status::ChangeInterval(ev.heartbeat_interval); + + let _ = self.keepalive_channel.send(status); + } + + feature_voice_enabled! {{ + if let Event::VoiceStateUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_state(&update.voice_state); + } + } + } + + if let Event::VoiceServerUpdate(ref update) = event { + if let Some(guild_id) = update.guild_id { + if let Some(handler) = self.manager.get(guild_id) { + handler.update_server(&update.endpoint, &update.token); + } + } + } + }} + + event + } + fn reconnect(&mut self) -> Result<Event> { debug!("Reconnecting"); @@ -480,57 +556,7 @@ impl Drop for Connection { } } -trait ReceiverExt { - fn recv_json<F, T>(&mut self, decode: F) -> Result<T> - where F: FnOnce(Value) -> Result<T>; -} - -trait SenderExt { - fn send_json(&mut self, value: &Value) -> Result<()>; -} - -impl ReceiverExt for Receiver<WebSocketStream> { - fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> { - let message: WsMessage = try!(self.recv_message()); - - if message.opcode == WsType::Close { - let representation = String::from_utf8_lossy(&message.payload) - .into_owned(); - - Err(Error::Connection(ConnectionError::Closed(message.cd_status_code, - representation))) - } else if message.opcode == WsType::Binary || message.opcode == WsType::Text { - let json: Value = if message.opcode == WsType::Binary { - try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..]))) - } else { - try!(serde_json::from_reader(&message.payload[..])) - }; - - decode(json).map_err(|err| { - warn!("Error decoding: {}", - String::from_utf8_lossy(&message.payload)); - - err - }) - } else { - let representation = String::from_utf8_lossy(&message.payload) - .into_owned(); - - Err(Error::Connection(ConnectionError::Closed(None, - representation))) - } - } -} - -impl SenderExt for Sender<WebSocketStream> { - fn send_json(&mut self, value: &Value) -> Result<()> { - serde_json::to_string(value) - .map(WsMessage::text) - .map_err(Error::from) - .and_then(|m| self.send_message(&m).map_err(Error::from)) - } -} - +#[inline] fn parse_ready(event: GatewayEvent, tx: &MpscSender<Status>, receiver: &mut Receiver<WebSocketStream>, @@ -582,7 +608,7 @@ fn identify(token: &str, shard_info: Option<[u8; 2]>) -> serde_json::Value { if let Some(shard_info) = shard_info { object = object - .insert_array("shard", |array| array + .insert_array("shard", |a| a .push(shard_info[0]) .push(shard_info[1])); } @@ -602,7 +628,7 @@ fn identify_compression(object: ObjectBuilder) -> ObjectBuilder { object.insert("compression", false) } -fn build_gateway_url(base: &str) -> Result<::websocket::client::request::Url> { +fn build_gateway_url(base: &str) -> Result<RequestUrl> { RequestUrl::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)) .map_err(|_| Error::Client(ClientError::Gateway)) } @@ -627,9 +653,8 @@ fn keepalive(interval: u64, sender = new_sender; }, Ok(Status::SendMessage(val)) => { - match sender.send_json(&val) { - Ok(()) => {}, - Err(e) => warn!("Err sending message: {:?}", e), + if let Err(why) = sender.send_json(&val) { + warn!("Err sending message: {:?}", why); } }, Ok(Status::Sequence(seq)) => { @@ -648,9 +673,8 @@ fn keepalive(interval: u64, .insert("op", OpCode::Heartbeat.num()) .build(); - match sender.send_json(&map) { - Ok(()) => {}, - Err(e) => warn!("Error sending gateway keeaplive: {:?}", e) + if let Err(why) = sender.send_json(&map) { + warn!("Err sending keepalive: {:?}", why); } } } diff --git a/src/client/context.rs b/src/client/context.rs index d7465d1..1bee390 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -14,8 +14,8 @@ use ::utils::builder::{ EditRole, GetMessages }; -use ::model::*; use ::internal::prelude::*; +use ::model::*; use ::utils; #[derive(Clone)] diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index f0d9c91..7efee75 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -4,8 +4,8 @@ use super::event_store::EventStore; use super::login_type::LoginType; use super::{STATE, Connection, Context}; use ::ext::framework::Framework; -use ::model::{ChannelId, Event, Message}; use ::internal::prelude::*; +use ::model::{ChannelId, Event, Message}; macro_rules! handler { ($field:ident, $event_store:ident) => { diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index b77e24f..9fba750 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -36,8 +36,8 @@ use std::default::Default; use std::io::{ErrorKind as IoErrorKind, Read}; use std::sync::{Arc, Mutex}; use ::constants; -use ::model::*; use ::internal::prelude::*; +use ::model::*; use ::utils::decode_array; lazy_static! { diff --git a/src/client/mod.rs b/src/client/mod.rs index 580ae74..661d477 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -41,7 +41,11 @@ mod dispatch; mod event_store; mod login_type; -pub use self::connection::{Connection, ConnectionError}; +pub use self::connection::{ + Connection, + ConnectionError, + Status as ConnectionStatus +}; pub use self::context::Context; pub use self::login_type::LoginType; @@ -53,10 +57,10 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use ::model::*; -use ::internal::prelude::*; use ::ext::framework::Framework; use ::ext::state::State; +use ::internal::prelude::*; +use ::model::*; lazy_static! { /// The STATE is a mutable lazily-initialized static binding. It can be diff --git a/src/constants.rs b/src/constants.rs index 69d49ae..68f6524 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -8,7 +8,7 @@ pub const MESSAGE_CODE_LIMIT: u16 = 2000; /// The [UserAgent] sent along with every request. /// /// [UserAgent]: ../hyper/header/struct.UserAgent.html -pub const USER_AGENT: &'static str = concat!("DiscordBot (https://github.com/zeyla/serenity, ", env!("CARGO_PKG_VERSION"), ")"); +pub const USER_AGENT: &'static str = concat!("DiscordBot (https://github.com/zeyla/serenity.rs, ", env!("CARGO_PKG_VERSION"), ")"); #[allow(dead_code)] #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -136,9 +136,10 @@ map_nums! { OpCode; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum VoiceOpCode { Identify, - SelectProtocol, - Hello, Heartbeat, + Hello, + KeepAlive, + SelectProtocol, SessionDescription, Speaking, } @@ -147,7 +148,8 @@ map_nums! { VoiceOpCode; Identify 0, SelectProtocol 1, Hello 2, - Heartbeat 3, + KeepAlive 3, SessionDescription 4, Speaking 5, + Heartbeat 8, } diff --git a/src/error.rs b/src/error.rs index 2e3ea07..97524b0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,8 @@ use serde_json::Error as JsonError; use serde_json::Value; use websocket::result::WebSocketError; use ::client::{ClientError, ConnectionError}; +#[cfg(feature="voice")] +use ::ext::voice::VoiceError; /// The common result type between most library functions. pub type Result<T> = ::std::result::Result<T, Error>; @@ -40,6 +42,9 @@ pub enum Error { Other(&'static str), /// An error from the `url` crate. Url(String), + /// Indicating an error within the voice module. + #[cfg(feature="voice")] + Voice(VoiceError), /// An error from the `rust-websocket` crate. WebSocket(WebSocketError), } @@ -91,6 +96,8 @@ impl StdError for Error { Error::Json(ref inner) => inner.description(), Error::Url(ref inner) => inner, Error::WebSocket(ref inner) => inner.description(), + #[cfg(feature = "voice")] + Error::Voice(_) => "Voice error", } } diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 92fda62..bb87911 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -10,4 +10,5 @@ pub mod framework; pub mod state; +#[cfg(feature="voice")] pub mod voice; diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs new file mode 100644 index 0000000..e49bd2a --- /dev/null +++ b/src/ext/voice/audio.rs @@ -0,0 +1,15 @@ +use ::model::UserId; + +/// A readable audio source. +pub trait AudioSource: Send { + fn is_stereo(&mut self) -> bool; + + fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>; +} + +/// A receiver for incoming audio. +pub trait AudioReceiver: Send { + fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool); + + fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]); +} diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs new file mode 100644 index 0000000..7dfc034 --- /dev/null +++ b/src/ext/voice/connection.rs @@ -0,0 +1,310 @@ +use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use serde_json::builder::ObjectBuilder; +use sodiumoxide::crypto::secretbox::Key; +use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket}; +use std::sync::mpsc::{self, Receiver as MpscReceiver}; +use std::thread::{self, Builder as ThreadBuilder}; +use super::audio::{AudioReceiver, AudioSource}; +use super::connection_info::ConnectionInfo; +use super::{CRYPTO_MODE, VoiceError}; +use websocket::client::request::Url as WebsocketUrl; +use websocket::client::{ + Client as WsClient, + Receiver as WsReceiver, + Sender as WsSender +}; +use websocket::stream::WebSocketStream; +use ::client::STATE; +use ::constants::VoiceOpCode; +use ::internal::prelude::*; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::internal::Timer; +use ::model::VoiceEvent; + +pub enum ReceiverStatus { + Udp(Vec<u8>), + Websocket(VoiceEvent), +} + +#[allow(dead_code)] +pub struct Connection { + audio_timer: Timer, + destination: SocketAddr, + keepalive_timer: Timer, + key: Key, + receive_channel: MpscReceiver<ReceiverStatus>, + sender: WsSender<WebSocketStream>, + sequence: u64, + speaking: bool, + ssrc: u32, + timestamp: u32, + udp: UdpSocket, +} + +impl Connection { + pub fn new(mut info: ConnectionInfo) -> Result<Connection> { + let url = try!(generate_url(&mut info.endpoint)); + + let response = try!(try!(WsClient::connect(url)).send()); + try!(response.validate()); + let (mut sender, mut receiver) = response.begin().split(); + + try!(sender.send_json(&identify(&info))); + + let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) { + VoiceEvent::Handshake(handshake) => handshake, + _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)), + }; + + if !has_valid_mode(handshake.modes) { + return Err(Error::Voice(VoiceError::VoiceModeUnavailable)); + } + + let destination = { + try!(try!((&info.endpoint[..], handshake.port) + .to_socket_addrs()) + .next() + .ok_or(Error::Voice(VoiceError::HostnameResolve))) + }; + let udp = try!(UdpSocket::bind("0.0.0.0:0")); + + { + let mut bytes = [0; 70]; + try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc)); + try!(udp.send_to(&bytes, destination)); + } + + try!(send_acknowledgement(&mut sender, &udp)); + + let key = try!(get_encryption_key(&mut receiver)); + + let receive_channel = try!(start_threads(receiver, &udp)); + + info!("[Voice] Connected to: {}", info.endpoint); + + Ok(Connection { + audio_timer: Timer::new(1000 * 60 * 4), + destination: destination, + key: key, + keepalive_timer: Timer::new(handshake.heartbeat_interval), + receive_channel: receive_channel, + udp: udp, + sender: sender, + sequence: 0, + speaking: false, + ssrc: handshake.ssrc, + timestamp: 0, + }) + } + + #[allow(unused_variables)] + pub fn update(&mut self, + source: &mut Option<Box<AudioSource>>, + receiver: &mut Option<Box<AudioReceiver>>, + audio_timer: &mut Timer) + -> Result<()> { + if let Some(receiver) = receiver.as_mut() { + while let Ok(status) = self.receive_channel.try_recv() { + match status { + ReceiverStatus::Udp(packet) => { + debug!("[Voice] Received UDP packet: {:?}", packet); + }, + ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { + receiver.speaking_update(ev.ssrc, + &ev.user_id, + ev.speaking); + }, + ReceiverStatus::Websocket(other) => { + info!("[Voice] Received other websocket data: {:?}", + other); + }, + } + } + } else { + while let Ok(_) = self.receive_channel.try_recv() {} + } + + // Send the voice websocket keepalive if it's time + if self.keepalive_timer.check() { + try!(self.sender.send_json(&keepalive())); + } + + // Send the UDP keepalive if it's time + if self.audio_timer.check() { + let mut bytes = [0; 4]; + try!((&mut bytes[..]).write_u32::<BigEndian>(self.ssrc)); + try!(self.udp.send_to(&bytes, self.destination)); + } + + try!(self.speaking(true)); + + self.sequence = self.sequence.wrapping_add(1); + self.timestamp = self.timestamp.wrapping_add(960); + + audio_timer.await(); + self.audio_timer.reset(); + Ok(()) + } + + fn speaking(&mut self, speaking: bool) -> Result<()> { + if self.speaking == speaking { + return Ok(()); + } + + self.speaking = speaking; + + let map = ObjectBuilder::new() + .insert("op", VoiceOpCode::Speaking.num()) + .insert_object("d", |object| object + .insert("delay", 0)) + .insert("speaking", speaking) + .build(); + + self.sender.send_json(&map) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + let _ = self.sender.get_mut().shutdown(Shutdown::Both); + + info!("Voice disconnected"); + } +} + +fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { + if endpoint.ends_with(":80") { + let len = endpoint.len(); + + endpoint.truncate(len - 3); + } + + WebsocketUrl::parse(&format!("wss://{}", endpoint)) + .or(Err(Error::Voice(VoiceError::EndpointUrl))) +} + +pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>) + -> Result<Key> { + loop { + match try!(receiver.recv_json(VoiceEvent::decode)) { + VoiceEvent::Ready(ready) => { + if ready.mode != CRYPTO_MODE { + return Err(Error::Voice(VoiceError::VoiceModeInvalid)); + } + + return Key::from_slice(&ready.secret_key) + .ok_or(Error::Voice(VoiceError::KeyGen)); + }, + VoiceEvent::Unknown(op, value) => { + debug!("Unknown message type: {}/{:?}", op.num(), value); + }, + _ => {}, + } + } +} + +fn identify(info: &ConnectionInfo) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::Identify.num()) + .insert_object("d", |o| o + .insert("server_id", info.server_id) + .insert("session_id", &info.session_id) + .insert("token", &info.token) + .insert("user_id", STATE.lock().unwrap().user.id.0)) + .build() +} + +#[inline(always)] +fn has_valid_mode(modes: Vec<String>) -> bool { + modes.iter().any(|s| s == CRYPTO_MODE) +} + +fn keepalive() -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::KeepAlive.num()) + .insert("d", Value::Null) + .build() +} + +#[inline] +fn select_protocol(address: &[u8], port: u16) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::SelectProtocol.num()) + .insert_object("d", |o| o + .insert("protocol", "udp") + .insert_object("data", |o| o + .insert("address", address) + .insert("mode", "xsalsa20_poly1305"))) + .insert("port", port) + .build() +} + +#[inline] +fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket) + -> Result<()> { + let mut bytes = [0; 256]; + + let (len, _) = try!(udp.recv_from(&mut bytes)); + + let zero_index = bytes.iter() + .skip(4) + .position(|&x| x == 0) + .unwrap(); + + let address = &bytes[4..4 + zero_index]; + + let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>()); + + // send the acknowledgement websocket message + let map = select_protocol(address, port); + sender.send_json(&map).map(|_| ()) +} + +#[inline] +fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) + -> Result<MpscReceiver<ReceiverStatus>> { + let thread = thread::current(); + let thread_name = thread.name().unwrap_or("serenity.rs voice"); + + let (tx, rx) = mpsc::channel(); + let tx_clone = tx.clone(); + let udp_clone = try!(udp.try_clone()); + + try!(ThreadBuilder::new() + .name(format!("{} WS", thread_name)) + .spawn(move || { + loop { + let msg = receiver.recv_json(VoiceEvent::decode); + + if let Ok(msg) = msg { + let send = tx.send(ReceiverStatus::Websocket(msg)); + + if let Err(_why) = send { + return; + } + } else { + break; + } + } + })); + + try!(ThreadBuilder::new() + .name(format!("{} UDP", thread_name)) + .spawn(move || { + let mut buffer = [0; 512]; + + loop { + let (len, _) = udp_clone.recv_from(&mut buffer).unwrap(); + let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len] + .iter() + .cloned() + .collect())); + + if let Err(_why) = req { + return; + } + } + })); + + Ok(rx) +} diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs new file mode 100644 index 0000000..9115385 --- /dev/null +++ b/src/ext/voice/connection_info.rs @@ -0,0 +1,6 @@ +pub struct ConnectionInfo { + pub endpoint: String, + pub server_id: u64, + pub session_id: String, + pub token: String, +} diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs new file mode 100644 index 0000000..b1f3251 --- /dev/null +++ b/src/ext/voice/error.rs @@ -0,0 +1,10 @@ +#[derive(Debug)] +pub enum VoiceError { + // An indicator that an endpoint URL was invalid. + EndpointUrl, + ExpectedHandshake, + HostnameResolve, + KeyGen, + VoiceModeInvalid, + VoiceModeUnavailable, +} diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs new file mode 100644 index 0000000..8dc0ab1 --- /dev/null +++ b/src/ext/voice/handler.rs @@ -0,0 +1,331 @@ +use serde_json::builder::ObjectBuilder; +use std::sync::mpsc::{self, Sender}; +use super::connection_info::ConnectionInfo; +use super::{Status as VoiceStatus, Target}; +use ::client::ConnectionStatus; +use ::constants::VoiceOpCode; +use ::model::{ChannelId, GuildId, VoiceState}; +use super::threading; + +/// The handler is responsible for "handling" a single voice connection, acting +/// as a clean API above the inner connection. +/// +/// # Examples +/// +/// Assuming that you already have a [`Manager`], most likely retrieved via a +/// [WebSocket connection], you can join a guild's voice channel and deafen +/// yourself like so: +/// +/// ```rust,ignore +/// // assuming a `manager` has already been bound, hopefully retrieved through +/// // a websocket's connection. +/// use serenity::model::{ChannelId, GuildId}; +/// +/// let guild_id = GuildId(81384788765712384); +/// let channel_id = ChannelId(85482585546833920); +/// +/// let handler = manager.join(Some(guild_id), channel_id); +/// handler.deafen(true); +/// ``` +/// +/// [`Manager`]: struct.Manager.html +/// [WebSocket connection]: ../../client/struct.Connection.html +pub struct Handler { + channel_id: Option<ChannelId>, + endpoint_token: Option<(String, String)>, + guild_id: Option<GuildId>, + self_deaf: bool, + self_mute: bool, + sender: Sender<VoiceStatus>, + session_id: Option<String>, + user_id: u64, + ws: Sender<ConnectionStatus>, +} + +impl Handler { + /// Creates a new Handler. + /// + /// **Note**: You should never call this yourself, and should instead use + /// [`Manager::join`]. + /// + /// Like, really. Really do not use this. Please. + /// + /// [`Manager::join`]: struct.Manager.html#method.join + #[doc(hidden)] + pub fn new(target: Target, ws: Sender<ConnectionStatus>, user_id: u64) + -> Self { + let (tx, rx) = mpsc::channel(); + + let (channel_id, guild_id) = match target { + Target::Channel(channel_id) => (Some(channel_id), None), + Target::Guild(guild_id) => (None, Some(guild_id)), + }; + + threading::start(target, rx); + + Handler { + channel_id: channel_id, + endpoint_token: None, + guild_id: guild_id, + self_deaf: false, + self_mute: false, + sender: tx, + session_id: None, + user_id: user_id, + ws: ws, + } + } + + /// Retrieves the current connected voice channel's `ChannelId`, if connected + /// to one. + /// + /// Note that when connected to a voice channel, while the `ChannelId` will + /// not be `None`, the [`GuildId`] retrieved via [`guild`] can, in the event + /// of [`Group`] or 1-on-1 [`Call`]s. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`Group`]: ../../model/struct.Group.html + /// [`GuildId`]: ../../model/struct.GuildId.html + /// [`guild`]: #method.guild + pub fn channel(&self) -> Option<ChannelId> { + self.channel_id + } + + /// Sets whether the current connection to be deafened. + /// + /// If there is no live voice connection, then this only acts as a settings + /// update for future connections. + /// + /// **Note**: Unlike in the official client, you _can_ be deafened while + /// not being muted. + pub fn deafen(&mut self, deaf: bool) { + self.self_deaf = deaf; + + // Only send an update if there is currently a connected channel. + // + // Otherwise, this can be treated as a "settings" update for a + // connection. + if self.channel_id.is_some() { + self.update(); + } + } + + /// Retrieves the current connected voice channel's `GuildId`, if connected + /// to one. + /// + /// Note that the `GuildId` can be `None` in the event of [`Group`] or + /// 1-on-1 [`Call`]s, although when connected to a voice channel, the + /// [`ChannelId`] retrieved via [`channel`] will be `Some`. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`ChannelId`]: ../../model/struct.ChannelId.html + /// [`Group`]: ../../model/struct.Group.html + /// [`channel`]: #method.channel + pub fn guild(&self) -> Option<GuildId> { + self.guild_id + } + + /// Whether the current handler is set to deafen voice connections. + /// + /// Use [`deafen`] to modify this configuration. + /// + /// [`deafen`]: #method.deafen + pub fn is_deafened(&self) -> bool { + self.self_deaf + } + + /// Whether the current handler is set to mute voice connections. + /// + /// Use [`mute`] to modify this configuration. + /// + /// [`mute`]: #method.mute + pub fn is_muted(&self) -> bool { + self.self_mute + } + + /// Connect - or switch - to the given voice channel by its Id. + /// + /// **Note**: This is not necessary for [`Group`] or direct [call][`Call`]s. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`Group`]: ../../model/struct.Group.html + pub fn join(&mut self, channel_id: ChannelId) { + self.channel_id = Some(channel_id); + + self.connect(); + } + + /// Leaves the current voice channel, disconnecting from it. + /// + /// This does _not_ forget settings, like whether to be self-deafened or + /// self-muted. + pub fn leave(&mut self) { + match self.channel_id { + None => return, + Some(_channel_id) => { + self.channel_id = None; + + self.update(); + }, + } + } + + /// Sets whether the current connection is to be muted. + /// + /// If there is no live voice connection, then this only acts as a settings + /// update for future connections. + pub fn mute(&mut self, mute: bool) { + self.self_mute = mute; + + if self.channel_id.is_some() { + self.update(); + } + } + + /// Switches the current connected voice channel to the given `channel_id`. + /// + /// This has 3 separate behaviors: + /// + /// - if the given `channel_id` is equivilant to the current connected + /// `channel_id`, then do nothing; + /// - if the given `channel_id` is _not_ equivilant to the current connected + /// `channel_id`, then switch to the given `channel_id`; + /// - if not currently connected to a voice channel, connect to the given + /// one. + /// + /// **Note**: The given `channel_id`, if in a guild, _must_ be in the + /// current handler's associated guild. + /// + /// If you are dealing with switching from one group to another, then open + /// another handler, and optionally drop this one via [`Manager::remove`]. + /// + /// [`Manager::remove`]: struct.Manager.html#method.remove + pub fn switch_to(&mut self, channel_id: ChannelId) { + match self.channel_id { + Some(current_id) if current_id == channel_id => { + // If already connected to the given channel, do nothing. + return; + }, + Some(_current_id) => { + self.channel_id = Some(channel_id); + + self.update(); + }, + None => { + self.channel_id = Some(channel_id); + + self.connect(); + }, + } + } + + fn connect(&self) { + // Do _not_ try connecting if there is not at least a channel. There + // does not _necessarily_ need to be a guild. + if self.channel_id.is_none() { + return; + } + + self.update(); + } + + fn connect_with_data(&mut self, session_id: String, endpoint: String, token: String) { + let target_id = if let Some(guild_id) = self.guild_id { + guild_id.0 + } else if let Some(channel_id) = self.channel_id { + channel_id.0 + } else { + // Theoretically never happens? This needs to be researched more. + error!("[Voice] No guild/channel ID when connecting"); + + return; + }; + + self.send(VoiceStatus::Connect(ConnectionInfo { + endpoint: endpoint, + server_id: target_id, + session_id: session_id, + token: token, + })) + } + + // Send an update for the current session. + fn update(&self) { + let map = ObjectBuilder::new() + .insert("op", VoiceOpCode::SessionDescription.num()) + .insert_object("d", |o| o + .insert("channel_id", self.channel_id.map(|c| c.0)) + .insert("guild_id", self.guild_id.map(|g| g.0)) + .insert("self_deaf", self.self_deaf) + .insert("self_mute", self.self_mute)) + .build(); + + let _ = self.ws.send(ConnectionStatus::SendMessage(map)); + } + + fn send(&mut self, status: VoiceStatus) { + let send = self.sender.send(status); + + // Reconnect if it errored. + if let Err(mpsc::SendError(status)) = send { + let (tx, rx) = mpsc::channel(); + + self.sender = tx; + self.sender.send(status).unwrap(); + + threading::start(Target::Guild(self.guild_id.unwrap()), rx); + + self.update(); + } + } + + /// You probably shouldn't use this if you're reading the source code. + #[doc(hidden)] + pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) { + if let &Some(ref endpoint) = endpoint { + let endpoint = endpoint.clone(); + let token = token.to_owned(); + let session_id = match self.session_id { + Some(ref session_id) => session_id.clone(), + None => return, + }; + + self.connect_with_data(session_id, endpoint, token); + } else { + self.leave(); + } + } + + /// You probably shouldn't use this if you're reading the source code. + #[doc(hidden)] + pub fn update_state(&mut self, voice_state: &VoiceState) { + if self.user_id != voice_state.user_id.0 { + return; + } + + self.channel_id = voice_state.channel_id; + + if voice_state.channel_id.is_some() { + let session_id = voice_state.session_id.clone(); + + match self.endpoint_token.take() { + Some((endpoint, token)) => { + self.connect_with_data(session_id, endpoint, token); + }, + None => { + self.session_id = Some(session_id); + }, + } + } else { + self.leave(); + } + } +} + +impl Drop for Handler { + /// Leaves the current connected voice channel, if connected to one, and + /// forgets all configurations relevant to this Handler. + fn drop(&mut self) { + self.leave(); + } +} diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs new file mode 100644 index 0000000..c6c7533 --- /dev/null +++ b/src/ext/voice/manager.rs @@ -0,0 +1,141 @@ +use std::collections::HashMap; +use std::sync::mpsc::Sender as MpscSender; +use super::{Handler, Target}; +use ::client::ConnectionStatus; +use ::model::{ChannelId, GuildId}; + +/// A manager is a struct responsible for managing [`Handler`]s which belong to +/// a single [WebSocket connection]. This is a fairly complex key-value store, +/// with a bit of extra utility for easily joining a "target". +/// +/// The "target" used by the Manager is determined based on the `guild_id` and +/// `channel_id` provided. If a `guild_id` is _not_ provided to methods that +/// optionally require it, then the target is a group or 1-on-1 call with a +/// user. The `channel_id` is then used as the target. +/// +/// If a `guild_id` is provided, then the target is the guild, as a user +/// can not be connected to two channels within one guild simultaneously. +/// +/// [`Group`]: ../../model/struct.Group.html +/// [`Handler`]: struct.Handler.html +/// [guild's channel]: ../../model/enum.ChannelType.html#variant.Voice +/// [WebSocket connection]: ../../client/struct.Connection.html +pub struct Manager { + handlers: HashMap<Target, Handler>, + user_id: u64, + ws: MpscSender<ConnectionStatus>, +} + +impl Manager { + #[doc(hidden)] + pub fn new(ws: MpscSender<ConnectionStatus>, user_id: u64) -> Manager { + Manager { + handlers: HashMap::new(), + user_id: user_id, + ws: ws, + } + } + + /// Retrieves a mutable handler for the given target, if one exists. + pub fn get<T: Into<Target>>(&mut self, target_id: T) + -> Option<&mut Handler> { + self.handlers.get_mut(&target_id.into()) + } + + /// Connects to a target by retrieving its relevant [`Handler`] and + /// connecting, or creating the handler if required. + /// + /// This can also switch to the given channel, if a handler already exists + /// for the target and the current connected channel is not equal to the + /// given channel. + /// + /// In the case of channel targets, the same channel is used to connect to. + /// + /// In the case of guilds, the provided channel is used to connect to. The + /// channel _must_ be in the provided guild. This is _not_ checked by the + /// library, and will result in an error. If there is already a connected + /// handler for the guild, _and_ the provided channel is different from the + /// channel that the connection is already connected to, then the handler + /// will switch the connection to the provided channel. + /// + /// If you _only_ need to retrieve the handler for a target, then use + /// [`get`]. + /// + /// [`Handler`]: struct.Handler.html + /// [`get`]: #method.get + #[allow(map_entry)] + pub fn join(&mut self, guild_id: Option<GuildId>, channel_id: ChannelId) + -> &mut Handler { + if let Some(guild_id) = guild_id { + let target = Target::Guild(guild_id); + + { + let mut found = false; + + if let Some(handler) = self.handlers.get_mut(&target) { + handler.switch_to(channel_id); + + found = true; + } + + if found { + // Actually safe, as the key has already been found above. + return self.handlers.get_mut(&target).unwrap(); + } + } + + let mut handler = Handler::new(target, self.ws.clone(), self.user_id); + handler.join(channel_id); + + self.handlers.insert(target, handler); + + // Actually safe, as the key would have been inserted above. + self.handlers.get_mut(&target).unwrap() + } else { + let target = Target::Channel(channel_id); + + if !self.handlers.contains_key(&target) { + let mut handler = Handler::new(target, self.ws.clone(), self.user_id); + handler.join(channel_id); + + self.handlers.insert(target, handler); + } + + // Actually safe, as the key would have been inserted above. + self.handlers.get_mut(&target).unwrap() + } + } + + /// Retrieves the [handler][`Handler`] for the given target and leaves the + /// associated voice channel, if connected. + /// + /// This will _not_ drop the handler, and will preserve it and its settings. + /// + /// This is a wrapper around [getting][`get`] a handler and calling + /// [`leave`] on it. + /// + /// [`Handler`]: struct.Handler.html + /// [`get`]: #method.get + /// [`leave`]: struct.Handler.html#method.leave + pub fn leave<T: Into<Target>>(&mut self, target_id: T) { + let target = target_id.into(); + + if let Some(handler) = self.handlers.get_mut(&target) { + handler.leave(); + } + } + + /// Retrieves the [`Handler`] for the given target and leaves the associated + /// voice channel, if connected. + /// + /// The handler is then dropped, removing settings for the target. + /// + /// [`Handler`]: struct.Handler.html + pub fn remove<T: Into<Target>>(&mut self, target_id: T) { + let target = target_id.into(); + + self.leave(target); + + self.handlers.remove(&target); + } +} diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs index e69de29..107a473 100644 --- a/src/ext/voice/mod.rs +++ b/src/ext/voice/mod.rs @@ -0,0 +1,55 @@ +mod audio; +mod connection; +mod connection_info; +mod error; +mod manager; +mod handler; +mod threading; + +pub use self::error::VoiceError; +pub use self::handler::Handler; +pub use self::manager::Manager; + +use self::audio::{AudioReceiver, AudioSource}; +use self::connection_info::ConnectionInfo; +use ::model::{ChannelId, GuildId}; + +const CRYPTO_MODE: &'static str = "xsalsa20_poly1305"; + +#[doc(hidden)] +pub enum Status { + Connect(ConnectionInfo), + Disconnect, + SetReceiver(Option<Box<AudioReceiver>>), + SetSender(Option<Box<AudioSource>>), +} + +/// Denotes the target to manage a connection for. +/// +/// For most cases, targets should entirely be guilds, except for the one case +/// where a user account can be in a 1-to-1 or group call. +/// +/// It _may_ be possible in the future for bots to be in multiple groups. If +/// this turns out to be the case, supporting that now rather than messily in +/// the future is the best option. Thus, these types of calls are specified by +/// the group's channel Id. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum Target { + /// Used for managing a voice handler for a 1-on-1 (user-to-user) or group + /// call. + Channel(ChannelId), + /// Used for managing a voice handler for a guild. + Guild(GuildId), +} + +impl From<ChannelId> for Target { + fn from(channel_id: ChannelId) -> Target { + Target::Channel(channel_id) + } +} + +impl From<GuildId> for Target { + fn from(guild_id: GuildId) -> Target { + Target::Guild(guild_id) + } +} diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs new file mode 100644 index 0000000..4110c5f --- /dev/null +++ b/src/ext/voice/threading.rs @@ -0,0 +1,93 @@ +use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError}; +use std::thread::Builder as ThreadBuilder; +use super::connection::Connection; +use super::{Status, Target}; +use ::internal::Timer; + +pub fn start(target_id: Target, rx: MpscReceiver<Status>) { + let name = match target_id { + Target::Channel(channel_id) => format!("Serenity Voice (C{})", channel_id), + Target::Guild(guild_id) => format!("Serenity Voice (G{})", guild_id), + }; + + ThreadBuilder::new() + .name(name) + .spawn(move || runner(rx)) + .expect("Err starting voice"); +} + +fn runner(rx: MpscReceiver<Status>) { + let mut sender = None; + let mut receiver = None; + let mut connection = None; + let mut timer = Timer::new(20); + + 'runner: loop { + loop { + match rx.try_recv() { + Ok(Status::Connect(info)) => { + connection = match Connection::new(info) { + Ok(connection) => Some(connection), + Err(why) => { + error!("Err connecting via voice: {:?}", why); + + None + }, + }; + }, + Ok(Status::Disconnect) => { + connection = None; + }, + Ok(Status::SetReceiver(r)) => { + receiver = r; + }, + Ok(Status::SetSender(s)) => { + sender = s; + }, + Err(TryRecvError::Empty) => { + // If we receieved nothing, then we can perform an update. + break; + }, + Err(TryRecvError::Disconnected) => { + break 'runner; + }, + } + } + + // Overall here, check if there's an error. + // + // If there is a connection, try to send an update. This should not + // error. If there is though for some spurious reason, then set `error` + // to `true`. + // + // Otherwise, wait out the timer and do _not_ error and wait to receive + // another event. + let error = match connection.as_mut() { + Some(connection) => { + let update = connection.update(&mut sender, + &mut receiver, + &mut timer); + + match update { + Ok(()) => false, + Err(why) => { + error!("Err updating voice connection: {:?}", why); + + true + }, + } + }, + None => { + timer.await(); + + false + }, + }; + + // If there was an error, then just reset the connection and try to get + // another. + if error { + connection = None; + } + } +} diff --git a/src/internal/mod.rs b/src/internal/mod.rs index b9d7209..9dd4676 100644 --- a/src/internal/mod.rs +++ b/src/internal/mod.rs @@ -1 +1,8 @@ pub mod prelude; +pub mod ws_impl; + +#[cfg(feature = "voice")] +mod timer; + +#[cfg(feature = "voice")] +pub use self::timer::Timer; diff --git a/src/internal/timer.rs b/src/internal/timer.rs new file mode 100644 index 0000000..cc846b3 --- /dev/null +++ b/src/internal/timer.rs @@ -0,0 +1,45 @@ +use std::thread; +use std::time::Duration as StdDuration; +use time::{self, Duration, Timespec}; + +pub struct Timer { + due: Timespec, + duration: Duration, +} + +impl Timer { + pub fn new(duration_in_ms: u64) -> Timer { + let duration = Duration::milliseconds(duration_in_ms as i64); + + Timer { + due: time::get_time() + duration, + duration: duration, + } + } + + pub fn await(&mut self) { + let diff = self.due - time::get_time(); + + if diff > time::Duration::zero() { + let amount = diff.num_milliseconds() as u64; + + thread::sleep(StdDuration::from_millis(amount)); + } + + self.due = self.due + self.duration; + } + + pub fn check(&mut self) -> bool { + if time::get_time() >= self.due { + self.due = self.due + self.duration; + + true + } else { + false + } + } + + pub fn reset(&mut self) { + self.due = time::get_time() + self.duration; + } +} diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs new file mode 100644 index 0000000..ab91dae --- /dev/null +++ b/src/internal/ws_impl.rs @@ -0,0 +1,60 @@ +use flate2::read::ZlibDecoder; +use serde_json; +use websocket::client::{Receiver, Sender}; +use websocket::message::{Message as WsMessage, Type as WsType}; +use websocket::stream::WebSocketStream; +use websocket::ws::receiver::Receiver as WsReceiver; +use websocket::ws::sender::Sender as WsSender; +use ::client::ConnectionError; +use ::internal::prelude::*; + +pub trait ReceiverExt { + fn recv_json<F, T>(&mut self, decode: F) -> Result<T> + where F: FnOnce(Value) -> Result<T>; +} + +pub trait SenderExt { + fn send_json(&mut self, value: &Value) -> Result<()>; +} + +impl ReceiverExt for Receiver<WebSocketStream> { + fn recv_json<F, T>(&mut self, decode: F) -> Result<T> where F: FnOnce(Value) -> Result<T> { + let message: WsMessage = try!(self.recv_message()); + + if message.opcode == WsType::Close { + let representation = String::from_utf8_lossy(&message.payload) + .into_owned(); + + Err(Error::Connection(ConnectionError::Closed(message.cd_status_code, + representation))) + } else if message.opcode == WsType::Binary || message.opcode == WsType::Text { + let json: Value = if message.opcode == WsType::Binary { + try!(serde_json::from_reader(ZlibDecoder::new(&message.payload[..]))) + } else { + try!(serde_json::from_reader(&message.payload[..])) + }; + + decode(json).map_err(|err| { + warn!("Error decoding: {}", + String::from_utf8_lossy(&message.payload)); + + err + }) + } else { + let representation = String::from_utf8_lossy(&message.payload) + .into_owned(); + + Err(Error::Connection(ConnectionError::Closed(None, + representation))) + } + } +} + +impl SenderExt for Sender<WebSocketStream> { + fn send_json(&mut self, value: &Value) -> Result<()> { + serde_json::to_string(value) + .map(WsMessage::text) + .map_err(Error::from) + .and_then(|m| self.send_message(&m).map_err(Error::from)) + } +} @@ -93,6 +93,11 @@ extern crate serde_json; extern crate time; extern crate websocket; +#[cfg(feature="voice")] +extern crate opus; +#[cfg(feature="voice")] +extern crate sodiumoxide; + #[macro_use] pub mod utils; diff --git a/src/model/gateway.rs b/src/model/gateway.rs index ee62dd5..aa3d995 100644 --- a/src/model/gateway.rs +++ b/src/model/gateway.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashMap}; use super::utils::*; use super::*; -use ::constants::{OpCode, VoiceOpCode}; +use ::constants::OpCode; use ::internal::prelude::*; use ::utils::decode_array; @@ -355,65 +355,6 @@ impl GatewayEvent { } } -#[derive(Debug, Clone)] -pub enum VoiceEvent { - Handshake { - heartbeat_interval: u64, - port: u16, - ssrc: u32, - modes: Vec<String>, - }, - Ready { - mode: String, - secret_key: Vec<u8>, - }, - SpeakingUpdate { - user_id: UserId, - ssrc: u32, - speaking: bool, - }, - KeepAlive, - Unknown(u64, Value) -} - -impl VoiceEvent { - pub fn decode(value: Value) -> Result<VoiceEvent> { - let mut value = try!(into_map(value)); - - let op = req!(try!(remove(&mut value, "op")).as_u64()); - let op = try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode))); - - if op == VoiceOpCode::Heartbeat { - return Ok(VoiceEvent::KeepAlive) - } - - let mut value = try!(remove(&mut value, "d").and_then(into_map)); - if op == VoiceOpCode::Hello { - missing!(value, VoiceEvent::Handshake { - heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()), - modes: try!(decode_array(try!(remove(&mut value, "modes")), into_string)), - port: req!(try!(remove(&mut value, "port")).as_u64()) as u16, - ssrc: req!(try!(remove(&mut value, "ssrc")).as_u64()) as u32, - }) - } else if op == VoiceOpCode::SessionDescription { - missing!(value, VoiceEvent::Ready { - mode: try!(remove(&mut value, "mode").and_then(into_string)), - secret_key: try!(decode_array(try!(remove(&mut value, "secret_key")), - |v| Ok(req!(v.as_u64()) as u8) - )), - }) - } else if op == VoiceOpCode::Speaking { - missing!(value, VoiceEvent::SpeakingUpdate { - user_id: try!(remove(&mut value, "user_id").and_then(UserId::decode)), - ssrc: req!(try!(remove(&mut value, "ssrc")).as_u64()) as u32, - speaking: req!(try!(remove(&mut value, "speaking")).as_bool()), - }) - } else { - Ok(VoiceEvent::Unknown(op as u64, Value::Object(value))) - } - } -} - /// Event received over a websocket connection #[derive(Clone, Debug)] pub enum Event { diff --git a/src/model/utils.rs b/src/model/utils.rs index f0108e9..f85a30f 100644 --- a/src/model/utils.rs +++ b/src/model/utils.rs @@ -35,7 +35,11 @@ macro_rules! missing { #[macro_escape] macro_rules! req { ($opt:expr) => { - try!($opt.ok_or(Error::Decode(concat!("Type mismatch in model:", line!(), ": ", stringify!($opt)), Value::Null))) + try!($opt.ok_or(Error::Decode(concat!("Type mismatch in model:", + line!(), + ": ", + stringify!($opt)), + Value::Null))) } } diff --git a/src/model/voice.rs b/src/model/voice.rs index e69de29..a888029 100644 --- a/src/model/voice.rs +++ b/src/model/voice.rs @@ -0,0 +1,97 @@ +use super::utils::{into_map, into_string, remove, warn_field}; +use super::UserId; +use ::constants::VoiceOpCode; +use ::internal::prelude::*; +use ::utils::decode_array; + +#[derive(Clone, Debug)] +pub struct VoiceHandshake { + pub heartbeat_interval: u64, + pub ip: Option<String>, + pub modes: Vec<String>, + pub port: u16, + pub ssrc: u32, +} + +#[derive(Clone, Debug)] +pub struct VoiceHeartbeat { + pub heartbeat_interval: u64, +} + +#[derive(Clone, Debug)] +pub struct VoiceHello { + pub heartbeat_interval: u64, + pub modes: Vec<String>, + pub port: u16, + pub ssrc: u32, +} + +#[derive(Clone, Debug)] +pub struct VoiceReady { + pub mode: String, + pub secret_key: Vec<u8>, +} + +#[derive(Clone, Debug)] +pub struct VoiceSpeaking { + pub speaking: bool, + pub ssrc: u32, + pub user_id: UserId, +} + +#[derive(Clone, Debug)] +pub enum VoiceEvent { + Handshake(VoiceHandshake), + Heartbeat(VoiceHeartbeat), + Hello(VoiceHello), + Ready(VoiceReady), + Speaking(VoiceSpeaking), + KeepAlive, + Unknown(VoiceOpCode, Value) +} + +impl VoiceEvent { + pub fn decode(value: Value) -> Result<VoiceEvent> { + let mut value = try!(into_map(value)); + let op = req!(try!(remove(&mut value, "op")).as_u64()); + let mut map = try!(remove(&mut value, "d").and_then(into_map)); + + match try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode))) { + VoiceOpCode::Heartbeat => { + missing!(map, VoiceEvent::Heartbeat(VoiceHeartbeat { + heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()), + })) + }, + VoiceOpCode::Hello => { + missing!(map, VoiceEvent::Hello(VoiceHello { + heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval")) + .as_u64()), + modes: try!(decode_array(try!(remove(&mut map, "modes")), + into_string)), + port: req!(try!(remove(&mut map, "port")) + .as_u64()) as u16, + ssrc: req!(try!(remove(&mut map, "ssrc")) + .as_u64()) as u32, + })) + }, + VoiceOpCode::KeepAlive => Ok(VoiceEvent::KeepAlive), + VoiceOpCode::SessionDescription => { + missing!(map, VoiceEvent::Ready(VoiceReady { + mode: try!(remove(&mut map, "mode") + .and_then(into_string)), + secret_key: try!(decode_array(try!(remove(&mut map, "secret_key")), + |v| Ok(req!(v.as_u64()) as u8) + )), + })) + }, + VoiceOpCode::Speaking => { + missing!(map, VoiceEvent::Speaking(VoiceSpeaking { + speaking: req!(try!(remove(&mut map, "speaking")).as_bool()), + ssrc: req!(try!(remove(&mut map, "ssrc")).as_u64()) as u32, + user_id: try!(remove(&mut map, "user_id").and_then(UserId::decode)), + })) + } + other => Ok(VoiceEvent::Unknown(other, Value::Object(map))), + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index dbd1455..28d18f1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,4 +1,3 @@ - //! A set of utilities to help with common use cases that are not required to //! fully use the library. @@ -109,6 +108,56 @@ macro_rules! request { }}; } +// Enable/disable check for voice +macro_rules! feature_voice { + ($enabled:block) => { + { + feature_voice_enabled! {{ + $enabled + }} + } + }; + ($enabled:block $disabled:block) => { + { + feature_voice_enabled! {{ + $enabled + }} + + feature_voice_disabled! {{ + $disabled + }} + } + }; +} + +#[cfg(feature="voice")] +macro_rules! feature_voice_enabled { + ($enabled:block) => { + { + $enabled + } + } +} + +#[cfg(not(feature="voice"))] +macro_rules! feature_voice_enabled { + ($enabled:block) => {} +} + +#[cfg(feature="voice")] +macro_rules! feature_voice_disabled { + ($disabled:block) => {} +} + +#[cfg(not(feature="voice"))] +macro_rules! feature_voice_disabled { + ($disabled:block) => { + { + $disabled + } + } +} + /// Retrieves the "code" part of an [invite][`RichInvite`] out of a URL. /// /// # Examples |