aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-11-29 11:22:33 -0800
committerAustin Hellyer <[email protected]>2016-11-29 11:22:09 -0800
commitb7f70c6252125a1739066b531ea9e5dff07592a1 (patch)
tree9845d27a7c978d5488bf5bd8ffc04cfbcd184448
parentRemove duplicated gateway logic (diff)
downloadserenity-b7f70c6252125a1739066b531ea9e5dff07592a1.tar.xz
serenity-b7f70c6252125a1739066b531ea9e5dff07592a1.zip
Add initial audio support
Audio can be played with support by passing one of the following into the `Handler::play` method: `serenity::ext::voice::{ffmpeg, pcm, ytdl}` functions, where - `ffmpeg` accepts a path (such as a `File`); - `pcm` accepts a raw reader source; - `ytdl` accepts a URI, which works with everything youtube-dl supports: <https://github.com/rg3/youtube-dl/blob/master/docs/supportedsites.md> The source can be stopped via [`Handler::stop`]. Receive is supported through [`Handler::listen`], which accepts a `serenity::ext::voice::AudioReceiver` implementation. An example is provided in the form of the file located at `./examples/07_voice.rs`, which can be run by cloning the repo and performing the command `cargo run --example 07_voice`. Prior to running the command, set a bot token as the value of the env variable `DISCORD_TOKEN`. The example supports: - `deafen`: deafens the bot; - `join`: joins a voice channel by ID. The example is a primitive implementation, and requires the ID of the channel to be passed to the bot as a command of `~join 131937933270712320`; - `leave`: leaves the current voice channel, if in one; - `mute`: mutes the bot and will continue to play source audio; - `play`: plays source audio from a URI, through a command like `~play https://www.youtube.com/watch?v=5KJjBRm0ElA`; - `ping`: responds with "Pong!" to ensure the bot is working properly; - `undeafen`: undeafens the bot, if that's actually a word; - `unmute`: unmutes the bot. Documentation for audio can be found at: <https://serenity.zey.moe/serenity/ext/voice/index.html>
-rw-r--r--examples/07_voice.rs156
-rw-r--r--src/client/gateway/shard.rs2
-rw-r--r--src/client/mod.rs1
-rw-r--r--src/error.rs40
-rw-r--r--src/ext/voice/audio.rs5
-rw-r--r--src/ext/voice/connection.rs413
-rw-r--r--src/ext/voice/connection_info.rs3
-rw-r--r--src/ext/voice/error.rs8
-rw-r--r--src/ext/voice/handler.rs54
-rw-r--r--src/ext/voice/mod.rs5
-rw-r--r--src/ext/voice/payload.rs48
-rw-r--r--src/ext/voice/streamer.rs149
-rw-r--r--src/ext/voice/threading.rs10
-rw-r--r--src/model/voice.rs23
14 files changed, 713 insertions, 204 deletions
diff --git a/examples/07_voice.rs b/examples/07_voice.rs
index 1f9a999..e685f37 100644
--- a/examples/07_voice.rs
+++ b/examples/07_voice.rs
@@ -5,8 +5,12 @@ extern crate serenity;
#[cfg(feature = "voice")]
use serenity::client::{CACHE, Client, Context};
#[cfg(feature = "voice")]
+use serenity::ext::voice;
+#[cfg(feature = "voice")]
use serenity::model::{ChannelId, Message};
#[cfg(feature = "voice")]
+use serenity::Result as SerenityResult;
+#[cfg(feature = "voice")]
use std::env;
#[cfg(not(feature = "voice"))]
@@ -21,19 +25,24 @@ fn main() {
.expect("Expected a token in the environment");
let mut client = Client::login_bot(&token);
- client.with_framework(|f|
- f.configure(|c| c.prefix("~"))
+ client.with_framework(|f| f
+ .configure(|c| c
+ .prefix("~")
+ .on_mention(true))
.on("deafen", deafen)
.on("join", join)
.on("leave", leave)
.on("mute", mute)
- .on("ping", ping));
+ .on("play", play)
+ .on("ping", ping)
+ .on("undeafen", undeafen)
+ .on("unmute", unmute));
client.on_ready(|_context, ready| {
println!("{} is connected!", ready.user.name);
});
- let _ = client.start();
+ let _ = client.start().map_err(|why| println!("Client ended: {:?}", why));
}
#[cfg(feature = "voice")]
@@ -41,7 +50,7 @@ fn deafen(context: &Context, message: &Message, _args: Vec<String>) {
let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
Some(channel) => channel.guild_id,
None => {
- let _ = context.say("Groups and DMs not supported");
+ check_msg(context.say("Groups and DMs not supported"));
return;
},
@@ -52,18 +61,18 @@ fn deafen(context: &Context, message: &Message, _args: Vec<String>) {
let handler = match shard.manager.get(guild_id) {
Some(handler) => handler,
None => {
- let _ = message.reply("Not in a voice channel");
+ check_msg(message.reply("Not in a voice channel"));
return;
},
};
if handler.is_deafened() {
- let _ = context.say("Already deafened");
+ check_msg(context.say("Already deafened"));
} else {
handler.deafen(true);
- let _ = context.say("Deafened");
+ check_msg(context.say("Deafened"));
}
}
@@ -73,13 +82,13 @@ fn join(context: &Context, message: &Message, args: Vec<String>) {
Some(arg) => match arg.parse::<u64>() {
Ok(id) => ChannelId(id),
Err(_why) => {
- let _ = message.reply("Invalid voice channel ID given");
+ check_msg(message.reply("Invalid voice channel ID given"));
return;
},
},
None => {
- let _ = message.reply("Requires a voice channel ID be given");
+ check_msg(message.reply("Requires a voice channel ID be given"));
return;
},
@@ -88,18 +97,16 @@ fn join(context: &Context, message: &Message, args: Vec<String>) {
let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
Some(channel) => channel.guild_id,
None => {
- let _ = context.say("Groups and DMs not supported");
+ check_msg(context.say("Groups and DMs not supported"));
return;
},
};
let mut shard = context.shard.lock().unwrap();
- let mut manager = &mut shard.manager;
+ shard.manager.join(Some(guild_id), connect_to);
- let _handler = manager.join(Some(guild_id), connect_to);
-
- let _ = context.say(&format!("Joined {}", connect_to.mention()));
+ check_msg(context.say(&format!("Joined {}", connect_to.mention())));
}
#[cfg(feature = "voice")]
@@ -107,23 +114,21 @@ fn leave(context: &Context, message: &Message, _args: Vec<String>) {
let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
Some(channel) => channel.guild_id,
None => {
- let _ = context.say("Groups and DMs not supported");
+ check_msg(context.say("Groups and DMs not supported"));
return;
},
};
- let is_connected = match context.shard.lock().unwrap().manager.get(guild_id) {
- Some(handler) => handler.channel().is_some(),
- None => false,
- };
+ let mut shard = context.shard.lock().unwrap();
+ let has_handler = shard.manager.get(guild_id).is_some();
- if is_connected {
- context.shard.lock().unwrap().manager.remove(guild_id);
+ if has_handler {
+ shard.manager.remove(guild_id);
- let _ = context.say("Left voice channel");
+ check_msg(context.say("Left voice channel"));
} else {
- let _ = message.reply("Not in a voice channel");
+ check_msg(message.reply("Not in a voice channel"));
}
}
@@ -132,7 +137,7 @@ fn mute(context: &Context, message: &Message, _args: Vec<String>) {
let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
Some(channel) => channel.guild_id,
None => {
- let _ = context.say("Groups and DMs not supported");
+ check_msg(context.say("Groups and DMs not supported"));
return;
},
@@ -143,22 +148,115 @@ fn mute(context: &Context, message: &Message, _args: Vec<String>) {
let handler = match shard.manager.get(guild_id) {
Some(handler) => handler,
None => {
- let _ = message.reply("Not in a voice channel");
+ check_msg(message.reply("Not in a voice channel"));
return;
},
};
if handler.is_muted() {
- let _ = context.say("Already muted");
+ check_msg(context.say("Already muted"));
} else {
handler.mute(true);
- let _ = context.say("Now muted");
+ check_msg(context.say("Now muted"));
}
}
#[cfg(feature = "voice")]
fn ping(context: &Context, _message: &Message, _args: Vec<String>) {
- let _ = context.say("Pong!");
+ check_msg(context.say("Pong!"));
+}
+
+#[cfg(feature = "voice")]
+fn play(context: &Context, message: &Message, args: Vec<String>) {
+ let url = match args.get(0) {
+ Some(url) => url,
+ None => {
+ check_msg(context.say("Must provide a URL to a video or audio"));
+
+ return;
+ },
+ };
+
+ if !url.starts_with("http") {
+ check_msg(context.say("Must provide a valid URL"));
+
+ return;
+ }
+
+ let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
+ Some(channel) => channel.guild_id,
+ None => {
+ check_msg(context.say("Error finding channel info"));
+
+ return;
+ },
+ };
+
+ if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) {
+ let source = match voice::ytdl(url) {
+ Ok(source) => source,
+ Err(why) => {
+ println!("Err starting source: {:?}", why);
+
+ check_msg(context.say("Error sourcing ffmpeg"));
+
+ return;
+ },
+ };
+
+ handler.play(source);
+
+ check_msg(context.say("Playing song"));
+ } else {
+ check_msg(context.say("Not in a voice channel to play in"));
+ }
+}
+
+#[cfg(feature = "voice")]
+fn undeafen(context: &Context, message: &Message, _args: Vec<String>) {
+ let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
+ Some(channel) => channel.guild_id,
+ None => {
+ check_msg(context.say("Error finding channel info"));
+
+ return;
+ },
+ };
+
+ if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) {
+ handler.deafen(false);
+
+ check_msg(context.say("Undeafened"));
+ } else {
+ check_msg(context.say("Not in a voice channel to undeafen in"));
+ }
+}
+
+#[cfg(feature = "voice")]
+fn unmute(context: &Context, message: &Message, _args: Vec<String>) {
+ let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) {
+ Some(channel) => channel.guild_id,
+ None => {
+ check_msg(context.say("Error finding channel info"));
+
+ return;
+ },
+ };
+
+ if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) {
+ handler.mute(false);
+
+ check_msg(context.say("Unmuted"));
+ } else {
+ check_msg(context.say("Not in a voice channel to undeafen in"));
+ }
+}
+
+/// Checks that a message successfully sent; if not, then logs why to stdout.
+fn check_msg(result: SerenityResult<Message>) {
+ if let Err(why) = result {
+ println!("Error sending message: {:?}", why);
+ }
}
diff --git a/src/client/gateway/shard.rs b/src/client/gateway/shard.rs
index ead7fd0..13a8c06 100644
--- a/src/client/gateway/shard.rs
+++ b/src/client/gateway/shard.rs
@@ -6,7 +6,7 @@ use std::thread::{self, Builder as ThreadBuilder};
use std::time::Duration as StdDuration;
use std::mem;
use super::super::login_type::LoginType;
-use super::super::{Client, rest};
+use super::super::rest;
use super::{GatewayError, GatewayStatus, prep};
use websocket::client::{Client as WsClient, Sender, Receiver};
use websocket::message::Message as WsMessage;
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 7248405..41e982a 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -54,7 +54,6 @@ use ::model::event::{
GuildSyncEvent,
MessageUpdateEvent,
PresenceUpdateEvent,
- ReadyEvent,
ResumedEvent,
TypingStartEvent,
VoiceServerUpdateEvent,
diff --git a/src/error.rs b/src/error.rs
index 6c534fb..94958fd 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -7,6 +7,8 @@ use serde_json::Value;
use websocket::result::WebSocketError;
use ::client::gateway::GatewayError;
use ::client::ClientError;
+#[cfg(feature = "opus")]
+use opus::Error as OpusError;
#[cfg(feature="voice")]
use ::ext::voice::VoiceError;
@@ -59,36 +61,46 @@ pub enum Error {
Other(&'static str),
/// An error from the `url` crate.
Url(String),
+ /// An error from the `rust-websocket` crate.
+ WebSocket(WebSocketError),
+ /// An error from the `opus` crate.
+ #[cfg(feature = "voice")]
+ Opus(OpusError),
/// Indicating an error within the [voice module].
///
/// [voice module]: ext/voice/index.html
- #[cfg(feature="voice")]
+ #[cfg(feature = "voice")]
Voice(VoiceError),
- /// An error from the `rust-websocket` crate.
- WebSocket(WebSocketError),
}
impl From<IoError> for Error {
- fn from(err: IoError) -> Error {
- Error::Io(err)
+ fn from(e: IoError) -> Error {
+ Error::Io(e)
}
}
impl From<HyperError> for Error {
- fn from(err: HyperError) -> Error {
- Error::Hyper(err)
+ fn from(e: HyperError) -> Error {
+ Error::Hyper(e)
}
}
impl From<JsonError> for Error {
- fn from(err: JsonError) -> Error {
- Error::Json(err)
+ fn from(e: JsonError) -> Error {
+ Error::Json(e)
+ }
+}
+
+#[cfg(feature = "voice")]
+impl From<OpusError> for Error {
+ fn from(e: OpusError) -> Error {
+ Error::Opus(e)
}
}
impl From<WebSocketError> for Error {
- fn from(err: WebSocketError) -> Error {
- Error::WebSocket(err)
+ fn from(e: WebSocketError) -> Error {
+ Error::WebSocket(e)
}
}
@@ -96,9 +108,11 @@ impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Hyper(ref inner) => inner.fmt(f),
+ Error::Io(ref inner) => inner.fmt(f),
Error::Json(ref inner) => inner.fmt(f),
Error::WebSocket(ref inner) => inner.fmt(f),
- Error::Io(ref inner) => inner.fmt(f),
+ #[cfg(feature = "voice")]
+ Error::Opus(ref inner) => inner.fmt(f),
_ => f.write_str(self.description()),
}
}
@@ -116,6 +130,8 @@ impl StdError for Error {
Error::Url(ref inner) => inner,
Error::WebSocket(ref inner) => inner.description(),
#[cfg(feature = "voice")]
+ Error::Opus(ref inner) => inner.description(),
+ #[cfg(feature = "voice")]
Error::Voice(_) => "Voice error",
}
}
diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs
index e49bd2a..814cd69 100644
--- a/src/ext/voice/audio.rs
+++ b/src/ext/voice/audio.rs
@@ -1,5 +1,8 @@
use ::model::UserId;
+pub const HEADER_LEN: usize = 12;
+pub const SAMPLE_RATE: u32 = 48000;
+
/// A readable audio source.
pub trait AudioSource: Send {
fn is_stereo(&mut self) -> bool;
@@ -9,7 +12,7 @@ pub trait AudioSource: Send {
/// A receiver for incoming audio.
pub trait AudioReceiver: Send {
- fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool);
+ fn speaking_update(&mut self, ssrc: u32, user_id: u64, speaking: bool);
fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]);
}
diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs
index 1cbc5bf..8f1821d 100644
--- a/src/ext/voice/connection.rs
+++ b/src/ext/voice/connection.rs
@@ -1,12 +1,21 @@
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
-use serde_json::builder::ObjectBuilder;
-use sodiumoxide::crypto::secretbox::Key;
+use opus::{
+ Channels,
+ CodingMode,
+ Decoder as OpusDecoder,
+ Encoder as OpusEncoder,
+ packet as opus_packet,
+};
+use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
+use std::collections::HashMap;
+use std::io::Write;
use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket};
-use std::sync::mpsc::{self, Receiver as MpscReceiver};
-use std::thread::{self, Builder as ThreadBuilder};
-use super::audio::{AudioReceiver, AudioSource};
+use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
+use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
+use std::time::Duration;
+use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
-use super::{CRYPTO_MODE, VoiceError};
+use super::{CRYPTO_MODE, VoiceError, payload};
use websocket::client::request::Url as WebsocketUrl;
use websocket::client::{
Client as WsClient,
@@ -14,29 +23,40 @@ use websocket::client::{
Sender as WsSender
};
use websocket::stream::WebSocketStream;
-use ::client::CACHE;
-use ::constants::VoiceOpCode;
use ::internal::prelude::*;
use ::internal::ws_impl::{ReceiverExt, SenderExt};
use ::internal::Timer;
use ::model::VoiceEvent;
-pub enum ReceiverStatus {
+enum ReceiverStatus {
Udp(Vec<u8>),
Websocket(VoiceEvent),
}
#[allow(dead_code)]
+struct ThreadItems {
+ rx: MpscReceiver<ReceiverStatus>,
+ udp_close_sender: MpscSender<i32>,
+ udp_thread: JoinHandle<()>,
+ ws_close_sender: MpscSender<i32>,
+ ws_thread: JoinHandle<()>,
+}
+
+#[allow(dead_code)]
pub struct Connection {
audio_timer: Timer,
+ decoder_map: HashMap<(u32, Channels), OpusDecoder>,
destination: SocketAddr,
+ encoder: OpusEncoder,
+ encoder_stereo: bool,
keepalive_timer: Timer,
key: Key,
- receive_channel: MpscReceiver<ReceiverStatus>,
sender: WsSender<WebSocketStream>,
- sequence: u64,
+ sequence: u16,
+ silence_frames: u8,
speaking: bool,
ssrc: u32,
+ thread_items: ThreadItems,
timestamp: u32,
udp: UdpSocket,
}
@@ -49,50 +69,97 @@ impl Connection {
try!(response.validate());
let (mut sender, mut receiver) = response.begin().split();
- try!(sender.send_json(&identify(&info)));
+ try!(sender.send_json(&payload::build_identify(&info)));
+
+ let hello = {
+ let hello;
+
+ loop {
+ let k = receiver.recv_json(VoiceEvent::decode);
+
+ match try!(k) {
+ VoiceEvent::Hello(received_hello) => {
+ hello = received_hello;
+
+ break;
+ },
+ VoiceEvent::Heartbeat(_heartbeat) => continue,
+ other => {
+ debug!("[Voice] Expected hello/heartbeat; got: {:?}",
+ other);
+
+ return Err(Error::Voice(VoiceError::ExpectedHandshake));
+ },
+ }
+ }
- let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) {
- VoiceEvent::Handshake(handshake) => handshake,
- _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)),
+ hello
};
- if !has_valid_mode(handshake.modes) {
+ if !has_valid_mode(hello.modes) {
return Err(Error::Voice(VoiceError::VoiceModeUnavailable));
}
- let destination = {
- try!(try!((&info.endpoint[..], handshake.port)
- .to_socket_addrs())
- .next()
- .ok_or(Error::Voice(VoiceError::HostnameResolve)))
- };
+ let destination = try!(try!((&info.endpoint[..], hello.port)
+ .to_socket_addrs())
+ .next()
+ .ok_or(Error::Voice(VoiceError::HostnameResolve)));
+
+ // Important to note here: the length of the packet can be of either 4
+ // or 70 bytes. If it is 4 bytes, then we need to send a 70-byte packet
+ // to determine the IP.
+ //
+ // Past the initial 4 bytes, the packet _must_ be completely empty data.
+ //
+ // The returned packet will be a null-terminated string of the IP, and
+ // the port encoded in LE in the last two bytes of the packet.
let udp = try!(UdpSocket::bind("0.0.0.0:0"));
{
let mut bytes = [0; 70];
- try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc));
+
+ try!((&mut bytes[..]).write_u32::<BigEndian>(hello.ssrc));
try!(udp.send_to(&bytes, destination));
- }
- try!(send_acknowledgement(&mut sender, &udp));
+ let mut bytes = [0; 256];
+ let (len, _addr) = try!(udp.recv_from(&mut bytes));
+
+ // Find the position in the bytes that contains the first byte of 0,
+ // indicating the "end of the address".
+ let index = try!(bytes.iter().skip(4).position(|&x| x == 0)
+ .ok_or(Error::Voice(VoiceError::FindingByte)));
+
+ let pos = 4 + index;
+ let addr = String::from_utf8_lossy(&bytes[4..pos]);
+ let port_pos = len - 2;
+ let port = try!((&bytes[port_pos..]).read_u16::<LittleEndian>());
+
+ try!(sender.send_json(&payload::build_select_protocol(addr, port)));
+ }
let key = try!(get_encryption_key(&mut receiver));
- let receive_channel = try!(start_threads(receiver, &udp));
+ let thread_items = try!(start_threads(receiver, &udp));
info!("[Voice] Connected to: {}", info.endpoint);
+ let encoder = try!(OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio));
+
Ok(Connection {
audio_timer: Timer::new(1000 * 60 * 4),
+ decoder_map: HashMap::new(),
destination: destination,
+ encoder: encoder,
+ encoder_stereo: false,
key: key,
- keepalive_timer: Timer::new(handshake.heartbeat_interval),
- receive_channel: receive_channel,
+ keepalive_timer: Timer::new(hello.heartbeat_interval),
udp: udp,
sender: sender,
sequence: 0,
+ silence_frames: 0,
speaking: false,
- ssrc: handshake.ssrc,
+ ssrc: hello.ssrc,
+ thread_items: thread_items,
timestamp: 0,
})
}
@@ -103,15 +170,45 @@ impl Connection {
receiver: &mut Option<Box<AudioReceiver>>,
audio_timer: &mut Timer)
-> Result<()> {
+ let mut buffer = [0i16; 960 * 2];
+ let mut packet = [0u8; 512];
+ let mut nonce = secretbox::Nonce([0; 24]);
+
if let Some(receiver) = receiver.as_mut() {
- while let Ok(status) = self.receive_channel.try_recv() {
+ while let Ok(status) = self.thread_items.rx.try_recv() {
match status {
ReceiverStatus::Udp(packet) => {
- debug!("[Voice] Received UDP packet: {:?}", packet);
+ let mut handle = &packet[2..];
+ let seq = try!(handle.read_u16::<BigEndian>());
+ let timestamp = try!(handle.read_u32::<BigEndian>());
+ let ssrc = try!(handle.read_u32::<BigEndian>());
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ let channels = try!(opus_packet::get_nb_channels(&decrypted));
+
+ let entry = self.decoder_map.entry((ssrc, channels))
+ .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE,
+ channels)
+ .unwrap());
+
+ let len = try!(entry.decode(&decrypted, &mut buffer, false));
+
+ let is_stereo = channels == Channels::Stereo;
+
+ let b = if is_stereo {
+ len * 2
+ } else {
+ len
+ };
+
+ receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
+ }
},
ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
receiver.speaking_update(ev.ssrc,
- &ev.user_id,
+ ev.user_id.0,
ev.speaking);
},
ReceiverStatus::Websocket(other) => {
@@ -121,12 +218,16 @@ impl Connection {
}
}
} else {
- while let Ok(_) = self.receive_channel.try_recv() {}
+ loop {
+ if let Err(_why) = self.thread_items.rx.try_recv() {
+ break;
+ }
+ }
}
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- try!(self.sender.send_json(&keepalive()));
+ try!(self.sender.send_json(&payload::build_keepalive()));
}
// Send the UDP keepalive if it's time
@@ -136,31 +237,132 @@ impl Connection {
try!(self.udp.send_to(&bytes, self.destination));
}
- try!(self.speaking(true));
+ let len = try!(self.read(source, &mut buffer));
- self.sequence = self.sequence.wrapping_add(1);
- self.timestamp = self.timestamp.wrapping_add(960);
+ if len == 0 {
+ try!(self.set_speaking(false));
+
+ if self.silence_frames > 0 {
+ self.silence_frames -= 1;
+
+ for value in &mut buffer[..] {
+ *value = 0;
+ }
+ } else {
+ audio_timer.await();
+
+ return Ok(());
+ }
+ } else {
+ self.silence_frames = 5;
+
+ for value in &mut buffer[len..] {
+ *value = 0;
+ }
+ }
+
+ try!(self.set_speaking(true));
+ let index = try!(self.prep_packet(&mut packet, buffer, nonce));
audio_timer.await();
+ try!(self.udp.send_to(&packet[..index], self.destination));
self.audio_timer.reset();
+
Ok(())
}
- fn speaking(&mut self, speaking: bool) -> Result<()> {
+ fn prep_packet(&mut self,
+ packet: &mut [u8; 512],
+ buffer: [i16; 1920],
+ mut nonce: Nonce)
+ -> Result<usize> {
+ {
+ let mut cursor = &mut packet[..HEADER_LEN];
+ try!(cursor.write_all(&[0x80, 0x78]));
+ try!(cursor.write_u16::<BigEndian>(self.sequence));
+ try!(cursor.write_u32::<BigEndian>(self.timestamp));
+ try!(cursor.write_u32::<BigEndian>(self.ssrc));
+ }
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ let extent = packet.len() - 16;
+ let buffer_len = if self.encoder_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ let len = try!(self.encoder.encode(&buffer[..buffer_len],
+ &mut packet[HEADER_LEN..extent]));
+ let crypted = {
+ let slice = &packet[HEADER_LEN..HEADER_LEN + len];
+ secretbox::seal(slice, &nonce, &self.key)
+ };
+ let index = HEADER_LEN + crypted.len();
+ packet[HEADER_LEN..index].clone_from_slice(&crypted);
+
+ self.sequence = self.sequence.wrapping_add(1);
+ self.timestamp = self.timestamp.wrapping_add(960);
+
+ Ok(HEADER_LEN + crypted.len())
+ }
+
+ fn read(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ buffer: &mut [i16; 1920])
+ -> Result<usize> {
+ let mut clear = false;
+
+ let len = match source.as_mut() {
+ Some(source) => {
+ let is_stereo = source.is_stereo();
+
+ if is_stereo != self.encoder_stereo {
+ let channels = if is_stereo {
+ Channels::Stereo
+ } else {
+ Channels::Mono
+ };
+ self.encoder = try!(OpusEncoder::new(SAMPLE_RATE,
+ channels,
+ CodingMode::Audio));
+ self.encoder_stereo = is_stereo;
+ }
+
+ let buffer_len = if is_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ match source.read_frame(&mut buffer[..buffer_len]) {
+ Some(len) => len,
+ None => {
+ clear = true;
+
+ 0
+ },
+ }
+ },
+ None => 0,
+ };
+
+ if clear {
+ *source = None;
+ }
+
+ Ok(len)
+ }
+
+ fn set_speaking(&mut self, speaking: bool) -> Result<()> {
if self.speaking == speaking {
return Ok(());
}
self.speaking = speaking;
- let map = ObjectBuilder::new()
- .insert("op", VoiceOpCode::Speaking.num())
- .insert_object("d", |object| object
- .insert("delay", 0))
- .insert("speaking", speaking)
- .build();
-
- self.sender.send_json(&map)
+ self.sender.send_json(&payload::build_speaking(speaking))
}
}
@@ -168,7 +370,7 @@ impl Drop for Connection {
fn drop(&mut self) {
let _ = self.sender.get_mut().shutdown(Shutdown::Both);
- info!("Voice disconnected");
+ info!("[Voice] Disconnected");
}
}
@@ -183,7 +385,8 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
.or(Err(Error::Voice(VoiceError::EndpointUrl)))
}
-pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+#[inline]
+fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
-> Result<Key> {
loop {
match try!(receiver.recv_json(VoiceEvent::decode)) {
@@ -196,115 +399,77 @@ pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
.ok_or(Error::Voice(VoiceError::KeyGen));
},
VoiceEvent::Unknown(op, value) => {
- debug!("Unknown message type: {}/{:?}", op.num(), value);
+ debug!("[Voice] Expected ready for key; got: op{}/v{:?}",
+ op.num(),
+ value);
},
_ => {},
}
}
}
-fn identify(info: &ConnectionInfo) -> Value {
- ObjectBuilder::new()
- .insert("op", VoiceOpCode::Identify.num())
- .insert_object("d", |o| o
- .insert("server_id", info.server_id)
- .insert("session_id", &info.session_id)
- .insert("token", &info.token)
- .insert("user_id", CACHE.read().unwrap().user.id.0))
- .build()
-}
-
-#[inline(always)]
+#[inline]
fn has_valid_mode(modes: Vec<String>) -> bool {
modes.iter().any(|s| s == CRYPTO_MODE)
}
-fn keepalive() -> Value {
- ObjectBuilder::new()
- .insert("op", VoiceOpCode::KeepAlive.num())
- .insert("d", Value::Null)
- .build()
-}
-
-#[inline]
-fn select_protocol(address: &[u8], port: u16) -> Value {
- ObjectBuilder::new()
- .insert("op", VoiceOpCode::SelectProtocol.num())
- .insert_object("d", |o| o
- .insert("protocol", "udp")
- .insert_object("data", |o| o
- .insert("address", address)
- .insert("mode", "xsalsa20_poly1305")))
- .insert("port", port)
- .build()
-}
-
-#[inline]
-fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket)
- -> Result<()> {
- let mut bytes = [0; 256];
-
- let (len, _) = try!(udp.recv_from(&mut bytes));
-
- let zero_index = bytes.iter()
- .skip(4)
- .position(|&x| x == 0)
- .unwrap();
-
- let address = &bytes[4..4 + zero_index];
-
- let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>());
-
- // send the acknowledgement websocket message
- let map = select_protocol(address, port);
- sender.send_json(&map).map(|_| ())
-}
-
#[inline]
fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
- -> Result<MpscReceiver<ReceiverStatus>> {
- let thread = thread::current();
- let thread_name = thread.name().unwrap_or("serenity.rs voice");
+ -> Result<ThreadItems> {
+ let (udp_close_sender, udp_close_reader) = mpsc::channel();
+ let (ws_close_sender, ws_close_reader) = mpsc::channel();
+
+ let current_thread = thread::current();
+ let thread_name = current_thread.name().unwrap_or("serenity voice");
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
let udp_clone = try!(udp.try_clone());
- try!(ThreadBuilder::new()
- .name(format!("{} WS", thread_name))
+ let udp_thread = try!(ThreadBuilder::new()
+ .name(format!("{} UDP", thread_name))
.spawn(move || {
- loop {
- let msg = receiver.recv_json(VoiceEvent::decode);
+ let _ = udp_clone.set_read_timeout(Some(Duration::from_millis(250)));
- if let Ok(msg) = msg {
- let send = tx.send(ReceiverStatus::Websocket(msg));
+ let mut buffer = [0; 512];
+
+ loop {
+ if let Ok((len, _)) = udp_clone.recv_from(&mut buffer) {
+ let piece = buffer[..len].iter().cloned().collect();
+ let send = tx.send(ReceiverStatus::Udp(piece));
if let Err(_why) = send {
return;
}
- } else {
- break;
+ } else if let Ok(_v) = udp_close_reader.try_recv() {
+ return;
}
}
}));
- try!(ThreadBuilder::new()
- .name(format!("{} UDP", thread_name))
+ let ws_thread = try!(ThreadBuilder::new()
+ .name(format!("{} WS", thread_name))
.spawn(move || {
- let mut buffer = [0; 512];
-
loop {
- let (len, _) = udp_clone.recv_from(&mut buffer).unwrap();
- let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len]
- .iter()
- .cloned()
- .collect()));
+ while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) {
+ if let Err(_why) = tx_clone.send(ReceiverStatus::Websocket(msg)) {
+ return;
+ }
+ }
- if let Err(_why) = req {
+ if let Ok(_v) = ws_close_reader.try_recv() {
return;
}
+
+ thread::sleep(Duration::from_millis(25));
}
}));
- Ok(rx)
+ Ok(ThreadItems {
+ rx: rx,
+ udp_close_sender: udp_close_sender,
+ udp_thread: udp_thread,
+ ws_close_sender: ws_close_sender,
+ ws_thread: ws_thread,
+ })
}
diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs
index 9115385..c257e4a 100644
--- a/src/ext/voice/connection_info.rs
+++ b/src/ext/voice/connection_info.rs
@@ -1,6 +1,7 @@
+#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub endpoint: String,
- pub server_id: u64,
pub session_id: String,
+ pub target_id: u64,
pub token: String,
}
diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs
index b1f3251..a49229a 100644
--- a/src/ext/voice/error.rs
+++ b/src/ext/voice/error.rs
@@ -1,10 +1,18 @@
+use serde_json::Value;
+use std::process::Output;
+
#[derive(Debug)]
pub enum VoiceError {
// An indicator that an endpoint URL was invalid.
EndpointUrl,
ExpectedHandshake,
+ FindingByte,
HostnameResolve,
KeyGen,
+ Streams,
VoiceModeInvalid,
VoiceModeUnavailable,
+ YouTubeDLRun(Output),
+ YouTubeDLProcessing(Value),
+ YouTubeDLUrl(Value),
}
diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs
index cbdd033..16f700a 100644
--- a/src/ext/voice/handler.rs
+++ b/src/ext/voice/handler.rs
@@ -1,5 +1,6 @@
use serde_json::builder::ObjectBuilder;
-use std::sync::mpsc::{self, Sender};
+use std::sync::mpsc::{self, Sender as MpscSender};
+use super::{AudioReceiver, AudioSource};
use super::connection_info::ConnectionInfo;
use super::{Status as VoiceStatus, Target};
use ::client::gateway::GatewayStatus;
@@ -36,10 +37,10 @@ pub struct Handler {
guild_id: Option<GuildId>,
self_deaf: bool,
self_mute: bool,
- sender: Sender<VoiceStatus>,
+ sender: MpscSender<VoiceStatus>,
session_id: Option<String>,
user_id: u64,
- ws: Sender<GatewayStatus>,
+ ws: MpscSender<GatewayStatus>,
}
impl Handler {
@@ -52,7 +53,7 @@ impl Handler {
///
/// [`Manager::join`]: struct.Manager.html#method.join
#[doc(hidden)]
- pub fn new(target: Target, ws: Sender<GatewayStatus>, user_id: u64)
+ pub fn new(target: Target, ws: MpscSender<GatewayStatus>, user_id: u64)
-> Self {
let (tx, rx) = mpsc::channel();
@@ -170,6 +171,17 @@ impl Handler {
}
}
+ /// Sets a receiver, i.e. a way to receive audio. Most use cases for bots do
+ /// not require this.
+ ///
+ /// The `receiver` argument can be thought of as an "optional Option". You
+ /// can pass in just a boxed receiver, and do not need to specify `Some`.
+ ///
+ /// Pass `None` to drop the current receiver, if one exists.
+ pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O) {
+ self.send(VoiceStatus::SetReceiver(receiver.into()))
+ }
+
/// Sets whether the current connection is to be muted.
///
/// If there is no live voice connection, then this only acts as a settings
@@ -182,6 +194,19 @@ impl Handler {
}
}
+ /// Plays audio from a source. This can be a source created via
+ /// [`voice::ffmpeg`] or [`voice::ytdl`].
+ ///
+ /// [`voice::ffmpeg`]: fn.ffmpeg.html
+ /// [`voice::ytdl`]: fn.ytdl.html
+ pub fn play(&mut self, source: Box<AudioSource>) {
+ self.send(VoiceStatus::SetSender(Some(source)))
+ }
+
+ pub fn stop(&mut self) {
+ self.send(VoiceStatus::SetSender(None))
+ }
+
/// Switches the current connected voice channel to the given `channel_id`.
///
/// This has 3 separate behaviors:
@@ -243,8 +268,8 @@ impl Handler {
self.send(VoiceStatus::Connect(ConnectionInfo {
endpoint: endpoint,
- server_id: target_id,
session_id: session_id,
+ target_id: target_id,
token: token,
}))
}
@@ -264,10 +289,8 @@ impl Handler {
}
fn send(&mut self, status: VoiceStatus) {
- let send = self.sender.send(status);
-
- // Reconnect if it errored.
- if let Err(mpsc::SendError(status)) = send {
+ // Restart thread if it errored.
+ if let Err(mpsc::SendError(status)) = self.sender.send(status) {
let (tx, rx) = mpsc::channel();
self.sender = tx;
@@ -282,15 +305,14 @@ impl Handler {
/// You probably shouldn't use this if you're reading the source code.
#[doc(hidden)]
pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) {
- if let Some(ref endpoint) = *endpoint {
- let endpoint = endpoint.clone();
+ if let Some(endpoint) = endpoint.clone() {
let token = token.to_owned();
- let session_id = match self.session_id {
- Some(ref session_id) => session_id.clone(),
- None => return,
- };
- self.connect_with_data(session_id, endpoint, token);
+ if let Some(session_id) = self.session_id.clone() {
+ self.connect_with_data(session_id, endpoint, token);
+ } else {
+ self.endpoint_token = Some((endpoint, token));
+ }
} else {
self.leave();
}
diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs
index 107a473..eb63f27 100644
--- a/src/ext/voice/mod.rs
+++ b/src/ext/voice/mod.rs
@@ -4,13 +4,16 @@ mod connection_info;
mod error;
mod manager;
mod handler;
+mod payload;
+mod streamer;
mod threading;
+pub use self::audio::{AudioReceiver, AudioSource};
pub use self::error::VoiceError;
pub use self::handler::Handler;
pub use self::manager::Manager;
+pub use self::streamer::{ffmpeg, ytdl};
-use self::audio::{AudioReceiver, AudioSource};
use self::connection_info::ConnectionInfo;
use ::model::{ChannelId, GuildId};
diff --git a/src/ext/voice/payload.rs b/src/ext/voice/payload.rs
new file mode 100644
index 0000000..c9ceeeb
--- /dev/null
+++ b/src/ext/voice/payload.rs
@@ -0,0 +1,48 @@
+use serde_json::builder::ObjectBuilder;
+use serde_json::Value;
+use super::connection_info::ConnectionInfo;
+use ::constants::VoiceOpCode;
+use ::client::CACHE;
+
+#[inline]
+pub fn build_identify(info: &ConnectionInfo) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Identify.num())
+ .insert_object("d", |o| o
+ .insert("server_id", info.target_id)
+ .insert("session_id", &info.session_id)
+ .insert("token", &info.token)
+ .insert("user_id", CACHE.read().unwrap().user.id.0))
+ .build()
+}
+
+#[inline]
+pub fn build_keepalive() -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::KeepAlive.num())
+ .insert("d", Value::Null)
+ .build()
+}
+
+#[inline]
+pub fn build_select_protocol(address: ::std::borrow::Cow<str>, port: u16) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::SelectProtocol.num())
+ .insert_object("d", |o| o
+ .insert("protocol", "udp")
+ .insert_object("data", |o| o
+ .insert("address", address)
+ .insert("mode", super::CRYPTO_MODE)
+ .insert("port", port)))
+ .build()
+}
+
+#[inline]
+pub fn build_speaking(speaking: bool) -> Value {
+ ObjectBuilder::new()
+ .insert("op", VoiceOpCode::Speaking.num())
+ .insert_object("d", |o| o
+ .insert("delay", 0)
+ .insert("speaking", speaking))
+ .build()
+}
diff --git a/src/ext/voice/streamer.rs b/src/ext/voice/streamer.rs
new file mode 100644
index 0000000..ed1c620
--- /dev/null
+++ b/src/ext/voice/streamer.rs
@@ -0,0 +1,149 @@
+use byteorder::{LittleEndian, ReadBytesExt};
+use serde_json;
+use std::ffi::OsStr;
+use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult};
+use std::process::{Child, Command, Stdio};
+use super::{AudioSource, VoiceError};
+use ::internal::prelude::*;
+
+struct ChildContainer(Child);
+
+impl Read for ChildContainer {
+ fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> {
+ self.0.stdout.as_mut().unwrap().read(buffer)
+ }
+}
+
+struct PcmSource<R: Read + Send + 'static>(bool, R);
+
+impl<R: Read + Send> AudioSource for PcmSource<R> {
+ fn is_stereo(&mut self) -> bool {
+ self.0
+ }
+
+ fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> {
+ for (i, v) in buffer.iter_mut().enumerate() {
+ *v = match self.1.read_i16::<LittleEndian>() {
+ Ok(v) => v,
+ Err(ref e) => return if e.kind() == IoErrorKind::UnexpectedEof {
+ Some(i)
+ } else {
+ None
+ },
+ }
+ }
+
+ Some(buffer.len())
+ }
+}
+
+pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
+ let path = path.as_ref();
+
+ /// Will fail if the path is not to a file on the fs. Likely a YouTube URI.
+ let is_stereo = is_stereo(path).unwrap_or(false);
+ let stereo_val = if is_stereo {
+ "2"
+ } else {
+ "1"
+ };
+
+ let args = [
+ "-f",
+ "s16le",
+ "-ac",
+ stereo_val,
+ "-ar",
+ "48000",
+ "-acodec",
+ "pcm_s16le",
+ "-",
+ ];
+
+ let command = try!(Command::new("ffmpeg")
+ .arg("-i")
+ .arg(path)
+ .args(&args)
+ .stderr(Stdio::null())
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .spawn());
+
+ Ok(pcm(is_stereo, ChildContainer(command)))
+}
+
+pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R)
+ -> Box<AudioSource> {
+ Box::new(PcmSource(is_stereo, reader))
+}
+
+pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> {
+ let args = [
+ "-f",
+ "webm[abr>0]/bestaudio/best",
+ "--no-playlist",
+ "--print-json",
+ "--skip-download",
+ uri,
+ ];
+
+ let out = try!(Command::new("youtube-dl")
+ .args(&args)
+ .stdin(Stdio::null())
+ .output());
+
+ if !out.status.success() {
+ return Err(Error::Voice(VoiceError::YouTubeDLRun(out)));
+ }
+
+ let value = try!(serde_json::from_reader(&out.stdout[..]));
+ let mut obj = match value {
+ Value::Object(obj) => obj,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLProcessing(other))),
+ };
+
+ let uri = match obj.remove("url") {
+ Some(v) => match v {
+ Value::String(uri) => uri,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ },
+ None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))),
+ };
+
+ ffmpeg(&uri)
+}
+
+fn is_stereo(path: &OsStr) -> Result<bool> {
+ let args = [
+ "-v",
+ "quiet",
+ "-of",
+ "json",
+ "-show-streams",
+ "-i",
+ ];
+
+ let out = try!(Command::new("ffprobe")
+ .args(&args)
+ .arg(path)
+ .stdin(Stdio::null())
+ .output());
+
+ let value: Value = try!(serde_json::from_reader(&out.stdout[..]));
+
+ let streams = try!(value.as_object()
+ .and_then(|m| m.get("streams"))
+ .and_then(|v| v.as_array())
+ .ok_or(Error::Voice(VoiceError::Streams)));
+
+ let check = streams.iter()
+ .any(|stream| {
+ let channels = stream.as_object()
+ .and_then(|m| m.get("channels")
+ .and_then(|v| v.as_i64()));
+
+ channels == Some(2)
+ });
+
+ Ok(check)
+}
diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs
index 113c23d..bbbffd1 100644
--- a/src/ext/voice/threading.rs
+++ b/src/ext/voice/threading.rs
@@ -13,7 +13,7 @@ pub fn start(target_id: Target, rx: MpscReceiver<Status>) {
ThreadBuilder::new()
.name(name)
.spawn(move || runner(rx))
- .expect("Error starting voice");
+ .expect(&format!("[Voice] Error starting target: {:?}", target_id));
}
fn runner(rx: MpscReceiver<Status>) {
@@ -27,9 +27,11 @@ fn runner(rx: MpscReceiver<Status>) {
match rx.try_recv() {
Ok(Status::Connect(info)) => {
connection = match Connection::new(info) {
- Ok(connection) => Some(connection),
+ Ok(connection) => {
+ Some(connection)
+ },
Err(why) => {
- error!("Error connecting via voice: {:?}", why);
+ warn!("[Voice] Error connecting: {:?}", why);
None
},
@@ -71,7 +73,7 @@ fn runner(rx: MpscReceiver<Status>) {
match update {
Ok(()) => false,
Err(why) => {
- error!("Error updating voice connection: {:?}", why);
+ error!("[Voice] Error updating connection: {:?}", why);
true
},
diff --git a/src/model/voice.rs b/src/model/voice.rs
index a888029..61035c6 100644
--- a/src/model/voice.rs
+++ b/src/model/voice.rs
@@ -4,16 +4,7 @@ use ::constants::VoiceOpCode;
use ::internal::prelude::*;
use ::utils::decode_array;
-#[derive(Clone, Debug)]
-pub struct VoiceHandshake {
- pub heartbeat_interval: u64,
- pub ip: Option<String>,
- pub modes: Vec<String>,
- pub port: u16,
- pub ssrc: u32,
-}
-
-#[derive(Clone, Debug)]
+#[derive(Clone, Copy, Debug)]
pub struct VoiceHeartbeat {
pub heartbeat_interval: u64,
}
@@ -21,6 +12,7 @@ pub struct VoiceHeartbeat {
#[derive(Clone, Debug)]
pub struct VoiceHello {
pub heartbeat_interval: u64,
+ pub ip: String,
pub modes: Vec<String>,
pub port: u16,
pub ssrc: u32,
@@ -32,7 +24,7 @@ pub struct VoiceReady {
pub secret_key: Vec<u8>,
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Copy, Debug)]
pub struct VoiceSpeaking {
pub speaking: bool,
pub ssrc: u32,
@@ -41,7 +33,6 @@ pub struct VoiceSpeaking {
#[derive(Clone, Debug)]
pub enum VoiceEvent {
- Handshake(VoiceHandshake),
Heartbeat(VoiceHeartbeat),
Hello(VoiceHello),
Ready(VoiceReady),
@@ -56,16 +47,20 @@ impl VoiceEvent {
let op = req!(try!(remove(&mut value, "op")).as_u64());
let mut map = try!(remove(&mut value, "d").and_then(into_map));
- match try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode))) {
+ let opcode = try!(VoiceOpCode::from_num(op)
+ .ok_or(Error::Client(ClientError::InvalidOpCode)));
+
+ match opcode {
VoiceOpCode::Heartbeat => {
missing!(map, VoiceEvent::Heartbeat(VoiceHeartbeat {
- heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()),
+ heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval")).as_u64()),
}))
},
VoiceOpCode::Hello => {
missing!(map, VoiceEvent::Hello(VoiceHello {
heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval"))
.as_u64()),
+ ip: try!(remove(&mut map, "ip").and_then(into_string)),
modes: try!(decode_array(try!(remove(&mut map, "modes")),
into_string)),
port: req!(try!(remove(&mut map, "port"))