aboutsummaryrefslogtreecommitdiff
path: root/src/voice
diff options
context:
space:
mode:
authoracdenisSK <[email protected]>2017-07-27 06:42:48 +0200
committeracdenisSK <[email protected]>2017-07-27 07:30:23 +0200
commit550030264952f0e0043b63f4582bb817ef8bbf37 (patch)
treeb921e2f78fd603a5ca671623083a32806fd16090 /src/voice
parentUse a consistent indentation style (diff)
downloadserenity-550030264952f0e0043b63f4582bb817ef8bbf37.tar.xz
serenity-550030264952f0e0043b63f4582bb817ef8bbf37.zip
rustfmt
Diffstat (limited to 'src/voice')
-rw-r--r--src/voice/audio.rs7
-rw-r--r--src/voice/connection.rs171
-rw-r--r--src/voice/connection_info.rs2
-rw-r--r--src/voice/handler.rs24
-rw-r--r--src/voice/manager.rs6
-rw-r--r--src/voice/payload.rs2
-rw-r--r--src/voice/streamer.rs58
-rw-r--r--src/voice/threading.rs12
8 files changed, 130 insertions, 152 deletions
diff --git a/src/voice/audio.rs b/src/voice/audio.rs
index ea8c87a..14b9ccd 100644
--- a/src/voice/audio.rs
+++ b/src/voice/audio.rs
@@ -12,5 +12,10 @@ pub trait AudioSource: Send {
pub trait AudioReceiver: Send {
fn speaking_update(&mut self, ssrc: u32, user_id: u64, speaking: bool);
- fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]);
+ fn voice_packet(&mut self,
+ ssrc: u32,
+ sequence: u16,
+ timestamp: u32,
+ stereo: bool,
+ data: &[i16]);
}
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index ced6a79..6f8343c 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -1,10 +1,10 @@
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use opus::{
- Channels,
+ packet as opus_packet,
Application as CodingMode,
+ Channels,
Decoder as OpusDecoder,
Encoder as OpusEncoder,
- packet as opus_packet,
};
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::collections::HashMap;
@@ -14,18 +14,18 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
-use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
+use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE};
use super::connection_info::ConnectionInfo;
-use super::{CRYPTO_MODE, VoiceError, payload};
+use super::{payload, VoiceError, CRYPTO_MODE};
use websocket::client::Url as WebsocketUrl;
use websocket::sync::client::ClientBuilder;
use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream};
use websocket::sync::Client as WsClient;
-use ::internal::prelude::*;
-use ::internal::ws_impl::{ReceiverExt, SenderExt};
-use ::internal::Timer;
-use ::model::event::VoiceEvent;
-use ::model::UserId;
+use internal::prelude::*;
+use internal::ws_impl::{ReceiverExt, SenderExt};
+use internal::Timer;
+use model::event::VoiceEvent;
+use model::UserId;
type Client = WsClient<TlsStream<TcpStream>>;
@@ -78,8 +78,7 @@ impl Connection {
},
VoiceEvent::Heartbeat(_) => continue,
other => {
- debug!("[Voice] Expected hello/heartbeat; got: {:?}",
- other);
+ debug!("[Voice] Expected hello/heartbeat; got: {:?}", other);
return Err(Error::Voice(VoiceError::ExpectedHandshake));
},
@@ -116,7 +115,10 @@ impl Connection {
// Find the position in the bytes that contains the first byte of 0,
// indicating the "end of the address".
- let index = bytes.iter().skip(4).position(|&x| x == 0)
+ let index = bytes
+ .iter()
+ .skip(4)
+ .position(|&x| x == 0)
.ok_or(Error::Voice(VoiceError::FindingByte))?;
let pos = 4 + index;
@@ -124,12 +126,16 @@ impl Connection {
let port_pos = len - 2;
let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
- client.send_json(&payload::build_select_protocol(addr, port))?;
+ client
+ .send_json(&payload::build_select_protocol(addr, port))?;
}
let key = encryption_key(&mut client)?;
- let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25)));
+ let _ = client
+ .stream_ref()
+ .as_tcp()
+ .set_read_timeout(Some(Duration::from_millis(25)));
let mutexed_client = Arc::new(Mutex::new(client));
let thread_items = start_threads(mutexed_client.clone(), &udp)?;
@@ -139,23 +145,23 @@ impl Connection {
let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?;
Ok(Connection {
- audio_timer: Timer::new(1000 * 60 * 4),
- client: mutexed_client,
- decoder_map: HashMap::new(),
- destination: destination,
- encoder: encoder,
- encoder_stereo: false,
- key: key,
- keepalive_timer: Timer::new(hello.heartbeat_interval),
- udp: udp,
- sequence: 0,
- silence_frames: 0,
- speaking: false,
- ssrc: hello.ssrc,
- thread_items: thread_items,
- timestamp: 0,
- user_id: info.user_id,
- })
+ audio_timer: Timer::new(1000 * 60 * 4),
+ client: mutexed_client,
+ decoder_map: HashMap::new(),
+ destination: destination,
+ encoder: encoder,
+ encoder_stereo: false,
+ key: key,
+ keepalive_timer: Timer::new(hello.heartbeat_interval),
+ udp: udp,
+ sequence: 0,
+ silence_frames: 0,
+ speaking: false,
+ ssrc: hello.ssrc,
+ thread_items: thread_items,
+ timestamp: 0,
+ user_id: info.user_id,
+ })
}
#[allow(unused_variables)]
@@ -179,35 +185,29 @@ impl Connection {
nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
- if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ if let Ok(decrypted) =
+ secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
let channels = opus_packet::get_nb_channels(&decrypted)?;
- let entry = self.decoder_map.entry((ssrc, channels))
- .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE,
- channels)
- .unwrap());
+ let entry =
+ self.decoder_map.entry((ssrc, channels)).or_insert_with(
+ || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(),
+ );
let len = entry.decode(&decrypted, &mut buffer, false)?;
let is_stereo = channels == Channels::Stereo;
- let b = if is_stereo {
- len * 2
- } else {
- len
- };
+ let b = if is_stereo { len * 2 } else { len };
receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
}
},
ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
- receiver.speaking_update(ev.ssrc,
- ev.user_id.0,
- ev.speaking);
+ receiver.speaking_update(ev.ssrc, ev.user_id.0, ev.speaking);
},
ReceiverStatus::Websocket(other) => {
- info!("[Voice] Received other websocket data: {:?}",
- other);
+ info!("[Voice] Received other websocket data: {:?}", other);
},
}
}
@@ -221,7 +221,10 @@ impl Connection {
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- self.client.lock().unwrap().send_json(&payload::build_keepalive())?;
+ self.client
+ .lock()
+ .unwrap()
+ .send_json(&payload::build_keepalive())?;
}
// Send UDP keepalive if it's time
@@ -282,14 +285,10 @@ impl Connection {
nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
let sl_index = packet.len() - 16;
- let buffer_len = if self.encoder_stereo {
- 960 * 2
- } else {
- 960
- };
+ let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 };
- let len = self.encoder.encode(&buffer[..buffer_len],
- &mut packet[HEADER_LEN..sl_index])?;
+ let len = self.encoder
+ .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?;
let crypted = {
let slice = &packet[HEADER_LEN..HEADER_LEN + len];
secretbox::seal(slice, &nonce, &self.key)
@@ -319,17 +318,11 @@ impl Connection {
} else {
Channels::Mono
};
- self.encoder = OpusEncoder::new(SAMPLE_RATE,
- channels,
- CodingMode::Audio)?;
+ self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?;
self.encoder_stereo = is_stereo;
}
- let buffer_len = if is_stereo {
- 960 * 2
- } else {
- 960
- };
+ let buffer_len = if is_stereo { 960 * 2 } else { 960 };
match source.read_frame(&mut buffer[..buffer_len]) {
Some(len) => len,
@@ -357,7 +350,10 @@ impl Connection {
self.speaking = speaking;
- self.client.lock().unwrap().send_json(&payload::build_speaking(speaking))
+ self.client
+ .lock()
+ .unwrap()
+ .send_json(&payload::build_speaking(speaking))
}
}
@@ -382,8 +378,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
}
#[inline]
-fn encryption_key(client: &mut Client)
- -> Result<Key> {
+fn encryption_key(client: &mut Client) -> Result<Key> {
loop {
match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Ready(ready) => {
@@ -391,8 +386,7 @@ fn encryption_key(client: &mut Client)
return Err(Error::Voice(VoiceError::VoiceModeInvalid));
}
- return Key::from_slice(&ready.secret_key)
- .ok_or(Error::Voice(VoiceError::KeyGen));
+ return Key::from_slice(&ready.secret_key).ok_or(Error::Voice(VoiceError::KeyGen));
},
VoiceEvent::Unknown(op, value) => {
debug!("[Voice] Expected ready for key; got: op{}/v{:?}",
@@ -405,13 +399,10 @@ fn encryption_key(client: &mut Client)
}
#[inline]
-fn has_valid_mode(modes: &[String]) -> bool {
- modes.iter().any(|s| s == CRYPTO_MODE)
-}
+fn has_valid_mode(modes: &[String]) -> bool { modes.iter().any(|s| s == CRYPTO_MODE) }
#[inline]
-fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
- -> Result<ThreadItems> {
+fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> {
let (udp_close_sender, udp_close_reader) = mpsc::channel();
let (ws_close_sender, ws_close_reader) = mpsc::channel();
@@ -445,27 +436,25 @@ fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
let ws_thread = ThreadBuilder::new()
.name(format!("{} WS", thread_name))
- .spawn(move || {
- loop {
- while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
- if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
- return;
- }
- }
+ .spawn(move || loop {
+ while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
+ if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
+ return;
+ }
+ }
- if ws_close_reader.try_recv().is_ok() {
- return;
- }
+ if ws_close_reader.try_recv().is_ok() {
+ return;
+ }
- thread::sleep(Duration::from_millis(25));
- }
- })?;
+ thread::sleep(Duration::from_millis(25));
+ })?;
Ok(ThreadItems {
- rx: rx,
- udp_close_sender: udp_close_sender,
- udp_thread: udp_thread,
- ws_close_sender: ws_close_sender,
- ws_thread: ws_thread,
- })
+ rx: rx,
+ udp_close_sender: udp_close_sender,
+ udp_thread: udp_thread,
+ ws_close_sender: ws_close_sender,
+ ws_thread: ws_thread,
+ })
}
diff --git a/src/voice/connection_info.rs b/src/voice/connection_info.rs
index d0364ce..91bee80 100644
--- a/src/voice/connection_info.rs
+++ b/src/voice/connection_info.rs
@@ -1,4 +1,4 @@
-use ::model::{GuildId, UserId};
+use model::{GuildId, UserId};
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
index fb157e6..fb1bf28 100644
--- a/src/voice/handler.rs
+++ b/src/voice/handler.rs
@@ -3,8 +3,8 @@ use std::sync::mpsc::{self, Sender as MpscSender};
use super::{AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::Status as VoiceStatus;
-use ::constants::VoiceOpCode;
-use ::model::{ChannelId, GuildId, UserId, VoiceState};
+use constants::VoiceOpCode;
+use model::{ChannelId, GuildId, UserId, VoiceState};
use super::threading;
/// The handler is responsible for "handling" a single voice connection, acting
@@ -153,12 +153,12 @@ impl Handler {
// Safe as all of these being present was already checked.
self.send(VoiceStatus::Connect(ConnectionInfo {
- endpoint: endpoint,
- guild_id: guild_id,
- session_id: session_id,
- token: token,
- user_id: user_id,
- }));
+ endpoint: endpoint,
+ guild_id: guild_id,
+ session_id: session_id,
+ token: token,
+ user_id: user_id,
+ }));
true
}
@@ -256,9 +256,7 @@ impl Handler {
}
/// Stops playing audio from a source, if one is set.
- pub fn stop(&mut self) {
- self.send(VoiceStatus::SetSender(None))
- }
+ pub fn stop(&mut self) { self.send(VoiceStatus::SetSender(None)) }
/// Switches the current connected voice channel to the given `channel_id`.
///
@@ -418,7 +416,5 @@ impl Handler {
impl Drop for Handler {
/// Leaves the current connected voice channel, if connected to one, and
/// forgets all configurations relevant to this Handler.
- fn drop(&mut self) {
- self.leave();
- }
+ fn drop(&mut self) { self.leave(); }
}
diff --git a/src/voice/manager.rs b/src/voice/manager.rs
index 528efe7..785aef8 100644
--- a/src/voice/manager.rs
+++ b/src/voice/manager.rs
@@ -2,7 +2,7 @@ use serde_json::Value;
use std::collections::HashMap;
use std::sync::mpsc::Sender as MpscSender;
use super::Handler;
-use ::model::{ChannelId, GuildId, UserId};
+use model::{ChannelId, GuildId, UserId};
/// A manager is a struct responsible for managing [`Handler`]s which belong to
/// a single [`Shard`]. This is a fairly complex key-value store,
@@ -64,7 +64,9 @@ impl Manager {
/// [`get`]: #method.get
#[allow(map_entry)]
pub fn join<C, G>(&mut self, guild_id: G, channel_id: C) -> &mut Handler
- where C: Into<ChannelId>, G: Into<GuildId> {
+ where
+ C: Into<ChannelId>,
+ G: Into<GuildId>, {
let channel_id = channel_id.into();
let guild_id = guild_id.into();
diff --git a/src/voice/payload.rs b/src/voice/payload.rs
index c2e7c0c..0096ebe 100644
--- a/src/voice/payload.rs
+++ b/src/voice/payload.rs
@@ -1,6 +1,6 @@
use serde_json::Value;
use super::connection_info::ConnectionInfo;
-use ::constants::VoiceOpCode;
+use constants::VoiceOpCode;
#[inline]
pub fn build_identify(info: &ConnectionInfo) -> Value {
diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs
index 4d3b9a9..c755da2 100644
--- a/src/voice/streamer.rs
+++ b/src/voice/streamer.rs
@@ -4,7 +4,7 @@ use std::ffi::OsStr;
use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult};
use std::process::{Child, Command, Stdio};
use super::{AudioSource, VoiceError};
-use ::internal::prelude::*;
+use internal::prelude::*;
struct ChildContainer(Child);
@@ -17,18 +17,18 @@ impl Read for ChildContainer {
struct PcmSource<R: Read + Send + 'static>(bool, R);
impl<R: Read + Send> AudioSource for PcmSource<R> {
- fn is_stereo(&mut self) -> bool {
- self.0
- }
+ fn is_stereo(&mut self) -> bool { self.0 }
fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> {
for (i, v) in buffer.iter_mut().enumerate() {
*v = match self.1.read_i16::<LittleEndian>() {
Ok(v) => v,
- Err(ref e) => return if e.kind() == IoErrorKind::UnexpectedEof {
- Some(i)
- } else {
- None
+ Err(ref e) => {
+ return if e.kind() == IoErrorKind::UnexpectedEof {
+ Some(i)
+ } else {
+ None
+ }
},
}
}
@@ -43,11 +43,7 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
/// Will fail if the path is not to a file on the fs. Likely a YouTube URI.
let is_stereo = is_stereo(path).unwrap_or(false);
- let stereo_val = if is_stereo {
- "2"
- } else {
- "1"
- };
+ let stereo_val = if is_stereo { "2" } else { "1" };
let args = [
"-f",
@@ -74,8 +70,7 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
}
/// Creates a PCM audio source.
-pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R)
- -> Box<AudioSource> {
+pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> {
Box::new(PcmSource(is_stereo, reader))
}
@@ -106,9 +101,11 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> {
};
let uri = match obj.remove("url") {
- Some(v) => match v {
- Value::String(uri) => uri,
- other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ Some(v) => {
+ match v {
+ Value::String(uri) => uri,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ }
},
None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))),
};
@@ -117,14 +114,7 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> {
}
fn is_stereo(path: &OsStr) -> Result<bool> {
- let args = [
- "-v",
- "quiet",
- "-of",
- "json",
- "-show-streams",
- "-i",
- ];
+ let args = ["-v", "quiet", "-of", "json", "-show-streams", "-i"];
let out = Command::new("ffprobe")
.args(&args)
@@ -134,19 +124,19 @@ fn is_stereo(path: &OsStr) -> Result<bool> {
let value: Value = serde_json::from_reader(&out.stdout[..])?;
- let streams = value.as_object()
+ let streams = value
+ .as_object()
.and_then(|m| m.get("streams"))
.and_then(|v| v.as_array())
.ok_or(Error::Voice(VoiceError::Streams))?;
- let check = streams.iter()
- .any(|stream| {
- let channels = stream.as_object()
- .and_then(|m| m.get("channels")
- .and_then(|v| v.as_i64()));
+ let check = streams.iter().any(|stream| {
+ let channels = stream
+ .as_object()
+ .and_then(|m| m.get("channels").and_then(|v| v.as_i64()));
- channels == Some(2)
- });
+ channels == Some(2)
+ });
Ok(check)
}
diff --git a/src/voice/threading.rs b/src/voice/threading.rs
index fe0aebc..f272282 100644
--- a/src/voice/threading.rs
+++ b/src/voice/threading.rs
@@ -2,8 +2,8 @@ use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError};
use std::thread::Builder as ThreadBuilder;
use super::connection::Connection;
use super::Status;
-use ::internal::Timer;
-use ::model::GuildId;
+use internal::Timer;
+use model::GuildId;
pub(crate) fn start(guild_id: GuildId, rx: MpscReceiver<Status>) {
let name = format!("Serenity Voice (G{})", guild_id);
@@ -25,9 +25,7 @@ fn runner(rx: &MpscReceiver<Status>) {
match rx.try_recv() {
Ok(Status::Connect(info)) => {
connection = match Connection::new(info) {
- Ok(connection) => {
- Some(connection)
- },
+ Ok(connection) => Some(connection),
Err(why) => {
warn!("[Voice] Error connecting: {:?}", why);
@@ -64,9 +62,7 @@ fn runner(rx: &MpscReceiver<Status>) {
// another event.
let error = match connection.as_mut() {
Some(connection) => {
- let cycle = connection.cycle(&mut sender,
- &mut receiver,
- &mut timer);
+ let cycle = connection.cycle(&mut sender, &mut receiver, &mut timer);
match cycle {
Ok(()) => false,