diff options
| author | Zeyla Hellyer <[email protected]> | 2017-06-10 16:06:37 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-06-10 16:06:37 -0700 |
| commit | 2f30f9ab38761aad62af977ab4440b8bfb43a897 (patch) | |
| tree | e403e3e3cd394c9ff6d05f49febe285868ae108b /src/voice | |
| parent | Use an https connector in http::send_files (diff) | |
| download | serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.tar.xz serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.zip | |
Fix voice compilation
Diffstat (limited to 'src/voice')
| -rw-r--r-- | src/voice/connection.rs | 50 | ||||
| -rw-r--r-- | src/voice/handler.rs | 10 | ||||
| -rw-r--r-- | src/voice/manager.rs | 6 |
3 files changed, 34 insertions, 32 deletions
diff --git a/src/voice/connection.rs b/src/voice/connection.rs index 698f469..5bc4f36 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -11,24 +11,24 @@ use std::collections::HashMap; use std::io::Write; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; +use std::sync::{Arc, Mutex}; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::{CRYPTO_MODE, VoiceError, payload}; -use websocket::client::request::Url as WebsocketUrl; -use websocket::client::{ - Client as WsClient, - Receiver as WsReceiver, - Sender as WsSender -}; -use websocket::stream::WebSocketStream; +use websocket::client::Url as WebsocketUrl; +use websocket::sync::client::ClientBuilder; +use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream}; +use websocket::sync::Client as WsClient; use ::internal::prelude::*; use ::internal::ws_impl::{ReceiverExt, SenderExt}; use ::internal::Timer; use ::model::event::VoiceEvent; use ::model::UserId; +type Client = WsClient<TlsStream<TcpStream>>; + enum ReceiverStatus { Udp(Vec<u8>), Websocket(VoiceEvent), @@ -46,13 +46,13 @@ struct ThreadItems { #[allow(dead_code)] pub struct Connection { audio_timer: Timer, + client: Arc<Mutex<Client>>, decoder_map: HashMap<(u32, Channels), OpusDecoder>, destination: SocketAddr, encoder: OpusEncoder, encoder_stereo: bool, keepalive_timer: Timer, key: Key, - sender: WsSender<WebSocketStream>, sequence: u16, silence_frames: u8, speaking: bool, @@ -67,17 +67,15 @@ impl Connection { pub fn new(mut info: ConnectionInfo) -> Result<Connection> { let url = generate_url(&mut info.endpoint)?; - let response = WsClient::connect(url)?.send()?; - response.validate()?; - let (mut sender, mut receiver) = response.begin().split(); + let mut client = ClientBuilder::from_url(&url).connect_secure(None)?; - sender.send_json(&payload::build_identify(&info))?; + client.send_json(&payload::build_identify(&info))?; let hello = { let hello; loop { - match receiver.recv_json(VoiceEvent::decode)? { + match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Hello(received_hello) => { hello = received_hello; @@ -134,12 +132,15 @@ impl Connection { let port_pos = len - 2; let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?; - sender.send_json(&payload::build_select_protocol(addr, port))?; + client.send_json(&payload::build_select_protocol(addr, port))?; } - let key = encryption_key(&mut receiver)?; + let key = encryption_key(&mut client)?; + + let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25))); - let thread_items = start_threads(receiver, &udp)?; + let mutexed_client = Arc::new(Mutex::new(client)); + let thread_items = start_threads(mutexed_client.clone(), &udp)?; info!("[Voice] Connected to: {}", info.endpoint); @@ -147,6 +148,7 @@ impl Connection { Ok(Connection { audio_timer: Timer::new(1000 * 60 * 4), + client: mutexed_client, decoder_map: HashMap::new(), destination: destination, encoder: encoder, @@ -154,7 +156,6 @@ impl Connection { key: key, keepalive_timer: Timer::new(hello.heartbeat_interval), udp: udp, - sender: sender, sequence: 0, silence_frames: 0, speaking: false, @@ -228,7 +229,7 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.sender.send_json(&payload::build_keepalive())?; + self.client.lock().unwrap().send_json(&payload::build_keepalive())?; } // Send UDP keepalive if it's time @@ -263,9 +264,10 @@ impl Connection { } self.set_speaking(true)?; - let index = self.prep_packet(&mut packet, buffer, nonce)?; + let index = self.prep_packet(&mut packet, buffer, nonce)?; audio_timer.await(); + self.udp.send_to(&packet[..index], self.destination)?; self.audio_timer.reset(); @@ -363,7 +365,7 @@ impl Connection { self.speaking = speaking; - self.sender.send_json(&payload::build_speaking(speaking)) + self.client.lock().unwrap().send_json(&payload::build_speaking(speaking)) } } @@ -388,10 +390,10 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { } #[inline] -fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>) +fn encryption_key(client: &mut Client) -> Result<Key> { loop { - match receiver.recv_json(VoiceEvent::decode)? { + match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Ready(ready) => { if ready.mode != CRYPTO_MODE { return Err(Error::Voice(VoiceError::VoiceModeInvalid)); @@ -416,7 +418,7 @@ fn has_valid_mode(modes: &[String]) -> bool { } #[inline] -fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) +fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> { let (udp_close_sender, udp_close_reader) = mpsc::channel(); let (ws_close_sender, ws_close_reader) = mpsc::channel(); @@ -453,7 +455,7 @@ fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) .name(format!("{} WS", thread_name)) .spawn(move || { loop { - while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) { + while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { return; } diff --git a/src/voice/handler.rs b/src/voice/handler.rs index 22c0375..e40c6d9 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -1,9 +1,9 @@ +use serde_json::Value; use std::sync::mpsc::{self, Sender as MpscSender}; use super::{AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::Status as VoiceStatus; use ::constants::VoiceOpCode; -use ::gateway::GatewayStatus; use ::model::{ChannelId, GuildId, UserId, VoiceState}; use super::threading; @@ -99,7 +99,7 @@ pub struct Handler { /// /// When set via [`standalone`][`Handler::standalone`], it will not be /// present. - ws: Option<MpscSender<GatewayStatus>>, + ws: Option<MpscSender<Value>>, } impl Handler { @@ -113,7 +113,7 @@ impl Handler { /// [`Manager::join`]: struct.Manager.html#method.join #[doc(hidden)] #[inline] - pub fn new(guild_id: GuildId, ws: MpscSender<GatewayStatus>, user_id: UserId) -> Self { + pub fn new(guild_id: GuildId, ws: MpscSender<Value>, user_id: UserId) -> Self { Self::new_raw(guild_id, Some(ws), user_id) } @@ -357,7 +357,7 @@ impl Handler { } } - fn new_raw(guild_id: GuildId, ws: Option<MpscSender<GatewayStatus>>, user_id: UserId) -> Self { + fn new_raw(guild_id: GuildId, ws: Option<MpscSender<Value>>, user_id: UserId) -> Self { let (tx, rx) = mpsc::channel(); threading::start(guild_id, rx); @@ -418,7 +418,7 @@ impl Handler { } }); - let _ = ws.send(GatewayStatus::SendMessage(map)); + let _ = ws.send(map); } } } diff --git a/src/voice/manager.rs b/src/voice/manager.rs index fe1b489..67eb7b2 100644 --- a/src/voice/manager.rs +++ b/src/voice/manager.rs @@ -1,7 +1,7 @@ +use serde_json::Value; use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::Handler; -use ::gateway::GatewayStatus; use ::model::{ChannelId, GuildId, UserId}; /// A manager is a struct responsible for managing [`Handler`]s which belong to @@ -24,12 +24,12 @@ use ::model::{ChannelId, GuildId, UserId}; pub struct Manager { handlers: HashMap<GuildId, Handler>, user_id: UserId, - ws: MpscSender<GatewayStatus>, + ws: MpscSender<Value>, } impl Manager { #[doc(hidden)] - pub fn new(ws: MpscSender<GatewayStatus>, user_id: UserId) -> Manager { + pub fn new(ws: MpscSender<Value>, user_id: UserId) -> Manager { Manager { handlers: HashMap::new(), user_id: user_id, |