aboutsummaryrefslogtreecommitdiff
path: root/src/voice/connection.rs
diff options
context:
space:
mode:
authoracdenisSK <[email protected]>2017-07-27 06:42:48 +0200
committeracdenisSK <[email protected]>2017-07-27 07:30:23 +0200
commit550030264952f0e0043b63f4582bb817ef8bbf37 (patch)
treeb921e2f78fd603a5ca671623083a32806fd16090 /src/voice/connection.rs
parentUse a consistent indentation style (diff)
downloadserenity-550030264952f0e0043b63f4582bb817ef8bbf37.tar.xz
serenity-550030264952f0e0043b63f4582bb817ef8bbf37.zip
rustfmt
Diffstat (limited to 'src/voice/connection.rs')
-rw-r--r--src/voice/connection.rs171
1 files changed, 80 insertions, 91 deletions
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index ced6a79..6f8343c 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -1,10 +1,10 @@
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use opus::{
- Channels,
+ packet as opus_packet,
Application as CodingMode,
+ Channels,
Decoder as OpusDecoder,
Encoder as OpusEncoder,
- packet as opus_packet,
};
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::collections::HashMap;
@@ -14,18 +14,18 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
-use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
+use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE};
use super::connection_info::ConnectionInfo;
-use super::{CRYPTO_MODE, VoiceError, payload};
+use super::{payload, VoiceError, CRYPTO_MODE};
use websocket::client::Url as WebsocketUrl;
use websocket::sync::client::ClientBuilder;
use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream};
use websocket::sync::Client as WsClient;
-use ::internal::prelude::*;
-use ::internal::ws_impl::{ReceiverExt, SenderExt};
-use ::internal::Timer;
-use ::model::event::VoiceEvent;
-use ::model::UserId;
+use internal::prelude::*;
+use internal::ws_impl::{ReceiverExt, SenderExt};
+use internal::Timer;
+use model::event::VoiceEvent;
+use model::UserId;
type Client = WsClient<TlsStream<TcpStream>>;
@@ -78,8 +78,7 @@ impl Connection {
},
VoiceEvent::Heartbeat(_) => continue,
other => {
- debug!("[Voice] Expected hello/heartbeat; got: {:?}",
- other);
+ debug!("[Voice] Expected hello/heartbeat; got: {:?}", other);
return Err(Error::Voice(VoiceError::ExpectedHandshake));
},
@@ -116,7 +115,10 @@ impl Connection {
// Find the position in the bytes that contains the first byte of 0,
// indicating the "end of the address".
- let index = bytes.iter().skip(4).position(|&x| x == 0)
+ let index = bytes
+ .iter()
+ .skip(4)
+ .position(|&x| x == 0)
.ok_or(Error::Voice(VoiceError::FindingByte))?;
let pos = 4 + index;
@@ -124,12 +126,16 @@ impl Connection {
let port_pos = len - 2;
let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
- client.send_json(&payload::build_select_protocol(addr, port))?;
+ client
+ .send_json(&payload::build_select_protocol(addr, port))?;
}
let key = encryption_key(&mut client)?;
- let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25)));
+ let _ = client
+ .stream_ref()
+ .as_tcp()
+ .set_read_timeout(Some(Duration::from_millis(25)));
let mutexed_client = Arc::new(Mutex::new(client));
let thread_items = start_threads(mutexed_client.clone(), &udp)?;
@@ -139,23 +145,23 @@ impl Connection {
let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?;
Ok(Connection {
- audio_timer: Timer::new(1000 * 60 * 4),
- client: mutexed_client,
- decoder_map: HashMap::new(),
- destination: destination,
- encoder: encoder,
- encoder_stereo: false,
- key: key,
- keepalive_timer: Timer::new(hello.heartbeat_interval),
- udp: udp,
- sequence: 0,
- silence_frames: 0,
- speaking: false,
- ssrc: hello.ssrc,
- thread_items: thread_items,
- timestamp: 0,
- user_id: info.user_id,
- })
+ audio_timer: Timer::new(1000 * 60 * 4),
+ client: mutexed_client,
+ decoder_map: HashMap::new(),
+ destination: destination,
+ encoder: encoder,
+ encoder_stereo: false,
+ key: key,
+ keepalive_timer: Timer::new(hello.heartbeat_interval),
+ udp: udp,
+ sequence: 0,
+ silence_frames: 0,
+ speaking: false,
+ ssrc: hello.ssrc,
+ thread_items: thread_items,
+ timestamp: 0,
+ user_id: info.user_id,
+ })
}
#[allow(unused_variables)]
@@ -179,35 +185,29 @@ impl Connection {
nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
- if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ if let Ok(decrypted) =
+ secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
let channels = opus_packet::get_nb_channels(&decrypted)?;
- let entry = self.decoder_map.entry((ssrc, channels))
- .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE,
- channels)
- .unwrap());
+ let entry =
+ self.decoder_map.entry((ssrc, channels)).or_insert_with(
+ || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(),
+ );
let len = entry.decode(&decrypted, &mut buffer, false)?;
let is_stereo = channels == Channels::Stereo;
- let b = if is_stereo {
- len * 2
- } else {
- len
- };
+ let b = if is_stereo { len * 2 } else { len };
receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
}
},
ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
- receiver.speaking_update(ev.ssrc,
- ev.user_id.0,
- ev.speaking);
+ receiver.speaking_update(ev.ssrc, ev.user_id.0, ev.speaking);
},
ReceiverStatus::Websocket(other) => {
- info!("[Voice] Received other websocket data: {:?}",
- other);
+ info!("[Voice] Received other websocket data: {:?}", other);
},
}
}
@@ -221,7 +221,10 @@ impl Connection {
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- self.client.lock().unwrap().send_json(&payload::build_keepalive())?;
+ self.client
+ .lock()
+ .unwrap()
+ .send_json(&payload::build_keepalive())?;
}
// Send UDP keepalive if it's time
@@ -282,14 +285,10 @@ impl Connection {
nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
let sl_index = packet.len() - 16;
- let buffer_len = if self.encoder_stereo {
- 960 * 2
- } else {
- 960
- };
+ let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 };
- let len = self.encoder.encode(&buffer[..buffer_len],
- &mut packet[HEADER_LEN..sl_index])?;
+ let len = self.encoder
+ .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?;
let crypted = {
let slice = &packet[HEADER_LEN..HEADER_LEN + len];
secretbox::seal(slice, &nonce, &self.key)
@@ -319,17 +318,11 @@ impl Connection {
} else {
Channels::Mono
};
- self.encoder = OpusEncoder::new(SAMPLE_RATE,
- channels,
- CodingMode::Audio)?;
+ self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?;
self.encoder_stereo = is_stereo;
}
- let buffer_len = if is_stereo {
- 960 * 2
- } else {
- 960
- };
+ let buffer_len = if is_stereo { 960 * 2 } else { 960 };
match source.read_frame(&mut buffer[..buffer_len]) {
Some(len) => len,
@@ -357,7 +350,10 @@ impl Connection {
self.speaking = speaking;
- self.client.lock().unwrap().send_json(&payload::build_speaking(speaking))
+ self.client
+ .lock()
+ .unwrap()
+ .send_json(&payload::build_speaking(speaking))
}
}
@@ -382,8 +378,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
}
#[inline]
-fn encryption_key(client: &mut Client)
- -> Result<Key> {
+fn encryption_key(client: &mut Client) -> Result<Key> {
loop {
match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Ready(ready) => {
@@ -391,8 +386,7 @@ fn encryption_key(client: &mut Client)
return Err(Error::Voice(VoiceError::VoiceModeInvalid));
}
- return Key::from_slice(&ready.secret_key)
- .ok_or(Error::Voice(VoiceError::KeyGen));
+ return Key::from_slice(&ready.secret_key).ok_or(Error::Voice(VoiceError::KeyGen));
},
VoiceEvent::Unknown(op, value) => {
debug!("[Voice] Expected ready for key; got: op{}/v{:?}",
@@ -405,13 +399,10 @@ fn encryption_key(client: &mut Client)
}
#[inline]
-fn has_valid_mode(modes: &[String]) -> bool {
- modes.iter().any(|s| s == CRYPTO_MODE)
-}
+fn has_valid_mode(modes: &[String]) -> bool { modes.iter().any(|s| s == CRYPTO_MODE) }
#[inline]
-fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
- -> Result<ThreadItems> {
+fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> {
let (udp_close_sender, udp_close_reader) = mpsc::channel();
let (ws_close_sender, ws_close_reader) = mpsc::channel();
@@ -445,27 +436,25 @@ fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
let ws_thread = ThreadBuilder::new()
.name(format!("{} WS", thread_name))
- .spawn(move || {
- loop {
- while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
- if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
- return;
- }
- }
+ .spawn(move || loop {
+ while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
+ if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
+ return;
+ }
+ }
- if ws_close_reader.try_recv().is_ok() {
- return;
- }
+ if ws_close_reader.try_recv().is_ok() {
+ return;
+ }
- thread::sleep(Duration::from_millis(25));
- }
- })?;
+ thread::sleep(Duration::from_millis(25));
+ })?;
Ok(ThreadItems {
- rx: rx,
- udp_close_sender: udp_close_sender,
- udp_thread: udp_thread,
- ws_close_sender: ws_close_sender,
- ws_thread: ws_thread,
- })
+ rx: rx,
+ udp_close_sender: udp_close_sender,
+ udp_thread: udp_thread,
+ ws_close_sender: ws_close_sender,
+ ws_thread: ws_thread,
+ })
}