diff options
| author | Kyle Simpson <[email protected]> | 2018-02-27 18:49:03 +0000 |
|---|---|---|
| committer | Ken Swenson <[email protected]> | 2018-11-06 20:27:11 -0500 |
| commit | b8e562d58d7f8270e3166e0facc080f205c17d1b (patch) | |
| tree | d73e5051249b1169fe7fe44983d8013ec5bbfd80 /src | |
| parent | Make MessageUpdateEvent::embeds a Vec<Embed> (diff) | |
| download | serenity-b8e562d58d7f8270e3166e0facc080f205c17d1b.tar.xz serenity-b8e562d58d7f8270e3166e0facc080f205c17d1b.zip | |
Voice fixes, better API adherence, bitrate control, documentation
* Fixing silence frame.
* Messed that one up while fighting with the borrow checker. Sorry!
* Initial machinery for playback position tracking
* Mix multiple input AudioType::Opus streams
* Encode for stereo, use nicer "soft clip"
* First stab at docs for Audio.
* Better-er docs for voice::Audio etc.
* Bitrate control.
* Fix #270, Better handling of the voice api
We were mostly doing the voice API wrong, I've changed OpCode names to
be correct. We now listenfor both Ready and Hello at connection init,
and do soft checks for Heartbeat ACKs.
* Adding missing voice opcodes, related structs
* Also removes events for messages which cannot be received.
@Zeyla's recommended changes.
* New docstrings now have correct style for referring to functions.
* Docstrings also have room to breathe (!)
* Rand dependency now properly moved behind the `voice` feature.
* Slightly cleaner checks at voice connection initialisation.
* Various idiomatic changes throughout.
* Prevent compilation of Audio docs example.
Likely too much machinery in the background to get a working Handler, I
think.
* Re-fixing the docstrings.
* Fixing travis for Audio docs.
Diffstat (limited to 'src')
| -rw-r--r-- | src/constants.rs | 36 | ||||
| -rw-r--r-- | src/lib.rs | 2 | ||||
| -rw-r--r-- | src/model/event.rs | 78 | ||||
| -rw-r--r-- | src/voice/audio.rs | 2 | ||||
| -rw-r--r-- | src/voice/connection.rs | 83 | ||||
| -rw-r--r-- | src/voice/handler.rs | 13 | ||||
| -rw-r--r-- | src/voice/mod.rs | 2 | ||||
| -rw-r--r-- | src/voice/payload.rs | 6 | ||||
| -rw-r--r-- | src/voice/streamer.rs | 59 | ||||
| -rw-r--r-- | src/voice/threading.rs | 10 |
10 files changed, 237 insertions, 54 deletions
diff --git a/src/constants.rs b/src/constants.rs index 5485d7e..5febc9e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -137,26 +137,38 @@ pub enum VoiceOpCode { /// Used to select the voice protocol. SelectProtocol = 1, /// Used to complete the websocket handshake. - Hello = 2, + Ready = 2, /// Used to keep the websocket connection alive. - KeepAlive = 3, + Heartbeat = 3, /// Used to describe the session. SessionDescription = 4, /// Used to indicate which users are speaking. Speaking = 5, - /// Used to heartbeat. - Heartbeat = 8, + /// Heartbeat ACK, received by the client to show the server's receipt of a heartbeat. + HeartbeatAck = 6, + /// Sent after a disconnect to attempt to resume a session. + Resume = 7, + /// Used to determine how often the client must send a heartbeat. + Hello = 8, + /// Sent by the server if a session coulkd successfully be resumed. + Resumed = 9, + /// Message indicating that another user has disconnected from the voice channel. + ClientDisconnect = 13, } enum_number!( VoiceOpCode { Identify, SelectProtocol, - Hello, - KeepAlive, + Ready, + Heartbeat, SessionDescription, Speaking, - Heartbeat, + HeartbeatAck, + Resume, + Hello, + Resumed, + ClientDisconnect, } ); @@ -165,11 +177,15 @@ impl VoiceOpCode { match *self { VoiceOpCode::Identify => 0, VoiceOpCode::SelectProtocol => 1, - VoiceOpCode::Hello => 2, - VoiceOpCode::KeepAlive => 3, + VoiceOpCode::Ready => 2, + VoiceOpCode::Heartbeat => 3, VoiceOpCode::SessionDescription => 4, VoiceOpCode::Speaking => 5, - VoiceOpCode::Heartbeat => 8, + VoiceOpCode::HeartbeatAck => 6, + VoiceOpCode::Resume => 7, + VoiceOpCode::Hello => 8, + VoiceOpCode::Resumed => 9, + VoiceOpCode::ClientDisconnect => 13, } } } @@ -143,6 +143,8 @@ extern crate multipart; extern crate native_tls; #[cfg(feature = "opus")] extern crate opus; +#[cfg(feature = "rand")] +extern crate rand; #[cfg(feature = "sodiumoxide")] extern crate sodiumoxide; #[cfg(feature = "threadpool")] diff --git a/src/model/event.rs b/src/model/event.rs index 21fe2bc..9b8bb36 100644 --- a/src/model/event.rs +++ b/src/model/event.rs @@ -1874,14 +1874,19 @@ impl<'de> Deserialize<'de> for EventType { #[allow(missing_docs)] #[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct VoiceHeartbeat { - pub heartbeat_interval: u64, + pub nonce: u64, +} + +#[allow(missing_docs)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] +pub struct VoiceHeartbeatAck { + pub nonce: u64, } #[allow(missing_docs)] #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct VoiceHello { +pub struct VoiceReady { pub heartbeat_interval: u64, - pub ip: String, pub modes: Vec<String>, pub port: u16, pub ssrc: u32, @@ -1889,6 +1894,12 @@ pub struct VoiceHello { #[allow(missing_docs)] #[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VoiceHello { + pub heartbeat_interval: u64, +} + +#[allow(missing_docs)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct VoiceSessionDescription { pub mode: String, pub secret_key: Vec<u8>, @@ -1902,25 +1913,48 @@ pub struct VoiceSpeaking { pub user_id: UserId, } +#[allow(missing_docs)] +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VoiceResume { + pub server_id: String, + pub session_id: String, + pub token: String, +} + +#[allow(missing_docs)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] +pub struct VoiceClientDisconnect { + pub user_id: UserId, +} + /// A representation of data received for [`voice`] events. /// /// [`voice`]: ../../voice/index.html #[derive(Clone, Debug, Serialize)] #[serde(untagged)] pub enum VoiceEvent { - /// A voice heartbeat. - Heartbeat(VoiceHeartbeat), - /// A "hello" was received with initial voice data, such as the - /// [`heartbeat_interval`]. + /// Server's response to the client's Identify operation. + /// Contains session-specific information, e.g. + /// [`ssrc`] and supported encryption modes. /// - /// [`heartbeat_interval`]: struct.VoiceHello.html#structfield.heartbeat_interval - Hello(VoiceHello), - /// A simple keepalive event. - KeepAlive, + /// [`ssrc`]: struct.VoiceReady.html#structfield.ssrc + Ready(VoiceReady), /// A voice event describing the current session. - Ready(VoiceSessionDescription), + SessionDescription(VoiceSessionDescription), /// A voice event denoting that someone is speaking. Speaking(VoiceSpeaking), + /// Acknowledgement from the server for a prior voice heartbeat. + HeartbeatAck(VoiceHeartbeatAck), + /// A "hello" was received with initial voice data, such as the + /// true [`heartbeat_interval`]. + /// + /// [`heartbeat_interval`]: struct.VoiceHello.html#structfield.heartbeat_interval + Hello(VoiceHello), + /// Message received if a Resume reques was successful. + Resumed, + /// Status update in the current channel, indicating that a user has + /// disconnected. + ClientDisconnect(VoiceClientDisconnect), /// An unknown voice event not registered. Unknown(VoiceOpCode, Value), } @@ -1941,28 +1975,38 @@ impl<'de> Deserialize<'de> for VoiceEvent { .map_err(DeError::custom)?; Ok(match op { - VoiceOpCode::Heartbeat => { + VoiceOpCode::HeartbeatAck => { let v = serde_json::from_value(v).map_err(DeError::custom)?; - VoiceEvent::Heartbeat(v) + VoiceEvent::HeartbeatAck(v) + }, + VoiceOpCode::Ready => { + let v = VoiceReady::deserialize(v).map_err(DeError::custom)?; + + VoiceEvent::Ready(v) }, VoiceOpCode::Hello => { - let v = VoiceHello::deserialize(v).map_err(DeError::custom)?; + let v = serde_json::from_value(v).map_err(DeError::custom)?; VoiceEvent::Hello(v) }, - VoiceOpCode::KeepAlive => VoiceEvent::KeepAlive, VoiceOpCode::SessionDescription => { let v = VoiceSessionDescription::deserialize(v) .map_err(DeError::custom)?; - VoiceEvent::Ready(v) + VoiceEvent::SessionDescription(v) }, VoiceOpCode::Speaking => { let v = VoiceSpeaking::deserialize(v).map_err(DeError::custom)?; VoiceEvent::Speaking(v) }, + VoiceOpCode::Resumed => VoiceEvent::Resumed, + VoiceOpCode::ClientDisconnect => { + let v = VoiceClientDisconnect::deserialize(v).map_err(DeError::custom)?; + + VoiceEvent::ClientDisconnect(v) + }, other => VoiceEvent::Unknown(other, v), }) } diff --git a/src/voice/audio.rs b/src/voice/audio.rs index c556c8b..bf45eb8 100644 --- a/src/voice/audio.rs +++ b/src/voice/audio.rs @@ -16,6 +16,8 @@ pub trait AudioSource: Send { fn read_pcm_frame(&mut self, buffer: &mut [i16]) -> Option<usize>; fn read_opus_frame(&mut self) -> Option<Vec<u8>>; + + fn decode_and_add_opus_frame(&mut self, float_buffer: &mut [f32; 1920], volume: f32) -> Option<usize>; } /// A receiver for incoming audio. diff --git a/src/voice/connection.rs b/src/voice/connection.rs index 5f806a0..460560b 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -18,12 +18,14 @@ use model::{ use opus::{ packet as opus_packet, Application as CodingMode, + Bitrate, Channels, Decoder as OpusDecoder, Encoder as OpusEncoder, SoftClip, }; use parking_lot::Mutex; +use rand::random; use serde::Deserialize; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::{ @@ -87,6 +89,7 @@ pub struct Connection { encoder_stereo: bool, keepalive_timer: Timer, key: Key, + last_heartbeat_nonce: Option<u64>, sequence: u16, silence_frames: u8, soft_clip: SoftClip, @@ -103,32 +106,45 @@ impl Connection { let url = generate_url(&mut info.endpoint)?; let mut client = ClientBuilder::from_url(&url).connect_secure(None)?; + let mut hello = None; + let mut ready = None; client.send_json(&payload::build_identify(&info))?; - let hello = loop { + loop { let value = match client.recv_json()? { Some(value) => value, None => continue, }; match VoiceEvent::deserialize(value)? { - VoiceEvent::Hello(received_hello) => { - break received_hello; + VoiceEvent::Ready(r) => { + ready = Some(r); + if hello.is_some(){ + break; + } + }, + VoiceEvent::Hello(h) => { + hello = Some(h); + if ready.is_some() { + break; + } }, - VoiceEvent::Heartbeat(_) => continue, other => { - debug!("[Voice] Expected hello/heartbeat; got: {:?}", other); + debug!("[Voice] Expected ready/hello; got: {:?}", other); return Err(Error::Voice(VoiceError::ExpectedHandshake)); }, } }; - if !has_valid_mode(&hello.modes) { + let hello = hello.expect("[Voice] Hello packet expected in connection initialisation, but not found."); + let ready = ready.expect("[Voice] Ready packet expected in connection initialisation, but not found."); + + if !has_valid_mode(&ready.modes) { return Err(Error::Voice(VoiceError::VoiceModeUnavailable)); } - let destination = (&info.endpoint[..], hello.port) + let destination = (&info.endpoint[..], ready.port) .to_socket_addrs()? .next() .ok_or(Error::Voice(VoiceError::HostnameResolve))?; @@ -146,7 +162,7 @@ impl Connection { { let mut bytes = [0; 70]; - (&mut bytes[..]).write_u32::<BigEndian>(hello.ssrc)?; + (&mut bytes[..]).write_u32::<BigEndian>(ready.ssrc)?; udp.send_to(&bytes, destination)?; let mut bytes = [0; 256]; @@ -181,7 +197,10 @@ impl Connection { info!("[Voice] Connected to: {}", info.endpoint); - let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?; + // Encode for Discord in Stereo, as required. + let mut encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Stereo, CodingMode::Audio)?; + encoder.set_bitrate(Bitrate::Bits(DEFAULT_BITRATE))?; + let soft_clip = SoftClip::new(Channels::Stereo); let soft_clip = SoftClip::new(Channels::Stereo); @@ -214,11 +233,20 @@ impl Connection { pub fn cycle(&mut self, sources: &mut Vec<LockedAudio>, receiver: &mut Option<Box<AudioReceiver>>, - audio_timer: &mut Timer) + audio_timer: &mut Timer, + bitrate: Bitrate) -> Result<()> { + // We need to actually reserve enough space for the desired bitrate. + let size = match bitrate { + // If user specified, we can calculate. 20ms means 50fps. + Bitrate::Bits(b) => b.abs() / 50, + // Otherwise, just have a lot preallocated. + _ => 5120, + } + 16; + let mut buffer = [0i16; 960 * 2]; let mut mix_buffer = [0f32; 960 * 2]; - let mut packet = [0u8; 512]; + let mut packet = vec![0u8; size as usize].into_boxed_slice(); let mut nonce = secretbox::Nonce([0; 24]); if let Some(receiver) = receiver.as_mut() { @@ -277,6 +305,18 @@ impl Connection { ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { receiver.speaking_update(ev.ssrc, ev.user_id.0, ev.speaking); }, + ReceiverStatus::Websocket(VoiceEvent::HeartbeatAck(ev)) => { + match self.last_heartbeat_nonce { + Some(nonce) => { + if ev.nonce != nonce { + warn!("[Voice] Heartbeat nonce mismatch! Expected {}, saw {}.", nonce, ev.nonce); + } + + self.last_heartbeat_nonce = None; + }, + None => {}, + } + }, ReceiverStatus::Websocket(other) => { info!("[Voice] Received other websocket data: {:?}", other); }, @@ -292,7 +332,9 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.client.lock().send_json(&payload::build_keepalive())?; + let nonce = random::<u64>(); + self.last_heartbeat_nonce = Some(nonce); + self.client.lock().send_json(&payload::build_heartbeat(nonce))?; } // Send UDP keepalive if it's time @@ -302,6 +344,11 @@ impl Connection { self.udp.send_to(&bytes, self.destination)?; } + // Reconfigure encoder bitrate. + // From my testing, it seemed like this needed to be set every cycle. + if let Err(e) = self.encoder.set_bitrate(bitrate) { + warn!("[Voice] Bitrate set unsuccessfully: {:?}", e); + } let mut opus_frame = Vec::new(); @@ -346,10 +393,8 @@ impl Connection { } let temp_len = match stream.get_type() { - // TODO: decode back to raw, then include. - AudioType::Opus => match stream.read_opus_frame() { + AudioType::Opus => match stream.decode_and_add_opus_frame(&mut mix_buffer, vol) { Some(frame) => { - opus_frame = frame; opus_frame.len() }, None => 0, @@ -421,7 +466,7 @@ impl Connection { } fn prep_packet(&mut self, - packet: &mut [u8; 512], + packet: &mut [u8], buffer: [f32; 1920], opus_frame: &[u8], mut nonce: Nonce) @@ -518,12 +563,12 @@ fn encryption_key(client: &mut Client) -> Result<Key> { }; match VoiceEvent::deserialize(value)? { - VoiceEvent::Ready(ready) => { - if ready.mode != CRYPTO_MODE { + VoiceEvent::SessionDescription(desc) => { + if desc.mode != CRYPTO_MODE { return Err(Error::Voice(VoiceError::VoiceModeInvalid)); } - return Key::from_slice(&ready.secret_key) + return Key::from_slice(&desc.secret_key) .ok_or(Error::Voice(VoiceError::KeyGen)); }, VoiceEvent::Unknown(op, value) => { diff --git a/src/voice/handler.rs b/src/voice/handler.rs index d42b8b8..2f76e32 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -14,7 +14,7 @@ use std::sync::{ Arc }; use super::connection_info::ConnectionInfo; -use super::{Audio, AudioReceiver, AudioSource, Status as VoiceStatus, threading, LockedAudio}; +use super::{Audio, AudioReceiver, AudioSource, Bitrate, Status as VoiceStatus, threading, LockedAudio}; /// The handler is responsible for "handling" a single voice connection, acting /// as a clean API above the inner connection. @@ -277,6 +277,17 @@ impl Handler { player } + /// Sets the bitrate for encoding Opus packets sent along + /// the channel being managed. + /// + /// The default rate is 128 kbps. + /// Sensible values range between `Bits(512)` and `Bits(512_000)` + /// bits per second. + /// Alternatively, `Auto` and `Max` remain available. + pub fn set_bitrate(&mut self, bitrate: Bitrate) { + self.send(VoiceStatus::SetBitrate(bitrate)) + } + /// Stops playing audio from a source, if one is set. pub fn stop(&mut self) { self.send(VoiceStatus::SetSender(None)) } diff --git a/src/voice/mod.rs b/src/voice/mod.rs index 7b5aaa0..f6a5e45 100644 --- a/src/voice/mod.rs +++ b/src/voice/mod.rs @@ -32,6 +32,7 @@ pub use self::{ ytdl } }; +pub use opus::Bitrate; use self::connection_info::ConnectionInfo; @@ -43,4 +44,5 @@ pub(crate) enum Status { SetReceiver(Option<Box<AudioReceiver>>), SetSender(Option<LockedAudio>), AddSender(LockedAudio), + SetBitrate(Bitrate), } diff --git a/src/voice/payload.rs b/src/voice/payload.rs index cef19cd..7faf31b 100644 --- a/src/voice/payload.rs +++ b/src/voice/payload.rs @@ -16,10 +16,10 @@ pub fn build_identify(info: &ConnectionInfo) -> Value { } #[inline] -pub fn build_keepalive() -> Value { +pub fn build_heartbeat(nonce: u64) -> Value { json!({ - "op": VoiceOpCode::KeepAlive.num(), - "d": Value::Null, + "op": VoiceOpCode::Heartbeat.num(), + "d": nonce, }) } diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs index 893f92a..476e00f 100644 --- a/src/voice/streamer.rs +++ b/src/voice/streamer.rs @@ -1,5 +1,11 @@ use byteorder::{LittleEndian, ReadBytesExt}; use internal::prelude::*; +use opus::{ + Channels, + Decoder as OpusDecoder, + Result as OpusResult, +}; +use parking_lot::Mutex; use serde_json; use std::{ ffi::OsStr, @@ -15,14 +21,16 @@ use std::{ Command, Stdio }, - result::Result as StdResult + result::Result as StdResult, + sync::Arc, }; use super::{ AudioSource, AudioType, DcaError, DcaMetadata, - VoiceError + VoiceError, + audio, }; struct ChildContainer(Child); @@ -41,10 +49,32 @@ impl Drop for ChildContainer { } } +impl Drop for ChildContainer { + fn drop (&mut self) { + if let Err(e) = self.0.wait() { + debug!("[Voice] Error awaiting child process: {:?}", e); + } + } +} + +// Since each audio item needs its own decoder, we need to +// work around the fact that OpusDecoders aint sendable. +struct SendDecoder(OpusDecoder); + +impl SendDecoder { + fn decode_float(&mut self, input: &[u8], output: &mut [f32], fec: bool) -> OpusResult<usize> { + let &mut SendDecoder(ref mut sd) = self; + sd.decode_float(input, output, fec) + } +} + +unsafe impl Send for SendDecoder {} + struct InputSource<R: Read + Send + 'static> { stereo: bool, reader: R, kind: AudioType, + decoder: Option<Arc<Mutex<SendDecoder>>>, } impl<R: Read + Send> AudioSource for InputSource<R> { @@ -96,6 +126,24 @@ impl<R: Read + Send> AudioSource for InputSource<R> { }, } } + + fn decode_and_add_opus_frame(&mut self, float_buffer: &mut [f32; 1920], volume: f32) -> Option<usize> { + let decoder_lock = self.decoder.as_mut()?.clone(); + let frame = self.read_opus_frame()?; + let mut local_buf = [0f32; 960 * 2]; + + let count = { + let mut decoder = decoder_lock.lock(); + + decoder.decode_float(frame.as_slice(), &mut local_buf, false).ok()? + }; + + for i in 0..1920 { + float_buffer[i] += local_buf[i] * volume; + } + + Some(count) + } } /// Opens an audio file through `ffmpeg` and creates an audio source. @@ -225,6 +273,12 @@ pub fn opus<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSo stereo: is_stereo, reader, kind: AudioType::Opus, + decoder: Some( + Arc::new(Mutex::new( + // We always want to decode *to* stereo, for mixing reasons. + SendDecoder(OpusDecoder::new(audio::SAMPLE_RATE, Channels::Stereo).unwrap()) + )) + ), }) } @@ -234,6 +288,7 @@ pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSou stereo: is_stereo, reader, kind: AudioType::Pcm, + decoder: None, }) } diff --git a/src/voice/threading.rs b/src/voice/threading.rs index 55e3651..2e4da0c 100644 --- a/src/voice/threading.rs +++ b/src/voice/threading.rs @@ -6,7 +6,9 @@ use std::{ }; use super::{ connection::Connection, - Status + Bitrate, + Status, + audio, }; pub(crate) fn start(guild_id: GuildId, rx: MpscReceiver<Status>) { @@ -23,6 +25,7 @@ fn runner(rx: &MpscReceiver<Status>) { let mut receiver = None; let mut connection = None; let mut timer = Timer::new(20); + let mut bitrate = Bitrate::Bits(audio::DEFAULT_BITRATE); 'runner: loop { loop { @@ -53,6 +56,9 @@ fn runner(rx: &MpscReceiver<Status>) { Ok(Status::AddSender(s)) => { senders.push(s); }, + Ok(Status::SetBitrate(b)) => { + bitrate = b; + }, Err(TryRecvError::Empty) => { // If we received nothing, then we can perform an update. break; @@ -73,7 +79,7 @@ fn runner(rx: &MpscReceiver<Status>) { // another event. let error = match connection.as_mut() { Some(connection) => { - let cycle = connection.cycle(&mut senders, &mut receiver, &mut timer); + let cycle = connection.cycle(&mut senders, &mut receiver, &mut timer, bitrate); match cycle { Ok(()) => false, |