diff options
| author | Austin Hellyer <[email protected]> | 2016-11-29 11:22:33 -0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-11-29 11:22:09 -0800 |
| commit | b7f70c6252125a1739066b531ea9e5dff07592a1 (patch) | |
| tree | 9845d27a7c978d5488bf5bd8ffc04cfbcd184448 | |
| parent | Remove duplicated gateway logic (diff) | |
| download | serenity-b7f70c6252125a1739066b531ea9e5dff07592a1.tar.xz serenity-b7f70c6252125a1739066b531ea9e5dff07592a1.zip | |
Add initial audio support
Audio can be played with support by passing one of the following into
the `Handler::play` method:
`serenity::ext::voice::{ffmpeg, pcm, ytdl}` functions, where
- `ffmpeg` accepts a path (such as a `File`);
- `pcm` accepts a raw reader source;
- `ytdl` accepts a URI, which works with everything youtube-dl supports:
<https://github.com/rg3/youtube-dl/blob/master/docs/supportedsites.md>
The source can be stopped via [`Handler::stop`].
Receive is supported through [`Handler::listen`], which accepts a
`serenity::ext::voice::AudioReceiver` implementation.
An example is provided in the form of the file located at
`./examples/07_voice.rs`, which can be run by cloning the repo and
performing the command `cargo run --example 07_voice`. Prior to running
the command, set a bot token as the value of the env variable
`DISCORD_TOKEN`. The example supports:
- `deafen`: deafens the bot;
- `join`: joins a voice channel by ID. The example is a primitive
implementation, and requires the ID of the channel to be passed to the
bot as a command of `~join 131937933270712320`;
- `leave`: leaves the current voice channel, if in one;
- `mute`: mutes the bot and will continue to play source audio;
- `play`: plays source audio from a URI, through a command like
`~play https://www.youtube.com/watch?v=5KJjBRm0ElA`;
- `ping`: responds with "Pong!" to ensure the bot is working properly;
- `undeafen`: undeafens the bot, if that's actually a word;
- `unmute`: unmutes the bot.
Documentation for audio can be found at:
<https://serenity.zey.moe/serenity/ext/voice/index.html>
| -rw-r--r-- | examples/07_voice.rs | 156 | ||||
| -rw-r--r-- | src/client/gateway/shard.rs | 2 | ||||
| -rw-r--r-- | src/client/mod.rs | 1 | ||||
| -rw-r--r-- | src/error.rs | 40 | ||||
| -rw-r--r-- | src/ext/voice/audio.rs | 5 | ||||
| -rw-r--r-- | src/ext/voice/connection.rs | 413 | ||||
| -rw-r--r-- | src/ext/voice/connection_info.rs | 3 | ||||
| -rw-r--r-- | src/ext/voice/error.rs | 8 | ||||
| -rw-r--r-- | src/ext/voice/handler.rs | 54 | ||||
| -rw-r--r-- | src/ext/voice/mod.rs | 5 | ||||
| -rw-r--r-- | src/ext/voice/payload.rs | 48 | ||||
| -rw-r--r-- | src/ext/voice/streamer.rs | 149 | ||||
| -rw-r--r-- | src/ext/voice/threading.rs | 10 | ||||
| -rw-r--r-- | src/model/voice.rs | 23 |
14 files changed, 713 insertions, 204 deletions
diff --git a/examples/07_voice.rs b/examples/07_voice.rs index 1f9a999..e685f37 100644 --- a/examples/07_voice.rs +++ b/examples/07_voice.rs @@ -5,8 +5,12 @@ extern crate serenity; #[cfg(feature = "voice")] use serenity::client::{CACHE, Client, Context}; #[cfg(feature = "voice")] +use serenity::ext::voice; +#[cfg(feature = "voice")] use serenity::model::{ChannelId, Message}; #[cfg(feature = "voice")] +use serenity::Result as SerenityResult; +#[cfg(feature = "voice")] use std::env; #[cfg(not(feature = "voice"))] @@ -21,19 +25,24 @@ fn main() { .expect("Expected a token in the environment"); let mut client = Client::login_bot(&token); - client.with_framework(|f| - f.configure(|c| c.prefix("~")) + client.with_framework(|f| f + .configure(|c| c + .prefix("~") + .on_mention(true)) .on("deafen", deafen) .on("join", join) .on("leave", leave) .on("mute", mute) - .on("ping", ping)); + .on("play", play) + .on("ping", ping) + .on("undeafen", undeafen) + .on("unmute", unmute)); client.on_ready(|_context, ready| { println!("{} is connected!", ready.user.name); }); - let _ = client.start(); + let _ = client.start().map_err(|why| println!("Client ended: {:?}", why)); } #[cfg(feature = "voice")] @@ -41,7 +50,7 @@ fn deafen(context: &Context, message: &Message, _args: Vec<String>) { let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { Some(channel) => channel.guild_id, None => { - let _ = context.say("Groups and DMs not supported"); + check_msg(context.say("Groups and DMs not supported")); return; }, @@ -52,18 +61,18 @@ fn deafen(context: &Context, message: &Message, _args: Vec<String>) { let handler = match shard.manager.get(guild_id) { Some(handler) => handler, None => { - let _ = message.reply("Not in a voice channel"); + check_msg(message.reply("Not in a voice channel")); return; }, }; if handler.is_deafened() { - let _ = context.say("Already deafened"); + check_msg(context.say("Already deafened")); } else { handler.deafen(true); - let _ = context.say("Deafened"); + check_msg(context.say("Deafened")); } } @@ -73,13 +82,13 @@ fn join(context: &Context, message: &Message, args: Vec<String>) { Some(arg) => match arg.parse::<u64>() { Ok(id) => ChannelId(id), Err(_why) => { - let _ = message.reply("Invalid voice channel ID given"); + check_msg(message.reply("Invalid voice channel ID given")); return; }, }, None => { - let _ = message.reply("Requires a voice channel ID be given"); + check_msg(message.reply("Requires a voice channel ID be given")); return; }, @@ -88,18 +97,16 @@ fn join(context: &Context, message: &Message, args: Vec<String>) { let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { Some(channel) => channel.guild_id, None => { - let _ = context.say("Groups and DMs not supported"); + check_msg(context.say("Groups and DMs not supported")); return; }, }; let mut shard = context.shard.lock().unwrap(); - let mut manager = &mut shard.manager; + shard.manager.join(Some(guild_id), connect_to); - let _handler = manager.join(Some(guild_id), connect_to); - - let _ = context.say(&format!("Joined {}", connect_to.mention())); + check_msg(context.say(&format!("Joined {}", connect_to.mention()))); } #[cfg(feature = "voice")] @@ -107,23 +114,21 @@ fn leave(context: &Context, message: &Message, _args: Vec<String>) { let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { Some(channel) => channel.guild_id, None => { - let _ = context.say("Groups and DMs not supported"); + check_msg(context.say("Groups and DMs not supported")); return; }, }; - let is_connected = match context.shard.lock().unwrap().manager.get(guild_id) { - Some(handler) => handler.channel().is_some(), - None => false, - }; + let mut shard = context.shard.lock().unwrap(); + let has_handler = shard.manager.get(guild_id).is_some(); - if is_connected { - context.shard.lock().unwrap().manager.remove(guild_id); + if has_handler { + shard.manager.remove(guild_id); - let _ = context.say("Left voice channel"); + check_msg(context.say("Left voice channel")); } else { - let _ = message.reply("Not in a voice channel"); + check_msg(message.reply("Not in a voice channel")); } } @@ -132,7 +137,7 @@ fn mute(context: &Context, message: &Message, _args: Vec<String>) { let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { Some(channel) => channel.guild_id, None => { - let _ = context.say("Groups and DMs not supported"); + check_msg(context.say("Groups and DMs not supported")); return; }, @@ -143,22 +148,115 @@ fn mute(context: &Context, message: &Message, _args: Vec<String>) { let handler = match shard.manager.get(guild_id) { Some(handler) => handler, None => { - let _ = message.reply("Not in a voice channel"); + check_msg(message.reply("Not in a voice channel")); return; }, }; if handler.is_muted() { - let _ = context.say("Already muted"); + check_msg(context.say("Already muted")); } else { handler.mute(true); - let _ = context.say("Now muted"); + check_msg(context.say("Now muted")); } } #[cfg(feature = "voice")] fn ping(context: &Context, _message: &Message, _args: Vec<String>) { - let _ = context.say("Pong!"); + check_msg(context.say("Pong!")); +} + +#[cfg(feature = "voice")] +fn play(context: &Context, message: &Message, args: Vec<String>) { + let url = match args.get(0) { + Some(url) => url, + None => { + check_msg(context.say("Must provide a URL to a video or audio")); + + return; + }, + }; + + if !url.starts_with("http") { + check_msg(context.say("Must provide a valid URL")); + + return; + } + + let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { + Some(channel) => channel.guild_id, + None => { + check_msg(context.say("Error finding channel info")); + + return; + }, + }; + + if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) { + let source = match voice::ytdl(url) { + Ok(source) => source, + Err(why) => { + println!("Err starting source: {:?}", why); + + check_msg(context.say("Error sourcing ffmpeg")); + + return; + }, + }; + + handler.play(source); + + check_msg(context.say("Playing song")); + } else { + check_msg(context.say("Not in a voice channel to play in")); + } +} + +#[cfg(feature = "voice")] +fn undeafen(context: &Context, message: &Message, _args: Vec<String>) { + let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { + Some(channel) => channel.guild_id, + None => { + check_msg(context.say("Error finding channel info")); + + return; + }, + }; + + if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) { + handler.deafen(false); + + check_msg(context.say("Undeafened")); + } else { + check_msg(context.say("Not in a voice channel to undeafen in")); + } +} + +#[cfg(feature = "voice")] +fn unmute(context: &Context, message: &Message, _args: Vec<String>) { + let guild_id = match CACHE.read().unwrap().get_guild_channel(message.channel_id) { + Some(channel) => channel.guild_id, + None => { + check_msg(context.say("Error finding channel info")); + + return; + }, + }; + + if let Some(handler) = context.shard.lock().unwrap().manager.get(guild_id) { + handler.mute(false); + + check_msg(context.say("Unmuted")); + } else { + check_msg(context.say("Not in a voice channel to undeafen in")); + } +} + +/// Checks that a message successfully sent; if not, then logs why to stdout. +fn check_msg(result: SerenityResult<Message>) { + if let Err(why) = result { + println!("Error sending message: {:?}", why); + } } diff --git a/src/client/gateway/shard.rs b/src/client/gateway/shard.rs index ead7fd0..13a8c06 100644 --- a/src/client/gateway/shard.rs +++ b/src/client/gateway/shard.rs @@ -6,7 +6,7 @@ use std::thread::{self, Builder as ThreadBuilder}; use std::time::Duration as StdDuration; use std::mem; use super::super::login_type::LoginType; -use super::super::{Client, rest}; +use super::super::rest; use super::{GatewayError, GatewayStatus, prep}; use websocket::client::{Client as WsClient, Sender, Receiver}; use websocket::message::Message as WsMessage; diff --git a/src/client/mod.rs b/src/client/mod.rs index 7248405..41e982a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -54,7 +54,6 @@ use ::model::event::{ GuildSyncEvent, MessageUpdateEvent, PresenceUpdateEvent, - ReadyEvent, ResumedEvent, TypingStartEvent, VoiceServerUpdateEvent, diff --git a/src/error.rs b/src/error.rs index 6c534fb..94958fd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,8 @@ use serde_json::Value; use websocket::result::WebSocketError; use ::client::gateway::GatewayError; use ::client::ClientError; +#[cfg(feature = "opus")] +use opus::Error as OpusError; #[cfg(feature="voice")] use ::ext::voice::VoiceError; @@ -59,36 +61,46 @@ pub enum Error { Other(&'static str), /// An error from the `url` crate. Url(String), + /// An error from the `rust-websocket` crate. + WebSocket(WebSocketError), + /// An error from the `opus` crate. + #[cfg(feature = "voice")] + Opus(OpusError), /// Indicating an error within the [voice module]. /// /// [voice module]: ext/voice/index.html - #[cfg(feature="voice")] + #[cfg(feature = "voice")] Voice(VoiceError), - /// An error from the `rust-websocket` crate. - WebSocket(WebSocketError), } impl From<IoError> for Error { - fn from(err: IoError) -> Error { - Error::Io(err) + fn from(e: IoError) -> Error { + Error::Io(e) } } impl From<HyperError> for Error { - fn from(err: HyperError) -> Error { - Error::Hyper(err) + fn from(e: HyperError) -> Error { + Error::Hyper(e) } } impl From<JsonError> for Error { - fn from(err: JsonError) -> Error { - Error::Json(err) + fn from(e: JsonError) -> Error { + Error::Json(e) + } +} + +#[cfg(feature = "voice")] +impl From<OpusError> for Error { + fn from(e: OpusError) -> Error { + Error::Opus(e) } } impl From<WebSocketError> for Error { - fn from(err: WebSocketError) -> Error { - Error::WebSocket(err) + fn from(e: WebSocketError) -> Error { + Error::WebSocket(e) } } @@ -96,9 +108,11 @@ impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Error::Hyper(ref inner) => inner.fmt(f), + Error::Io(ref inner) => inner.fmt(f), Error::Json(ref inner) => inner.fmt(f), Error::WebSocket(ref inner) => inner.fmt(f), - Error::Io(ref inner) => inner.fmt(f), + #[cfg(feature = "voice")] + Error::Opus(ref inner) => inner.fmt(f), _ => f.write_str(self.description()), } } @@ -116,6 +130,8 @@ impl StdError for Error { Error::Url(ref inner) => inner, Error::WebSocket(ref inner) => inner.description(), #[cfg(feature = "voice")] + Error::Opus(ref inner) => inner.description(), + #[cfg(feature = "voice")] Error::Voice(_) => "Voice error", } } diff --git a/src/ext/voice/audio.rs b/src/ext/voice/audio.rs index e49bd2a..814cd69 100644 --- a/src/ext/voice/audio.rs +++ b/src/ext/voice/audio.rs @@ -1,5 +1,8 @@ use ::model::UserId; +pub const HEADER_LEN: usize = 12; +pub const SAMPLE_RATE: u32 = 48000; + /// A readable audio source. pub trait AudioSource: Send { fn is_stereo(&mut self) -> bool; @@ -9,7 +12,7 @@ pub trait AudioSource: Send { /// A receiver for incoming audio. pub trait AudioReceiver: Send { - fn speaking_update(&mut self, ssrc: u32, user_id: &UserId, speaking: bool); + fn speaking_update(&mut self, ssrc: u32, user_id: u64, speaking: bool); fn voice_packet(&mut self, ssrc: u32, sequence: u16, timestamp: u32, stereo: bool, data: &[i16]); } diff --git a/src/ext/voice/connection.rs b/src/ext/voice/connection.rs index 1cbc5bf..8f1821d 100644 --- a/src/ext/voice/connection.rs +++ b/src/ext/voice/connection.rs @@ -1,12 +1,21 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; -use serde_json::builder::ObjectBuilder; -use sodiumoxide::crypto::secretbox::Key; +use opus::{ + Channels, + CodingMode, + Decoder as OpusDecoder, + Encoder as OpusEncoder, + packet as opus_packet, +}; +use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; +use std::collections::HashMap; +use std::io::Write; use std::net::{Shutdown, SocketAddr, ToSocketAddrs, UdpSocket}; -use std::sync::mpsc::{self, Receiver as MpscReceiver}; -use std::thread::{self, Builder as ThreadBuilder}; -use super::audio::{AudioReceiver, AudioSource}; +use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; +use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; +use std::time::Duration; +use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; -use super::{CRYPTO_MODE, VoiceError}; +use super::{CRYPTO_MODE, VoiceError, payload}; use websocket::client::request::Url as WebsocketUrl; use websocket::client::{ Client as WsClient, @@ -14,29 +23,40 @@ use websocket::client::{ Sender as WsSender }; use websocket::stream::WebSocketStream; -use ::client::CACHE; -use ::constants::VoiceOpCode; use ::internal::prelude::*; use ::internal::ws_impl::{ReceiverExt, SenderExt}; use ::internal::Timer; use ::model::VoiceEvent; -pub enum ReceiverStatus { +enum ReceiverStatus { Udp(Vec<u8>), Websocket(VoiceEvent), } #[allow(dead_code)] +struct ThreadItems { + rx: MpscReceiver<ReceiverStatus>, + udp_close_sender: MpscSender<i32>, + udp_thread: JoinHandle<()>, + ws_close_sender: MpscSender<i32>, + ws_thread: JoinHandle<()>, +} + +#[allow(dead_code)] pub struct Connection { audio_timer: Timer, + decoder_map: HashMap<(u32, Channels), OpusDecoder>, destination: SocketAddr, + encoder: OpusEncoder, + encoder_stereo: bool, keepalive_timer: Timer, key: Key, - receive_channel: MpscReceiver<ReceiverStatus>, sender: WsSender<WebSocketStream>, - sequence: u64, + sequence: u16, + silence_frames: u8, speaking: bool, ssrc: u32, + thread_items: ThreadItems, timestamp: u32, udp: UdpSocket, } @@ -49,50 +69,97 @@ impl Connection { try!(response.validate()); let (mut sender, mut receiver) = response.begin().split(); - try!(sender.send_json(&identify(&info))); + try!(sender.send_json(&payload::build_identify(&info))); + + let hello = { + let hello; + + loop { + let k = receiver.recv_json(VoiceEvent::decode); + + match try!(k) { + VoiceEvent::Hello(received_hello) => { + hello = received_hello; + + break; + }, + VoiceEvent::Heartbeat(_heartbeat) => continue, + other => { + debug!("[Voice] Expected hello/heartbeat; got: {:?}", + other); + + return Err(Error::Voice(VoiceError::ExpectedHandshake)); + }, + } + } - let handshake = match try!(receiver.recv_json(VoiceEvent::decode)) { - VoiceEvent::Handshake(handshake) => handshake, - _ => return Err(Error::Voice(VoiceError::ExpectedHandshake)), + hello }; - if !has_valid_mode(handshake.modes) { + if !has_valid_mode(hello.modes) { return Err(Error::Voice(VoiceError::VoiceModeUnavailable)); } - let destination = { - try!(try!((&info.endpoint[..], handshake.port) - .to_socket_addrs()) - .next() - .ok_or(Error::Voice(VoiceError::HostnameResolve))) - }; + let destination = try!(try!((&info.endpoint[..], hello.port) + .to_socket_addrs()) + .next() + .ok_or(Error::Voice(VoiceError::HostnameResolve))); + + // Important to note here: the length of the packet can be of either 4 + // or 70 bytes. If it is 4 bytes, then we need to send a 70-byte packet + // to determine the IP. + // + // Past the initial 4 bytes, the packet _must_ be completely empty data. + // + // The returned packet will be a null-terminated string of the IP, and + // the port encoded in LE in the last two bytes of the packet. let udp = try!(UdpSocket::bind("0.0.0.0:0")); { let mut bytes = [0; 70]; - try!((&mut bytes[..]).write_u32::<BigEndian>(handshake.ssrc)); + + try!((&mut bytes[..]).write_u32::<BigEndian>(hello.ssrc)); try!(udp.send_to(&bytes, destination)); - } - try!(send_acknowledgement(&mut sender, &udp)); + let mut bytes = [0; 256]; + let (len, _addr) = try!(udp.recv_from(&mut bytes)); + + // Find the position in the bytes that contains the first byte of 0, + // indicating the "end of the address". + let index = try!(bytes.iter().skip(4).position(|&x| x == 0) + .ok_or(Error::Voice(VoiceError::FindingByte))); + + let pos = 4 + index; + let addr = String::from_utf8_lossy(&bytes[4..pos]); + let port_pos = len - 2; + let port = try!((&bytes[port_pos..]).read_u16::<LittleEndian>()); + + try!(sender.send_json(&payload::build_select_protocol(addr, port))); + } let key = try!(get_encryption_key(&mut receiver)); - let receive_channel = try!(start_threads(receiver, &udp)); + let thread_items = try!(start_threads(receiver, &udp)); info!("[Voice] Connected to: {}", info.endpoint); + let encoder = try!(OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)); + Ok(Connection { audio_timer: Timer::new(1000 * 60 * 4), + decoder_map: HashMap::new(), destination: destination, + encoder: encoder, + encoder_stereo: false, key: key, - keepalive_timer: Timer::new(handshake.heartbeat_interval), - receive_channel: receive_channel, + keepalive_timer: Timer::new(hello.heartbeat_interval), udp: udp, sender: sender, sequence: 0, + silence_frames: 0, speaking: false, - ssrc: handshake.ssrc, + ssrc: hello.ssrc, + thread_items: thread_items, timestamp: 0, }) } @@ -103,15 +170,45 @@ impl Connection { receiver: &mut Option<Box<AudioReceiver>>, audio_timer: &mut Timer) -> Result<()> { + let mut buffer = [0i16; 960 * 2]; + let mut packet = [0u8; 512]; + let mut nonce = secretbox::Nonce([0; 24]); + if let Some(receiver) = receiver.as_mut() { - while let Ok(status) = self.receive_channel.try_recv() { + while let Ok(status) = self.thread_items.rx.try_recv() { match status { ReceiverStatus::Udp(packet) => { - debug!("[Voice] Received UDP packet: {:?}", packet); + let mut handle = &packet[2..]; + let seq = try!(handle.read_u16::<BigEndian>()); + let timestamp = try!(handle.read_u32::<BigEndian>()); + let ssrc = try!(handle.read_u32::<BigEndian>()); + + nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); + + if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { + let channels = try!(opus_packet::get_nb_channels(&decrypted)); + + let entry = self.decoder_map.entry((ssrc, channels)) + .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE, + channels) + .unwrap()); + + let len = try!(entry.decode(&decrypted, &mut buffer, false)); + + let is_stereo = channels == Channels::Stereo; + + let b = if is_stereo { + len * 2 + } else { + len + }; + + receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]); + } }, ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { receiver.speaking_update(ev.ssrc, - &ev.user_id, + ev.user_id.0, ev.speaking); }, ReceiverStatus::Websocket(other) => { @@ -121,12 +218,16 @@ impl Connection { } } } else { - while let Ok(_) = self.receive_channel.try_recv() {} + loop { + if let Err(_why) = self.thread_items.rx.try_recv() { + break; + } + } } // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - try!(self.sender.send_json(&keepalive())); + try!(self.sender.send_json(&payload::build_keepalive())); } // Send the UDP keepalive if it's time @@ -136,31 +237,132 @@ impl Connection { try!(self.udp.send_to(&bytes, self.destination)); } - try!(self.speaking(true)); + let len = try!(self.read(source, &mut buffer)); - self.sequence = self.sequence.wrapping_add(1); - self.timestamp = self.timestamp.wrapping_add(960); + if len == 0 { + try!(self.set_speaking(false)); + + if self.silence_frames > 0 { + self.silence_frames -= 1; + + for value in &mut buffer[..] { + *value = 0; + } + } else { + audio_timer.await(); + + return Ok(()); + } + } else { + self.silence_frames = 5; + + for value in &mut buffer[len..] { + *value = 0; + } + } + + try!(self.set_speaking(true)); + let index = try!(self.prep_packet(&mut packet, buffer, nonce)); audio_timer.await(); + try!(self.udp.send_to(&packet[..index], self.destination)); self.audio_timer.reset(); + Ok(()) } - fn speaking(&mut self, speaking: bool) -> Result<()> { + fn prep_packet(&mut self, + packet: &mut [u8; 512], + buffer: [i16; 1920], + mut nonce: Nonce) + -> Result<usize> { + { + let mut cursor = &mut packet[..HEADER_LEN]; + try!(cursor.write_all(&[0x80, 0x78])); + try!(cursor.write_u16::<BigEndian>(self.sequence)); + try!(cursor.write_u32::<BigEndian>(self.timestamp)); + try!(cursor.write_u32::<BigEndian>(self.ssrc)); + } + + nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]); + + let extent = packet.len() - 16; + let buffer_len = if self.encoder_stereo { + 960 * 2 + } else { + 960 + }; + + let len = try!(self.encoder.encode(&buffer[..buffer_len], + &mut packet[HEADER_LEN..extent])); + let crypted = { + let slice = &packet[HEADER_LEN..HEADER_LEN + len]; + secretbox::seal(slice, &nonce, &self.key) + }; + let index = HEADER_LEN + crypted.len(); + packet[HEADER_LEN..index].clone_from_slice(&crypted); + + self.sequence = self.sequence.wrapping_add(1); + self.timestamp = self.timestamp.wrapping_add(960); + + Ok(HEADER_LEN + crypted.len()) + } + + fn read(&mut self, + source: &mut Option<Box<AudioSource>>, + buffer: &mut [i16; 1920]) + -> Result<usize> { + let mut clear = false; + + let len = match source.as_mut() { + Some(source) => { + let is_stereo = source.is_stereo(); + + if is_stereo != self.encoder_stereo { + let channels = if is_stereo { + Channels::Stereo + } else { + Channels::Mono + }; + self.encoder = try!(OpusEncoder::new(SAMPLE_RATE, + channels, + CodingMode::Audio)); + self.encoder_stereo = is_stereo; + } + + let buffer_len = if is_stereo { + 960 * 2 + } else { + 960 + }; + + match source.read_frame(&mut buffer[..buffer_len]) { + Some(len) => len, + None => { + clear = true; + + 0 + }, + } + }, + None => 0, + }; + + if clear { + *source = None; + } + + Ok(len) + } + + fn set_speaking(&mut self, speaking: bool) -> Result<()> { if self.speaking == speaking { return Ok(()); } self.speaking = speaking; - let map = ObjectBuilder::new() - .insert("op", VoiceOpCode::Speaking.num()) - .insert_object("d", |object| object - .insert("delay", 0)) - .insert("speaking", speaking) - .build(); - - self.sender.send_json(&map) + self.sender.send_json(&payload::build_speaking(speaking)) } } @@ -168,7 +370,7 @@ impl Drop for Connection { fn drop(&mut self) { let _ = self.sender.get_mut().shutdown(Shutdown::Both); - info!("Voice disconnected"); + info!("[Voice] Disconnected"); } } @@ -183,7 +385,8 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { .or(Err(Error::Voice(VoiceError::EndpointUrl))) } -pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>) +#[inline] +fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>) -> Result<Key> { loop { match try!(receiver.recv_json(VoiceEvent::decode)) { @@ -196,115 +399,77 @@ pub fn get_encryption_key(receiver: &mut WsReceiver<WebSocketStream>) .ok_or(Error::Voice(VoiceError::KeyGen)); }, VoiceEvent::Unknown(op, value) => { - debug!("Unknown message type: {}/{:?}", op.num(), value); + debug!("[Voice] Expected ready for key; got: op{}/v{:?}", + op.num(), + value); }, _ => {}, } } } -fn identify(info: &ConnectionInfo) -> Value { - ObjectBuilder::new() - .insert("op", VoiceOpCode::Identify.num()) - .insert_object("d", |o| o - .insert("server_id", info.server_id) - .insert("session_id", &info.session_id) - .insert("token", &info.token) - .insert("user_id", CACHE.read().unwrap().user.id.0)) - .build() -} - -#[inline(always)] +#[inline] fn has_valid_mode(modes: Vec<String>) -> bool { modes.iter().any(|s| s == CRYPTO_MODE) } -fn keepalive() -> Value { - ObjectBuilder::new() - .insert("op", VoiceOpCode::KeepAlive.num()) - .insert("d", Value::Null) - .build() -} - -#[inline] -fn select_protocol(address: &[u8], port: u16) -> Value { - ObjectBuilder::new() - .insert("op", VoiceOpCode::SelectProtocol.num()) - .insert_object("d", |o| o - .insert("protocol", "udp") - .insert_object("data", |o| o - .insert("address", address) - .insert("mode", "xsalsa20_poly1305"))) - .insert("port", port) - .build() -} - -#[inline] -fn send_acknowledgement(sender: &mut WsSender<WebSocketStream>, udp: &UdpSocket) - -> Result<()> { - let mut bytes = [0; 256]; - - let (len, _) = try!(udp.recv_from(&mut bytes)); - - let zero_index = bytes.iter() - .skip(4) - .position(|&x| x == 0) - .unwrap(); - - let address = &bytes[4..4 + zero_index]; - - let port = try!((&bytes[len - 2..]).read_u16::<LittleEndian>()); - - // send the acknowledgement websocket message - let map = select_protocol(address, port); - sender.send_json(&map).map(|_| ()) -} - #[inline] fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket) - -> Result<MpscReceiver<ReceiverStatus>> { - let thread = thread::current(); - let thread_name = thread.name().unwrap_or("serenity.rs voice"); + -> Result<ThreadItems> { + let (udp_close_sender, udp_close_reader) = mpsc::channel(); + let (ws_close_sender, ws_close_reader) = mpsc::channel(); + + let current_thread = thread::current(); + let thread_name = current_thread.name().unwrap_or("serenity voice"); let (tx, rx) = mpsc::channel(); let tx_clone = tx.clone(); let udp_clone = try!(udp.try_clone()); - try!(ThreadBuilder::new() - .name(format!("{} WS", thread_name)) + let udp_thread = try!(ThreadBuilder::new() + .name(format!("{} UDP", thread_name)) .spawn(move || { - loop { - let msg = receiver.recv_json(VoiceEvent::decode); + let _ = udp_clone.set_read_timeout(Some(Duration::from_millis(250))); - if let Ok(msg) = msg { - let send = tx.send(ReceiverStatus::Websocket(msg)); + let mut buffer = [0; 512]; + + loop { + if let Ok((len, _)) = udp_clone.recv_from(&mut buffer) { + let piece = buffer[..len].iter().cloned().collect(); + let send = tx.send(ReceiverStatus::Udp(piece)); if let Err(_why) = send { return; } - } else { - break; + } else if let Ok(_v) = udp_close_reader.try_recv() { + return; } } })); - try!(ThreadBuilder::new() - .name(format!("{} UDP", thread_name)) + let ws_thread = try!(ThreadBuilder::new() + .name(format!("{} WS", thread_name)) .spawn(move || { - let mut buffer = [0; 512]; - loop { - let (len, _) = udp_clone.recv_from(&mut buffer).unwrap(); - let req = tx_clone.send(ReceiverStatus::Udp(buffer[..len] - .iter() - .cloned() - .collect())); + while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) { + if let Err(_why) = tx_clone.send(ReceiverStatus::Websocket(msg)) { + return; + } + } - if let Err(_why) = req { + if let Ok(_v) = ws_close_reader.try_recv() { return; } + + thread::sleep(Duration::from_millis(25)); } })); - Ok(rx) + Ok(ThreadItems { + rx: rx, + udp_close_sender: udp_close_sender, + udp_thread: udp_thread, + ws_close_sender: ws_close_sender, + ws_thread: ws_thread, + }) } diff --git a/src/ext/voice/connection_info.rs b/src/ext/voice/connection_info.rs index 9115385..c257e4a 100644 --- a/src/ext/voice/connection_info.rs +++ b/src/ext/voice/connection_info.rs @@ -1,6 +1,7 @@ +#[derive(Clone, Debug)] pub struct ConnectionInfo { pub endpoint: String, - pub server_id: u64, pub session_id: String, + pub target_id: u64, pub token: String, } diff --git a/src/ext/voice/error.rs b/src/ext/voice/error.rs index b1f3251..a49229a 100644 --- a/src/ext/voice/error.rs +++ b/src/ext/voice/error.rs @@ -1,10 +1,18 @@ +use serde_json::Value; +use std::process::Output; + #[derive(Debug)] pub enum VoiceError { // An indicator that an endpoint URL was invalid. EndpointUrl, ExpectedHandshake, + FindingByte, HostnameResolve, KeyGen, + Streams, VoiceModeInvalid, VoiceModeUnavailable, + YouTubeDLRun(Output), + YouTubeDLProcessing(Value), + YouTubeDLUrl(Value), } diff --git a/src/ext/voice/handler.rs b/src/ext/voice/handler.rs index cbdd033..16f700a 100644 --- a/src/ext/voice/handler.rs +++ b/src/ext/voice/handler.rs @@ -1,5 +1,6 @@ use serde_json::builder::ObjectBuilder; -use std::sync::mpsc::{self, Sender}; +use std::sync::mpsc::{self, Sender as MpscSender}; +use super::{AudioReceiver, AudioSource}; use super::connection_info::ConnectionInfo; use super::{Status as VoiceStatus, Target}; use ::client::gateway::GatewayStatus; @@ -36,10 +37,10 @@ pub struct Handler { guild_id: Option<GuildId>, self_deaf: bool, self_mute: bool, - sender: Sender<VoiceStatus>, + sender: MpscSender<VoiceStatus>, session_id: Option<String>, user_id: u64, - ws: Sender<GatewayStatus>, + ws: MpscSender<GatewayStatus>, } impl Handler { @@ -52,7 +53,7 @@ impl Handler { /// /// [`Manager::join`]: struct.Manager.html#method.join #[doc(hidden)] - pub fn new(target: Target, ws: Sender<GatewayStatus>, user_id: u64) + pub fn new(target: Target, ws: MpscSender<GatewayStatus>, user_id: u64) -> Self { let (tx, rx) = mpsc::channel(); @@ -170,6 +171,17 @@ impl Handler { } } + /// Sets a receiver, i.e. a way to receive audio. Most use cases for bots do + /// not require this. + /// + /// The `receiver` argument can be thought of as an "optional Option". You + /// can pass in just a boxed receiver, and do not need to specify `Some`. + /// + /// Pass `None` to drop the current receiver, if one exists. + pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O) { + self.send(VoiceStatus::SetReceiver(receiver.into())) + } + /// Sets whether the current connection is to be muted. /// /// If there is no live voice connection, then this only acts as a settings @@ -182,6 +194,19 @@ impl Handler { } } + /// Plays audio from a source. This can be a source created via + /// [`voice::ffmpeg`] or [`voice::ytdl`]. + /// + /// [`voice::ffmpeg`]: fn.ffmpeg.html + /// [`voice::ytdl`]: fn.ytdl.html + pub fn play(&mut self, source: Box<AudioSource>) { + self.send(VoiceStatus::SetSender(Some(source))) + } + + pub fn stop(&mut self) { + self.send(VoiceStatus::SetSender(None)) + } + /// Switches the current connected voice channel to the given `channel_id`. /// /// This has 3 separate behaviors: @@ -243,8 +268,8 @@ impl Handler { self.send(VoiceStatus::Connect(ConnectionInfo { endpoint: endpoint, - server_id: target_id, session_id: session_id, + target_id: target_id, token: token, })) } @@ -264,10 +289,8 @@ impl Handler { } fn send(&mut self, status: VoiceStatus) { - let send = self.sender.send(status); - - // Reconnect if it errored. - if let Err(mpsc::SendError(status)) = send { + // Restart thread if it errored. + if let Err(mpsc::SendError(status)) = self.sender.send(status) { let (tx, rx) = mpsc::channel(); self.sender = tx; @@ -282,15 +305,14 @@ impl Handler { /// You probably shouldn't use this if you're reading the source code. #[doc(hidden)] pub fn update_server(&mut self, endpoint: &Option<String>, token: &str) { - if let Some(ref endpoint) = *endpoint { - let endpoint = endpoint.clone(); + if let Some(endpoint) = endpoint.clone() { let token = token.to_owned(); - let session_id = match self.session_id { - Some(ref session_id) => session_id.clone(), - None => return, - }; - self.connect_with_data(session_id, endpoint, token); + if let Some(session_id) = self.session_id.clone() { + self.connect_with_data(session_id, endpoint, token); + } else { + self.endpoint_token = Some((endpoint, token)); + } } else { self.leave(); } diff --git a/src/ext/voice/mod.rs b/src/ext/voice/mod.rs index 107a473..eb63f27 100644 --- a/src/ext/voice/mod.rs +++ b/src/ext/voice/mod.rs @@ -4,13 +4,16 @@ mod connection_info; mod error; mod manager; mod handler; +mod payload; +mod streamer; mod threading; +pub use self::audio::{AudioReceiver, AudioSource}; pub use self::error::VoiceError; pub use self::handler::Handler; pub use self::manager::Manager; +pub use self::streamer::{ffmpeg, ytdl}; -use self::audio::{AudioReceiver, AudioSource}; use self::connection_info::ConnectionInfo; use ::model::{ChannelId, GuildId}; diff --git a/src/ext/voice/payload.rs b/src/ext/voice/payload.rs new file mode 100644 index 0000000..c9ceeeb --- /dev/null +++ b/src/ext/voice/payload.rs @@ -0,0 +1,48 @@ +use serde_json::builder::ObjectBuilder; +use serde_json::Value; +use super::connection_info::ConnectionInfo; +use ::constants::VoiceOpCode; +use ::client::CACHE; + +#[inline] +pub fn build_identify(info: &ConnectionInfo) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::Identify.num()) + .insert_object("d", |o| o + .insert("server_id", info.target_id) + .insert("session_id", &info.session_id) + .insert("token", &info.token) + .insert("user_id", CACHE.read().unwrap().user.id.0)) + .build() +} + +#[inline] +pub fn build_keepalive() -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::KeepAlive.num()) + .insert("d", Value::Null) + .build() +} + +#[inline] +pub fn build_select_protocol(address: ::std::borrow::Cow<str>, port: u16) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::SelectProtocol.num()) + .insert_object("d", |o| o + .insert("protocol", "udp") + .insert_object("data", |o| o + .insert("address", address) + .insert("mode", super::CRYPTO_MODE) + .insert("port", port))) + .build() +} + +#[inline] +pub fn build_speaking(speaking: bool) -> Value { + ObjectBuilder::new() + .insert("op", VoiceOpCode::Speaking.num()) + .insert_object("d", |o| o + .insert("delay", 0) + .insert("speaking", speaking)) + .build() +} diff --git a/src/ext/voice/streamer.rs b/src/ext/voice/streamer.rs new file mode 100644 index 0000000..ed1c620 --- /dev/null +++ b/src/ext/voice/streamer.rs @@ -0,0 +1,149 @@ +use byteorder::{LittleEndian, ReadBytesExt}; +use serde_json; +use std::ffi::OsStr; +use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult}; +use std::process::{Child, Command, Stdio}; +use super::{AudioSource, VoiceError}; +use ::internal::prelude::*; + +struct ChildContainer(Child); + +impl Read for ChildContainer { + fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> { + self.0.stdout.as_mut().unwrap().read(buffer) + } +} + +struct PcmSource<R: Read + Send + 'static>(bool, R); + +impl<R: Read + Send> AudioSource for PcmSource<R> { + fn is_stereo(&mut self) -> bool { + self.0 + } + + fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> { + for (i, v) in buffer.iter_mut().enumerate() { + *v = match self.1.read_i16::<LittleEndian>() { + Ok(v) => v, + Err(ref e) => return if e.kind() == IoErrorKind::UnexpectedEof { + Some(i) + } else { + None + }, + } + } + + Some(buffer.len()) + } +} + +pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> { + let path = path.as_ref(); + + /// Will fail if the path is not to a file on the fs. Likely a YouTube URI. + let is_stereo = is_stereo(path).unwrap_or(false); + let stereo_val = if is_stereo { + "2" + } else { + "1" + }; + + let args = [ + "-f", + "s16le", + "-ac", + stereo_val, + "-ar", + "48000", + "-acodec", + "pcm_s16le", + "-", + ]; + + let command = try!(Command::new("ffmpeg") + .arg("-i") + .arg(path) + .args(&args) + .stderr(Stdio::null()) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .spawn()); + + Ok(pcm(is_stereo, ChildContainer(command))) +} + +pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) + -> Box<AudioSource> { + Box::new(PcmSource(is_stereo, reader)) +} + +pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> { + let args = [ + "-f", + "webm[abr>0]/bestaudio/best", + "--no-playlist", + "--print-json", + "--skip-download", + uri, + ]; + + let out = try!(Command::new("youtube-dl") + .args(&args) + .stdin(Stdio::null()) + .output()); + + if !out.status.success() { + return Err(Error::Voice(VoiceError::YouTubeDLRun(out))); + } + + let value = try!(serde_json::from_reader(&out.stdout[..])); + let mut obj = match value { + Value::Object(obj) => obj, + other => return Err(Error::Voice(VoiceError::YouTubeDLProcessing(other))), + }; + + let uri = match obj.remove("url") { + Some(v) => match v { + Value::String(uri) => uri, + other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))), + }, + None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))), + }; + + ffmpeg(&uri) +} + +fn is_stereo(path: &OsStr) -> Result<bool> { + let args = [ + "-v", + "quiet", + "-of", + "json", + "-show-streams", + "-i", + ]; + + let out = try!(Command::new("ffprobe") + .args(&args) + .arg(path) + .stdin(Stdio::null()) + .output()); + + let value: Value = try!(serde_json::from_reader(&out.stdout[..])); + + let streams = try!(value.as_object() + .and_then(|m| m.get("streams")) + .and_then(|v| v.as_array()) + .ok_or(Error::Voice(VoiceError::Streams))); + + let check = streams.iter() + .any(|stream| { + let channels = stream.as_object() + .and_then(|m| m.get("channels") + .and_then(|v| v.as_i64())); + + channels == Some(2) + }); + + Ok(check) +} diff --git a/src/ext/voice/threading.rs b/src/ext/voice/threading.rs index 113c23d..bbbffd1 100644 --- a/src/ext/voice/threading.rs +++ b/src/ext/voice/threading.rs @@ -13,7 +13,7 @@ pub fn start(target_id: Target, rx: MpscReceiver<Status>) { ThreadBuilder::new() .name(name) .spawn(move || runner(rx)) - .expect("Error starting voice"); + .expect(&format!("[Voice] Error starting target: {:?}", target_id)); } fn runner(rx: MpscReceiver<Status>) { @@ -27,9 +27,11 @@ fn runner(rx: MpscReceiver<Status>) { match rx.try_recv() { Ok(Status::Connect(info)) => { connection = match Connection::new(info) { - Ok(connection) => Some(connection), + Ok(connection) => { + Some(connection) + }, Err(why) => { - error!("Error connecting via voice: {:?}", why); + warn!("[Voice] Error connecting: {:?}", why); None }, @@ -71,7 +73,7 @@ fn runner(rx: MpscReceiver<Status>) { match update { Ok(()) => false, Err(why) => { - error!("Error updating voice connection: {:?}", why); + error!("[Voice] Error updating connection: {:?}", why); true }, diff --git a/src/model/voice.rs b/src/model/voice.rs index a888029..61035c6 100644 --- a/src/model/voice.rs +++ b/src/model/voice.rs @@ -4,16 +4,7 @@ use ::constants::VoiceOpCode; use ::internal::prelude::*; use ::utils::decode_array; -#[derive(Clone, Debug)] -pub struct VoiceHandshake { - pub heartbeat_interval: u64, - pub ip: Option<String>, - pub modes: Vec<String>, - pub port: u16, - pub ssrc: u32, -} - -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct VoiceHeartbeat { pub heartbeat_interval: u64, } @@ -21,6 +12,7 @@ pub struct VoiceHeartbeat { #[derive(Clone, Debug)] pub struct VoiceHello { pub heartbeat_interval: u64, + pub ip: String, pub modes: Vec<String>, pub port: u16, pub ssrc: u32, @@ -32,7 +24,7 @@ pub struct VoiceReady { pub secret_key: Vec<u8>, } -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct VoiceSpeaking { pub speaking: bool, pub ssrc: u32, @@ -41,7 +33,6 @@ pub struct VoiceSpeaking { #[derive(Clone, Debug)] pub enum VoiceEvent { - Handshake(VoiceHandshake), Heartbeat(VoiceHeartbeat), Hello(VoiceHello), Ready(VoiceReady), @@ -56,16 +47,20 @@ impl VoiceEvent { let op = req!(try!(remove(&mut value, "op")).as_u64()); let mut map = try!(remove(&mut value, "d").and_then(into_map)); - match try!(VoiceOpCode::from_num(op).ok_or(Error::Client(ClientError::InvalidOpCode))) { + let opcode = try!(VoiceOpCode::from_num(op) + .ok_or(Error::Client(ClientError::InvalidOpCode))); + + match opcode { VoiceOpCode::Heartbeat => { missing!(map, VoiceEvent::Heartbeat(VoiceHeartbeat { - heartbeat_interval: req!(try!(remove(&mut value, "heartbeat_interval")).as_u64()), + heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval")).as_u64()), })) }, VoiceOpCode::Hello => { missing!(map, VoiceEvent::Hello(VoiceHello { heartbeat_interval: req!(try!(remove(&mut map, "heartbeat_interval")) .as_u64()), + ip: try!(remove(&mut map, "ip").and_then(into_string)), modes: try!(decode_array(try!(remove(&mut map, "modes")), into_string)), port: req!(try!(remove(&mut map, "port")) |