aboutsummaryrefslogtreecommitdiff
path: root/src/voice
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-10 16:06:37 -0700
committerZeyla Hellyer <[email protected]>2017-06-10 16:06:37 -0700
commit2f30f9ab38761aad62af977ab4440b8bfb43a897 (patch)
treee403e3e3cd394c9ff6d05f49febe285868ae108b /src/voice
parentUse an https connector in http::send_files (diff)
downloadserenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.tar.xz
serenity-2f30f9ab38761aad62af977ab4440b8bfb43a897.zip
Fix voice compilation
Diffstat (limited to 'src/voice')
-rw-r--r--src/voice/connection.rs50
-rw-r--r--src/voice/handler.rs10
-rw-r--r--src/voice/manager.rs6
3 files changed, 34 insertions, 32 deletions
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index 698f469..5bc4f36 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -11,24 +11,24 @@ use std::collections::HashMap;
use std::io::Write;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
+use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::{CRYPTO_MODE, VoiceError, payload};
-use websocket::client::request::Url as WebsocketUrl;
-use websocket::client::{
- Client as WsClient,
- Receiver as WsReceiver,
- Sender as WsSender
-};
-use websocket::stream::WebSocketStream;
+use websocket::client::Url as WebsocketUrl;
+use websocket::sync::client::ClientBuilder;
+use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream};
+use websocket::sync::Client as WsClient;
use ::internal::prelude::*;
use ::internal::ws_impl::{ReceiverExt, SenderExt};
use ::internal::Timer;
use ::model::event::VoiceEvent;
use ::model::UserId;
+type Client = WsClient<TlsStream<TcpStream>>;
+
enum ReceiverStatus {
Udp(Vec<u8>),
Websocket(VoiceEvent),
@@ -46,13 +46,13 @@ struct ThreadItems {
#[allow(dead_code)]
pub struct Connection {
audio_timer: Timer,
+ client: Arc<Mutex<Client>>,
decoder_map: HashMap<(u32, Channels), OpusDecoder>,
destination: SocketAddr,
encoder: OpusEncoder,
encoder_stereo: bool,
keepalive_timer: Timer,
key: Key,
- sender: WsSender<WebSocketStream>,
sequence: u16,
silence_frames: u8,
speaking: bool,
@@ -67,17 +67,15 @@ impl Connection {
pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
let url = generate_url(&mut info.endpoint)?;
- let response = WsClient::connect(url)?.send()?;
- response.validate()?;
- let (mut sender, mut receiver) = response.begin().split();
+ let mut client = ClientBuilder::from_url(&url).connect_secure(None)?;
- sender.send_json(&payload::build_identify(&info))?;
+ client.send_json(&payload::build_identify(&info))?;
let hello = {
let hello;
loop {
- match receiver.recv_json(VoiceEvent::decode)? {
+ match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Hello(received_hello) => {
hello = received_hello;
@@ -134,12 +132,15 @@ impl Connection {
let port_pos = len - 2;
let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
- sender.send_json(&payload::build_select_protocol(addr, port))?;
+ client.send_json(&payload::build_select_protocol(addr, port))?;
}
- let key = encryption_key(&mut receiver)?;
+ let key = encryption_key(&mut client)?;
+
+ let _ = client.stream_ref().as_tcp().set_read_timeout(Some(Duration::from_millis(25)));
- let thread_items = start_threads(receiver, &udp)?;
+ let mutexed_client = Arc::new(Mutex::new(client));
+ let thread_items = start_threads(mutexed_client.clone(), &udp)?;
info!("[Voice] Connected to: {}", info.endpoint);
@@ -147,6 +148,7 @@ impl Connection {
Ok(Connection {
audio_timer: Timer::new(1000 * 60 * 4),
+ client: mutexed_client,
decoder_map: HashMap::new(),
destination: destination,
encoder: encoder,
@@ -154,7 +156,6 @@ impl Connection {
key: key,
keepalive_timer: Timer::new(hello.heartbeat_interval),
udp: udp,
- sender: sender,
sequence: 0,
silence_frames: 0,
speaking: false,
@@ -228,7 +229,7 @@ impl Connection {
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- self.sender.send_json(&payload::build_keepalive())?;
+ self.client.lock().unwrap().send_json(&payload::build_keepalive())?;
}
// Send UDP keepalive if it's time
@@ -263,9 +264,10 @@ impl Connection {
}
self.set_speaking(true)?;
- let index = self.prep_packet(&mut packet, buffer, nonce)?;
+ let index = self.prep_packet(&mut packet, buffer, nonce)?;
audio_timer.await();
+
self.udp.send_to(&packet[..index], self.destination)?;
self.audio_timer.reset();
@@ -363,7 +365,7 @@ impl Connection {
self.speaking = speaking;
- self.sender.send_json(&payload::build_speaking(speaking))
+ self.client.lock().unwrap().send_json(&payload::build_speaking(speaking))
}
}
@@ -388,10 +390,10 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
}
#[inline]
-fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+fn encryption_key(client: &mut Client)
-> Result<Key> {
loop {
- match receiver.recv_json(VoiceEvent::decode)? {
+ match client.recv_json(VoiceEvent::decode)? {
VoiceEvent::Ready(ready) => {
if ready.mode != CRYPTO_MODE {
return Err(Error::Voice(VoiceError::VoiceModeInvalid));
@@ -416,7 +418,7 @@ fn has_valid_mode(modes: &[String]) -> bool {
}
#[inline]
-fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+fn start_threads(client: Arc<Mutex<Client>>, udp: &UdpSocket)
-> Result<ThreadItems> {
let (udp_close_sender, udp_close_reader) = mpsc::channel();
let (ws_close_sender, ws_close_reader) = mpsc::channel();
@@ -453,7 +455,7 @@ fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
.name(format!("{} WS", thread_name))
.spawn(move || {
loop {
- while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) {
+ while let Ok(msg) = client.lock().unwrap().recv_json(VoiceEvent::decode) {
if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
return;
}
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
index 22c0375..e40c6d9 100644
--- a/src/voice/handler.rs
+++ b/src/voice/handler.rs
@@ -1,9 +1,9 @@
+use serde_json::Value;
use std::sync::mpsc::{self, Sender as MpscSender};
use super::{AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::Status as VoiceStatus;
use ::constants::VoiceOpCode;
-use ::gateway::GatewayStatus;
use ::model::{ChannelId, GuildId, UserId, VoiceState};
use super::threading;
@@ -99,7 +99,7 @@ pub struct Handler {
///
/// When set via [`standalone`][`Handler::standalone`], it will not be
/// present.
- ws: Option<MpscSender<GatewayStatus>>,
+ ws: Option<MpscSender<Value>>,
}
impl Handler {
@@ -113,7 +113,7 @@ impl Handler {
/// [`Manager::join`]: struct.Manager.html#method.join
#[doc(hidden)]
#[inline]
- pub fn new(guild_id: GuildId, ws: MpscSender<GatewayStatus>, user_id: UserId) -> Self {
+ pub fn new(guild_id: GuildId, ws: MpscSender<Value>, user_id: UserId) -> Self {
Self::new_raw(guild_id, Some(ws), user_id)
}
@@ -357,7 +357,7 @@ impl Handler {
}
}
- fn new_raw(guild_id: GuildId, ws: Option<MpscSender<GatewayStatus>>, user_id: UserId) -> Self {
+ fn new_raw(guild_id: GuildId, ws: Option<MpscSender<Value>>, user_id: UserId) -> Self {
let (tx, rx) = mpsc::channel();
threading::start(guild_id, rx);
@@ -418,7 +418,7 @@ impl Handler {
}
});
- let _ = ws.send(GatewayStatus::SendMessage(map));
+ let _ = ws.send(map);
}
}
}
diff --git a/src/voice/manager.rs b/src/voice/manager.rs
index fe1b489..67eb7b2 100644
--- a/src/voice/manager.rs
+++ b/src/voice/manager.rs
@@ -1,7 +1,7 @@
+use serde_json::Value;
use std::collections::HashMap;
use std::sync::mpsc::Sender as MpscSender;
use super::Handler;
-use ::gateway::GatewayStatus;
use ::model::{ChannelId, GuildId, UserId};
/// A manager is a struct responsible for managing [`Handler`]s which belong to
@@ -24,12 +24,12 @@ use ::model::{ChannelId, GuildId, UserId};
pub struct Manager {
handlers: HashMap<GuildId, Handler>,
user_id: UserId,
- ws: MpscSender<GatewayStatus>,
+ ws: MpscSender<Value>,
}
impl Manager {
#[doc(hidden)]
- pub fn new(ws: MpscSender<GatewayStatus>, user_id: UserId) -> Manager {
+ pub fn new(ws: MpscSender<Value>, user_id: UserId) -> Manager {
Manager {
handlers: HashMap::new(),
user_id: user_id,