aboutsummaryrefslogtreecommitdiff
path: root/src/voice
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-05-22 17:02:00 -0700
committerZeyla Hellyer <[email protected]>2017-05-22 17:02:00 -0700
commit9969be60cf320797c37b317da24d9a08fd5eafa5 (patch)
treef27bf7a57af95bbc11990b1edcea9cca99276964 /src/voice
parentReasonably derive Debug on items (diff)
downloadserenity-9969be60cf320797c37b317da24d9a08fd5eafa5.tar.xz
serenity-9969be60cf320797c37b317da24d9a08fd5eafa5.zip
Restructure modules
Modules are now separated into a fashion where the library can be used for most use cases, without needing to compile the rest. The core of serenity, with no features enabled, contains only the struct (model) definitions, constants, and prelude. Models do not have most functions compiled in, as that is separated into the `model` feature. The `client` module has been split into 3 modules: `client`, `gateway`, and `http`. `http` contains functions to interact with the REST API. `gateway` contains the Shard to interact with the gateway, requiring `http` for retrieving the gateway URL. `client` requires both of the other features and acts as an abstracted interface over both the gateway and REST APIs, handling the event loop. The `builder` module has been separated from `utils`, and can now be optionally compiled in. It and the `http` feature are required by the `model` feature due to a large number of methods requiring access to them. `utils` now contains a number of utilities, such as the Colour struct, the `MessageBuilder`, and mention parsing functions. Each of the original `ext` modules are still featured, with `cache` not requiring any feature to be enabled, `framework` requiring the `client`, `model`, and `utils`, and `voice` requiring `gateway`. In total the features and their requirements are: - `builder`: none - `cache`: none - `client`: `gateway`, `http` - `framework`: `client`, `model`, `utils` - `gateway`: `http` - `http`: none - `model`: `builder`, `http` - `utils`: none - `voice`: `gateway` The default features are `builder`, `cache`, `client`, `framework`, `gateway`, `model`, `http`, and `utils`. To help with forwards compatibility, modules have been re-exported from their original locations.
Diffstat (limited to 'src/voice')
-rw-r--r--src/voice/audio.rs16
-rw-r--r--src/voice/connection.rs477
-rw-r--r--src/voice/connection_info.rs10
-rw-r--r--src/voice/error.rs34
-rw-r--r--src/voice/handler.rs432
-rw-r--r--src/voice/manager.rs126
-rw-r--r--src/voice/mod.rs29
-rw-r--r--src/voice/payload.rs50
-rw-r--r--src/voice/streamer.rs152
-rw-r--r--src/voice/threading.rs94
10 files changed, 1420 insertions, 0 deletions
diff --git a/src/voice/audio.rs b/src/voice/audio.rs
new file mode 100644
index 0000000..ea8c87a
--- /dev/null
+++ b/src/voice/audio.rs
@@ -0,0 +1,16 @@
+pub const HEADER_LEN: usize = 12;
+pub const SAMPLE_RATE: u32 = 48000;
+
+/// A readable audio source.
+pub trait AudioSource: Send {
+ fn is_stereo(&mut self) -> bool;
+
+ fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>;
+}
+
+/// A receiver for incoming audio.
+pub trait AudioReceiver: Send {
+ fn speaking_update(&mut self, ssrc: u32, user_id: u64, speaking: bool);
+
+ fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]);
+}
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
new file mode 100644
index 0000000..698f469
--- /dev/null
+++ b/src/voice/connection.rs
@@ -0,0 +1,477 @@
+use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
+use opus::{
+ Channels,
+ CodingMode,
+ Decoder as OpusDecoder,
+ Encoder as OpusEncoder,
+ packet as opus_packet,
+};
+use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
+use std::collections::HashMap;
+use std::io::Write;
+use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
+use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
+use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
+use std::time::Duration;
+use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
+use super::connection_info::ConnectionInfo;
+use super::{CRYPTO_MODE, VoiceError, payload};
+use websocket::client::request::Url as WebsocketUrl;
+use websocket::client::{
+ Client as WsClient,
+ Receiver as WsReceiver,
+ Sender as WsSender
+};
+use websocket::stream::WebSocketStream;
+use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::internal::Timer;
+use ::model::event::VoiceEvent;
+use ::model::UserId;
+
+enum ReceiverStatus {
+ Udp(Vec<u8>),
+ Websocket(VoiceEvent),
+}
+
+#[allow(dead_code)]
+struct ThreadItems {
+ rx: MpscReceiver<ReceiverStatus>,
+ udp_close_sender: MpscSender<i32>,
+ udp_thread: JoinHandle<()>,
+ ws_close_sender: MpscSender<i32>,
+ ws_thread: JoinHandle<()>,
+}
+
+#[allow(dead_code)]
+pub struct Connection {
+ audio_timer: Timer,
+ decoder_map: HashMap<(u32, Channels), OpusDecoder>,
+ destination: SocketAddr,
+ encoder: OpusEncoder,
+ encoder_stereo: bool,
+ keepalive_timer: Timer,
+ key: Key,
+ sender: WsSender<WebSocketStream>,
+ sequence: u16,
+ silence_frames: u8,
+ speaking: bool,
+ ssrc: u32,
+ thread_items: ThreadItems,
+ timestamp: u32,
+ udp: UdpSocket,
+ user_id: UserId,
+}
+
+impl Connection {
+ pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
+ let url = generate_url(&mut info.endpoint)?;
+
+ let response = WsClient::connect(url)?.send()?;
+ response.validate()?;
+ let (mut sender, mut receiver) = response.begin().split();
+
+ sender.send_json(&payload::build_identify(&info))?;
+
+ let hello = {
+ let hello;
+
+ loop {
+ match receiver.recv_json(VoiceEvent::decode)? {
+ VoiceEvent::Hello(received_hello) => {
+ hello = received_hello;
+
+ break;
+ },
+ VoiceEvent::Heartbeat(_) => continue,
+ other => {
+ debug!("[Voice] Expected hello/heartbeat; got: {:?}",
+ other);
+
+ return Err(Error::Voice(VoiceError::ExpectedHandshake));
+ },
+ }
+ }
+
+ hello
+ };
+
+ if !has_valid_mode(&hello.modes) {
+ return Err(Error::Voice(VoiceError::VoiceModeUnavailable));
+ }
+
+ let destination = (&info.endpoint[..], hello.port)
+ .to_socket_addrs()?
+ .next()
+ .ok_or(Error::Voice(VoiceError::HostnameResolve))?;
+
+ // Important to note here: the length of the packet can be of either 4
+ // or 70 bytes. If it is 4 bytes, then we need to send a 70-byte packet
+ // to determine the IP.
+ //
+ // Past the initial 4 bytes, the packet _must_ be completely empty data.
+ //
+ // The returned packet will be a null-terminated string of the IP, and
+ // the port encoded in LE in the last two bytes of the packet.
+ let udp = UdpSocket::bind("0.0.0.0:0")?;
+
+ {
+ let mut bytes = [0; 70];
+
+ (&mut bytes[..]).write_u32::<BigEndian>(hello.ssrc)?;
+ udp.send_to(&bytes, destination)?;
+
+ let mut bytes = [0; 256];
+ let (len, _addr) = udp.recv_from(&mut bytes)?;
+
+ // Find the position in the bytes that contains the first byte of 0,
+ // indicating the "end of the address".
+ let index = bytes.iter().skip(4).position(|&x| x == 0)
+ .ok_or(Error::Voice(VoiceError::FindingByte))?;
+
+ let pos = 4 + index;
+ let addr = String::from_utf8_lossy(&bytes[4..pos]);
+ let port_pos = len - 2;
+ let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
+
+ sender.send_json(&payload::build_select_protocol(addr, port))?;
+ }
+
+ let key = encryption_key(&mut receiver)?;
+
+ let thread_items = start_threads(receiver, &udp)?;
+
+ info!("[Voice] Connected to: {}", info.endpoint);
+
+ let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?;
+
+ Ok(Connection {
+ audio_timer: Timer::new(1000 * 60 * 4),
+ decoder_map: HashMap::new(),
+ destination: destination,
+ encoder: encoder,
+ encoder_stereo: false,
+ key: key,
+ keepalive_timer: Timer::new(hello.heartbeat_interval),
+ udp: udp,
+ sender: sender,
+ sequence: 0,
+ silence_frames: 0,
+ speaking: false,
+ ssrc: hello.ssrc,
+ thread_items: thread_items,
+ timestamp: 0,
+ user_id: info.user_id,
+ })
+ }
+
+ #[allow(unused_variables)]
+ pub fn cycle(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ receiver: &mut Option<Box<AudioReceiver>>,
+ audio_timer: &mut Timer)
+ -> Result<()> {
+ let mut buffer = [0i16; 960 * 2];
+ let mut packet = [0u8; 512];
+ let mut nonce = secretbox::Nonce([0; 24]);
+
+ if let Some(receiver) = receiver.as_mut() {
+ while let Ok(status) = self.thread_items.rx.try_recv() {
+ match status {
+ ReceiverStatus::Udp(packet) => {
+ let mut handle = &packet[2..];
+ let seq = handle.read_u16::<BigEndian>()?;
+ let timestamp = handle.read_u32::<BigEndian>()?;
+ let ssrc = handle.read_u32::<BigEndian>()?;
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ let channels = opus_packet::get_nb_channels(&decrypted)?;
+
+ let entry = self.decoder_map.entry((ssrc, channels))
+ .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE,
+ channels)
+ .unwrap());
+
+ let len = entry.decode(&decrypted, &mut buffer, false)?;
+
+ let is_stereo = channels == Channels::Stereo;
+
+ let b = if is_stereo {
+ len * 2
+ } else {
+ len
+ };
+
+ receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
+ }
+ },
+ ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
+ receiver.speaking_update(ev.ssrc,
+ ev.user_id.0,
+ ev.speaking);
+ },
+ ReceiverStatus::Websocket(other) => {
+ info!("[Voice] Received other websocket data: {:?}",
+ other);
+ },
+ }
+ }
+ } else {
+ loop {
+ if self.thread_items.rx.try_recv().is_err() {
+ break;
+ }
+ }
+ }
+
+ // Send the voice websocket keepalive if it's time
+ if self.keepalive_timer.check() {
+ self.sender.send_json(&payload::build_keepalive())?;
+ }
+
+ // Send UDP keepalive if it's time
+ if self.audio_timer.check() {
+ let mut bytes = [0; 4];
+ (&mut bytes[..]).write_u32::<BigEndian>(self.ssrc)?;
+ self.udp.send_to(&bytes, self.destination)?;
+ }
+
+ let len = self.read(source, &mut buffer)?;
+
+ if len == 0 {
+ self.set_speaking(false)?;
+
+ if self.silence_frames > 0 {
+ self.silence_frames -= 1;
+
+ for value in &mut buffer[..] {
+ *value = 0;
+ }
+ } else {
+ audio_timer.await();
+
+ return Ok(());
+ }
+ } else {
+ self.silence_frames = 5;
+
+ for value in &mut buffer[len..] {
+ *value = 0;
+ }
+ }
+
+ self.set_speaking(true)?;
+ let index = self.prep_packet(&mut packet, buffer, nonce)?;
+
+ audio_timer.await();
+ self.udp.send_to(&packet[..index], self.destination)?;
+ self.audio_timer.reset();
+
+ Ok(())
+ }
+
+ fn prep_packet(&mut self,
+ packet: &mut [u8; 512],
+ buffer: [i16; 1920],
+ mut nonce: Nonce)
+ -> Result<usize> {
+ {
+ let mut cursor = &mut packet[..HEADER_LEN];
+ cursor.write_all(&[0x80, 0x78])?;
+ cursor.write_u16::<BigEndian>(self.sequence)?;
+ cursor.write_u32::<BigEndian>(self.timestamp)?;
+ cursor.write_u32::<BigEndian>(self.ssrc)?;
+ }
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ let sl_index = packet.len() - 16;
+ let buffer_len = if self.encoder_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ let len = self.encoder.encode(&buffer[..buffer_len],
+ &mut packet[HEADER_LEN..sl_index])?;
+ let crypted = {
+ let slice = &packet[HEADER_LEN..HEADER_LEN + len];
+ secretbox::seal(slice, &nonce, &self.key)
+ };
+ let index = HEADER_LEN + crypted.len();
+ packet[HEADER_LEN..index].clone_from_slice(&crypted);
+
+ self.sequence = self.sequence.wrapping_add(1);
+ self.timestamp = self.timestamp.wrapping_add(960);
+
+ Ok(HEADER_LEN + crypted.len())
+ }
+
+ fn read(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ buffer: &mut [i16; 1920])
+ -> Result<usize> {
+ let mut clear = false;
+
+ let len = match source.as_mut() {
+ Some(source) => {
+ let is_stereo = source.is_stereo();
+
+ if is_stereo != self.encoder_stereo {
+ let channels = if is_stereo {
+ Channels::Stereo
+ } else {
+ Channels::Mono
+ };
+ self.encoder = OpusEncoder::new(SAMPLE_RATE,
+ channels,
+ CodingMode::Audio)?;
+ self.encoder_stereo = is_stereo;
+ }
+
+ let buffer_len = if is_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ match source.read_frame(&mut buffer[..buffer_len]) {
+ Some(len) => len,
+ None => {
+ clear = true;
+
+ 0
+ },
+ }
+ },
+ None => 0,
+ };
+
+ if clear {
+ *source = None;
+ }
+
+ Ok(len)
+ }
+
+ fn set_speaking(&mut self, speaking: bool) -> Result<()> {
+ if self.speaking == speaking {
+ return Ok(());
+ }
+
+ self.speaking = speaking;
+
+ self.sender.send_json(&payload::build_speaking(speaking))
+ }
+}
+
+impl Drop for Connection {
+ fn drop(&mut self) {
+ let _ = self.thread_items.udp_close_sender.send(0);
+ let _ = self.thread_items.ws_close_sender.send(0);
+
+ info!("[Voice] Disconnected");
+ }
+}
+
+fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
+ if endpoint.ends_with(":80") {
+ let len = endpoint.len();
+
+ endpoint.truncate(len - 3);
+ }
+
+ WebsocketUrl::parse(&format!("wss://{}", endpoint))
+ .or(Err(Error::Voice(VoiceError::EndpointUrl)))
+}
+
+#[inline]
+fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+ -> Result<Key> {
+ loop {
+ match receiver.recv_json(VoiceEvent::decode)? {
+ VoiceEvent::Ready(ready) => {
+ if ready.mode != CRYPTO_MODE {
+ return Err(Error::Voice(VoiceError::VoiceModeInvalid));
+ }
+
+ return Key::from_slice(&ready.secret_key)
+ .ok_or(Error::Voice(VoiceError::KeyGen));
+ },
+ VoiceEvent::Unknown(op, value) => {
+ debug!("[Voice] Expected ready for key; got: op{}/v{:?}",
+ op.num(),
+ value);
+ },
+ _ => {},
+ }
+ }
+}
+
+#[inline]
+fn has_valid_mode(modes: &[String]) -> bool {
+ modes.iter().any(|s| s == CRYPTO_MODE)
+}
+
+#[inline]
+fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+ -> Result<ThreadItems> {
+ let (udp_close_sender, udp_close_reader) = mpsc::channel();
+ let (ws_close_sender, ws_close_reader) = mpsc::channel();
+
+ let current_thread = thread::current();
+ let thread_name = current_thread.name().unwrap_or("serenity voice");
+
+ let (tx, rx) = mpsc::channel();
+ let tx_clone = tx.clone();
+ let udp_clone = udp.try_clone()?;
+
+ let udp_thread = ThreadBuilder::new()
+ .name(format!("{} UDP", thread_name))
+ .spawn(move || {
+ let _ = udp_clone.set_read_timeout(Some(Duration::from_millis(250)));
+
+ let mut buffer = [0; 512];
+
+ loop {
+ if let Ok((len, _)) = udp_clone.recv_from(&mut buffer) {
+ let piece = buffer[..len].to_vec();
+ let send = tx.send(ReceiverStatus::Udp(piece));
+
+ if send.is_err() {
+ return;
+ }
+ } else if udp_close_reader.try_recv().is_ok() {
+ return;
+ }
+ }
+ })?;
+
+ let ws_thread = ThreadBuilder::new()
+ .name(format!("{} WS", thread_name))
+ .spawn(move || {
+ loop {
+ while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) {
+ if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
+ return;
+ }
+ }
+
+ if ws_close_reader.try_recv().is_ok() {
+ return;
+ }
+
+ thread::sleep(Duration::from_millis(25));
+ }
+ })?;
+
+ Ok(ThreadItems {
+ rx: rx,
+ udp_close_sender: udp_close_sender,
+ udp_thread: udp_thread,
+ ws_close_sender: ws_close_sender,
+ ws_thread: ws_thread,
+ })
+}
diff --git a/src/voice/connection_info.rs b/src/voice/connection_info.rs
new file mode 100644
index 0000000..d0364ce
--- /dev/null
+++ b/src/voice/connection_info.rs
@@ -0,0 +1,10 @@
+use ::model::{GuildId, UserId};
+
+#[derive(Clone, Debug)]
+pub struct ConnectionInfo {
+ pub endpoint: String,
+ pub guild_id: GuildId,
+ pub session_id: String,
+ pub token: String,
+ pub user_id: UserId,
+}
diff --git a/src/voice/error.rs b/src/voice/error.rs
new file mode 100644
index 0000000..55be1f6
--- /dev/null
+++ b/src/voice/error.rs
@@ -0,0 +1,34 @@
+use serde_json::Value;
+use std::process::Output;
+
+/// An error returned from the voice module.
+// Errors which are not visible to the end user are hidden.
+#[derive(Debug)]
+pub enum VoiceError {
+ /// An indicator that an endpoint URL was invalid.
+ EndpointUrl,
+ #[doc(hidden)]
+ ExpectedHandshake,
+ #[doc(hidden)]
+ FindingByte,
+ #[doc(hidden)]
+ HostnameResolve,
+ #[doc(hidden)]
+ KeyGen,
+ /// An error occurred while checking if a path is stereo.
+ Streams,
+ #[doc(hidden)]
+ VoiceModeInvalid,
+ #[doc(hidden)]
+ VoiceModeUnavailable,
+ /// An error occurred while running `youtube-dl`.
+ YouTubeDLRun(Output),
+ /// An error occurred while processing the JSON output from `youtube-dl`.
+ ///
+ /// The JSON output is given.
+ YouTubeDLProcessing(Value),
+ /// The `url` field of the `youtube-dl` JSON output was not present.
+ ///
+ /// The JSON output is given.
+ YouTubeDLUrl(Value),
+}
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
new file mode 100644
index 0000000..22c0375
--- /dev/null
+++ b/src/voice/handler.rs
@@ -0,0 +1,432 @@
+use std::sync::mpsc::{self, Sender as MpscSender};
+use super::{AudioReceiver, AudioSource};
+use super::connection_info::ConnectionInfo;
+use super::Status as VoiceStatus;
+use ::constants::VoiceOpCode;
+use ::gateway::GatewayStatus;
+use ::model::{ChannelId, GuildId, UserId, VoiceState};
+use super::threading;
+
+/// The handler is responsible for "handling" a single voice connection, acting
+/// as a clean API above the inner connection.
+///
+/// Look into the [`Manager`] for a slightly higher-level interface for managing
+/// the existence of handlers.
+///
+/// **Note**: You should _not_ manually mutate any struct fields. You should
+/// _only_ read them. Use methods to mutate them.
+///
+/// # Examples
+///
+/// Assuming that you already have a `Manager`, most likely retrieved via a
+/// [`Shard`], you can join a guild's voice channel and deafen yourself like so:
+///
+/// ```rust,ignore
+/// // assuming a `manager` has already been bound, hopefully retrieved through
+/// // a websocket's connection.
+/// use serenity::model::{ChannelId, GuildId};
+///
+/// let guild_id = GuildId(81384788765712384);
+/// let channel_id = ChannelId(85482585546833920);
+///
+/// let handler = manager.join(Some(guild_id), channel_id);
+/// handler.deafen(true);
+/// ```
+///
+/// [`Manager`]: struct.Manager.html
+/// [`Shard`]: ../gateway/struct.Shard.html
+#[derive(Clone, Debug)]
+pub struct Handler {
+ /// The ChannelId to be connected to, if any.
+ ///
+ /// Note that when connected to a voice channel, while the `ChannelId` will
+ /// not be `None`, the [`guild_id`] can, in the event of [`Group`] or
+ /// 1-on-1 [`Call`]s.
+ ///
+ /// **Note**: This _must not_ be manually mutated. Call [`switch_to`] to
+ /// mutate this value.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ /// [`guild`]: #structfield.guild
+ /// [`switch_to`]: #method.switch_to
+ pub channel_id: Option<ChannelId>,
+ /// The voice server endpoint.
+ pub endpoint: Option<String>,
+ /// The GuildId to be connected to, if any. Can be normally `None` in the
+ /// event of playing audio to a one-on-one [`Call`] or [`Group`].
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ pub guild_id: GuildId,
+ /// Whether the current handler is set to deafen voice connections.
+ ///
+ /// **Note**: This _must not_ be manually mutated. Call [`deafen`] to
+ /// mutate this value.
+ ///
+ /// [`deafen`]: #method.deafen
+ pub self_deaf: bool,
+ /// Whether the current handler is set to mute voice connections.
+ ///
+ /// **Note**: This _must not_ be manually mutated. Call [`mute`] to mutate
+ /// this value.
+ ///
+ /// [`mute`]: #method.mute
+ pub self_mute: bool,
+ /// The internal sender to the voice connection monitor thread.
+ sender: MpscSender<VoiceStatus>,
+ /// The session Id of the current voice connection, if any.
+ ///
+ /// **Note**: This _should_ be set through an [`update_state`] call.
+ ///
+ /// [`update_state`]: #method.update_state
+ pub session_id: Option<String>,
+ /// The token of the current voice connection, if any.
+ ///
+ /// **Note**: This _should_ be set through an [`update_server`] call.
+ ///
+ /// [`update_server`]: #method.update_server
+ pub token: Option<String>,
+ /// The Id of the current user.
+ ///
+ /// This is configured via [`new`] or [`standalone`].
+ ///
+ /// [`new`]: #method.new
+ /// [`standalone`]: #method.standalone
+ pub user_id: UserId,
+ /// Will be set when a `Handler` is made via the [`new`][`Handler::new`]
+ /// method.
+ ///
+ /// When set via [`standalone`][`Handler::standalone`], it will not be
+ /// present.
+ ws: Option<MpscSender<GatewayStatus>>,
+}
+
+impl Handler {
+ /// Creates a new Handler.
+ ///
+ /// **Note**: You should never call this yourself, and should instead use
+ /// [`Manager::join`].
+ ///
+ /// Like, really. Really do not use this. Please.
+ ///
+ /// [`Manager::join`]: struct.Manager.html#method.join
+ #[doc(hidden)]
+ #[inline]
+ pub fn new(guild_id: GuildId, ws: MpscSender<GatewayStatus>, user_id: UserId) -> Self {
+ Self::new_raw(guild_id, Some(ws), user_id)
+ }
+
+ /// Creates a new, standalone Handler which is not connected to the primary
+ /// WebSocket to the Gateway.
+ ///
+ /// Actions such as muting, deafening, and switching channels will not
+ /// function through this Handler and must be done through some other
+ /// method, as the values will only be internally updated.
+ ///
+ /// For most use cases you do not want this. Only use it if you are using
+ /// the voice component standalone from the rest of the library.
+ #[inline]
+ pub fn standalone(guild_id: GuildId, user_id: UserId) -> Self {
+ Self::new_raw(guild_id, None, user_id)
+ }
+
+ /// Connects to the voice channel if the following are present:
+ ///
+ /// - [`endpoint`]
+ /// - [`session_id`]
+ /// - [`token`]
+ ///
+ /// If they _are_ all present, then `true` is returned. Otherwise, `false`
+ /// is.
+ ///
+ /// This will automatically be called by [`update_server`] or
+ /// [`update_state`] when all three values become present.
+ ///
+ /// [`endpoint`]: #structfield.endpoint
+ /// [`session_id`]: #structfield.session_id
+ /// [`token`]: #structfield.token
+ /// [`update_server`]: #method.update_server
+ /// [`update_state`]: #method.update_state
+ pub fn connect(&mut self) -> bool {
+ if self.endpoint.is_none() || self.session_id.is_none() || self.token.is_none() {
+ return false;
+ }
+
+ let endpoint = self.endpoint.clone().unwrap();
+ let guild_id = self.guild_id;
+ let session_id = self.session_id.clone().unwrap();
+ let token = self.token.clone().unwrap();
+ let user_id = self.user_id;
+
+ // Safe as all of these being present was already checked.
+ self.send(VoiceStatus::Connect(ConnectionInfo {
+ endpoint: endpoint,
+ guild_id: guild_id,
+ session_id: session_id,
+ token: token,
+ user_id: user_id,
+ }));
+
+ true
+ }
+
+ /// Sets whether the current connection to be deafened.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ ///
+ /// **Note**: Unlike in the official client, you _can_ be deafened while
+ /// not being muted.
+ ///
+ /// **Note**: If the `Handler` was created via [`standalone`], then this
+ /// will _only_ update whether the connection is internally deafened.
+ ///
+ /// [`standalone`]: #method.standalone
+ pub fn deafen(&mut self, deaf: bool) {
+ self.self_deaf = deaf;
+
+ // Only send an update if there is currently a connected channel.
+ //
+ // Otherwise, this can be treated as a "settings" update for a
+ // connection.
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Connect - or switch - to the given voice channel by its Id.
+ ///
+ /// **Note**: This is not necessary for [`Group`] or direct [call][`Call`]s.
+ ///
+ /// [`Call`]: ../../model/struct.Call.html
+ /// [`Group`]: ../../model/struct.Group.html
+ pub fn join(&mut self, channel_id: ChannelId) {
+ self.channel_id = Some(channel_id);
+
+ self.send_join();
+ }
+
+ /// Leaves the current voice channel, disconnecting from it.
+ ///
+ /// This does _not_ forget settings, like whether to be self-deafened or
+ /// self-muted.
+ ///
+ /// **Note**: If the `Handler` was created via [`standalone`], then this
+ /// will _only_ update whether the connection is internally connected to a
+ /// voice channel.
+ ///
+ /// [`standalone`]: #method.standalone
+ pub fn leave(&mut self) {
+ // Only send an update if we were in a voice channel.
+ if self.channel_id.is_some() {
+ self.channel_id = None;
+
+ self.update();
+ }
+ }
+
+ /// Sets a receiver, i.e. a way to receive audio. Most use cases for bots do
+ /// not require this.
+ ///
+ /// The `receiver` argument can be thought of as an "optional Option". You
+ /// can pass in just a boxed receiver, and do not need to specify `Some`.
+ ///
+ /// Pass `None` to drop the current receiver, if one exists.
+ pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O) {
+ self.send(VoiceStatus::SetReceiver(receiver.into()))
+ }
+
+ /// Sets whether the current connection is to be muted.
+ ///
+ /// If there is no live voice connection, then this only acts as a settings
+ /// update for future connections.
+ ///
+ /// **Note**: If the `Handler` was created via [`standalone`], then this
+ /// will _only_ update whether the connection is internally muted.
+ ///
+ /// [`standalone`]: #method.standalone
+ pub fn mute(&mut self, mute: bool) {
+ self.self_mute = mute;
+
+ if self.channel_id.is_some() {
+ self.update();
+ }
+ }
+
+ /// Plays audio from a source. This can be a source created via
+ /// [`voice::ffmpeg`] or [`voice::ytdl`].
+ ///
+ /// [`voice::ffmpeg`]: fn.ffmpeg.html
+ /// [`voice::ytdl`]: fn.ytdl.html
+ pub fn play(&mut self, source: Box<AudioSource>) {
+ self.send(VoiceStatus::SetSender(Some(source)))
+ }
+
+ /// Stops playing audio from a source, if one is set.
+ pub fn stop(&mut self) {
+ self.send(VoiceStatus::SetSender(None))
+ }
+
+ /// Switches the current connected voice channel to the given `channel_id`.
+ ///
+ /// This has 3 separate behaviors:
+ ///
+ /// - if the given `channel_id` is equivalent to the current connected
+ /// `channel_id`, then do nothing;
+ /// - if the given `channel_id` is _not_ equivalent to the current connected
+ /// `channel_id`, then switch to the given `channel_id`;
+ /// - if not currently connected to a voice channel, connect to the given
+ /// one.
+ ///
+ /// If you are dealing with switching from one group to another, then open
+ /// another handler, and optionally drop this one via [`Manager::remove`].
+ ///
+ /// **Note**: The given `channel_id`, if in a guild, _must_ be in the
+ /// current handler's associated guild.
+ ///
+ /// **Note**: If the `Handler` was created via [`standalone`], then this
+ /// will _only_ update whether the connection is internally switched to a
+ /// different channel.
+ ///
+ /// [`Manager::remove`]: struct.Manager.html#method.remove
+ /// [`standalone`]: #method.standalone
+ pub fn switch_to(&mut self, channel_id: ChannelId) {
+ match self.channel_id {
+ Some(current_id) if current_id == channel_id => {
+ // If already connected to the given channel, do nothing.
+ return;
+ },
+ _ => {
+ self.channel_id = Some(channel_id);
+
+ self.update();
+ },
+ }
+ }
+
+ /// Updates the voice server data.
+ ///
+ /// You should only need to use this if you initialized the `Handler` via
+ /// [`standalone`].
+ ///
+ /// Refer to the documentation for [`connect`] for when this will
+ /// automatically connect to a voice channel.
+ ///
+ /// [`connect`]: #method.connect
+ /// [`standalone`]: #method.standalone
+ pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) {
+ self.token = Some(token.to_owned());
+
+ if let Some(endpoint) = endpoint.clone() {
+ self.endpoint = Some(endpoint);
+
+ if self.session_id.is_some() {
+ self.connect();
+ }
+ } else {
+ self.leave();
+ }
+ }
+
+ /// Updates the internal voice state of the current user.
+ ///
+ /// You should only need to use this if you initialized the `Handler` via
+ /// [`standalone`].
+ ///
+ /// refer to the documentation for [`connect`] for when this will
+ /// automatically connect to a voice channel.
+ ///
+ /// [`connect`]: #method.connect
+ /// [`standalone`]: #method.standalone
+ pub fn update_state(&mut self, voice_state: &VoiceState) {
+ if self.user_id != voice_state.user_id.0 {
+ return;
+ }
+
+ self.channel_id = voice_state.channel_id;
+
+ if voice_state.channel_id.is_some() {
+ self.session_id = Some(voice_state.session_id.clone());
+
+ if self.endpoint.is_some() && self.token.is_some() {
+ self.connect();
+ }
+ } else {
+ self.leave();
+ }
+ }
+
+ fn new_raw(guild_id: GuildId, ws: Option<MpscSender<GatewayStatus>>, user_id: UserId) -> Self {
+ let (tx, rx) = mpsc::channel();
+
+ threading::start(guild_id, rx);
+
+ Handler {
+ channel_id: None,
+ endpoint: None,
+ guild_id: guild_id,
+ self_deaf: false,
+ self_mute: false,
+ sender: tx,
+ session_id: None,
+ token: None,
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Sends a message to the thread.
+ fn send(&mut self, status: VoiceStatus) {
+ // Restart thread if it errored.
+ if let Err(mpsc::SendError(status)) = self.sender.send(status) {
+ let (tx, rx) = mpsc::channel();
+
+ self.sender = tx;
+ self.sender.send(status).unwrap();
+
+ threading::start(self.guild_id, rx);
+
+ self.update();
+ }
+ }
+
+ fn send_join(&self) {
+ // Do _not_ try connecting if there is not at least a channel. There
+ // does not _necessarily_ need to be a guild.
+ if self.channel_id.is_none() {
+ return;
+ }
+
+ self.update();
+ }
+
+ /// Send an update for the current session over WS.
+ ///
+ /// Does nothing if initialized via [`standalone`].
+ ///
+ /// [`standalone`]: #method.standalone
+ fn update(&self) {
+ if let Some(ref ws) = self.ws {
+ let map = json!({
+ "op": VoiceOpCode::SessionDescription.num(),
+ "d": {
+ "channel_id": self.channel_id.map(|c| c.0),
+ "guild_id": self.guild_id.0,
+ "self_deaf": self.self_deaf,
+ "self_mute": self.self_mute,
+ }
+ });
+
+ let _ = ws.send(GatewayStatus::SendMessage(map));
+ }
+ }
+}
+
+impl Drop for Handler {
+ /// Leaves the current connected voice channel, if connected to one, and
+ /// forgets all configurations relevant to this Handler.
+ fn drop(&mut self) {
+ self.leave();
+ }
+}
diff --git a/src/voice/manager.rs b/src/voice/manager.rs
new file mode 100644
index 0000000..fe1b489
--- /dev/null
+++ b/src/voice/manager.rs
@@ -0,0 +1,126 @@
+use std::collections::HashMap;
+use std::sync::mpsc::Sender as MpscSender;
+use super::Handler;
+use ::gateway::GatewayStatus;
+use ::model::{ChannelId, GuildId, UserId};
+
+/// A manager is a struct responsible for managing [`Handler`]s which belong to
+/// a single [`Shard`]. This is a fairly complex key-value store,
+/// with a bit of extra utility for easily joining a "target".
+///
+/// The "target" used by the Manager is determined based on the `guild_id` and
+/// `channel_id` provided. If a `guild_id` is _not_ provided to methods that
+/// optionally require it, then the target is a group or 1-on-1 call with a
+/// user. The `channel_id` is then used as the target.
+///
+/// If a `guild_id` is provided, then the target is the guild, as a user
+/// can not be connected to two channels within one guild simultaneously.
+///
+/// [`Group`]: ../../model/struct.Group.html
+/// [`Handler`]: struct.Handler.html
+/// [guild's channel]: ../../model/enum.ChannelType.html#variant.Voice
+/// [`Shard`]: ../gateway/struct.Shard.html
+#[derive(Clone, Debug)]
+pub struct Manager {
+ handlers: HashMap<GuildId, Handler>,
+ user_id: UserId,
+ ws: MpscSender<GatewayStatus>,
+}
+
+impl Manager {
+ #[doc(hidden)]
+ pub fn new(ws: MpscSender<GatewayStatus>, user_id: UserId) -> Manager {
+ Manager {
+ handlers: HashMap::new(),
+ user_id: user_id,
+ ws: ws,
+ }
+ }
+
+ /// Retrieves a mutable handler for the given target, if one exists.
+ pub fn get<G: Into<GuildId>>(&mut self, guild_id: G) -> Option<&mut Handler> {
+ self.handlers.get_mut(&guild_id.into())
+ }
+
+ /// Connects to a target by retrieving its relevant [`Handler`] and
+ /// connecting, or creating the handler if required.
+ ///
+ /// This can also switch to the given channel, if a handler already exists
+ /// for the target and the current connected channel is not equal to the
+ /// given channel.
+ ///
+ /// In the case of channel targets, the same channel is used to connect to.
+ ///
+ /// In the case of guilds, the provided channel is used to connect to. The
+ /// channel _must_ be in the provided guild. This is _not_ checked by the
+ /// library, and will result in an error. If there is already a connected
+ /// handler for the guild, _and_ the provided channel is different from the
+ /// channel that the connection is already connected to, then the handler
+ /// will switch the connection to the provided channel.
+ ///
+ /// If you _only_ need to retrieve the handler for a target, then use
+ /// [`get`].
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ #[allow(map_entry)]
+ pub fn join<C, G>(&mut self, guild_id: G, channel_id: C) -> &mut Handler
+ where C: Into<ChannelId>, G: Into<GuildId> {
+ let channel_id = channel_id.into();
+ let guild_id = guild_id.into();
+
+ {
+ let mut found = false;
+
+ if let Some(handler) = self.handlers.get_mut(&guild_id) {
+ handler.switch_to(channel_id);
+
+ found = true;
+ }
+
+ if found {
+ // Actually safe, as the key has already been found above.
+ return self.handlers.get_mut(&guild_id).unwrap();
+ }
+ }
+
+ let mut handler = Handler::new(guild_id, self.ws.clone(), self.user_id);
+ handler.join(channel_id);
+
+ self.handlers.insert(guild_id, handler);
+
+ // Actually safe, as the key would have been inserted above.
+ self.handlers.get_mut(&guild_id).unwrap()
+ }
+
+ /// Retrieves the [handler][`Handler`] for the given target and leaves the
+ /// associated voice channel, if connected.
+ ///
+ /// This will _not_ drop the handler, and will preserve it and its settings.
+ ///
+ /// This is a wrapper around [getting][`get`] a handler and calling
+ /// [`leave`] on it.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ /// [`get`]: #method.get
+ /// [`leave`]: struct.Handler.html#method.leave
+ pub fn leave<G: Into<GuildId>>(&mut self, guild_id: G) {
+ if let Some(handler) = self.handlers.get_mut(&guild_id.into()) {
+ handler.leave();
+ }
+ }
+
+ /// Retrieves the [`Handler`] for the given target and leaves the associated
+ /// voice channel, if connected.
+ ///
+ /// The handler is then dropped, removing settings for the target.
+ ///
+ /// [`Handler`]: struct.Handler.html
+ pub fn remove<G: Into<GuildId>>(&mut self, guild_id: G) {
+ let guild_id = guild_id.into();
+
+ self.leave(guild_id);
+
+ self.handlers.remove(&guild_id);
+ }
+}
diff --git a/src/voice/mod.rs b/src/voice/mod.rs
new file mode 100644
index 0000000..94e3b40
--- /dev/null
+++ b/src/voice/mod.rs
@@ -0,0 +1,29 @@
+//! A module for connecting to voice channels.
+
+mod audio;
+mod connection;
+mod connection_info;
+mod error;
+mod manager;
+mod handler;
+mod payload;
+mod streamer;
+mod threading;
+
+pub use self::audio::{AudioReceiver, AudioSource};
+pub use self::error::VoiceError;
+pub use self::handler::Handler;
+pub use self::manager::Manager;
+pub use self::streamer::{ffmpeg, pcm, ytdl};
+
+use self::connection_info::ConnectionInfo;
+
+const CRYPTO_MODE: &'static str = "xsalsa20_poly1305";
+
+#[doc(hidden)]
+pub enum Status {
+ Connect(ConnectionInfo),
+ Disconnect,
+ SetReceiver(Option<Box<AudioReceiver>>),
+ SetSender(Option<Box<AudioSource>>),
+}
diff --git a/src/voice/payload.rs b/src/voice/payload.rs
new file mode 100644
index 0000000..c2e7c0c
--- /dev/null
+++ b/src/voice/payload.rs
@@ -0,0 +1,50 @@
+use serde_json::Value;
+use super::connection_info::ConnectionInfo;
+use ::constants::VoiceOpCode;
+
+#[inline]
+pub fn build_identify(info: &ConnectionInfo) -> Value {
+ json!({
+ "op": VoiceOpCode::Identify.num(),
+ "d": {
+ "server_id": info.guild_id.0,
+ "session_id": &info.session_id,
+ "token": &info.token,
+ "user_id": info.user_id.0,
+ }
+ })
+}
+
+#[inline]
+pub fn build_keepalive() -> Value {
+ json!({
+ "op": VoiceOpCode::KeepAlive.num(),
+ "d": Value::Null,
+ })
+}
+
+#[inline]
+pub fn build_select_protocol(address: ::std::borrow::Cow<str>, port: u16) -> Value {
+ json!({
+ "op": VoiceOpCode::SelectProtocol.num(),
+ "d": {
+ "protocol": "udp",
+ "data": {
+ "address": address,
+ "mode": super::CRYPTO_MODE,
+ "port": port,
+ }
+ }
+ })
+}
+
+#[inline]
+pub fn build_speaking(speaking: bool) -> Value {
+ json!({
+ "op": VoiceOpCode::Speaking.num(),
+ "d": {
+ "delay": 0,
+ "speaking": speaking,
+ }
+ })
+}
diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs
new file mode 100644
index 0000000..4d3b9a9
--- /dev/null
+++ b/src/voice/streamer.rs
@@ -0,0 +1,152 @@
+use byteorder::{LittleEndian, ReadBytesExt};
+use serde_json;
+use std::ffi::OsStr;
+use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult};
+use std::process::{Child, Command, Stdio};
+use super::{AudioSource, VoiceError};
+use ::internal::prelude::*;
+
+struct ChildContainer(Child);
+
+impl Read for ChildContainer {
+ fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> {
+ self.0.stdout.as_mut().unwrap().read(buffer)
+ }
+}
+
+struct PcmSource<R: Read + Send + 'static>(bool, R);
+
+impl<R: Read + Send> AudioSource for PcmSource<R> {
+ fn is_stereo(&mut self) -> bool {
+ self.0
+ }
+
+ fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> {
+ for (i, v) in buffer.iter_mut().enumerate() {
+ *v = match self.1.read_i16::<LittleEndian>() {
+ Ok(v) => v,
+ Err(ref e) => return if e.kind() == IoErrorKind::UnexpectedEof {
+ Some(i)
+ } else {
+ None
+ },
+ }
+ }
+
+ Some(buffer.len())
+ }
+}
+
+/// Opens an audio file through `ffmpeg` and creates an audio source.
+pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
+ let path = path.as_ref();
+
+ /// Will fail if the path is not to a file on the fs. Likely a YouTube URI.
+ let is_stereo = is_stereo(path).unwrap_or(false);
+ let stereo_val = if is_stereo {
+ "2"
+ } else {
+ "1"
+ };
+
+ let args = [
+ "-f",
+ "s16le",
+ "-ac",
+ stereo_val,
+ "-ar",
+ "48000",
+ "-acodec",
+ "pcm_s16le",
+ "-",
+ ];
+
+ let command = Command::new("ffmpeg")
+ .arg("-i")
+ .arg(path)
+ .args(&args)
+ .stderr(Stdio::null())
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .spawn()?;
+
+ Ok(pcm(is_stereo, ChildContainer(command)))
+}
+
+/// Creates a PCM audio source.
+pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R)
+ -> Box<AudioSource> {
+ Box::new(PcmSource(is_stereo, reader))
+}
+
+/// Creates a streamed audio source with `youtube-dl` and `ffmpeg`.
+pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> {
+ let args = [
+ "-f",
+ "webm[abr>0]/bestaudio/best",
+ "--no-playlist",
+ "--print-json",
+ "--skip-download",
+ uri,
+ ];
+
+ let out = Command::new("youtube-dl")
+ .args(&args)
+ .stdin(Stdio::null())
+ .output()?;
+
+ if !out.status.success() {
+ return Err(Error::Voice(VoiceError::YouTubeDLRun(out)));
+ }
+
+ let value = serde_json::from_reader(&out.stdout[..])?;
+ let mut obj = match value {
+ Value::Object(obj) => obj,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLProcessing(other))),
+ };
+
+ let uri = match obj.remove("url") {
+ Some(v) => match v {
+ Value::String(uri) => uri,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ },
+ None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))),
+ };
+
+ ffmpeg(&uri)
+}
+
+fn is_stereo(path: &OsStr) -> Result<bool> {
+ let args = [
+ "-v",
+ "quiet",
+ "-of",
+ "json",
+ "-show-streams",
+ "-i",
+ ];
+
+ let out = Command::new("ffprobe")
+ .args(&args)
+ .arg(path)
+ .stdin(Stdio::null())
+ .output()?;
+
+ let value: Value = serde_json::from_reader(&out.stdout[..])?;
+
+ let streams = value.as_object()
+ .and_then(|m| m.get("streams"))
+ .and_then(|v| v.as_array())
+ .ok_or(Error::Voice(VoiceError::Streams))?;
+
+ let check = streams.iter()
+ .any(|stream| {
+ let channels = stream.as_object()
+ .and_then(|m| m.get("channels")
+ .and_then(|v| v.as_i64()));
+
+ channels == Some(2)
+ });
+
+ Ok(check)
+}
diff --git a/src/voice/threading.rs b/src/voice/threading.rs
new file mode 100644
index 0000000..4777231
--- /dev/null
+++ b/src/voice/threading.rs
@@ -0,0 +1,94 @@
+use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError};
+use std::thread::Builder as ThreadBuilder;
+use super::connection::Connection;
+use super::Status;
+use ::internal::Timer;
+use ::model::GuildId;
+
+pub fn start(guild_id: GuildId, rx: MpscReceiver<Status>) {
+ let name = format!("Serenity Voice (G{})", guild_id);
+
+ ThreadBuilder::new()
+ .name(name)
+ .spawn(move || runner(&rx))
+ .expect(&format!("[Voice] Error starting guild: {:?}", guild_id));
+}
+
+fn runner(rx: &MpscReceiver<Status>) {
+ let mut sender = None;
+ let mut receiver = None;
+ let mut connection = None;
+ let mut timer = Timer::new(20);
+
+ 'runner: loop {
+ loop {
+ match rx.try_recv() {
+ Ok(Status::Connect(info)) => {
+ connection = match Connection::new(info) {
+ Ok(connection) => {
+ Some(connection)
+ },
+ Err(why) => {
+ warn!("[Voice] Error connecting: {:?}", why);
+
+ None
+ },
+ };
+ },
+ Ok(Status::Disconnect) => {
+ connection = None;
+ },
+ Ok(Status::SetReceiver(r)) => {
+ receiver = r;
+ },
+ Ok(Status::SetSender(s)) => {
+ sender = s;
+ },
+ Err(TryRecvError::Empty) => {
+ // If we receieved nothing, then we can perform an update.
+ break;
+ },
+ Err(TryRecvError::Disconnected) => {
+ break 'runner;
+ },
+ }
+ }
+
+ // Overall here, check if there's an error.
+ //
+ // If there is a connection, try to send an update. This should not
+ // error. If there is though for some spurious reason, then set `error`
+ // to `true`.
+ //
+ // Otherwise, wait out the timer and do _not_ error and wait to receive
+ // another event.
+ let error = match connection.as_mut() {
+ Some(connection) => {
+ let cycle = connection.cycle(&mut sender,
+ &mut receiver,
+ &mut timer);
+
+ match cycle {
+ Ok(()) => false,
+ Err(why) => {
+ error!("(╯°□°)╯︵ ┻━┻ Error updating connection: {:?}",
+ why);
+
+ true
+ },
+ }
+ },
+ None => {
+ timer.await();
+
+ false
+ },
+ };
+
+ // If there was an error, then just reset the connection and try to get
+ // another.
+ if error {
+ connection = None;
+ }
+ }
+}