diff options
| author | acdenisSK <[email protected]> | 2017-07-27 06:42:48 +0200 |
|---|---|---|
| committer | acdenisSK <[email protected]> | 2017-07-27 07:30:23 +0200 |
| commit | 550030264952f0e0043b63f4582bb817ef8bbf37 (patch) | |
| tree | b921e2f78fd603a5ca671623083a32806fd16090 /src/voice/connection.rs | |
| parent | Use a consistent indentation style (diff) | |
| download | serenity-550030264952f0e0043b63f4582bb817ef8bbf37.tar.xz serenity-550030264952f0e0043b63f4582bb817ef8bbf37.zip | |
rustfmt
Diffstat (limited to 'src/voice/connection.rs')
| -rw-r--r-- | src/voice/connection.rs | 171 |
1 files changed, 80 insertions, 91 deletions
diff --git a/src/voice/connection.rs b/src/voice/connection.rs index ced6a79..6f8343c 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -1,10 +1,10 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; use opus::{ - Channels, + packet as opus_packet, Application as CodingMode, + Channels, Decoder as OpusDecoder, Encoder as OpusEncoder, - packet as opus_packet, }; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::collections::HashMap; @@ -14,18 +14,18 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; -use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource}; +use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE}; use super::connection_info::ConnectionInfo; -use super::{CRYPTO_MODE, VoiceError, payload}; +use super::{payload, VoiceError, CRYPTO_MODE}; use websocket::client::Url as WebsocketUrl; use websocket::sync::client::ClientBuilder; use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream}; use websocket::sync::Client as WsClient; -use ::internal::prelude::*; -use ::internal::ws_impl::{ReceiverExt, SenderExt}; -use ::internal::Timer; -use ::model::event::VoiceEvent; -use ::model::UserId; +use internal::prelude::*; +use internal::ws_impl::{ReceiverExt, SenderExt}; +use internal::Timer; +use model::event::VoiceEvent; +use model::UserId; type Client = WsClient<TlsStream<TcpStream>>; @@ -78,8 +78,7 @@ impl Connection { }, VoiceEvent::Heartbeat(_) => continue, other => { - debug!("[Voice] Expected hello/heartbeat; got: {:?}", - other); + debug!("[Voice] Expected hello/heartbeat; got: {:?}", other); return Err(Error::Voice(VoiceError::ExpectedHandshake)); }, @@ -116,7 +115,10 @@ impl Connection { // Find the position in the bytes that contains the first byte of 0, // indicating the "end of the address". - let index = bytes.iter().skip(4).position(|&x| x == 0) + let index = bytes + .iter() + .skip(4) + .position(|&x| x == 0) .ok_or(Error::Voice(VoiceError::FindingByte))?; let pos = 4 + index; @@ -124,12 +126,16 @@ impl Connection { let port_pos = len - 2; let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?; - client.send_json(&payload::build_select_protocol(addr, port))?; + client + .send_json(&payload::build_select_protocol(addr, port))?; } let key = encryption_key(&mut client)?; - let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25))); + let _ = client + .stream_ref() + .as_tcp() + .set_read_timeout(Some(Duration::from_millis(25))); let mutexed_client = Arc::new(Mutex::new(client)); let thread_items = start_threads(mutexed_client.clone(), &udp)?; @@ -139,23 +145,23 @@ impl Connection { let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?; Ok(Connection { - audio_timer: Timer::new(1000 * 60 * 4), - client: mutexed_client, - decoder_map: HashMap::new(), - destination: destination, - encoder: encoder, - encoder_stereo: false, - key: key, - keepalive_timer: Timer::new(hello.heartbeat_interval), - udp: udp, - sequence: 0, - silence_frames: 0, - speaking: false, - ssrc: hello.ssrc, - thread_items: thread_items, - timestamp: 0, - user_id: info.user_id, - }) + audio_timer: Timer::new(1000 * 60 * 4), + client: mutexed_client, + decoder_map: HashMap::new(), + destination: destination, + encoder: encoder, + encoder_stereo: false, + key: key, + keepalive_timer: Timer::new(hello.heartbeat_interval), + udp: udp, + sequence: 0, + silence_frames: 0, + speaking: false, + ssrc: hello.ssrc, + thread_items: thread_items, + timestamp: 0, + user_id: info.user_id, + }) } #[allow(unused_variables)] @@ -179,35 +185,29 @@ impl Connection { nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); - if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { + if let Ok(decrypted) = + secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { let channels = opus_packet::get_nb_channels(&decrypted)?; - let entry = self.decoder_map.entry((ssrc, channels)) - .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE, - channels) - .unwrap()); + let entry = + self.decoder_map.entry((ssrc, channels)).or_insert_with( + || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(), + ); let len = entry.decode(&decrypted, &mut buffer, false)?; let is_stereo = channels == Channels::Stereo; - let b = if is_stereo { - len * 2 - } else { - len - }; + let b = if is_stereo { len * 2 } else { len }; receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]); } }, ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { - receiver.speaking_update(ev.ssrc, - ev.user_id.0, - ev.speaking); + receiver.speaking_update(ev.ssrc, ev.user_id.0, ev.speaking); }, ReceiverStatus::Websocket(other) => { - info!("[Voice] Received other websocket data: {:?}", - other); + info!("[Voice] Received other websocket data: {:?}", other); }, } } @@ -221,7 +221,10 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.client.lock().unwrap().send_json(&payload::build_keepalive())?; + self.client + .lock() + .unwrap() + .send_json(&payload::build_keepalive())?; } // Send UDP keepalive if it's time @@ -282,14 +285,10 @@ impl Connection { nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); let sl_index = packet.len() - 16; - let buffer_len = if self.encoder_stereo { - 960 * 2 - } else { - 960 - }; + let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 }; - let len = self.encoder.encode(&buffer[..buffer_len], - &mut packet[HEADER_LEN..sl_index])?; + let len = self.encoder + .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?; let crypted = { let slice = &packet[HEADER_LEN..HEADER_LEN + len]; secretbox::seal(slice, &nonce, &self.key) @@ -319,17 +318,11 @@ impl Connection { } else { Channels::Mono }; - self.encoder = OpusEncoder::new(SAMPLE_RATE, - channels, - CodingMode::Audio)?; + self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?; self.encoder_stereo = is_stereo; } - let buffer_len = if is_stereo { - 960 * 2 - } else { - 960 - }; + let buffer_len = if is_stereo { 960 * 2 } else { 960 }; match source.read_frame(&mut buffer[..buffer_len]) { Some(len) => len, @@ -357,7 +350,10 @@ impl Connection { self.speaking = speaking; - self.client.lock().unwrap().send_json(&payload::build_speaking(speaking)) + self.client + .lock() + .unwrap() + .send_json(&payload::build_speaking(speaking)) } } @@ -382,8 +378,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { } #[inline] -fn encryption_key(client: &mut Client) - -> Result<Key> { +fn encryption_key(client: &mut Client) -> Result<Key> { loop { match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Ready(ready) => { @@ -391,8 +386,7 @@ fn encryption_key(client: &mut Client) return Err(Error::Voice(VoiceError::VoiceModeInvalid)); } - return Key::from_slice(&ready.secret_key) - .ok_or(Error::Voice(VoiceError::KeyGen)); + return Key::from_slice(&ready.secret_key).ok_or(Error::Voice(VoiceError::KeyGen)); }, VoiceEvent::Unknown(op, value) => { debug!("[Voice] Expected ready for key; got: op{}/v{:?}", @@ -405,13 +399,10 @@ fn encryption_key(client: &mut Client) } #[inline] -fn has_valid_mode(modes: &[String]) -> bool { - modes.iter().any(|s| s == CRYPTO_MODE) -} +fn has_valid_mode(modes: &[String]) -> bool { modes.iter().any(|s| s == CRYPTO_MODE) } #[inline] -fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) - -> Result<ThreadItems> { +fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> { let (udp_close_sender, udp_close_reader) = mpsc::channel(); let (ws_close_sender, ws_close_reader) = mpsc::channel(); @@ -445,27 +436,25 @@ fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) let ws_thread = ThreadBuilder::new() .name(format!("{} WS", thread_name)) - .spawn(move || { - loop { - while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { - if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { - return; - } - } + .spawn(move || loop { + while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { + if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { + return; + } + } - if ws_close_reader.try_recv().is_ok() { - return; - } + if ws_close_reader.try_recv().is_ok() { + return; + } - thread::sleep(Duration::from_millis(25)); - } - })?; + thread::sleep(Duration::from_millis(25)); + })?; Ok(ThreadItems { - rx: rx, - udp_close_sender: udp_close_sender, - udp_thread: udp_thread, - ws_close_sender: ws_close_sender, - ws_thread: ws_thread, - }) + rx: rx, + udp_close_sender: udp_close_sender, + udp_thread: udp_thread, + ws_close_sender: ws_close_sender, + ws_thread: ws_thread, + }) } |