aboutsummaryrefslogtreecommitdiff
path: root/src/ext
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-11-13 19:28:13 -0800
committerAustin Hellyer <[email protected]>2016-11-14 18:32:10 -0800
commit7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch)
treeca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src/ext
parentAdd internal module (diff)
downloadserenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz
serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip
Add voice connection support
Diffstat (limited to 'src/ext')
-rw-r--r--src/ext/mod.rs1
-rw-r--r--src/ext/voice/audio.rs15
-rw-r--r--src/ext/voice/connection.rs310
-rw-r--r--src/ext/voice/connection_info.rs6
-rw-r--r--src/ext/voice/error.rs10
-rw-r--r--src/ext/voice/handler.rs331
-rw-r--r--src/ext/voice/manager.rs141
-rw-r--r--src/ext/voice/mod.rs55
-rw-r--r--src/ext/voice/threading.rs93
9 files changed, 962 insertions, 0 deletions
diff --git a/src/ext/mod.rs b/src/ext/mod.rs
index 92fda62..bb87911 100644
--- a/src/ext/mod.rs
+++ b/src/ext/mod.rs
@@ -10,4 +10,5 @@
pub mod framework;
pub mod state;
+#[cfg(feature="voice")]
pub mod voice;
diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs
new file mode 100644
index 0000000..e49bd2a
--- /dev/null
+++ b/src/ext/voice/audio.rs
@@ -0,0 +1,15 @@
+use ::model::UserId;
+
+/// A readable audio source.
+pub trait AudioSource: Send {
+ fn is_stereo(&mut self) -> bool;
+
+ fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>;
+}
+
+/// A receiver for incoming audio.
+pub trait AudioReceiver: Send {
+ fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool);
+
+ fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]);
+}
diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs
new file mode 100644
index 0000000..7dfc034
--- /dev/null
+++ b/src/ext/voice/connection.rs
@@ -0,0 +1,310 @@
+use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
+use serde_json::builder::ObjectBuilder;
+use sodiumoxide::crypto::secretbox::Key;
+use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket};
+use std::sync::mpsc::{self, Receiver as MpscReceiver};
+use std::thread::{self, Builder as ThreadBuilder};
+use super::audio::{AudioReceiver, AudioSource};
+use super::connection_info::ConnectionInfo;
+use super::{CRYPTO_MODE, VoiceError};
+use websocket::client::request::Url as WebsocketUrl;
+use websocket::client::{
+ Client as WsClient,
+ Receiver as WsReceiver,
+ Sender as WsSender
+};
+use websocket::stream::WebSocketStream;
+use ::client::STATE;
+use ::constants::VoiceOpCode;
+use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::internal::Timer;
+use ::model::VoiceEvent;
+
+pub enum ReceiverStatus {
+ Udp(Vec<u8>),
+ Websocket(VoiceEvent),
+}
+
+#[allow(dead_code)]
+pub struct Connection {
+ audio_timer: Timer,
+ destination: SocketAddr,
+ keepalive_timer: Timer,
+ key: Key,
+ receive_channel: MpscReceiver<ReceiverStatus>,
+ sender: WsSender<WebSocketStream>,
+ sequence: u64,
+ speaking: bool,
+ ssrc: u32,
+ timestamp: u32,
+ udp: UdpSocket,
+}
+
+impl Connection {
+ pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
+ let url = try!(generate_url(&mut info.endpoint));
+
+ let response = try!(try!(WsClient::connect(url)).send());
+ try!(response.validate());
+ let (mut sender, mut receiver) = response.begin().split();
+
+ try!(sender.send_json(&identify(&info)));
+
+ let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) {
+ VoiceEvent::Handshake(handshake) => handshake,
+ _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)),
+ };
+
+ if !has_valid_mode(handshake.modes) {
+ return Err(Error::Voice(VoiceError::VoiceModeUnavailable));
+ }
+
+ let destination = {
+ try!(try!((&info.endpoint[..], handshake.port)
+ .to_socket_addrs())
+ .next()
+ .ok_or(Error::Voice(VoiceError::HostnameResolve)))
+ };
+ let udp = try!(UdpSocket::bind("0.0.0.0:0"));
+
+ {
+ let mut bytes = [0; 70];
+ try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc));
+ try!(udp.send_to(&bytes, destination));
+ }
+
+ try!(send_acknowledgement(&mut sender, &udp));
+
+ let key = try!(get_encryption_key(&mut receiver));
+
+ let receive_channel = try!(start_threads(receiver, &udp));
+
+ info!("[Voice] Connected to: {}", info.endpoint);
+
+ Ok(Connection {
+ audio_timer: Timer::new(1000 * 60 * 4),
+ destination: destination,
+ key: key,
+ keepalive_timer: Timer::new(handshake.heartbeat_interval),
+ receive_channel: receive_channel,
+ udp: udp,
+ sender: sender,
+ sequence: 0,
+ speaking: false,
+ ssrc: handshake.ssrc,
+ timestamp: 0,
+ })
+ }
+
+ #[allow(unused_variables)]
+ pub fn update(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ receiver: &mut Option<Box<AudioReceiver>>,
+ audio_timer: &mut Timer)
+ -> Result<()> {
+ if let Some(receiver) = receiver.as_mut() {
+ while let Ok(status) = self.receive_channel.try_recv() {
+ match status {
+ ReceiverStatus::Udp(packet) => {
+ debug!("[Voice] Received UDP packet: {:?}", packet);
+ },
+ ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
+ receiver.speaking_update(ev.ssrc,
+ &ev.user_id,
+ ev.speaking);
+ },
+ ReceiverStatus::Websocket(other) => {
+ info!("[Voice] Received other websocket data: {:?}",
+ other);
+ },
+ }
+ }
+ } else {
+ while let Ok(_) = self.receive_channel.try_recv() {}
+ }
+
+ // Send the voice websocket keepalive if it's time
+ if self.keepalive_timer.check() {
+ try!(self.sender.send_json(&keepalive()));
+ }
+
+ // Send the UDP keepalive if it's time
+ if self.audio_timer.check() {
+ let mut bytes = [0; 4];
+ try!((&mut bytes[..]).write_u32::<BigEndian>(self.ssrc));
+ try!(self.udp.send_to(&bytes, self.destination));
+ }
+
+ try!(self.speaking(true));
+
+ self.sequence = self.sequence.wrapping_add(1);
+ self.timestamp = self.timestamp.wrapping_add(960);
+
+ audio_timer.await();
+ self.audio_timer.reset();
+ Ok(())
+ }
+
+ fn speaking(&mut self, speaking: bool) -> Result<()> {
+ if self.speaking == speaking {
+ return Ok(());
+ }
+
+ self.speaking = speaking;
+
+ let map = ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Speaking.num())
+ .insert_object("d", |object| object
+ .insert("delay", 0))
+ .insert("speaking", speaking)
+ .build();
+
+ self.sender.send_json(&map)
+ }
+}
+
+impl Drop for Connection {
+ fn drop(&mut self) {
+ let _ = self.sender.get_mut().shutdown(Shutdown::Both);
+
+ info!("Voice disconnected");
+ }
+}
+
+fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
+ if endpoint.ends_with(":80") {
+ let len = endpoint.len();
+
+ endpoint.truncate(len - 3);
+ }
+
+ WebsocketUrl::parse(&format!("wss://{}", endpoint))
+ .or(Err(Error::Voice(VoiceError::EndpointUrl)))
+}
+
+pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+ -> Result<Key> {
+ loop {
+ match try!(receiver.recv_json(VoiceEvent::decode)) {
+ VoiceEvent::Ready(ready) => {
+ if ready.mode != CRYPTO_MODE {
+ return Err(Error::Voice(VoiceError::VoiceModeInvalid));
+ }
+
+ return Key::from_slice(&ready.secret_key)
+ .ok_or(Error::Voice(VoiceError::KeyGen));
+ },
+ VoiceEvent::Unknown(op, value) => {
+ debug!("Unknown message type: {}/{:?}", op.num(), value);
+ },
+ _ => {},
+ }
+ }
+}
+
+fn identify(info: &ConnectionInfo) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Identify.num())
+ .insert_object("d", |o| o
+ .insert("server_id", info.server_id)
+ .insert("session_id", &info.session_id)
+ .insert("token", &info.token)
+ .insert("user_id", STATE.lock().unwrap().user.id.0))
+ .build()
+}
+
+#[inline(always)]
+fn has_valid_mode(modes: Vec<String>) -> bool {
+ modes.iter().any(|s| s == CRYPTO_MODE)
+}
+
+fn keepalive() -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::KeepAlive.num())
+ .insert("d", Value::Null)
+ .build()
+}
+
+#[inline]
+fn select_protocol(address: &[u8], port: u16) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::SelectProtocol.num())
+ .insert_object("d", |o| o
+ .insert("protocol", "udp")
+ .insert_object("data", |o| o
+ .insert("address", address)
+ .insert("mode", "xsalsa20_poly1305")))
+ .insert("port", port)
+ .build()
+}
+
+#[inline]
+fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket)
+ -> Result<()> {
+ let mut bytes = [0; 256];
+
+ let (len, _) = try!(udp.recv_from(&mut bytes));
+
+ let zero_index = bytes.iter()
+ .skip(4)
+ .position(|&x| x == 0)
+ .unwrap();
+
+ let address = &bytes[4..4 + zero_index];
+
+ let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>());
+
+ // send the acknowledgement websocket message
+ let map = select_protocol(address, port);
+ sender.send_json(&map).map(|_| ())
+}
+
+#[inline]
+fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+ -> Result<MpscReceiver<ReceiverStatus>> {
+ let thread = thread::current();
+ let thread_name = thread.name().unwrap_or("serenity.rs voice");
+
+ let (tx, rx) = mpsc::channel();
+ let tx_clone = tx.clone();
+ let udp_clone = try!(udp.try_clone());
+
+ try!(ThreadBuilder::new()
+ .name(format!("{} WS", thread_name))
+ .spawn(move || {
+ loop {
+ let msg = receiver.recv_json(VoiceEvent::decode);
+
+ if let Ok(msg) = msg {
+ let send = tx.send(ReceiverStatus::Websocket(msg));
+
+ if let Err(_why) = send {
+ return;
+ }
+ } else {
+ break;
+ }
+ }
+ }));
+
+ try!(ThreadBuilder::new()
+ .name(format!("{} UDP", thread_name))
+ .spawn(move || {
+ let mut buffer = [0; 512];
+
+ loop {
+ let (len, _) = udp_clone.recv_from(&mut buffer).unwrap();
+ let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len]
+ .iter()
+ .cloned()
+ .collect()));
+
+ if let Err(_why) = req {
+ return;
+ }
+ }
+ }));
+
+ Ok(rx)
+}
diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs
new file mode 100644
index 0000000..9115385
--- /dev/null
+++ b/src/ext/voice/connection_info.rs
@@ -0,0 +1,6 @@
+pub struct ConnectionInfo {
+ pub endpoint: String,
+ pub server_id: u64,
+ pub session_id: String,
+ pub token: String,
+}
diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs
new file mode 100644
index 0000000..b1f3251
--- /dev/null
+++ b/src/ext/voice/error.rs
@@ -0,0 +1,10 @@
+#[derive(Debug)]
+pub enum VoiceError {
+ // An indicator that an endpoint URL was invalid.
+ EndpointUrl,
+ ExpectedHandshake,
+ HostnameResolve,
+ KeyGen,
+ VoiceModeInvalid,
+ VoiceModeUnavailable,
+}
diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs
new file mode 100644
index 0000000..8dc0ab1
--- /dev/null
+++ b/src/ext/voice/handler.rs
@@ -0,0 +1,331 @@
+use serde_json::builder::ObjectBuilder;
+use std::sync::mpsc::{self, Sender};
+use super::connection_info::ConnectionInfo;
+use super::{Status as VoiceStatus, Target};
+use ::client::ConnectionStatus;
+use ::constants::VoiceOpCode;
+use ::model::{ChannelId, GuildId, VoiceState};
+use super::threading;
+
+/// The handler is responsible for "handling" a single voice connection, acting
+/// as a clean API above the inner connection.
+///
+/// # Examples
+///
+/// Assuming that you already have a [`Manager`], most likely retrieved via a
+/// [WebSocket connection], you can join a guild's voice channel and deafen
+/// yourself like so:
+///
+/// ```rust,ignore
+/// // assuming a `manager` has already been bound, hopefully retrieved through
+/// // a websocket's connection.
+/// use serenity::model::{ChannelId, GuildId};
+///
+/// let guild_id = GuildId(81384788765712384);
+/// let channel_id = ChannelId(85482585546833920);
+///
+/// let handler = manager.join(Some(guild_id), channel_id);
+/// handler.deafen(true);
+/// ```
+///
+/// [`Manager`]: struct.Manager.html
+/// [WebSocket connection]: ../../client/struct.Connection.html
+pub struct Handler {
+ channel_id: Option<ChannelId>,
+ endpoint_token: Option<(String, String)>,
+ guild_id: Option<GuildId>,
+ self_deaf: bool,
+ self_mute: bool,
+ sender: Sender<VoiceStatus>,
+ session_id: Option<String>,
+ user_id: u64,
+ ws: Sender<ConnectionStatus>,
+}
+
+impl Handler {
+ /// Creates a new Handler.
+ ///
+ /// **Note**: You should never call this yourself, and should instead use
+ /// [`Manager::join`].
+ ///
+ /// Like, really. Really do not use this. Please.
+ ///
+ /// [`Manager::join`]: struct.Manager.html#method.join
+ #[doc(hidden)]
+ pub fn new(target: Target, ws: Sender<ConnectionStatus>, user_id: u64)
+ -> Self {
+ let (tx, rx) = mpsc::channel();
+
+ let (channel_id, guild_id) = match target {
+ Target::Channel(channel_id) => (Some(channel_id), None),
+ Target::Guild(guild_id) => (None, Some(guild_id)),
+ };
+
+ threading::start(target, rx);
+
+ Handler {
+ channel_id: channel_id,
+ endpoint_token: None,
+ guild_id: guild_id,
+ self_deaf: false,
+ self_mute: false,
+ sender: tx,
+ session_id: None,
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Retrieves the current connected voice channel's `ChannelId`, if connected
+ /// to one.
+ ///
+ /// Note that when connected to a voice channel, while the `ChannelId` will
+ /// not be `None`, the [`GuildId`] retrieved via [`guild`] can, in the event
+ /// of [`Group`] or 1-on-1 [`Call`]s.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ /// [`GuildId`]: ../../model/struct.GuildId.html
+ /// [`guild`]: #method.guild
+ pub fn channel(&self) -> Option<ChannelId> {
+ self.channel_id
+ }
+
+ /// Sets whether the current connection to be deafened.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ ///
+ /// **Note**: Unlike in the official client, you _can_ be deafened while
+ /// not being muted.
+ pub fn deafen(&mut self, deaf: bool) {
+ self.self_deaf = deaf;
+
+ // Only send an update if there is currently a connected channel.
+ //
+ // Otherwise, this can be treated as a "settings" update for a
+ // connection.
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Retrieves the current connected voice channel's `GuildId`, if connected
+ /// to one.
+ ///
+ /// Note that the `GuildId` can be `None` in the event of [`Group`] or
+ /// 1-on-1 [`Call`]s, although when connected to a voice channel, the
+ /// [`ChannelId`] retrieved via [`channel`] will be `Some`.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`ChannelId`]: ../../model/struct.ChannelId.html
+ /// [`Group`]: ../../model/struct.Group.html
+ /// [`channel`]: #method.channel
+ pub fn guild(&self) -> Option<GuildId> {
+ self.guild_id
+ }
+
+ /// Whether the current handler is set to deafen voice connections.
+ ///
+ /// Use [`deafen`] to modify this configuration.
+ ///
+ /// [`deafen`]: #method.deafen
+ pub fn is_deafened(&self) -> bool {
+ self.self_deaf
+ }
+
+ /// Whether the current handler is set to mute voice connections.
+ ///
+ /// Use [`mute`] to modify this configuration.
+ ///
+ /// [`mute`]: #method.mute
+ pub fn is_muted(&self) -> bool {
+ self.self_mute
+ }
+
+ /// Connect - or switch - to the given voice channel by its Id.
+ ///
+ /// **Note**: This is not necessary for [`Group`] or direct [call][`Call`]s.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ pub fn join(&mut self, channel_id: ChannelId) {
+ self.channel_id = Some(channel_id);
+
+ self.connect();
+ }
+
+ /// Leaves the current voice channel, disconnecting from it.
+ ///
+ /// This does _not_ forget settings, like whether to be self-deafened or
+ /// self-muted.
+ pub fn leave(&mut self) {
+ match self.channel_id {
+ None => return,
+ Some(_channel_id) => {
+ self.channel_id = None;
+
+ self.update();
+ },
+ }
+ }
+
+ /// Sets whether the current connection is to be muted.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ pub fn mute(&mut self, mute: bool) {
+ self.self_mute = mute;
+
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Switches the current connected voice channel to the given `channel_id`.
+ ///
+ /// This has 3 separate behaviors:
+ ///
+ /// - if the given `channel_id` is equivilant to the current connected
+ /// `channel_id`, then do nothing;
+ /// - if the given `channel_id` is _not_ equivilant to the current connected
+ /// `channel_id`, then switch to the given `channel_id`;
+ /// - if not currently connected to a voice channel, connect to the given
+ /// one.
+ ///
+ /// **Note**: The given `channel_id`, if in a guild, _must_ be in the
+ /// current handler's associated guild.
+ ///
+ /// If you are dealing with switching from one group to another, then open
+ /// another handler, and optionally drop this one via [`Manager::remove`].
+ ///
+ /// [`Manager::remove`]: struct.Manager.html#method.remove
+ pub fn switch_to(&mut self, channel_id: ChannelId) {
+ match self.channel_id {
+ Some(current_id) if current_id == channel_id => {
+ // If already connected to the given channel, do nothing.
+ return;
+ },
+ Some(_current_id) => {
+ self.channel_id = Some(channel_id);
+
+ self.update();
+ },
+ None => {
+ self.channel_id = Some(channel_id);
+
+ self.connect();
+ },
+ }
+ }
+
+ fn connect(&self) {
+ // Do _not_ try connecting if there is not at least a channel. There
+ // does not _necessarily_ need to be a guild.
+ if self.channel_id.is_none() {
+ return;
+ }
+
+ self.update();
+ }
+
+ fn connect_with_data(&mut self, session_id: String, endpoint: String, token: String) {
+ let target_id = if let Some(guild_id) = self.guild_id {
+ guild_id.0
+ } else if let Some(channel_id) = self.channel_id {
+ channel_id.0
+ } else {
+ // Theoretically never happens? This needs to be researched more.
+ error!("[Voice] No guild/channel ID when connecting");
+
+ return;
+ };
+
+ self.send(VoiceStatus::Connect(ConnectionInfo {
+ endpoint: endpoint,
+ server_id: target_id,
+ session_id: session_id,
+ token: token,
+ }))
+ }
+
+ // Send an update for the current session.
+ fn update(&self) {
+ let map = ObjectBuilder::new()
+ .insert("op", VoiceOpCode::SessionDescription.num())
+ .insert_object("d", |o| o
+ .insert("channel_id", self.channel_id.map(|c| c.0))
+ .insert("guild_id", self.guild_id.map(|g| g.0))
+ .insert("self_deaf", self.self_deaf)
+ .insert("self_mute", self.self_mute))
+ .build();
+
+ let _ = self.ws.send(ConnectionStatus::SendMessage(map));
+ }
+
+ fn send(&mut self, status: VoiceStatus) {
+ let send = self.sender.send(status);
+
+ // Reconnect if it errored.
+ if let Err(mpsc::SendError(status)) = send {
+ let (tx, rx) = mpsc::channel();
+
+ self.sender = tx;
+ self.sender.send(status).unwrap();
+
+ threading::start(Target::Guild(self.guild_id.unwrap()), rx);
+
+ self.update();
+ }
+ }
+
+ /// You probably shouldn't use this if you're reading the source code.
+ #[doc(hidden)]
+ pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) {
+ if let &Some(ref endpoint) = endpoint {
+ let endpoint = endpoint.clone();
+ let token = token.to_owned();
+ let session_id = match self.session_id {
+ Some(ref session_id) => session_id.clone(),
+ None => return,
+ };
+
+ self.connect_with_data(session_id, endpoint, token);
+ } else {
+ self.leave();
+ }
+ }
+
+ /// You probably shouldn't use this if you're reading the source code.
+ #[doc(hidden)]
+ pub fn update_state(&mut self, voice_state: &VoiceState) {
+ if self.user_id != voice_state.user_id.0 {
+ return;
+ }
+
+ self.channel_id = voice_state.channel_id;
+
+ if voice_state.channel_id.is_some() {
+ let session_id = voice_state.session_id.clone();
+
+ match self.endpoint_token.take() {
+ Some((endpoint, token)) => {
+ self.connect_with_data(session_id, endpoint, token);
+ },
+ None => {
+ self.session_id = Some(session_id);
+ },
+ }
+ } else {
+ self.leave();
+ }
+ }
+}
+
+impl Drop for Handler {
+ /// Leaves the current connected voice channel, if connected to one, and
+ /// forgets all configurations relevant to this Handler.
+ fn drop(&mut self) {
+ self.leave();
+ }
+}
diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs
new file mode 100644
index 0000000..c6c7533
--- /dev/null
+++ b/src/ext/voice/manager.rs
@@ -0,0 +1,141 @@
+use std::collections::HashMap;
+use std::sync::mpsc::Sender as MpscSender;
+use super::{Handler, Target};
+use ::client::ConnectionStatus;
+use ::model::{ChannelId, GuildId};
+
+/// A manager is a struct responsible for managing [`Handler`]s which belong to
+/// a single [WebSocket connection]. This is a fairly complex key-value store,
+/// with a bit of extra utility for easily joining a "target".
+///
+/// The "target" used by the Manager is determined based on the `guild_id` and
+/// `channel_id` provided. If a `guild_id` is _not_ provided to methods that
+/// optionally require it, then the target is a group or 1-on-1 call with a
+/// user. The `channel_id` is then used as the target.
+///
+/// If a `guild_id` is provided, then the target is the guild, as a user
+/// can not be connected to two channels within one guild simultaneously.
+///
+/// [`Group`]: ../../model/struct.Group.html
+/// [`Handler`]: struct.Handler.html
+/// [guild's channel]: ../../model/enum.ChannelType.html#variant.Voice
+/// [WebSocket connection]: ../../client/struct.Connection.html
+pub struct Manager {
+ handlers: HashMap<Target, Handler>,
+ user_id: u64,
+ ws: MpscSender<ConnectionStatus>,
+}
+
+impl Manager {
+ #[doc(hidden)]
+ pub fn new(ws: MpscSender<ConnectionStatus>, user_id: u64) -> Manager {
+ Manager {
+ handlers: HashMap::new(),
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Retrieves a mutable handler for the given target, if one exists.
+ pub fn get<T: Into<Target>>(&mut self, target_id: T)
+ -> Option<&mut Handler> {
+ self.handlers.get_mut(&target_id.into())
+ }
+
+ /// Connects to a target by retrieving its relevant [`Handler`] and
+ /// connecting, or creating the handler if required.
+ ///
+ /// This can also switch to the given channel, if a handler already exists
+ /// for the target and the current connected channel is not equal to the
+ /// given channel.
+ ///
+ /// In the case of channel targets, the same channel is used to connect to.
+ ///
+ /// In the case of guilds, the provided channel is used to connect to. The
+ /// channel _must_ be in the provided guild. This is _not_ checked by the
+ /// library, and will result in an error. If there is already a connected
+ /// handler for the guild, _and_ the provided channel is different from the
+ /// channel that the connection is already connected to, then the handler
+ /// will switch the connection to the provided channel.
+ ///
+ /// If you _only_ need to retrieve the handler for a target, then use
+ /// [`get`].
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ #[allow(map_entry)]
+ pub fn join(&mut self, guild_id: Option<GuildId>, channel_id: ChannelId)
+ -> &mut Handler {
+ if let Some(guild_id) = guild_id {
+ let target = Target::Guild(guild_id);
+
+ {
+ let mut found = false;
+
+ if let Some(handler) = self.handlers.get_mut(&target) {
+ handler.switch_to(channel_id);
+
+ found = true;
+ }
+
+ if found {
+ // Actually safe, as the key has already been found above.
+ return self.handlers.get_mut(&target).unwrap();
+ }
+ }
+
+ let mut handler = Handler::new(target, self.ws.clone(), self.user_id);
+ handler.join(channel_id);
+
+ self.handlers.insert(target, handler);
+
+ // Actually safe, as the key would have been inserted above.
+ self.handlers.get_mut(&target).unwrap()
+ } else {
+ let target = Target::Channel(channel_id);
+
+ if !self.handlers.contains_key(&target) {
+ let mut handler = Handler::new(target, self.ws.clone(), self.user_id);
+ handler.join(channel_id);
+
+ self.handlers.insert(target, handler);
+ }
+
+ // Actually safe, as the key would have been inserted above.
+ self.handlers.get_mut(&target).unwrap()
+ }
+ }
+
+ /// Retrieves the [handler][`Handler`] for the given target and leaves the
+ /// associated voice channel, if connected.
+ ///
+ /// This will _not_ drop the handler, and will preserve it and its settings.
+ ///
+ /// This is a wrapper around [getting][`get`] a handler and calling
+ /// [`leave`] on it.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ /// [`leave`]: struct.Handler.html#method.leave
+ pub fn leave<T: Into<Target>>(&mut self, target_id: T) {
+ let target = target_id.into();
+
+ if let Some(handler) = self.handlers.get_mut(&target) {
+ handler.leave();
+ }
+ }
+
+ /// Retrieves the [`Handler`] for the given target and leaves the associated
+ /// voice channel, if connected.
+ ///
+ /// The handler is then dropped, removing settings for the target.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ pub fn remove<T: Into<Target>>(&mut self, target_id: T) {
+ let target = target_id.into();
+
+ self.leave(target);
+
+ self.handlers.remove(&target);
+ }
+}
diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs
index e69de29..107a473 100644
--- a/src/ext/voice/mod.rs
+++ b/src/ext/voice/mod.rs
@@ -0,0 +1,55 @@
+mod audio;
+mod connection;
+mod connection_info;
+mod error;
+mod manager;
+mod handler;
+mod threading;
+
+pub use self::error::VoiceError;
+pub use self::handler::Handler;
+pub use self::manager::Manager;
+
+use self::audio::{AudioReceiver, AudioSource};
+use self::connection_info::ConnectionInfo;
+use ::model::{ChannelId, GuildId};
+
+const CRYPTO_MODE: &'static str = "xsalsa20_poly1305";
+
+#[doc(hidden)]
+pub enum Status {
+ Connect(ConnectionInfo),
+ Disconnect,
+ SetReceiver(Option<Box<AudioReceiver>>),
+ SetSender(Option<Box<AudioSource>>),
+}
+
+/// Denotes the target to manage a connection for.
+///
+/// For most cases, targets should entirely be guilds, except for the one case
+/// where a user account can be in a 1-to-1 or group call.
+///
+/// It _may_ be possible in the future for bots to be in multiple groups. If
+/// this turns out to be the case, supporting that now rather than messily in
+/// the future is the best option. Thus, these types of calls are specified by
+/// the group's channel Id.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+pub enum Target {
+ /// Used for managing a voice handler for a 1-on-1 (user-to-user) or group
+ /// call.
+ Channel(ChannelId),
+ /// Used for managing a voice handler for a guild.
+ Guild(GuildId),
+}
+
+impl From<ChannelId> for Target {
+ fn from(channel_id: ChannelId) -> Target {
+ Target::Channel(channel_id)
+ }
+}
+
+impl From<GuildId> for Target {
+ fn from(guild_id: GuildId) -> Target {
+ Target::Guild(guild_id)
+ }
+}
diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs
new file mode 100644
index 0000000..4110c5f
--- /dev/null
+++ b/src/ext/voice/threading.rs
@@ -0,0 +1,93 @@
+use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError};
+use std::thread::Builder as ThreadBuilder;
+use super::connection::Connection;
+use super::{Status, Target};
+use ::internal::Timer;
+
+pub fn start(target_id: Target, rx: MpscReceiver<Status>) {
+ let name = match target_id {
+ Target::Channel(channel_id) => format!("Serenity Voice (C{})", channel_id),
+ Target::Guild(guild_id) => format!("Serenity Voice (G{})", guild_id),
+ };
+
+ ThreadBuilder::new()
+ .name(name)
+ .spawn(move || runner(rx))
+ .expect("Err starting voice");
+}
+
+fn runner(rx: MpscReceiver<Status>) {
+ let mut sender = None;
+ let mut receiver = None;
+ let mut connection = None;
+ let mut timer = Timer::new(20);
+
+ 'runner: loop {
+ loop {
+ match rx.try_recv() {
+ Ok(Status::Connect(info)) => {
+ connection = match Connection::new(info) {
+ Ok(connection) => Some(connection),
+ Err(why) => {
+ error!("Err connecting via voice: {:?}", why);
+
+ None
+ },
+ };
+ },
+ Ok(Status::Disconnect) => {
+ connection = None;
+ },
+ Ok(Status::SetReceiver(r)) => {
+ receiver = r;
+ },
+ Ok(Status::SetSender(s)) => {
+ sender = s;
+ },
+ Err(TryRecvError::Empty) => {
+ // If we receieved nothing, then we can perform an update.
+ break;
+ },
+ Err(TryRecvError::Disconnected) => {
+ break 'runner;
+ },
+ }
+ }
+
+ // Overall here, check if there's an error.
+ //
+ // If there is a connection, try to send an update. This should not
+ // error. If there is though for some spurious reason, then set `error`
+ // to `true`.
+ //
+ // Otherwise, wait out the timer and do _not_ error and wait to receive
+ // another event.
+ let error = match connection.as_mut() {
+ Some(connection) => {
+ let update = connection.update(&mut sender,
+ &mut receiver,
+ &mut timer);
+
+ match update {
+ Ok(()) => false,
+ Err(why) => {
+ error!("Err updating voice connection: {:?}", why);
+
+ true
+ },
+ }
+ },
+ None => {
+ timer.await();
+
+ false
+ },
+ };
+
+ // If there was an error, then just reset the connection and try to get
+ // another.
+ if error {
+ connection = None;
+ }
+ }
+}