aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-10 16:06:37 -0700
committerZeyla Hellyer <[email protected]>2017-06-10 16:06:37 -0700
commit2f30f9ab38761aad62af977ab4440b8bfb43a897 (patch)
treee403e3e3cd394c9ff6d05f49febe285868ae108b /src
parentUse an https connector in http::send_files (diff)
downloadserenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.tar.xz
serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.zip
Fix voice compilation
Diffstat (limited to 'src')
-rw-r--r--src/client/mod.rs82
-rw-r--r--src/gateway/shard.rs21
-rw-r--r--src/voice/connection.rs50
-rw-r--r--src/voice/handler.rs10
-rw-r--r--src/voice/manager.rs6
5 files changed, 102 insertions, 67 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 91790fa..3b7a2ec 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -1374,10 +1374,15 @@ fn handle_shard(info: &mut MonitorInfo) {
let mut last_heartbeat_sent = UTC::now().timestamp();
loop {
- let mut shard = info.shard.lock().unwrap();
- let in_secs = shard.heartbeat_interval() / 1000;
+ let in_secs = {
+ let shard = info.shard.lock().unwrap();
+
+ shard.heartbeat_interval() / 1000
+ };
if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
+ let mut shard = info.shard.lock().unwrap();
+
// If the last heartbeat didn't receive an acknowledgement, then
// shutdown and auto-reconnect.
if !shard.last_heartbeat_acknowledged() {
@@ -1401,46 +1406,57 @@ fn handle_shard(info: &mut MonitorInfo) {
last_heartbeat_sent = UTC::now().timestamp();
}
- let event = match shard.client.recv_json(GatewayEvent::decode) {
- Ok(GatewayEvent::HeartbeatAck) => {
- last_ack_time = UTC::now().timestamp();
+ #[cfg(feature="voice")]
+ {
+ let mut shard = info.shard.lock().unwrap();
- Ok(GatewayEvent::HeartbeatAck)
- },
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
- continue;
- }
+ shard.cycle_voice_recv();
+ }
- debug!("Attempting to shutdown receiver/sender");
+ let event = {
+ let mut shard = info.shard.lock().unwrap();
- match shard.resume() {
- Ok(_) => {
- debug!("Successfully resumed shard");
+ let event = match shard.client.recv_json(GatewayEvent::decode) {
+ Ok(GatewayEvent::HeartbeatAck) => {
+ last_ack_time = UTC::now().timestamp();
+ Ok(GatewayEvent::HeartbeatAck)
+ },
+ Err(Error::WebSocket(WebSocketError::IoError(_))) => {
+ if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
continue;
- },
- Err(why) => {
- warn!("Err resuming shard: {:?}", why);
+ }
- return;
- },
- }
- },
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
- other => other,
- };
+ debug!("Attempting to shutdown receiver/sender");
- trace!("Received event on shard handler: {:?}", event);
+ match shard.resume() {
+ Ok(_) => {
+ debug!("Successfully resumed shard");
- let event = match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => continue,
- Err(why) => {
- error!("Shard handler received err: {:?}", why);
+ continue;
+ },
+ Err(why) => {
+ warn!("Err resuming shard: {:?}", why);
- continue;
- },
+ return;
+ },
+ }
+ },
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
+ other => other,
+ };
+
+ trace!("Received event on shard handler: {:?}", event);
+
+ match shard.handle_event(event) {
+ Ok(Some(event)) => event,
+ Ok(None) => continue,
+ Err(why) => {
+ error!("Shard handler received err: {:?}", why);
+
+ continue;
+ },
+ }
};
feature_framework! {{
diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs
index 69ff7b1..2dcdf38 100644
--- a/src/gateway/shard.rs
+++ b/src/gateway/shard.rs
@@ -1,4 +1,5 @@
use chrono::UTC;
+use serde_json::Value;
use std::io::Write;
use std::net::Shutdown;
use std::thread;
@@ -17,6 +18,8 @@ use ::internal::ws_impl::{ReceiverExt, SenderExt};
use ::model::event::{Event, GatewayEvent, ReadyEvent};
use ::model::{Game, GuildId, OnlineStatus};
+#[cfg(feature="voice")]
+use std::sync::mpsc::{self, Receiver as MpscReceiver};
#[cfg(feature="cache")]
use ::client::CACHE;
#[cfg(feature="voice")]
@@ -89,6 +92,8 @@ pub struct Shard {
/// update the voice connections' states.
#[cfg(feature="voice")]
pub manager: VoiceManager,
+ #[cfg(feature="voice")]
+ manager_rx: MpscReceiver<Value>,
}
impl Shard {
@@ -139,6 +144,8 @@ impl Shard {
let (ready, sequence) = prep::parse_ready(event, &mut client, &identification)?;
Ok((feature_voice! {{
+ let (tx, rx) = mpsc::channel();
+
Shard {
client: client,
current_presence: (None, OnlineStatus::Online, false),
@@ -151,6 +158,7 @@ impl Shard {
shard_info: shard_info,
ws_url: base_url.to_owned(),
manager: VoiceManager::new(tx, ready.ready.user.id),
+ manager_rx: rx,
}
} else {
Shard {
@@ -667,6 +675,16 @@ impl Shard {
}
}
+ #[cfg(feature="voice")]
+ #[doc(hidden)]
+ pub fn cycle_voice_recv(&mut self) {
+ if let Ok(v) = self.manager_rx.try_recv() {
+ if let Err(why) = self.client.send_json(&v) {
+ warn!("Err sending voice msg: {:?}", why);
+ }
+ }
+ }
+
#[doc(hidden)]
pub fn heartbeat(&mut self) -> Result<()> {
let map = json!({
@@ -830,12 +848,11 @@ fn connect(base_url: &str) -> Result<WsClient> {
let url = prep::build_gateway_url(base_url)?;
let client = ClientBuilder::from_url(&url).connect_secure(None)?;
- let timeout = StdDuration::from_secs(1);
+ let timeout = StdDuration::from_millis(250);
{
let stream = client.stream_ref().as_tcp();
stream.set_read_timeout(Some(timeout))?;
- stream.set_read_timeout(Some(timeout))?;
}
Ok(client)
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index 698f469..5bc4f36 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -11,24 +11,24 @@ use std::collections::HashMap;
use std::io::Write;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
+use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::{CRYPTO_MODE, VoiceError, payload};
-use websocket::client::request::Url as WebsocketUrl;
-use websocket::client::{
- Client as WsClient,
- Receiver as WsReceiver,
- Sender as WsSender
-};
-use websocket::stream::WebSocketStream;
+use websocket::client::Url as WebsocketUrl;
+use websocket::sync::client::ClientBuilder;
+use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream};
+use websocket::sync::Client as WsClient;
use ::internal::prelude::*;
use ::internal::ws_impl::{ReceiverExt, SenderExt};
use ::internal::Timer;
use ::model::event::VoiceEvent;
use ::model::UserId;
+type Client = WsClient<TlsStream<TcpStream>>;
+
enum ReceiverStatus {
Udp(Vec<u8>),
Websocket(VoiceEvent),
@@ -46,13 +46,13 @@ struct ThreadItems {
#[allow(dead_code)]
pub struct Connection {
audio_timer: Timer,
+ client: Arc<Mutex<Client>>,
decoder_map: HashMap<(u32, Channels), OpusDecoder>,
destination: SocketAddr,
encoder: OpusEncoder,
encoder_stereo: bool,
keepalive_timer: Timer,
key: Key,
- sender: WsSender<WebSocketStream>,
sequence: u16,
silence_frames: u8,
speaking: bool,
@@ -67,17 +67,15 @@ impl Connection {
pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
let url = generate_url(&mut info.endpoint)?;
- let response = WsClient::connect(url)?.send()?;
- response.validate()?;
- let (mut sender, mut receiver) = response.begin().split();
+ let mut client = ClientBuilder::from_url(&url).connect_secure(None)?;
- sender.send_json(&payload::build_identify(&info))?;
+ client.send_json(&payload::build_identify(&info))?;
let hello = {
let hello;
loop {
- match receiver.recv_json(VoiceEvent::decode)? {
+ match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Hello(received_hello) => {
hello = received_hello;
@@ -134,12 +132,15 @@ impl Connection {
let port_pos = len - 2;
let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
- sender.send_json(&payload::build_select_protocol(addr, port))?;
+ client.send_json(&payload::build_select_protocol(addr, port))?;
}
- let key = encryption_key(&mut receiver)?;
+ let key = encryption_key(&mut client)?;
+
+ let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25)));
- let thread_items = start_threads(receiver, &udp)?;
+ let mutexed_client = Arc::new(Mutex::new(client));
+ let thread_items = start_threads(mutexed_client.clone(), &udp)?;
info!("[Voice] Connected to: {}", info.endpoint);
@@ -147,6 +148,7 @@ impl Connection {
Ok(Connection {
audio_timer: Timer::new(1000 * 60 * 4),
+ client: mutexed_client,
decoder_map: HashMap::new(),
destination: destination,
encoder: encoder,
@@ -154,7 +156,6 @@ impl Connection {
key: key,
keepalive_timer: Timer::new(hello.heartbeat_interval),
udp: udp,
- sender: sender,
sequence: 0,
silence_frames: 0,
speaking: false,
@@ -228,7 +229,7 @@ impl Connection {
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- self.sender.send_json(&payload::build_keepalive())?;
+ self.client.lock().unwrap().send_json(&payload::build_keepalive())?;
}
// Send UDP keepalive if it's time
@@ -263,9 +264,10 @@ impl Connection {
}
self.set_speaking(true)?;
- let index = self.prep_packet(&mut packet, buffer, nonce)?;
+ let index = self.prep_packet(&mut packet, buffer, nonce)?;
audio_timer.await();
+
self.udp.send_to(&packet[..index], self.destination)?;
self.audio_timer.reset();
@@ -363,7 +365,7 @@ impl Connection {
self.speaking = speaking;
- self.sender.send_json(&payload::build_speaking(speaking))
+ self.client.lock().unwrap().send_json(&payload::build_speaking(speaking))
}
}
@@ -388,10 +390,10 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
}
#[inline]
-fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+fn encryption_key(client: &mut Client)
-> Result<Key> {
loop {
- match receiver.recv_json(VoiceEvent::decode)? {
+ match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Ready(ready) => {
if ready.mode != CRYPTO_MODE {
return Err(Error::Voice(VoiceError::VoiceModeInvalid));
@@ -416,7 +418,7 @@ fn has_valid_mode(modes: &[String]) -> bool {
}
#[inline]
-fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
-> Result<ThreadItems> {
let (udp_close_sender, udp_close_reader) = mpsc::channel();
let (ws_close_sender, ws_close_reader) = mpsc::channel();
@@ -453,7 +455,7 @@ fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
.name(format!("{} WS", thread_name))
.spawn(move || {
loop {
- while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) {
+ while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
return;
}
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
index 22c0375..e40c6d9 100644
--- a/src/voice/handler.rs
+++ b/src/voice/handler.rs
@@ -1,9 +1,9 @@
+use serde_json::Value;
use std::sync::mpsc::{self, Sender as MpscSender};
use super::{AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::Status as VoiceStatus;
use ::constants::VoiceOpCode;
-use ::gateway::GatewayStatus;
use ::model::{ChannelId, GuildId, UserId, VoiceState};
use super::threading;
@@ -99,7 +99,7 @@ pub struct Handler {
///
/// When set via [`standalone`][`Handler::standalone`], it will not be
/// present.
- ws: Option<MpscSender<GatewayStatus>>,
+ ws: Option<MpscSender<Value>>,
}
impl Handler {
@@ -113,7 +113,7 @@ impl Handler {
/// [`Manager::join`]: struct.Manager.html#method.join
#[doc(hidden)]
#[inline]
- pub fn new(guild_id: GuildId, ws: MpscSender<GatewayStatus>, user_id: UserId) -> Self {
+ pub fn new(guild_id: GuildId, ws: MpscSender<Value>, user_id: UserId) -> Self {
Self::new_raw(guild_id, Some(ws), user_id)
}
@@ -357,7 +357,7 @@ impl Handler {
}
}
- fn new_raw(guild_id: GuildId, ws: Option<MpscSender<GatewayStatus>>, user_id: UserId) -> Self {
+ fn new_raw(guild_id: GuildId, ws: Option<MpscSender<Value>>, user_id: UserId) -> Self {
let (tx, rx) = mpsc::channel();
threading::start(guild_id, rx);
@@ -418,7 +418,7 @@ impl Handler {
}
});
- let _ = ws.send(GatewayStatus::SendMessage(map));
+ let _ = ws.send(map);
}
}
}
diff --git a/src/voice/manager.rs b/src/voice/manager.rs
index fe1b489..67eb7b2 100644
--- a/src/voice/manager.rs
+++ b/src/voice/manager.rs
@@ -1,7 +1,7 @@
+use serde_json::Value;
use std::collections::HashMap;
use std::sync::mpsc::Sender as MpscSender;
use super::Handler;
-use ::gateway::GatewayStatus;
use ::model::{ChannelId, GuildId, UserId};
/// A manager is a struct responsible for managing [`Handler`]s which belong to
@@ -24,12 +24,12 @@ use ::model::{ChannelId, GuildId, UserId};
pub struct Manager {
handlers: HashMap<GuildId, Handler>,
user_id: UserId,
- ws: MpscSender<GatewayStatus>,
+ ws: MpscSender<Value>,
}
impl Manager {
#[doc(hidden)]
- pub fn new(ws: MpscSender<GatewayStatus>, user_id: UserId) -> Manager {
+ pub fn new(ws: MpscSender<Value>, user_id: UserId) -> Manager {
Manager {
handlers: HashMap::new(),
user_id: user_id,