aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKyle Simpson <[email protected]>2018-01-31 19:12:56 +0000
committerZeyla Hellyer <[email protected]>2018-01-31 11:12:56 -0800
commit324a288fbb0dd7d135aa9aab876cf39dabb6a02e (patch)
tree24f13043f4ddcb5bc656b7a547170223d826aedb /src
parentFix case insensitivity for aliases (#262) (diff)
downloadserenity-324a288fbb0dd7d135aa9aab876cf39dabb6a02e.tar.xz
serenity-324a288fbb0dd7d135aa9aab876cf39dabb6a02e.zip
Multiple audio stream playback, volume control, pausing
* Fix Speaking state, use latest voice API version * Speaking state would remain stuck on after playing particularly long stretches of audio. So far as I can tell, playing 5 frames of silence BEFORE changing the state seems to do the trick. * Added new constant to make sure the library uses v3 of the voice api, which it is written for. * Heartbeat interval adjusted by * .75 as recommended by Discord. * Initial version of new Audio wrapper. * Single audio file case, as before.. * Loop over all available audio samples. * Combine audio streams, account for volume. * Cheaper explicit Opus silence frames. As per Discord's recommendation, use a well-known 3-byte silence frame when needed. * A bit of cleanup Cleanup some of the code, rename some short-form fields to longer forms (e.g. `s/src/source`), and remove a breaking change. `Handler::play` was changed to return `LockedAudio` instead of `()`. If someone were to rely on `Handler::play` returning `()`, the return type change would break their code. Instead, this functionality has been added to a new `Handler::play_returning` function.
Diffstat (limited to 'src')
-rw-r--r--src/constants.rs2
-rw-r--r--src/voice/audio.rs49
-rw-r--r--src/voice/connection.rs102
-rw-r--r--src/voice/handler.rs32
-rw-r--r--src/voice/mod.rs5
-rw-r--r--src/voice/threading.rs13
6 files changed, 174 insertions, 29 deletions
diff --git a/src/constants.rs b/src/constants.rs
index 9f1807a..c9f22ac 100644
--- a/src/constants.rs
+++ b/src/constants.rs
@@ -5,6 +5,8 @@ pub const EMBED_MAX_LENGTH: u16 = 6000;
/// The gateway version used by the library. The gateway URI is retrieved via
/// the REST API.
pub const GATEWAY_VERSION: u8 = 6;
+/// The voice gateway version used by the library.
+pub const VOICE_GATEWAY_VERSION: u8 = 3;
/// The large threshold to send on identify.
pub const LARGE_THRESHOLD: u8 = 250;
/// The maximum unicode code points allowed within a message by Discord.
diff --git a/src/voice/audio.rs b/src/voice/audio.rs
index 21f8f31..8a001d1 100644
--- a/src/voice/audio.rs
+++ b/src/voice/audio.rs
@@ -1,3 +1,6 @@
+use parking_lot::Mutex;
+use std::sync::Arc;
+
pub const HEADER_LEN: usize = 12;
pub const SAMPLE_RATE: u32 = 48_000;
@@ -29,3 +32,49 @@ pub enum AudioType {
Opus,
Pcm,
}
+
+/// Control object for audio playback.
+///
+/// Accessed by both commands and the playback code -- as such, access is
+/// always guarded.
+pub struct Audio {
+ pub playing: bool,
+ pub volume: f32,
+ pub finished: bool,
+ pub source: Box<AudioSource>,
+}
+
+impl Audio {
+ pub fn new(source: Box<AudioSource>) -> Self {
+ Self {
+ playing: true,
+ volume: 1.0,
+ finished: false,
+ source,
+ }
+ }
+
+ pub fn play(&mut self) -> &mut Self {
+ self.playing = true;
+
+ self
+ }
+
+ pub fn pause(&mut self) -> &mut Self {
+ self.playing = false;
+
+ self
+ }
+
+ pub fn volume(&mut self, volume: f32) -> &mut Self {
+ self.volume = volume;
+
+ self
+ }
+}
+
+/// Threadsafe form of an instance of the [`Audio`] struct, locked behind a
+/// Mutex.
+///
+/// [`Audio`]: struct.Audio.html
+pub type LockedAudio = Arc<Mutex<Audio>>;
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index 29a426c..3588818 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -1,4 +1,5 @@
use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
+use constants::VOICE_GATEWAY_VERSION;
use internal::prelude::*;
use internal::ws_impl::{ReceiverExt, SenderExt};
use internal::Timer;
@@ -21,7 +22,7 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use std::sync::Arc;
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
-use super::audio::{AudioReceiver, AudioSource, AudioType, HEADER_LEN, SAMPLE_RATE};
+use super::audio::{AudioReceiver, AudioType, HEADER_LEN, SAMPLE_RATE, LockedAudio};
use super::connection_info::ConnectionInfo;
use super::{payload, VoiceError, CRYPTO_MODE};
use websocket::client::Url as WebsocketUrl;
@@ -150,6 +151,10 @@ impl Connection {
let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?;
+ // Per discord dev team's current recommendations:
+ // (https://discordapp.com/developers/docs/topics/voice-connections#heartbeating)
+ let temp_heartbeat = (hello.heartbeat_interval as f64 * 0.75) as u64;
+
Ok(Connection {
audio_timer: Timer::new(1000 * 60 * 4),
client: mutexed_client,
@@ -158,7 +163,7 @@ impl Connection {
encoder: encoder,
encoder_stereo: false,
key: key,
- keepalive_timer: Timer::new(hello.heartbeat_interval),
+ keepalive_timer: Timer::new(temp_heartbeat),
udp: udp,
sequence: 0,
silence_frames: 0,
@@ -172,11 +177,12 @@ impl Connection {
#[allow(unused_variables)]
pub fn cycle(&mut self,
- source: &mut Option<Box<AudioSource>>,
+ sources: &mut Vec<LockedAudio>,
receiver: &mut Option<Box<AudioReceiver>>,
audio_timer: &mut Timer)
-> Result<()> {
let mut buffer = [0i16; 960 * 2];
+ let mut mix_buffer = [0f32; 960 * 2];
let mut packet = [0u8; 512];
let mut nonce = secretbox::Nonce([0; 24]);
@@ -264,9 +270,35 @@ impl Connection {
let mut opus_frame = Vec::new();
- let len = match source.as_mut() {
- Some(stream) => {
- let is_stereo = stream.is_stereo();
+ let mut len = 0;
+
+ // Walk over all the audio files, removing those which have finished.
+ // For this purpose, we need a while loop in Rust.
+ let mut i = 0;
+
+ while i < sources.len() {
+ let mut finished = false;
+
+ let aud_lock = (&sources[i]).clone();
+ let mut aud = aud_lock.lock();
+
+ let vol = aud.volume;
+ let skip = !aud.playing;
+
+ {
+ let stream = &mut aud.source;
+
+ if skip {
+ i += 1;
+
+ continue;
+ }
+
+ // Assume this for now, at least.
+ // We'll be fusing streams, so we can either keep
+ // as stereo or downmix to mono.
+ let is_stereo = true;
+ let source_stereo = stream.is_stereo();
if is_stereo != self.encoder_stereo {
let channels = if is_stereo {
@@ -278,7 +310,8 @@ impl Connection {
self.encoder_stereo = is_stereo;
}
- match stream.get_type() {
+ let temp_len = match stream.get_type() {
+ // TODO: decode back to raw, then include.
AudioType::Opus => match stream.read_opus_frame() {
Some(frame) => {
opus_frame = frame;
@@ -287,28 +320,42 @@ impl Connection {
None => 0,
},
AudioType::Pcm => {
- let buffer_len = if is_stereo { 960 * 2 } else { 960 };
+ let buffer_len = if source_stereo { 960 * 2 } else { 960 };
match stream.read_pcm_frame(&mut buffer[..buffer_len]) {
Some(len) => len,
None => 0,
}
},
- }
- },
- None => 0,
+ };
+
+ // May need to force interleave/copy.
+ combine_audio(buffer, &mut mix_buffer, source_stereo, vol);
+
+ len = len.max(temp_len);
+ i += if temp_len > 0 {
+ 1
+ } else {
+ sources.remove(i);
+ finished = true;
+
+ 0
+ };
+ }
+
+ aud.finished = finished;
};
if len == 0 {
- self.set_speaking(false)?;
-
if self.silence_frames > 0 {
self.silence_frames -= 1;
- for value in &mut buffer[..] {
- *value = 0;
- }
+ // Explicit "Silence" frame.
+ opus_frame.extend_from_slice(&[0xf, 0x8, 0xf, 0xf, 0xf, 0xe]);
} else {
+ // Per official guidelines, send 5x silence BEFORE we stop speaking.
+ self.set_speaking(false)?;
+
audio_timer.await();
return Ok(());
@@ -323,7 +370,7 @@ impl Connection {
self.set_speaking(true)?;
- let index = self.prep_packet(&mut packet, buffer, &opus_frame, nonce)?;
+ let index = self.prep_packet(&mut packet, mix_buffer, &opus_frame, nonce)?;
audio_timer.await();
self.udp.send_to(&packet[..index], self.destination)?;
@@ -334,7 +381,7 @@ impl Connection {
fn prep_packet(&mut self,
packet: &mut [u8; 512],
- buffer: [i16; 1920],
+ buffer: [f32; 1920],
opus_frame: &[u8],
mut nonce: Nonce)
-> Result<usize> {
@@ -354,7 +401,7 @@ impl Connection {
let len = if opus_frame.is_empty() {
self.encoder
- .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?
+ .encode_float(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?
} else {
let len = opus_frame.len();
packet[HEADER_LEN..HEADER_LEN + len]
@@ -395,6 +442,21 @@ impl Drop for Connection {
}
}
+#[inline]
+fn combine_audio(
+ raw_buffer: [i16; 1920],
+ float_buffer: &mut [f32; 1920],
+ true_stereo: bool,
+ volume: f32,
+) {
+ for i in 0..1920 {
+ let sample_index = if true_stereo { i } else { i/2 };
+ let sample = (raw_buffer[sample_index] as f32) / 32768.0;
+
+ float_buffer[i] = (float_buffer[i] + sample*volume).max(-1.0).min(1.0);
+ }
+}
+
fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
if endpoint.ends_with(":80") {
let len = endpoint.len();
@@ -402,7 +464,7 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
endpoint.truncate(len - 3);
}
- WebsocketUrl::parse(&format!("wss://{}", endpoint))
+ WebsocketUrl::parse(&format!("wss://{}/?v={}", endpoint, VOICE_GATEWAY_VERSION))
.or(Err(Error::Voice(VoiceError::EndpointUrl)))
}
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
index c3a5f05..8113d94 100644
--- a/src/voice/handler.rs
+++ b/src/voice/handler.rs
@@ -2,9 +2,11 @@ use constants::VoiceOpCode;
use gateway::InterMessage;
use model::id::{ChannelId, GuildId, UserId};
use model::voice::VoiceState;
+use parking_lot::Mutex;
+use std::sync::Arc;
use std::sync::mpsc::{self, Sender as MpscSender};
use super::connection_info::ConnectionInfo;
-use super::{AudioReceiver, AudioSource, Status as VoiceStatus, threading};
+use super::{Audio, AudioReceiver, AudioSource, Status as VoiceStatus, threading, LockedAudio};
/// The handler is responsible for "handling" a single voice connection, acting
/// as a clean API above the inner connection.
@@ -249,13 +251,35 @@ impl Handler {
}
}
- /// Plays audio from a source. This can be a source created via
- /// [`voice::ffmpeg`] or [`voice::ytdl`].
+ /// Plays audio from a source.
+ ///
+ /// This can be a source created via [`voice::ffmpeg`] or [`voice::ytdl`].
///
/// [`voice::ffmpeg`]: fn.ffmpeg.html
/// [`voice::ytdl`]: fn.ytdl.html
pub fn play(&mut self, source: Box<AudioSource>) {
- self.send(VoiceStatus::SetSender(Some(source)))
+ self.play_returning(source);
+ }
+
+ /// Plays audio from a source, returning the locked audio source.
+ pub fn play_returning(&mut self, source: Box<AudioSource>) -> LockedAudio {
+ let player = Arc::new(Mutex::new(Audio::new(source)));
+ self.send(VoiceStatus::AddSender(player.clone()));
+
+ player
+ }
+
+ /// Plays audio from a source.
+ ///
+ /// Unlike `play`, this stops all other sources attached
+ /// to the channel.
+ ///
+ /// [`play`]: #method.play
+ pub fn play_only(&mut self, source: Box<AudioSource>) -> LockedAudio {
+ let player = Arc::new(Mutex::new(Audio::new(source)));
+ self.send(VoiceStatus::SetSender(Some(player.clone())));
+
+ player
}
/// Stops playing audio from a source, if one is set.
diff --git a/src/voice/mod.rs b/src/voice/mod.rs
index 70f1ae2..cae84a7 100644
--- a/src/voice/mod.rs
+++ b/src/voice/mod.rs
@@ -11,7 +11,7 @@ mod payload;
mod streamer;
mod threading;
-pub use self::audio::{AudioReceiver, AudioSource, AudioType};
+pub use self::audio::{Audio, AudioReceiver, AudioSource, AudioType, LockedAudio};
pub use self::dca::DcaMetadata;
pub use self::error::{DcaError, VoiceError};
pub use self::handler::Handler;
@@ -26,5 +26,6 @@ pub(crate) enum Status {
Connect(ConnectionInfo),
#[allow(dead_code)] Disconnect,
SetReceiver(Option<Box<AudioReceiver>>),
- SetSender(Option<Box<AudioSource>>),
+ SetSender(Option<LockedAudio>),
+ AddSender(LockedAudio),
}
diff --git a/src/voice/threading.rs b/src/voice/threading.rs
index 39b72ed..5731e86 100644
--- a/src/voice/threading.rs
+++ b/src/voice/threading.rs
@@ -15,7 +15,7 @@ pub(crate) fn start(guild_id: GuildId, rx: MpscReceiver<Status>) {
}
fn runner(rx: &MpscReceiver<Status>) {
- let mut sender = None;
+ let mut senders = Vec::new();
let mut receiver = None;
let mut connection = None;
let mut timer = Timer::new(20);
@@ -40,7 +40,14 @@ fn runner(rx: &MpscReceiver<Status>) {
receiver = r;
},
Ok(Status::SetSender(s)) => {
- sender = s;
+ senders.clear();
+
+ if let Some(aud) = s {
+ senders.push(aud);
+ }
+ },
+ Ok(Status::AddSender(s)) => {
+ senders.push(s);
},
Err(TryRecvError::Empty) => {
// If we receieved nothing, then we can perform an update.
@@ -62,7 +69,7 @@ fn runner(rx: &MpscReceiver<Status>) {
// another event.
let error = match connection.as_mut() {
Some(connection) => {
- let cycle = connection.cycle(&mut sender, &mut receiver, &mut timer);
+ let cycle = connection.cycle(&mut senders, &mut receiver, &mut timer);
match cycle {
Ok(()) => false,