diff options
| author | Zeyla Hellyer <[email protected]> | 2017-06-10 16:06:37 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-06-10 16:06:37 -0700 |
| commit | 2f30f9ab38761aad62af977ab4440b8bfb43a897 (patch) | |
| tree | e403e3e3cd394c9ff6d05f49febe285868ae108b /src | |
| parent | Use an https connector in http::send_files (diff) | |
| download | serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.tar.xz serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.zip | |
Fix voice compilation
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/mod.rs | 82 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 21 | ||||
| -rw-r--r-- | src/voice/connection.rs | 50 | ||||
| -rw-r--r-- | src/voice/handler.rs | 10 | ||||
| -rw-r--r-- | src/voice/manager.rs | 6 |
5 files changed, 102 insertions, 67 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 91790fa..3b7a2ec 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1374,10 +1374,15 @@ fn handle_shard(info: &mut MonitorInfo) { let mut last_heartbeat_sent = UTC::now().timestamp(); loop { - let mut shard = info.shard.lock().unwrap(); - let in_secs = shard.heartbeat_interval() / 1000; + let in_secs = { + let shard = info.shard.lock().unwrap(); + + shard.heartbeat_interval() / 1000 + }; if UTC::now().timestamp() - last_heartbeat_sent > in_secs { + let mut shard = info.shard.lock().unwrap(); + // If the last heartbeat didn't receive an acknowledgement, then // shutdown and auto-reconnect. if !shard.last_heartbeat_acknowledged() { @@ -1401,46 +1406,57 @@ fn handle_shard(info: &mut MonitorInfo) { last_heartbeat_sent = UTC::now().timestamp(); } - let event = match shard.client.recv_json(GatewayEvent::decode) { - Ok(GatewayEvent::HeartbeatAck) => { - last_ack_time = UTC::now().timestamp(); + #[cfg(feature="voice")] + { + let mut shard = info.shard.lock().unwrap(); - Ok(GatewayEvent::HeartbeatAck) - }, - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time { - continue; - } + shard.cycle_voice_recv(); + } - debug!("Attempting to shutdown receiver/sender"); + let event = { + let mut shard = info.shard.lock().unwrap(); - match shard.resume() { - Ok(_) => { - debug!("Successfully resumed shard"); + let event = match shard.client.recv_json(GatewayEvent::decode) { + Ok(GatewayEvent::HeartbeatAck) => { + last_ack_time = UTC::now().timestamp(); + Ok(GatewayEvent::HeartbeatAck) + }, + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time { continue; - }, - Err(why) => { - warn!("Err resuming shard: {:?}", why); + } - return; - }, - } - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, - other => other, - }; + debug!("Attempting to shutdown receiver/sender"); - trace!("Received event on shard handler: {:?}", event); + match shard.resume() { + Ok(_) => { + debug!("Successfully resumed shard"); - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); + continue; + }, + Err(why) => { + warn!("Err resuming shard: {:?}", why); - continue; - }, + return; + }, + } + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, + other => other, + }; + + trace!("Received event on shard handler: {:?}", event); + + match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => continue, + Err(why) => { + error!("Shard handler received err: {:?}", why); + + continue; + }, + } }; feature_framework! {{ diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 69ff7b1..2dcdf38 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -1,4 +1,5 @@ use chrono::UTC; +use serde_json::Value; use std::io::Write; use std::net::Shutdown; use std::thread; @@ -17,6 +18,8 @@ use ::internal::ws_impl::{ReceiverExt, SenderExt}; use ::model::event::{Event, GatewayEvent, ReadyEvent}; use ::model::{Game, GuildId, OnlineStatus}; +#[cfg(feature="voice")] +use std::sync::mpsc::{self, Receiver as MpscReceiver}; #[cfg(feature="cache")] use ::client::CACHE; #[cfg(feature="voice")] @@ -89,6 +92,8 @@ pub struct Shard { /// update the voice connections' states. #[cfg(feature="voice")] pub manager: VoiceManager, + #[cfg(feature="voice")] + manager_rx: MpscReceiver<Value>, } impl Shard { @@ -139,6 +144,8 @@ impl Shard { let (ready, sequence) = prep::parse_ready(event, &mut client, &identification)?; Ok((feature_voice! {{ + let (tx, rx) = mpsc::channel(); + Shard { client: client, current_presence: (None, OnlineStatus::Online, false), @@ -151,6 +158,7 @@ impl Shard { shard_info: shard_info, ws_url: base_url.to_owned(), manager: VoiceManager::new(tx, ready.ready.user.id), + manager_rx: rx, } } else { Shard { @@ -667,6 +675,16 @@ impl Shard { } } + #[cfg(feature="voice")] + #[doc(hidden)] + pub fn cycle_voice_recv(&mut self) { + if let Ok(v) = self.manager_rx.try_recv() { + if let Err(why) = self.client.send_json(&v) { + warn!("Err sending voice msg: {:?}", why); + } + } + } + #[doc(hidden)] pub fn heartbeat(&mut self) -> Result<()> { let map = json!({ @@ -830,12 +848,11 @@ fn connect(base_url: &str) -> Result<WsClient> { let url = prep::build_gateway_url(base_url)?; let client = ClientBuilder::from_url(&url).connect_secure(None)?; - let timeout = StdDuration::from_secs(1); + let timeout = StdDuration::from_millis(250); { let stream = client.stream_ref().as_tcp(); stream.set_read_timeout(Some(timeout))?; - stream.set_read_timeout(Some(timeout))?; } Ok(client) diff --git a/src/voice/connection.rs b/src/voice/connection.rs index 698f469..5bc4f36 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -11,24 +11,24 @@ use std::collections::HashMap; use std::io::Write; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; +use std::sync::{Arc, Mutex}; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::{CRYPTO_MODE, VoiceError, payload}; -use websocket::client::request::Url as WebsocketUrl; -use websocket::client::{ - Client as WsClient, - Receiver as WsReceiver, - Sender as WsSender -}; -use websocket::stream::WebSocketStream; +use websocket::client::Url as WebsocketUrl; +use websocket::sync::client::ClientBuilder; +use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream}; +use websocket::sync::Client as WsClient; use ::internal::prelude::*; use ::internal::ws_impl::{ReceiverExt, SenderExt}; use ::internal::Timer; use ::model::event::VoiceEvent; use ::model::UserId; +type Client = WsClient<TlsStream<TcpStream>>; + enum ReceiverStatus { Udp(Vec<u8>), Websocket(VoiceEvent), @@ -46,13 +46,13 @@ struct ThreadItems { #[allow(dead_code)] pub struct Connection { audio_timer: Timer, + client: Arc<Mutex<Client>>, decoder_map: HashMap<(u32, Channels), OpusDecoder>, destination: SocketAddr, encoder: OpusEncoder, encoder_stereo: bool, keepalive_timer: Timer, key: Key, - sender: WsSender<WebSocketStream>, sequence: u16, silence_frames: u8, speaking: bool, @@ -67,17 +67,15 @@ impl Connection { pub fn new(mut info: ConnectionInfo) -> Result<Connection> { let url = generate_url(&mut info.endpoint)?; - let response = WsClient::connect(url)?.send()?; - response.validate()?; - let (mut sender, mut receiver) = response.begin().split(); + let mut client = ClientBuilder::from_url(&url).connect_secure(None)?; - sender.send_json(&payload::build_identify(&info))?; + client.send_json(&payload::build_identify(&info))?; let hello = { let hello; loop { - match receiver.recv_json(VoiceEvent::decode)? { + match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Hello(received_hello) => { hello = received_hello; @@ -134,12 +132,15 @@ impl Connection { let port_pos = len - 2; let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?; - sender.send_json(&payload::build_select_protocol(addr, port))?; + client.send_json(&payload::build_select_protocol(addr, port))?; } - let key = encryption_key(&mut receiver)?; + let key = encryption_key(&mut client)?; + + let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25))); - let thread_items = start_threads(receiver, &udp)?; + let mutexed_client = Arc::new(Mutex::new(client)); + let thread_items = start_threads(mutexed_client.clone(), &udp)?; info!("[Voice] Connected to: {}", info.endpoint); @@ -147,6 +148,7 @@ impl Connection { Ok(Connection { audio_timer: Timer::new(1000 * 60 * 4), + client: mutexed_client, decoder_map: HashMap::new(), destination: destination, encoder: encoder, @@ -154,7 +156,6 @@ impl Connection { key: key, keepalive_timer: Timer::new(hello.heartbeat_interval), udp: udp, - sender: sender, sequence: 0, silence_frames: 0, speaking: false, @@ -228,7 +229,7 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.sender.send_json(&payload::build_keepalive())?; + self.client.lock().unwrap().send_json(&payload::build_keepalive())?; } // Send UDP keepalive if it's time @@ -263,9 +264,10 @@ impl Connection { } self.set_speaking(true)?; - let index = self.prep_packet(&mut packet, buffer, nonce)?; + let index = self.prep_packet(&mut packet, buffer, nonce)?; audio_timer.await(); + self.udp.send_to(&packet[..index], self.destination)?; self.audio_timer.reset(); @@ -363,7 +365,7 @@ impl Connection { self.speaking = speaking; - self.sender.send_json(&payload::build_speaking(speaking)) + self.client.lock().unwrap().send_json(&payload::build_speaking(speaking)) } } @@ -388,10 +390,10 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { } #[inline] -fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>) +fn encryption_key(client: &mut Client) -> Result<Key> { loop { - match receiver.recv_json(VoiceEvent::decode)? { + match client.recv_json(VoiceEvent::decode)? { VoiceEvent::Ready(ready) => { if ready.mode != CRYPTO_MODE { return Err(Error::Voice(VoiceError::VoiceModeInvalid)); @@ -416,7 +418,7 @@ fn has_valid_mode(modes: &[String]) -> bool { } #[inline] -fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) +fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket) -> Result<ThreadItems> { let (udp_close_sender, udp_close_reader) = mpsc::channel(); let (ws_close_sender, ws_close_reader) = mpsc::channel(); @@ -453,7 +455,7 @@ fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) .name(format!("{} WS", thread_name)) .spawn(move || { loop { - while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) { + while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) { if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() { return; } diff --git a/src/voice/handler.rs b/src/voice/handler.rs index 22c0375..e40c6d9 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -1,9 +1,9 @@ +use serde_json::Value; use std::sync::mpsc::{self, Sender as MpscSender}; use super::{AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::Status as VoiceStatus; use ::constants::VoiceOpCode; -use ::gateway::GatewayStatus; use ::model::{ChannelId, GuildId, UserId, VoiceState}; use super::threading; @@ -99,7 +99,7 @@ pub struct Handler { /// /// When set via [`standalone`][`Handler::standalone`], it will not be /// present. - ws: Option<MpscSender<GatewayStatus>>, + ws: Option<MpscSender<Value>>, } impl Handler { @@ -113,7 +113,7 @@ impl Handler { /// [`Manager::join`]: struct.Manager.html#method.join #[doc(hidden)] #[inline] - pub fn new(guild_id: GuildId, ws: MpscSender<GatewayStatus>, user_id: UserId) -> Self { + pub fn new(guild_id: GuildId, ws: MpscSender<Value>, user_id: UserId) -> Self { Self::new_raw(guild_id, Some(ws), user_id) } @@ -357,7 +357,7 @@ impl Handler { } } - fn new_raw(guild_id: GuildId, ws: Option<MpscSender<GatewayStatus>>, user_id: UserId) -> Self { + fn new_raw(guild_id: GuildId, ws: Option<MpscSender<Value>>, user_id: UserId) -> Self { let (tx, rx) = mpsc::channel(); threading::start(guild_id, rx); @@ -418,7 +418,7 @@ impl Handler { } }); - let _ = ws.send(GatewayStatus::SendMessage(map)); + let _ = ws.send(map); } } } diff --git a/src/voice/manager.rs b/src/voice/manager.rs index fe1b489..67eb7b2 100644 --- a/src/voice/manager.rs +++ b/src/voice/manager.rs @@ -1,7 +1,7 @@ +use serde_json::Value; use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::Handler; -use ::gateway::GatewayStatus; use ::model::{ChannelId, GuildId, UserId}; /// A manager is a struct responsible for managing [`Handler`]s which belong to @@ -24,12 +24,12 @@ use ::model::{ChannelId, GuildId, UserId}; pub struct Manager { handlers: HashMap<GuildId, Handler>, user_id: UserId, - ws: MpscSender<GatewayStatus>, + ws: MpscSender<Value>, } impl Manager { #[doc(hidden)] - pub fn new(ws: MpscSender<GatewayStatus>, user_id: UserId) -> Manager { + pub fn new(ws: MpscSender<Value>, user_id: UserId) -> Manager { Manager { handlers: HashMap::new(), user_id: user_id, |