diff options
| author | acdenisSK <[email protected]> | 2017-07-27 06:42:48 +0200 |
|---|---|---|
| committer | acdenisSK <[email protected]> | 2017-07-27 07:30:23 +0200 |
| commit | 550030264952f0e0043b63f4582bb817ef8bbf37 (patch) | |
| tree | b921e2f78fd603a5ca671623083a32806fd16090 /src/voice | |
| parent | Use a consistent indentation style (diff) | |
| download | serenity-550030264952f0e0043b63f4582bb817ef8bbf37.tar.xz serenity-550030264952f0e0043b63f4582bb817ef8bbf37.zip | |
rustfmt
Diffstat (limited to 'src/voice')
| -rw-r--r-- | src/voice/audio.rs | 7 | ||||
| -rw-r--r-- | src/voice/connection.rs | 171 | ||||
| -rw-r--r-- | src/voice/connection_info.rs | 2 | ||||
| -rw-r--r-- | src/voice/handler.rs | 24 | ||||
| -rw-r--r-- | src/voice/manager.rs | 6 | ||||
| -rw-r--r-- | src/voice/payload.rs | 2 | ||||
| -rw-r--r-- | src/voice/streamer.rs | 58 | ||||
| -rw-r--r-- | src/voice/threading.rs | 12 |
8 files changed, 130 insertions, 152 deletions
diff --git a/src/voice/audio.rs b/src/voice/audio.rs index ea8c87a..14b9ccd 100644 --- a/src/voice/audio.rs +++ b/src/voice/audio.rs @@ -12,5 +12,10 @@ pub trait AudioSource: Send { pub trait AudioReceiver: Send { fn speaking_update(&mut self, ssrc: u32, user_id: u64, speaking: bool); - fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]); + fn voice_packet(&mut self, + ssrc: u32, + sequence: u16, + timestamp: u32, + stereo: bool, + data: &[i16]); } diff --git a/src/voice/connection.rs b/src/voice/connection.rs index ced6a79..6f8343c 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -1,10 +1,10 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; use opus::{ - Channels, + packet as opus_packet, Application as CodingMode, + Channels, Decoder as OpusDecoder, Encoder as OpusEncoder, - packet as opus_packet, }; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::collections::HashMap; @@ -14,18 +14,18 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; -use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource}; +use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE}; use super::connection_info::ConnectionInfo; -use super::{CRYPTO_MODE, VoiceError, payload}; +use super::{payload, VoiceError, CRYPTO_MODE}; use websocket::client::Url as WebsocketUrl; use websocket::sync::client::ClientBuilder; use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream}; use websocket::sync::Client as WsClient; -use ::internal::prelude::*; -use ::internal::ws_impl::{ReceiverExt, SenderExt}; -use ::internal::Timer; -use ::model::event::VoiceEvent; -use ::model::UserId; +use internal::prelude::*; +use internal::ws_impl::{ReceiverExt, SenderExt}; +use internal::Timer; +use model::event::VoiceEvent; +use model::UserId; type Client = WsClient<TlsStream<TcpStream>>; @@ -78,8 +78,7 @@ impl Connection { }, VoiceEvent::Heartbeat(_) => continue, other => { - debug!("[Voice] Expected hello/heartbeat; got: {:?}", - other); + debug!("[Voice] Expected hello/heartbeat; got: {:?}", other); return Err(Error::Voice(VoiceError::ExpectedHandshake)); }, @@ -116,7 +115,10 @@ impl Connection { // Find the position in the bytes that contains the first byte of 0, // indicating the "end of the address". - let index = bytes.iter().skip(4).position(|&x| x == 0) + let index = bytes + .iter() + .skip(4) + .position(|&x| x == 0) .ok_or(Error::Voice(VoiceError::FindingByte))?; let pos = 4 + index; @@ -124,12 +126,16 @@ impl Connection { let port_pos = len - 2; let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?; - client.send_json(&payload::build_select_protocol(addr, port))?; + client + .send_json(&payload::build_select_protocol(addr, port))?; } let key = encryption_key(&mut client)?; - let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25))); + let _ = client + .stream_ref() + .as_tcp() + .set_read_timeout(Some(Duration::from_millis(25))); let mutexed_client = Arc::new(Mutex::new(client)); let thread_items = start_threads(mutexed_client.clone(), &udp)?; @@ -139,23 +145,23 @@ impl Connection { let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?; Ok(Connection { - audio_timer: Timer::new(1000 * 60 * 4), - client: mutexed_client, - decoder_map: HashMap::new(), - destination: destination, - encoder: encoder, - encoder_stereo: false, - key: key, - keepalive_timer: Timer::new(hello.heartbeat_interval), - udp: udp, - sequence: 0, - silence_frames: 0, - speaking: false, - ssrc: hello.ssrc, - thread_items: thread_items, - timestamp: 0, - user_id: info.user_id, - }) + audio_timer: Timer::new(1000 * 60 * 4), + client: mutexed_client, + decoder_map: HashMap::new(), + destination: destination, + encoder: encoder, + encoder_stereo: false, + key: key, + keepalive_timer: Timer::new(hello.heartbeat_interval), + udp: udp, + sequence: 0, + silence_frames: 0, + speaking: false, + ssrc: hello.ssrc, + thread_items: thread_items, + timestamp: 0, + user_id: info.user_id, + }) } #[allow(unused_variables)] @@ -179,35 +185,29 @@ impl Connection { nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); - if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { + if let Ok(decrypted) = + secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { let channels = opus_packet::get_nb_channels(&decrypted)?; - let entry = self.decoder_map.entry((ssrc, channels)) - .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE, - channels) - .unwrap()); + let entry = + self.decoder_map.entry((ssrc, channels)).or_insert_with( + || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(), + ); let len = entry.decode(&decrypted, &mut buffer, false)?; let is_stereo = channels == Channels::Stereo; - let b = if is_stereo { - len * 2 - } else { - len - }; + let b = if is_stereo { len * 2 } else { len }; receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]); } }, ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { - receiver.speaking_update(ev.ssrc, - ev.user_id.0, - ev.speaking); + receiver.speaking_update(ev.ssrc, ev.user_id.0, ev.speaking); }, ReceiverStatus::Websocket(other) => { - info!("[Voice] Received other websocket data: {:?}", - other); + info!("[Voice] Received other websocket data: {:?}", other); }, } } @@ -221,7 +221,10 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.client.lock().unwrap().send_json(&payload::build_keepalive())?; + self.client + .lock() + .unwrap() + .send_json(&payload::build_keepalive())?; } // Send UDP keepalive if it's time @@ -282,14 +285,10 @@ impl Connection { nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); let sl_index = packet.len() - 16; - let buffer_len = if self.encoder_stereo { - 960 * 2 - } else { - 960 - }; + let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 }; - let len = self.encoder.encode(&buffer[..buffer_len], - &mut packet[HEADER_LEN..sl_index])?; + let len = self.encoder + .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?; let crypted = { let slice = &packet[HEADER_LEN..HEADER_LEN + len]; secretbox::seal(slice, &nonce, &self.key) @@ -319,17 +318,11 @@ impl Connection { } else { Channels::Mono }; - self.encoder = OpusEncoder::new(SAMPLE_RATE, - channels, - CodingMode::Audio)?; + self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?; self.encoder_stereo = is_stereo; } - let buffer_len = if is_stereo { - 960 * 2 - } else { - 960 - }; + let buffer_len = if is_stereo { 960 * 2 } else { 960 }; match source.read_frame(&mut buffer[..buffer_len]) { Some(len) => len, @@ -357,7 +350,10 @@ impl Connection { self.speaking = speaking; - self.client.lock().unwrap().send_json(&payload::build_speaking(speaking)) + self.client + .lock() + .unwrap() + .send_json(&payload::build_speaking(speaking)) } } @@ -382,8 +378,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { } #[inline] -fn encryption_key(client: &mut Client) - -> Result<Key> { +fn encryption_key(client: &mut Client) -> Result<Key> { loop { match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Ready(ready) => { @@ -391,8 +386,7 @@ fn encryption_key(client: &mut Client) return Err(Error::Voice(VoiceError::VoiceModeInvalid)); } - return Key::from_slice(&ready.secret_key) - .ok_or(Error::Voice(VoiceError::KeyGen)); + return Key::from_slice(&ready.secret_key).ok_or(Error::Voice(VoiceError::KeyGen)); }, VoiceEvent::Unknown(op, value) => { debug!("[Voice] Expected ready for key; got: op{}/v{:?}", @@ -405,13 +399,10 @@ fn encryption_key(client: &mut Client) } #[inline] -fn has_valid_mode(modes: &[String]) -> bool { - modes.iter().any(|s| s == CRYPTO_MODE) -} +fn has_valid_mode(modes: &[String]) -> bool { modes.iter().any(|s| s == CRYPTO_MODE) } #[inline] -fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) - -> Result<ThreadItems> { +fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> { let (udp_close_sender, udp_close_reader) = mpsc::channel(); let (ws_close_sender, ws_close_reader) = mpsc::channel(); @@ -445,27 +436,25 @@ fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) let ws_thread = ThreadBuilder::new() .name(format!("{} WS", thread_name)) - .spawn(move || { - loop { - while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { - if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { - return; - } - } + .spawn(move || loop { + while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { + if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { + return; + } + } - if ws_close_reader.try_recv().is_ok() { - return; - } + if ws_close_reader.try_recv().is_ok() { + return; + } - thread::sleep(Duration::from_millis(25)); - } - })?; + thread::sleep(Duration::from_millis(25)); + })?; Ok(ThreadItems { - rx: rx, - udp_close_sender: udp_close_sender, - udp_thread: udp_thread, - ws_close_sender: ws_close_sender, - ws_thread: ws_thread, - }) + rx: rx, + udp_close_sender: udp_close_sender, + udp_thread: udp_thread, + ws_close_sender: ws_close_sender, + ws_thread: ws_thread, + }) } diff --git a/src/voice/connection_info.rs b/src/voice/connection_info.rs index d0364ce..91bee80 100644 --- a/src/voice/connection_info.rs +++ b/src/voice/connection_info.rs @@ -1,4 +1,4 @@ -use ::model::{GuildId, UserId}; +use model::{GuildId, UserId}; #[derive(Clone, Debug)] pub struct ConnectionInfo { diff --git a/src/voice/handler.rs b/src/voice/handler.rs index fb157e6..fb1bf28 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -3,8 +3,8 @@ use std::sync::mpsc::{self, Sender as MpscSender}; use super::{AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::Status as VoiceStatus; -use ::constants::VoiceOpCode; -use ::model::{ChannelId, GuildId, UserId, VoiceState}; +use constants::VoiceOpCode; +use model::{ChannelId, GuildId, UserId, VoiceState}; use super::threading; /// The handler is responsible for "handling" a single voice connection, acting @@ -153,12 +153,12 @@ impl Handler { // Safe as all of these being present was already checked. self.send(VoiceStatus::Connect(ConnectionInfo { - endpoint: endpoint, - guild_id: guild_id, - session_id: session_id, - token: token, - user_id: user_id, - })); + endpoint: endpoint, + guild_id: guild_id, + session_id: session_id, + token: token, + user_id: user_id, + })); true } @@ -256,9 +256,7 @@ impl Handler { } /// Stops playing audio from a source, if one is set. - pub fn stop(&mut self) { - self.send(VoiceStatus::SetSender(None)) - } + pub fn stop(&mut self) { self.send(VoiceStatus::SetSender(None)) } /// Switches the current connected voice channel to the given `channel_id`. /// @@ -418,7 +416,5 @@ impl Handler { impl Drop for Handler { /// Leaves the current connected voice channel, if connected to one, and /// forgets all configurations relevant to this Handler. - fn drop(&mut self) { - self.leave(); - } + fn drop(&mut self) { self.leave(); } } diff --git a/src/voice/manager.rs b/src/voice/manager.rs index 528efe7..785aef8 100644 --- a/src/voice/manager.rs +++ b/src/voice/manager.rs @@ -2,7 +2,7 @@ use serde_json::Value; use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::Handler; -use ::model::{ChannelId, GuildId, UserId}; +use model::{ChannelId, GuildId, UserId}; /// A manager is a struct responsible for managing [`Handler`]s which belong to /// a single [`Shard`]. This is a fairly complex key-value store, @@ -64,7 +64,9 @@ impl Manager { /// [`get`]: #method.get #[allow(map_entry)] pub fn join<C, G>(&mut self, guild_id: G, channel_id: C) -> &mut Handler - where C: Into<ChannelId>, G: Into<GuildId> { + where + C: Into<ChannelId>, + G: Into<GuildId>, { let channel_id = channel_id.into(); let guild_id = guild_id.into(); diff --git a/src/voice/payload.rs b/src/voice/payload.rs index c2e7c0c..0096ebe 100644 --- a/src/voice/payload.rs +++ b/src/voice/payload.rs @@ -1,6 +1,6 @@ use serde_json::Value; use super::connection_info::ConnectionInfo; -use ::constants::VoiceOpCode; +use constants::VoiceOpCode; #[inline] pub fn build_identify(info: &ConnectionInfo) -> Value { diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs index 4d3b9a9..c755da2 100644 --- a/src/voice/streamer.rs +++ b/src/voice/streamer.rs @@ -4,7 +4,7 @@ use std::ffi::OsStr; use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult}; use std::process::{Child, Command, Stdio}; use super::{AudioSource, VoiceError}; -use ::internal::prelude::*; +use internal::prelude::*; struct ChildContainer(Child); @@ -17,18 +17,18 @@ impl Read for ChildContainer { struct PcmSource<R: Read + Send + 'static>(bool, R); impl<R: Read + Send> AudioSource for PcmSource<R> { - fn is_stereo(&mut self) -> bool { - self.0 - } + fn is_stereo(&mut self) -> bool { self.0 } fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> { for (i, v) in buffer.iter_mut().enumerate() { *v = match self.1.read_i16::<LittleEndian>() { Ok(v) => v, - Err(ref e) => return if e.kind() == IoErrorKind::UnexpectedEof { - Some(i) - } else { - None + Err(ref e) => { + return if e.kind() == IoErrorKind::UnexpectedEof { + Some(i) + } else { + None + } }, } } @@ -43,11 +43,7 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> { /// Will fail if the path is not to a file on the fs. Likely a YouTube URI. let is_stereo = is_stereo(path).unwrap_or(false); - let stereo_val = if is_stereo { - "2" - } else { - "1" - }; + let stereo_val = if is_stereo { "2" } else { "1" }; let args = [ "-f", @@ -74,8 +70,7 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> { } /// Creates a PCM audio source. -pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) - -> Box<AudioSource> { +pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> { Box::new(PcmSource(is_stereo, reader)) } @@ -106,9 +101,11 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> { }; let uri = match obj.remove("url") { - Some(v) => match v { - Value::String(uri) => uri, - other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))), + Some(v) => { + match v { + Value::String(uri) => uri, + other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))), + } }, None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))), }; @@ -117,14 +114,7 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> { } fn is_stereo(path: &OsStr) -> Result<bool> { - let args = [ - "-v", - "quiet", - "-of", - "json", - "-show-streams", - "-i", - ]; + let args = ["-v", "quiet", "-of", "json", "-show-streams", "-i"]; let out = Command::new("ffprobe") .args(&args) @@ -134,19 +124,19 @@ fn is_stereo(path: &OsStr) -> Result<bool> { let value: Value = serde_json::from_reader(&out.stdout[..])?; - let streams = value.as_object() + let streams = value + .as_object() .and_then(|m| m.get("streams")) .and_then(|v| v.as_array()) .ok_or(Error::Voice(VoiceError::Streams))?; - let check = streams.iter() - .any(|stream| { - let channels = stream.as_object() - .and_then(|m| m.get("channels") - .and_then(|v| v.as_i64())); + let check = streams.iter().any(|stream| { + let channels = stream + .as_object() + .and_then(|m| m.get("channels").and_then(|v| v.as_i64())); - channels == Some(2) - }); + channels == Some(2) + }); Ok(check) } diff --git a/src/voice/threading.rs b/src/voice/threading.rs index fe0aebc..f272282 100644 --- a/src/voice/threading.rs +++ b/src/voice/threading.rs @@ -2,8 +2,8 @@ use std::sync::mpsc::{Receiver as MpscReceiver, TryRecvError}; use std::thread::Builder as ThreadBuilder; use super::connection::Connection; use super::Status; -use ::internal::Timer; -use ::model::GuildId; +use internal::Timer; +use model::GuildId; pub(crate) fn start(guild_id: GuildId, rx: MpscReceiver<Status>) { let name = format!("Serenity Voice (G{})", guild_id); @@ -25,9 +25,7 @@ fn runner(rx: &MpscReceiver<Status>) { match rx.try_recv() { Ok(Status::Connect(info)) => { connection = match Connection::new(info) { - Ok(connection) => { - Some(connection) - }, + Ok(connection) => Some(connection), Err(why) => { warn!("[Voice] Error connecting: {:?}", why); @@ -64,9 +62,7 @@ fn runner(rx: &MpscReceiver<Status>) { // another event. let error = match connection.as_mut() { Some(connection) => { - let cycle = connection.cycle(&mut sender, - &mut receiver, - &mut timer); + let cycle = connection.cycle(&mut sender, &mut receiver, &mut timer); match cycle { Ok(()) => false, |