diff options
| author | Kyle Simpson <[email protected]> | 2018-01-31 19:12:56 +0000 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2018-01-31 11:12:56 -0800 |
| commit | 324a288fbb0dd7d135aa9aab876cf39dabb6a02e (patch) | |
| tree | 24f13043f4ddcb5bc656b7a547170223d826aedb /src | |
| parent | Fix case insensitivity for aliases (#262) (diff) | |
| download | serenity-324a288fbb0dd7d135aa9aab876cf39dabb6a02e.tar.xz serenity-324a288fbb0dd7d135aa9aab876cf39dabb6a02e.zip | |
Multiple audio stream playback, volume control, pausing
* Fix Speaking state, use latest voice API version
* Speaking state would remain stuck on after playing particularly long
stretches of audio. So far as I can tell, playing 5 frames of silence
BEFORE changing the state seems to do the trick.
* Added new constant to make sure the library uses v3 of the voice api,
which it is written for.
* Heartbeat interval adjusted by * .75 as recommended by Discord.
* Initial version of new Audio wrapper.
* Single audio file case, as before..
* Loop over all available audio samples.
* Combine audio streams, account for volume.
* Cheaper explicit Opus silence frames.
As per Discord's recommendation, use a well-known 3-byte silence frame when needed.
* A bit of cleanup
Cleanup some of the code, rename some short-form fields to longer forms
(e.g. `s/src/source`), and remove a breaking change.
`Handler::play` was changed to return `LockedAudio` instead of `()`. If
someone were to rely on `Handler::play` returning `()`, the return type
change would break their code. Instead, this functionality has been
added to a new `Handler::play_returning` function.
Diffstat (limited to 'src')
| -rw-r--r-- | src/constants.rs | 2 | ||||
| -rw-r--r-- | src/voice/audio.rs | 49 | ||||
| -rw-r--r-- | src/voice/connection.rs | 102 | ||||
| -rw-r--r-- | src/voice/handler.rs | 32 | ||||
| -rw-r--r-- | src/voice/mod.rs | 5 | ||||
| -rw-r--r-- | src/voice/threading.rs | 13 |
6 files changed, 174 insertions, 29 deletions
diff --git a/src/constants.rs b/src/constants.rs index 9f1807a..c9f22ac 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -5,6 +5,8 @@ pub const EMBED_MAX_LENGTH: u16 = 6000; /// The gateway version used by the library. The gateway URI is retrieved via /// the REST API. pub const GATEWAY_VERSION: u8 = 6; +/// The voice gateway version used by the library. +pub const VOICE_GATEWAY_VERSION: u8 = 3; /// The large threshold to send on identify. pub const LARGE_THRESHOLD: u8 = 250; /// The maximum unicode code points allowed within a message by Discord. diff --git a/src/voice/audio.rs b/src/voice/audio.rs index 21f8f31..8a001d1 100644 --- a/src/voice/audio.rs +++ b/src/voice/audio.rs @@ -1,3 +1,6 @@ +use parking_lot::Mutex; +use std::sync::Arc; + pub const HEADER_LEN: usize = 12; pub const SAMPLE_RATE: u32 = 48_000; @@ -29,3 +32,49 @@ pub enum AudioType { Opus, Pcm, } + +/// Control object for audio playback. +/// +/// Accessed by both commands and the playback code -- as such, access is +/// always guarded. +pub struct Audio { + pub playing: bool, + pub volume: f32, + pub finished: bool, + pub source: Box<AudioSource>, +} + +impl Audio { + pub fn new(source: Box<AudioSource>) -> Self { + Self { + playing: true, + volume: 1.0, + finished: false, + source, + } + } + + pub fn play(&mut self) -> &mut Self { + self.playing = true; + + self + } + + pub fn pause(&mut self) -> &mut Self { + self.playing = false; + + self + } + + pub fn volume(&mut self, volume: f32) -> &mut Self { + self.volume = volume; + + self + } +} + +/// Threadsafe form of an instance of the [`Audio`] struct, locked behind a +/// Mutex. +/// +/// [`Audio`]: struct.Audio.html +pub type LockedAudio = Arc<Mutex<Audio>>; diff --git a/src/voice/connection.rs b/src/voice/connection.rs index 29a426c..3588818 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -1,4 +1,5 @@ use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; +use constants::VOICE_GATEWAY_VERSION; use internal::prelude::*; use internal::ws_impl::{ReceiverExt, SenderExt}; use internal::Timer; @@ -21,7 +22,7 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; use std::sync::Arc; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; -use super::audio::{AudioReceiver, AudioSource, AudioType, HEADER_LEN, SAMPLE_RATE}; +use super::audio::{AudioReceiver, AudioType, HEADER_LEN, SAMPLE_RATE, LockedAudio}; use super::connection_info::ConnectionInfo; use super::{payload, VoiceError, CRYPTO_MODE}; use websocket::client::Url as WebsocketUrl; @@ -150,6 +151,10 @@ impl Connection { let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?; + // Per discord dev team's current recommendations: + // (https://discordapp.com/developers/docs/topics/voice-connections#heartbeating) + let temp_heartbeat = (hello.heartbeat_interval as f64 * 0.75) as u64; + Ok(Connection { audio_timer: Timer::new(1000 * 60 * 4), client: mutexed_client, @@ -158,7 +163,7 @@ impl Connection { encoder: encoder, encoder_stereo: false, key: key, - keepalive_timer: Timer::new(hello.heartbeat_interval), + keepalive_timer: Timer::new(temp_heartbeat), udp: udp, sequence: 0, silence_frames: 0, @@ -172,11 +177,12 @@ impl Connection { #[allow(unused_variables)] pub fn cycle(&mut self, - source: &mut Option<Box<AudioSource>>, + sources: &mut Vec<LockedAudio>, receiver: &mut Option<Box<AudioReceiver>>, audio_timer: &mut Timer) -> Result<()> { let mut buffer = [0i16; 960 * 2]; + let mut mix_buffer = [0f32; 960 * 2]; let mut packet = [0u8; 512]; let mut nonce = secretbox::Nonce([0; 24]); @@ -264,9 +270,35 @@ impl Connection { let mut opus_frame = Vec::new(); - let len = match source.as_mut() { - Some(stream) => { - let is_stereo = stream.is_stereo(); + let mut len = 0; + + // Walk over all the audio files, removing those which have finished. + // For this purpose, we need a while loop in Rust. + let mut i = 0; + + while i < sources.len() { + let mut finished = false; + + let aud_lock = (&sources[i]).clone(); + let mut aud = aud_lock.lock(); + + let vol = aud.volume; + let skip = !aud.playing; + + { + let stream = &mut aud.source; + + if skip { + i += 1; + + continue; + } + + // Assume this for now, at least. + // We'll be fusing streams, so we can either keep + // as stereo or downmix to mono. + let is_stereo = true; + let source_stereo = stream.is_stereo(); if is_stereo != self.encoder_stereo { let channels = if is_stereo { @@ -278,7 +310,8 @@ impl Connection { self.encoder_stereo = is_stereo; } - match stream.get_type() { + let temp_len = match stream.get_type() { + // TODO: decode back to raw, then include. AudioType::Opus => match stream.read_opus_frame() { Some(frame) => { opus_frame = frame; @@ -287,28 +320,42 @@ impl Connection { None => 0, }, AudioType::Pcm => { - let buffer_len = if is_stereo { 960 * 2 } else { 960 }; + let buffer_len = if source_stereo { 960 * 2 } else { 960 }; match stream.read_pcm_frame(&mut buffer[..buffer_len]) { Some(len) => len, None => 0, } }, - } - }, - None => 0, + }; + + // May need to force interleave/copy. + combine_audio(buffer, &mut mix_buffer, source_stereo, vol); + + len = len.max(temp_len); + i += if temp_len > 0 { + 1 + } else { + sources.remove(i); + finished = true; + + 0 + }; + } + + aud.finished = finished; }; if len == 0 { - self.set_speaking(false)?; - if self.silence_frames > 0 { self.silence_frames -= 1; - for value in &mut buffer[..] { - *value = 0; - } + // Explicit "Silence" frame. + opus_frame.extend_from_slice(&[0xf, 0x8, 0xf, 0xf, 0xf, 0xe]); } else { + // Per official guidelines, send 5x silence BEFORE we stop speaking. + self.set_speaking(false)?; + audio_timer.await(); return Ok(()); @@ -323,7 +370,7 @@ impl Connection { self.set_speaking(true)?; - let index = self.prep_packet(&mut packet, buffer, &opus_frame, nonce)?; + let index = self.prep_packet(&mut packet, mix_buffer, &opus_frame, nonce)?; audio_timer.await(); self.udp.send_to(&packet[..index], self.destination)?; @@ -334,7 +381,7 @@ impl Connection { fn prep_packet(&mut self, packet: &mut [u8; 512], - buffer: [i16; 1920], + buffer: [f32; 1920], opus_frame: &[u8], mut nonce: Nonce) -> Result<usize> { @@ -354,7 +401,7 @@ impl Connection { let len = if opus_frame.is_empty() { self.encoder - .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])? + .encode_float(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])? } else { let len = opus_frame.len(); packet[HEADER_LEN..HEADER_LEN + len] @@ -395,6 +442,21 @@ impl Drop for Connection { } } +#[inline] +fn combine_audio( + raw_buffer: [i16; 1920], + float_buffer: &mut [f32; 1920], + true_stereo: bool, + volume: f32, +) { + for i in 0..1920 { + let sample_index = if true_stereo { i } else { i/2 }; + let sample = (raw_buffer[sample_index] as f32) / 32768.0; + + float_buffer[i] = (float_buffer[i] + sample*volume).max(-1.0).min(1.0); + } +} + fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { if endpoint.ends_with(":80") { let len = endpoint.len(); @@ -402,7 +464,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { endpoint.truncate(len - 3); } - WebsocketUrl::parse(&format!("wss://{}", endpoint)) + WebsocketUrl::parse(&format!("wss://{}/?v={}", endpoint, VOICE_GATEWAY_VERSION)) .or(Err(Error::Voice(VoiceError::EndpointUrl))) } diff --git a/src/voice/handler.rs b/src/voice/handler.rs index c3a5f05..8113d94 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -2,9 +2,11 @@ use constants::VoiceOpCode; use gateway::InterMessage; use model::id::{ChannelId, GuildId, UserId}; use model::voice::VoiceState; +use parking_lot::Mutex; +use std::sync::Arc; use std::sync::mpsc::{self, Sender as MpscSender}; use super::connection_info::ConnectionInfo; -use super::{AudioReceiver, AudioSource, Status as VoiceStatus, threading}; +use super::{Audio, AudioReceiver, AudioSource, Status as VoiceStatus, threading, LockedAudio}; /// The handler is responsible for "handling" a single voice connection, acting /// as a clean API above the inner connection. @@ -249,13 +251,35 @@ impl Handler { } } - /// Plays audio from a source. This can be a source created via - /// [`voice::ffmpeg`] or [`voice::ytdl`]. + /// Plays audio from a source. + /// + /// This can be a source created via [`voice::ffmpeg`] or [`voice::ytdl`]. /// /// [`voice::ffmpeg`]: fn.ffmpeg.html /// [`voice::ytdl`]: fn.ytdl.html pub fn play(&mut self, source: Box<AudioSource>) { - self.send(VoiceStatus::SetSender(Some(source))) + self.play_returning(source); + } + + /// Plays audio from a source, returning the locked audio source. + pub fn play_returning(&mut self, source: Box<AudioSource>) -> LockedAudio { + let player = Arc::new(Mutex::new(Audio::new(source))); + self.send(VoiceStatus::AddSender(player.clone())); + + player + } + + /// Plays audio from a source. + /// + /// Unlike `play`, this stops all other sources attached + /// to the channel. + /// + /// [`play`]: #method.play + pub fn play_only(&mut self, source: Box<AudioSource>) -> LockedAudio { + let player = Arc::new(Mutex::new(Audio::new(source))); + self.send(VoiceStatus::SetSender(Some(player.clone()))); + + player } /// Stops playing audio from a source, if one is set. diff --git a/src/voice/mod.rs b/src/voice/mod.rs index 70f1ae2..cae84a7 100644 --- a/src/voice/mod.rs +++ b/src/voice/mod.rs @@ -11,7 +11,7 @@ mod payload; mod streamer; mod threading; -pub use self::audio::{AudioReceiver, AudioSource, AudioType}; +pub use self::audio::{Audio, AudioReceiver, AudioSource, AudioType, LockedAudio}; pub use self::dca::DcaMetadata; pub use self::error::{DcaError, VoiceError}; pub use self::handler::Handler; @@ -26,5 +26,6 @@ pub(crate) enum Status { Connect(ConnectionInfo), #[allow(dead_code)] Disconnect, SetReceiver(Option<Box<AudioReceiver>>), - SetSender(Option<Box<AudioSource>>), + SetSender(Option<LockedAudio>), + AddSender(LockedAudio), } diff --git a/src/voice/threading.rs b/src/voice/threading.rs index 39b72ed..5731e86 100644 --- a/src/voice/threading.rs +++ b/src/voice/threading.rs @@ -15,7 +15,7 @@ pub(crate) fn start(guild_id: GuildId, rx: MpscReceiver<Status>) { } fn runner(rx: &MpscReceiver<Status>) { - let mut sender = None; + let mut senders = Vec::new(); let mut receiver = None; let mut connection = None; let mut timer = Timer::new(20); @@ -40,7 +40,14 @@ fn runner(rx: &MpscReceiver<Status>) { receiver = r; }, Ok(Status::SetSender(s)) => { - sender = s; + senders.clear(); + + if let Some(aud) = s { + senders.push(aud); + } + }, + Ok(Status::AddSender(s)) => { + senders.push(s); }, Err(TryRecvError::Empty) => { // If we receieved nothing, then we can perform an update. @@ -62,7 +69,7 @@ fn runner(rx: &MpscReceiver<Status>) { // another event. let error = match connection.as_mut() { Some(connection) => { - let cycle = connection.cycle(&mut sender, &mut receiver, &mut timer); + let cycle = connection.cycle(&mut senders, &mut receiver, &mut timer); match cycle { Ok(()) => false, |