diff options
| author | Austin Hellyer <[email protected]> | 2016-11-13 19:28:13 -0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-11-14 18:32:10 -0800 |
| commit | 7d22fb2a9c70e5e517b359875a0157f72e352e43 (patch) | |
| tree | ca3bcb3a76f68960563d3c38d45e21f493ce32f8 /src/ext | |
| parent | Add internal module (diff) | |
| download | serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.tar.xz serenity-7d22fb2a9c70e5e517b359875a0157f72e352e43.zip | |
Add voice connection support
Diffstat (limited to 'src/ext')
| -rw-r--r-- | src/ext/mod.rs | 1 | ||||
| -rw-r--r-- | src/ext/voice/audio.rs | 15 | ||||
| -rw-r--r-- | src/ext/voice/connection.rs | 310 | ||||
| -rw-r--r-- | src/ext/voice/connection_info.rs | 6 | ||||
| -rw-r--r-- | src/ext/voice/error.rs | 10 | ||||
| -rw-r--r-- | src/ext/voice/handler.rs | 331 | ||||
| -rw-r--r-- | src/ext/voice/manager.rs | 141 | ||||
| -rw-r--r-- | src/ext/voice/mod.rs | 55 | ||||
| -rw-r--r-- | src/ext/voice/threading.rs | 93 |
9 files changed, 962 insertions, 0 deletions
diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 92fda62..bb87911 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -10,4 +10,5 @@ pub mod framework; pub mod state; +#[cfg(feature="voice")] pub mod voice; diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs new file mode 100644 index 0000000..e49bd2a --- /dev/null +++ b/src/ext/voice/audio.rs @@ -0,0 +1,15 @@ +use ::model::UserId; + +/// A readable audio source. +pub trait AudioSource: Send { + fn is_stereo(&mut self) -> bool; + + fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>; +} + +/// A receiver for incoming audio. +pub trait AudioReceiver: Send { + fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool); + + fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]); +} diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs new file mode 100644 index 0000000..7dfc034 --- /dev/null +++ b/src/ext/voice/connection.rs @@ -0,0 +1,310 @@ +use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; +use serde_json::builder::ObjectBuilder; +use sodiumoxide::crypto::secretbox::Key; +use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket}; +use std::sync::mpsc::{self, Receiver as MpscReceiver}; +use std::thread::{self, Builder as ThreadBuilder}; +use super::audio::{AudioReceiver, AudioSource}; +use super::connection_info::ConnectionInfo; +use super::{CRYPTO_MODE, VoiceError}; +use websocket::client::request::Url as WebsocketUrl; +use websocket::client::{ + Client as WsClient, + Receiver as WsReceiver, + Sender as WsSender +}; +use websocket::stream::WebSocketStream; +use ::client::STATE; +use ::constants::VoiceOpCode; +use ::internal::prelude::*; +use ::internal::ws_impl::{ReceiverExt, SenderExt}; +use ::internal::Timer; +use ::model::VoiceEvent; + +pub enum ReceiverStatus { + Udp(Vec<u8>), + Websocket(VoiceEvent), +} + +#[allow(dead_code)] +pub struct Connection { + audio_timer: Timer, + destination: SocketAddr, + keepalive_timer: Timer, + key: Key, + receive_channel: MpscReceiver<ReceiverStatus>, + sender: WsSender<WebSocketStream>, + sequence: u64, + speaking: bool, + ssrc: u32, + timestamp: u32, + udp: UdpSocket, +} + +impl Connection { + pub fn new(mut info: ConnectionInfo) -> Result<Connection> { + let url = try!(generate_url(&mut info.endpoint)); + + let response = try!(try!(WsClient::connect(url)).send()); + try!(response.validate()); + let (mut sender, mut receiver) = response.begin().split(); + + try!(sender.send_json(&identify(&info))); + + let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) { + VoiceEvent::Handshake(handshake) => handshake, + _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)), + }; + + if !has_valid_mode(handshake.modes) { + return Err(Error::Voice(VoiceError::VoiceModeUnavailable)); + } + + let destination = { + try!(try!((&info.endpoint[..], handshake.port) + .to_socket_addrs()) + .next() + .ok_or(Error::Voice(VoiceError::HostnameResolve))) + }; + let udp = try!(UdpSocket::bind("0.0.0.0:0")); + + { + let mut bytes = [0; 70]; + try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc)); + try!(udp.send_to(&bytes, destination)); + } + + try!(send_acknowledgement(&mut sender, &udp)); + + let key = try!(get_encryption_key(&mut receiver)); + + let receive_channel = try!(start_threads(receiver, &udp)); + + info!("[Voice] Connected to: {}", info.endpoint); + + Ok(Connection { + audio_timer: Timer::new(1000 * 60 * 4), + destination: destination, + key: key, + keepalive_timer: Timer::new(handshake.heartbeat_interval), + receive_channel: receive_channel, + udp: udp, + sender: sender, + sequence: 0, + speaking: false, + ssrc: handshake.ssrc, + timestamp: 0, + }) + } + + #[allow(unused_variables)] + pub fn update(&mut self, + source: &mut Option<Box<AudioSource>>, + receiver: &mut Option<Box<AudioReceiver>>, + audio_timer: &mut Timer) + -> Result<()> { + if let Some(receiver) = receiver.as_mut() { + while let Ok(status) = self.receive_channel.try_recv() { + match status { + ReceiverStatus::Udp(packet) => { + debug!("[Voice] Received UDP packet: {:?}", packet); + }, + ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { + receiver.speaking_update(ev.ssrc, + &ev.user_id, + ev.speaking); + }, + ReceiverStatus::Websocket(other) => { + info!("[Voice] Received other websocket data: {:?}", + other); + }, + } + } + } else { + while let Ok(_) = self.receive_channel.try_recv() {} + } + + // Send the voice websocket keepalive if it's time + if self.keepalive_timer.check() { + try!(self.sender.send_json(&keepalive())); + } + + // Send the UDP keepalive if it's time + if self.audio_timer.check() { + let mut bytes = [0; 4]; + try!((&mut bytes[..]).write_u32::<BigEndian>(self.ssrc)); + try!(self.udp.send_to(&bytes, self.destination)); + } + + try!(self.speaking(true)); + + self.sequence = self.sequence.wrapping_add(1); + self.timestamp = self.timestamp.wrapping_add(960); + + audio_timer.await(); + self.audio_timer.reset(); + Ok(()) + } + + fn speaking(&mut self, speaking: bool) -> Result<()> { + if self.speaking == speaking { + return Ok(()); + } + + self.speaking = speaking; + + let map = ObjectBuilder::new() + .insert("op", VoiceOpCode::Speaking.num()) + .insert_object("d", |object| object + .insert("delay", 0)) + .insert("speaking", speaking) + .build(); + + self.sender.send_json(&map) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + let _ = self.sender.get_mut().shutdown(Shutdown::Both); + + info!("Voice disconnected"); + } +} + +fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { + if endpoint.ends_with(":80") { + let len = endpoint.len(); + + endpoint.truncate(len - 3); + } + + WebsocketUrl::parse(&format!("wss://{}", endpoint)) + .or(Err(Error::Voice(VoiceError::EndpointUrl))) +} + +pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>) + -> Result<Key> { + loop { + match try!(receiver.recv_json(VoiceEvent::decode)) { + VoiceEvent::Ready(ready) => { + if ready.mode != CRYPTO_MODE { + return Err(Error::Voice(VoiceError::VoiceModeInvalid)); + } + + return Key::from_slice(&ready.secret_key) + .ok_or(Error::Voice(VoiceError::KeyGen)); + }, + VoiceEvent::Unknown(op, value) => { + debug!("Unknown message type: {}/{:?}", op.num(), value); + }, + _ => {}, + } + } +} + +fn identify(info: &ConnectionInfo) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::Identify.num()) + .insert_object("d", |o| o + .insert("server_id", info.server_id) + .insert("session_id", &info.session_id) + .insert("token", &info.token) + .insert("user_id", STATE.lock().unwrap().user.id.0)) + .build() +} + +#[inline(always)] +fn has_valid_mode(modes: Vec<String>) -> bool { + modes.iter().any(|s| s == CRYPTO_MODE) +} + +fn keepalive() -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::KeepAlive.num()) + .insert("d", Value::Null) + .build() +} + +#[inline] +fn select_protocol(address: &[u8], port: u16) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::SelectProtocol.num()) + .insert_object("d", |o| o + .insert("protocol", "udp") + .insert_object("data", |o| o + .insert("address", address) + .insert("mode", "xsalsa20_poly1305"))) + .insert("port", port) + .build() +} + +#[inline] +fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket) + -> Result<()> { + let mut bytes = [0; 256]; + + let (len, _) = try!(udp.recv_from(&mut bytes)); + + let zero_index = bytes.iter() + .skip(4) + .position(|&x| x == 0) + .unwrap(); + + let address = &bytes[4..4 + zero_index]; + + let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>()); + + // send the acknowledgement websocket message + let map = select_protocol(address, port); + sender.send_json(&map).map(|_| ()) +} + +#[inline] +fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) + -> Result<MpscReceiver<ReceiverStatus>> { + let thread = thread::current(); + let thread_name = thread.name().unwrap_or("serenity.rs voice"); + + let (tx, rx) = mpsc::channel(); + let tx_clone = tx.clone(); + let udp_clone = try!(udp.try_clone()); + + try!(ThreadBuilder::new() + .name(format!("{} WS", thread_name)) + .spawn(move || { + loop { + let msg = receiver.recv_json(VoiceEvent::decode); + + if let Ok(msg) = msg { + let send = tx.send(ReceiverStatus::Websocket(msg)); + + if let Err(_why) = send { + return; + } + } else { + break; + } + } + })); + + try!(ThreadBuilder::new() + .name(format!("{} UDP", thread_name)) + .spawn(move || { + let mut buffer = [0; 512]; + + loop { + let (len, _) = udp_clone.recv_from(&mut buffer).unwrap(); + let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len] + .iter() + .cloned() + .collect())); + + if let Err(_why) = req { + return; + } + } + })); + + Ok(rx) +} diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs new file mode 100644 index 0000000..9115385 --- /dev/null +++ b/src/ext/voice/connection_info.rs @@ -0,0 +1,6 @@ +pub struct ConnectionInfo { + pub endpoint: String, + pub server_id: u64, + pub session_id: String, + pub token: String, +} diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs new file mode 100644 index 0000000..b1f3251 --- /dev/null +++ b/src/ext/voice/error.rs @@ -0,0 +1,10 @@ +#[derive(Debug)] +pub enum VoiceError { + // An indicator that an endpoint URL was invalid. + EndpointUrl, + ExpectedHandshake, + HostnameResolve, + KeyGen, + VoiceModeInvalid, + VoiceModeUnavailable, +} diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs new file mode 100644 index 0000000..8dc0ab1 --- /dev/null +++ b/src/ext/voice/handler.rs @@ -0,0 +1,331 @@ +use serde_json::builder::ObjectBuilder; +use std::sync::mpsc::{self, Sender}; +use super::connection_info::ConnectionInfo; +use super::{Status as VoiceStatus, Target}; +use ::client::ConnectionStatus; +use ::constants::VoiceOpCode; +use ::model::{ChannelId, GuildId, VoiceState}; +use super::threading; + +/// The handler is responsible for "handling" a single voice connection, acting +/// as a clean API above the inner connection. +/// +/// # Examples +/// +/// Assuming that you already have a [`Manager`], most likely retrieved via a +/// [WebSocket connection], you can join a guild's voice channel and deafen +/// yourself like so: +/// +/// ```rust,ignore +/// // assuming a `manager` has already been bound, hopefully retrieved through +/// // a websocket's connection. +/// use serenity::model::{ChannelId, GuildId}; +/// +/// let guild_id = GuildId(81384788765712384); +/// let channel_id = ChannelId(85482585546833920); +/// +/// let handler = manager.join(Some(guild_id), channel_id); +/// handler.deafen(true); +/// ``` +/// +/// [`Manager`]: struct.Manager.html +/// [WebSocket connection]: ../../client/struct.Connection.html +pub struct Handler { + channel_id: Option<ChannelId>, + endpoint_token: Option<(String, String)>, + guild_id: Option<GuildId>, + self_deaf: bool, + self_mute: bool, + sender: Sender<VoiceStatus>, + session_id: Option<String>, + user_id: u64, + ws: Sender<ConnectionStatus>, +} + +impl Handler { + /// Creates a new Handler. + /// + /// **Note**: You should never call this yourself, and should instead use + /// [`Manager::join`]. + /// + /// Like, really. Really do not use this. Please. + /// + /// [`Manager::join`]: struct.Manager.html#method.join + #[doc(hidden)] + pub fn new(target: Target, ws: Sender<ConnectionStatus>, user_id: u64) + -> Self { + let (tx, rx) = mpsc::channel(); + + let (channel_id, guild_id) = match target { + Target::Channel(channel_id) => (Some(channel_id), None), + Target::Guild(guild_id) => (None, Some(guild_id)), + }; + + threading::start(target, rx); + + Handler { + channel_id: channel_id, + endpoint_token: None, + guild_id: guild_id, + self_deaf: false, + self_mute: false, + sender: tx, + session_id: None, + user_id: user_id, + ws: ws, + } + } + + /// Retrieves the current connected voice channel's `ChannelId`, if connected + /// to one. + /// + /// Note that when connected to a voice channel, while the `ChannelId` will + /// not be `None`, the [`GuildId`] retrieved via [`guild`] can, in the event + /// of [`Group`] or 1-on-1 [`Call`]s. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`Group`]: ../../model/struct.Group.html + /// [`GuildId`]: ../../model/struct.GuildId.html + /// [`guild`]: #method.guild + pub fn channel(&self) -> Option<ChannelId> { + self.channel_id + } + + /// Sets whether the current connection to be deafened. + /// + /// If there is no live voice connection, then this only acts as a settings + /// update for future connections. + /// + /// **Note**: Unlike in the official client, you _can_ be deafened while + /// not being muted. + pub fn deafen(&mut self, deaf: bool) { + self.self_deaf = deaf; + + // Only send an update if there is currently a connected channel. + // + // Otherwise, this can be treated as a "settings" update for a + // connection. + if self.channel_id.is_some() { + self.update(); + } + } + + /// Retrieves the current connected voice channel's `GuildId`, if connected + /// to one. + /// + /// Note that the `GuildId` can be `None` in the event of [`Group`] or + /// 1-on-1 [`Call`]s, although when connected to a voice channel, the + /// [`ChannelId`] retrieved via [`channel`] will be `Some`. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`ChannelId`]: ../../model/struct.ChannelId.html + /// [`Group`]: ../../model/struct.Group.html + /// [`channel`]: #method.channel + pub fn guild(&self) -> Option<GuildId> { + self.guild_id + } + + /// Whether the current handler is set to deafen voice connections. + /// + /// Use [`deafen`] to modify this configuration. + /// + /// [`deafen`]: #method.deafen + pub fn is_deafened(&self) -> bool { + self.self_deaf + } + + /// Whether the current handler is set to mute voice connections. + /// + /// Use [`mute`] to modify this configuration. + /// + /// [`mute`]: #method.mute + pub fn is_muted(&self) -> bool { + self.self_mute + } + + /// Connect - or switch - to the given voice channel by its Id. + /// + /// **Note**: This is not necessary for [`Group`] or direct [call][`Call`]s. + /// + /// [`Call`]: ../../model/struct.Call.html + /// [`Group`]: ../../model/struct.Group.html + pub fn join(&mut self, channel_id: ChannelId) { + self.channel_id = Some(channel_id); + + self.connect(); + } + + /// Leaves the current voice channel, disconnecting from it. + /// + /// This does _not_ forget settings, like whether to be self-deafened or + /// self-muted. + pub fn leave(&mut self) { + match self.channel_id { + None => return, + Some(_channel_id) => { + self.channel_id = None; + + self.update(); + }, + } + } + + /// Sets whether the current connection is to be muted. + /// + /// If there is no live voice connection, then this only acts as a settings + /// update for future connections. + pub fn mute(&mut self, mute: bool) { + self.self_mute = mute; + + if self.channel_id.is_some() { + self.update(); + } + } + + /// Switches the current connected voice channel to the given `channel_id`. + /// + /// This has 3 separate behaviors: + /// + /// - if the given `channel_id` is equivilant to the current connected + /// `channel_id`, then do nothing; + /// - if the given `channel_id` is _not_ equivilant to the current connected + /// `channel_id`, then switch to the given `channel_id`; + /// - if not currently connected to a voice channel, connect to the given + /// one. + /// + /// **Note**: The given `channel_id`, if in a guild, _must_ be in the + /// current handler's associated guild. + /// + /// If you are dealing with switching from one group to another, then open + /// another handler, and optionally drop this one via [`Manager::remove`]. + /// + /// [`Manager::remove`]: struct.Manager.html#method.remove + pub fn switch_to(&mut self, channel_id: ChannelId) { + match self.channel_id { + Some(current_id) if current_id == channel_id => { + // If already connected to the given channel, do nothing. + return; + }, + Some(_current_id) => { + self.channel_id = Some(channel_id); + + self.update(); + }, + None => { + self.channel_id = Some(channel_id); + + self.connect(); + }, + } + } + + fn connect(&self) { + // Do _not_ try connecting if there is not at least a channel. There + // does not _necessarily_ need to be a guild. + if self.channel_id.is_none() { + return; + } + + self.update(); + } + + fn connect_with_data(&mut self, session_id: String, endpoint: String, token: String) { + let target_id = if let Some(guild_id) = self.guild_id { + guild_id.0 + } else if let Some(channel_id) = self.channel_id { + channel_id.0 + } else { + // Theoretically never happens? This needs to be researched more. + error!("[Voice] No guild/channel ID when connecting"); + + return; + }; + + self.send(VoiceStatus::Connect(ConnectionInfo { + endpoint: endpoint, + server_id: target_id, + session_id: session_id, + token: token, + })) + } + + // Send an update for the current session. + fn update(&self) { + let map = ObjectBuilder::new() + .insert("op", VoiceOpCode::SessionDescription.num()) + .insert_object("d", |o| o + .insert("channel_id", self.channel_id.map(|c| c.0)) + .insert("guild_id", self.guild_id.map(|g| g.0)) + .insert("self_deaf", self.self_deaf) + .insert("self_mute", self.self_mute)) + .build(); + + let _ = self.ws.send(ConnectionStatus::SendMessage(map)); + } + + fn send(&mut self, status: VoiceStatus) { + let send = self.sender.send(status); + + // Reconnect if it errored. + if let Err(mpsc::SendError(status)) = send { + let (tx, rx) = mpsc::channel(); + + self.sender = tx; + self.sender.send(status).unwrap(); + + threading::start(Target::Guild(self.guild_id.unwrap()), rx); + + self.update(); + } + } + + /// You probably shouldn't use this if you're reading the source code. + #[doc(hidden)] + pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) { + if let &Some(ref endpoint) = endpoint { + let endpoint = endpoint.clone(); + let token = token.to_owned(); + let session_id = match self.session_id { + Some(ref session_id) => session_id.clone(), + None => return, + }; + + self.connect_with_data(session_id, endpoint, token); + } else { + self.leave(); + } + } + + /// You probably shouldn't use this if you're reading the source code. + #[doc(hidden)] + pub fn update_state(&mut self, voice_state: &VoiceState) { + if self.user_id != voice_state.user_id.0 { + return; + } + + self.channel_id = voice_state.channel_id; + + if voice_state.channel_id.is_some() { + let session_id = voice_state.session_id.clone(); + + match self.endpoint_token.take() { + Some((endpoint, token)) => { + self.connect_with_data(session_id, endpoint, token); + }, + None => { + self.session_id = Some(session_id); + }, + } + } else { + self.leave(); + } + } +} + +impl Drop for Handler { + /// Leaves the current connected voice channel, if connected to one, and + /// forgets all configurations relevant to this Handler. + fn drop(&mut self) { + self.leave(); + } +} diff --git a/src/ext/voice/manager.rs b/src/ext/voice/manager.rs new file mode 100644 index 0000000..c6c7533 --- /dev/null +++ b/src/ext/voice/manager.rs @@ -0,0 +1,141 @@ +use std::collections::HashMap; +use std::sync::mpsc::Sender as MpscSender; +use super::{Handler, Target}; +use ::client::ConnectionStatus; +use ::model::{ChannelId, GuildId}; + +/// A manager is a struct responsible for managing [`Handler`]s which belong to +/// a single [WebSocket connection]. This is a fairly complex key-value store, +/// with a bit of extra utility for easily joining a "target". +/// +/// The "target" used by the Manager is determined based on the `guild_id` and +/// `channel_id` provided. If a `guild_id` is _not_ provided to methods that +/// optionally require it, then the target is a group or 1-on-1 call with a +/// user. The `channel_id` is then used as the target. +/// +/// If a `guild_id` is provided, then the target is the guild, as a user +/// can not be connected to two channels within one guild simultaneously. +/// +/// [`Group`]: ../../model/struct.Group.html +/// [`Handler`]: struct.Handler.html +/// [guild's channel]: ../../model/enum.ChannelType.html#variant.Voice +/// [WebSocket connection]: ../../client/struct.Connection.html +pub struct Manager { + handlers: HashMap<Target, Handler>, + user_id: u64, + ws: MpscSender<ConnectionStatus>, +} + +impl Manager { + #[doc(hidden)] + pub fn new(ws: MpscSender<ConnectionStatus>, user_id: u64) -> Manager { + Manager { + handlers: HashMap::new(), + user_id: user_id, + ws: ws, + } + } + + /// Retrieves a mutable handler for the given target, if one exists. + pub fn get<T: Into<Target>>(&mut self, target_id: T) + -> Option<&mut Handler> { + self.handlers.get_mut(&target_id.into()) + } + + /// Connects to a target by retrieving its relevant [`Handler`] and + /// connecting, or creating the handler if required. + /// + /// This can also switch to the given channel, if a handler already exists + /// for the target and the current connected channel is not equal to the + /// given channel. + /// + /// In the case of channel targets, the same channel is used to connect to. + /// + /// In the case of guilds, the provided channel is used to connect to. The + /// channel _must_ be in the provided guild. This is _not_ checked by the + /// library, and will result in an error. If there is already a connected + /// handler for the guild, _and_ the provided channel is different from the + /// channel that the connection is already connected to, then the handler + /// will switch the connection to the provided channel. + /// + /// If you _only_ need to retrieve the handler for a target, then use + /// [`get`]. + /// + /// [`Handler`]: struct.Handler.html + /// [`get`]: #method.get + #[allow(map_entry)] + pub fn join(&mut self, guild_id: Option<GuildId>, channel_id: ChannelId) + -> &mut Handler { + if let Some(guild_id) = guild_id { + let target = Target::Guild(guild_id); + + { + let mut found = false; + + if let Some(handler) = self.handlers.get_mut(&target) { + handler.switch_to(channel_id); + + found = true; + } + + if found { + // Actually safe, as the key has already been found above. + return self.handlers.get_mut(&target).unwrap(); + } + } + + let mut handler = Handler::new(target, self.ws.clone(), self.user_id); + handler.join(channel_id); + + self.handlers.insert(target, handler); + + // Actually safe, as the key would have been inserted above. + self.handlers.get_mut(&target).unwrap() + } else { + let target = Target::Channel(channel_id); + + if !self.handlers.contains_key(&target) { + let mut handler = Handler::new(target, self.ws.clone(), self.user_id); + handler.join(channel_id); + + self.handlers.insert(target, handler); + } + + // Actually safe, as the key would have been inserted above. + self.handlers.get_mut(&target).unwrap() + } + } + + /// Retrieves the [handler][`Handler`] for the given target and leaves the + /// associated voice channel, if connected. + /// + /// This will _not_ drop the handler, and will preserve it and its settings. + /// + /// This is a wrapper around [getting][`get`] a handler and calling + /// [`leave`] on it. + /// + /// [`Handler`]: struct.Handler.html + /// [`get`]: #method.get + /// [`leave`]: struct.Handler.html#method.leave + pub fn leave<T: Into<Target>>(&mut self, target_id: T) { + let target = target_id.into(); + + if let Some(handler) = self.handlers.get_mut(&target) { + handler.leave(); + } + } + + /// Retrieves the [`Handler`] for the given target and leaves the associated + /// voice channel, if connected. + /// + /// The handler is then dropped, removing settings for the target. + /// + /// [`Handler`]: struct.Handler.html + pub fn remove<T: Into<Target>>(&mut self, target_id: T) { + let target = target_id.into(); + + self.leave(target); + + self.handlers.remove(&target); + } +} diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs index e69de29..107a473 100644 --- a/src/ext/voice/mod.rs +++ b/src/ext/voice/mod.rs @@ -0,0 +1,55 @@ +mod audio; +mod connection; +mod connection_info; +mod error; +mod manager; +mod handler; +mod threading; + +pub use self::error::VoiceError; +pub use self::handler::Handler; +pub use self::manager::Manager; + +use self::audio::{AudioReceiver, AudioSource}; +use self::connection_info::ConnectionInfo; +use ::model::{ChannelId, GuildId}; + +const CRYPTO_MODE: &'static str = "xsalsa20_poly1305"; + +#[doc(hidden)] +pub enum Status { + Connect(ConnectionInfo), + Disconnect, + SetReceiver(Option<Box<AudioReceiver>>), + SetSender(Option<Box<AudioSource>>), +} + +/// Denotes the target to manage a connection for. +/// +/// For most cases, targets should entirely be guilds, except for the one case +/// where a user account can be in a 1-to-1 or group call. +/// +/// It _may_ be possible in the future for bots to be in multiple groups. If +/// this turns out to be the case, supporting that now rather than messily in +/// the future is the best option. Thus, these types of calls are specified by +/// the group's channel Id. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum Target { + /// Used for managing a voice handler for a 1-on-1 (user-to-user) or group + /// call. + Channel(ChannelId), + /// Used for managing a voice handler for a guild. + Guild(GuildId), +} + +impl From<ChannelId> for Target { + fn from(channel_id: ChannelId) -> Target { + Target::Channel(channel_id) + } +} + +impl From<GuildId> for Target { + fn from(guild_id: GuildId) -> Target { + Target::Guild(guild_id) + } +} diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs new file mode 100644 index 0000000..4110c5f --- /dev/null +++ b/src/ext/voice/threading.rs @@ -0,0 +1,93 @@ +use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError}; +use std::thread::Builder as ThreadBuilder; +use super::connection::Connection; +use super::{Status, Target}; +use ::internal::Timer; + +pub fn start(target_id: Target, rx: MpscReceiver<Status>) { + let name = match target_id { + Target::Channel(channel_id) => format!("Serenity Voice (C{})", channel_id), + Target::Guild(guild_id) => format!("Serenity Voice (G{})", guild_id), + }; + + ThreadBuilder::new() + .name(name) + .spawn(move || runner(rx)) + .expect("Err starting voice"); +} + +fn runner(rx: MpscReceiver<Status>) { + let mut sender = None; + let mut receiver = None; + let mut connection = None; + let mut timer = Timer::new(20); + + 'runner: loop { + loop { + match rx.try_recv() { + Ok(Status::Connect(info)) => { + connection = match Connection::new(info) { + Ok(connection) => Some(connection), + Err(why) => { + error!("Err connecting via voice: {:?}", why); + + None + }, + }; + }, + Ok(Status::Disconnect) => { + connection = None; + }, + Ok(Status::SetReceiver(r)) => { + receiver = r; + }, + Ok(Status::SetSender(s)) => { + sender = s; + }, + Err(TryRecvError::Empty) => { + // If we receieved nothing, then we can perform an update. + break; + }, + Err(TryRecvError::Disconnected) => { + break 'runner; + }, + } + } + + // Overall here, check if there's an error. + // + // If there is a connection, try to send an update. This should not + // error. If there is though for some spurious reason, then set `error` + // to `true`. + // + // Otherwise, wait out the timer and do _not_ error and wait to receive + // another event. + let error = match connection.as_mut() { + Some(connection) => { + let update = connection.update(&mut sender, + &mut receiver, + &mut timer); + + match update { + Ok(()) => false, + Err(why) => { + error!("Err updating voice connection: {:?}", why); + + true + }, + } + }, + None => { + timer.await(); + + false + }, + }; + + // If there was an error, then just reset the connection and try to get + // another. + if error { + connection = None; + } + } +} |