aboutsummaryrefslogtreecommitdiff
path: root/src/voice
diff options
context:
space:
mode:
authorMaiddog <[email protected]>2017-08-26 17:55:43 -0500
committeralex <[email protected]>2017-08-27 00:55:43 +0200
commit3e0b1032d80a1847558a752e8316d97f9ae58f04 (patch)
treeca65390091cb3c0ab98b6497a1447ba69df3d20d /src/voice
parentUse `$crate` for `Args` (diff)
downloadserenity-3e0b1032d80a1847558a752e8316d97f9ae58f04.tar.xz
serenity-3e0b1032d80a1847558a752e8316d97f9ae58f04.zip
Add ability to play DCA and Opus files. (#148)
Diffstat (limited to 'src/voice')
-rw-r--r--src/voice/audio.rs12
-rw-r--r--src/voice/connection.rs192
-rw-r--r--src/voice/dca.rs14
-rw-r--r--src/voice/error.rs30
-rw-r--r--src/voice/handler.rs2
-rw-r--r--src/voice/mod.rs11
-rw-r--r--src/voice/streamer.rs120
7 files changed, 265 insertions, 116 deletions
diff --git a/src/voice/audio.rs b/src/voice/audio.rs
index 14b9ccd..be3ae60 100644
--- a/src/voice/audio.rs
+++ b/src/voice/audio.rs
@@ -5,7 +5,11 @@ pub const SAMPLE_RATE: u32 = 48000;
pub trait AudioSource: Send {
fn is_stereo(&mut self) -> bool;
- fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>;
+ fn get_type(&self) -> AudioType;
+
+ fn read_pcm_frame(&mut self, buffer: &mut [i16]) -> Option<usize>;
+
+ fn read_opus_frame(&mut self) -> Option<Vec<u8>>;
}
/// A receiver for incoming audio.
@@ -19,3 +23,9 @@ pub trait AudioReceiver: Send {
stereo: bool,
data: &[i16]);
}
+
+#[derive(Clone)]
+pub enum AudioType {
+ Opus,
+ Pcm,
+}
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
index 7cc8b3a..7f2ac23 100644
--- a/src/voice/connection.rs
+++ b/src/voice/connection.rs
@@ -1,11 +1,6 @@
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
-use opus::{
- packet as opus_packet,
- Application as CodingMode,
- Channels,
- Decoder as OpusDecoder,
- Encoder as OpusEncoder,
-};
+use opus::{Application as CodingMode, Channels, Decoder as OpusDecoder, Encoder as OpusEncoder,
+ packet as opus_packet};
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::collections::HashMap;
use std::io::Write;
@@ -14,9 +9,9 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;
-use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE};
+use super::audio::{AudioReceiver, AudioSource, AudioType, HEADER_LEN, SAMPLE_RATE};
use super::connection_info::ConnectionInfo;
-use super::{payload, VoiceError, CRYPTO_MODE};
+use super::{CRYPTO_MODE, VoiceError, payload};
use websocket::client::Url as WebsocketUrl;
use websocket::sync::client::ClientBuilder;
use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream};
@@ -115,27 +110,27 @@ impl Connection {
// Find the position in the bytes that contains the first byte of 0,
// indicating the "end of the address".
- let index = bytes
- .iter()
- .skip(4)
- .position(|&x| x == 0)
- .ok_or(Error::Voice(VoiceError::FindingByte))?;
+ let index = bytes.iter().skip(4).position(|&x| x == 0).ok_or(
+ Error::Voice(
+ VoiceError::FindingByte,
+ ),
+ )?;
let pos = 4 + index;
let addr = String::from_utf8_lossy(&bytes[4..pos]);
let port_pos = len - 2;
let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
- client
- .send_json(&payload::build_select_protocol(addr, port))?;
+ client.send_json(
+ &payload::build_select_protocol(addr, port),
+ )?;
}
let key = encryption_key(&mut client)?;
- let _ = client
- .stream_ref()
- .as_tcp()
- .set_read_timeout(Some(Duration::from_millis(25)));
+ let _ = client.stream_ref().as_tcp().set_read_timeout(
+ Some(Duration::from_millis(25)),
+ );
let mutexed_client = Arc::new(Mutex::new(client));
let thread_items = start_threads(mutexed_client.clone(), &udp)?;
@@ -183,17 +178,21 @@ impl Connection {
let timestamp = handle.read_u32::<BigEndian>()?;
let ssrc = handle.read_u32::<BigEndian>()?;
- nonce.0[..HEADER_LEN]
- .clone_from_slice(&packet[..HEADER_LEN]);
+ nonce.0[..HEADER_LEN].clone_from_slice(
+ &packet[..HEADER_LEN],
+ );
- if let Ok(decrypted) =
- secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ if let Ok(decrypted) = secretbox::open(
+ &packet[HEADER_LEN..],
+ &nonce,
+ &self.key,
+ ) {
let channels = opus_packet::get_nb_channels(&decrypted)?;
let entry =
- self.decoder_map.entry((ssrc, channels)).or_insert_with(
- || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(),
- );
+ self.decoder_map.entry((ssrc, channels)).or_insert_with(|| {
+ OpusDecoder::new(SAMPLE_RATE, channels).unwrap()
+ });
let len = entry.decode(&decrypted, &mut buffer, false)?;
@@ -201,8 +200,13 @@ impl Connection {
let b = if is_stereo { len * 2 } else { len };
- receiver
- .voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
+ receiver.voice_packet(
+ ssrc,
+ seq,
+ timestamp,
+ is_stereo,
+ &buffer[..b],
+ );
}
},
ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
@@ -223,10 +227,9 @@ impl Connection {
// Send the voice websocket keepalive if it's time
if self.keepalive_timer.check() {
- self.client
- .lock()
- .unwrap()
- .send_json(&payload::build_keepalive())?;
+ self.client.lock().unwrap().send_json(
+ &payload::build_keepalive(),
+ )?;
}
// Send UDP keepalive if it's time
@@ -236,7 +239,45 @@ impl Connection {
self.udp.send_to(&bytes, self.destination)?;
}
- let len = self.read(source, &mut buffer)?;
+
+ let mut opus_frame = Vec::new();
+
+ let len = match source.as_mut() {
+ Some(stream) => {
+ let is_stereo = stream.is_stereo();
+
+ if is_stereo != self.encoder_stereo {
+ let channels = if is_stereo {
+ Channels::Stereo
+ } else {
+ Channels::Mono
+ };
+ self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?;
+ self.encoder_stereo = is_stereo;
+ }
+
+ match stream.get_type() {
+ AudioType::Opus => {
+ match stream.read_opus_frame() {
+ Some(frame) => {
+ opus_frame = frame;
+ opus_frame.len()
+ },
+ None => 0,
+ }
+ },
+ AudioType::Pcm => {
+ let buffer_len = if is_stereo { 960 * 2 } else { 960 };
+
+ match stream.read_pcm_frame(&mut buffer[..buffer_len]) {
+ Some(len) => len,
+ None => 0,
+ }
+ },
+ }
+ },
+ None => 0,
+ };
if len == 0 {
self.set_speaking(false)?;
@@ -262,7 +303,7 @@ impl Connection {
self.set_speaking(true)?;
- let index = self.prep_packet(&mut packet, buffer, nonce)?;
+ let index = self.prep_packet(&mut packet, buffer, &opus_frame, nonce)?;
audio_timer.await();
self.udp.send_to(&packet[..index], self.destination)?;
@@ -274,6 +315,7 @@ impl Connection {
fn prep_packet(&mut self,
packet: &mut [u8; 512],
buffer: [i16; 1920],
+ opus_frame: &[u8],
mut nonce: Nonce)
-> Result<usize> {
{
@@ -284,14 +326,26 @@ impl Connection {
cursor.write_u32::<BigEndian>(self.ssrc)?;
}
- nonce.0[..HEADER_LEN]
- .clone_from_slice(&packet[..HEADER_LEN]);
+ nonce.0[..HEADER_LEN].clone_from_slice(
+ &packet[..HEADER_LEN],
+ );
let sl_index = packet.len() - 16;
let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 };
- let len = self.encoder
- .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?;
+ let len = if opus_frame.is_empty() {
+ self.encoder.encode(
+ &buffer[..buffer_len],
+ &mut packet[HEADER_LEN..sl_index],
+ )?
+ } else {
+ let len = opus_frame.len();
+ packet[HEADER_LEN..HEADER_LEN + len].clone_from_slice(
+ opus_frame,
+ );
+ len
+ };
+
let crypted = {
let slice = &packet[HEADER_LEN..HEADER_LEN + len];
secretbox::seal(slice, &nonce, &self.key)
@@ -305,47 +359,6 @@ impl Connection {
Ok(HEADER_LEN + crypted.len())
}
- fn read(&mut self,
- source: &mut Option<Box<AudioSource>>,
- buffer: &mut [i16; 1920])
- -> Result<usize> {
- let mut clear = false;
-
- let len = match source.as_mut() {
- Some(source) => {
- let is_stereo = source.is_stereo();
-
- if is_stereo != self.encoder_stereo {
- let channels = if is_stereo {
- Channels::Stereo
- } else {
- Channels::Mono
- };
- self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?;
- self.encoder_stereo = is_stereo;
- }
-
- let buffer_len = if is_stereo { 960 * 2 } else { 960 };
-
- match source.read_frame(&mut buffer[..buffer_len]) {
- Some(len) => len,
- None => {
- clear = true;
-
- 0
- },
- }
- },
- None => 0,
- };
-
- if clear {
- *source = None;
- }
-
- Ok(len)
- }
-
fn set_speaking(&mut self, speaking: bool) -> Result<()> {
if self.speaking == speaking {
return Ok(());
@@ -353,10 +366,11 @@ impl Connection {
self.speaking = speaking;
- self.client
- .lock()
- .unwrap()
- .send_json(&payload::build_speaking(speaking))
+ self.client.lock().unwrap().send_json(
+ &payload::build_speaking(
+ speaking,
+ ),
+ )
}
}
@@ -376,8 +390,9 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
endpoint.truncate(len - 3);
}
- WebsocketUrl::parse(&format!("wss://{}", endpoint))
- .or(Err(Error::Voice(VoiceError::EndpointUrl)))
+ WebsocketUrl::parse(&format!("wss://{}", endpoint)).or(Err(
+ Error::Voice(VoiceError::EndpointUrl),
+ ))
}
#[inline]
@@ -389,8 +404,9 @@ fn encryption_key(client: &mut Client) -> Result<Key> {
return Err(Error::Voice(VoiceError::VoiceModeInvalid));
}
- return Key::from_slice(&ready.secret_key)
- .ok_or(Error::Voice(VoiceError::KeyGen));
+ return Key::from_slice(&ready.secret_key).ok_or(Error::Voice(
+ VoiceError::KeyGen,
+ ));
},
VoiceEvent::Unknown(op, value) => {
debug!(
diff --git a/src/voice/dca.rs b/src/voice/dca.rs
new file mode 100644
index 0000000..4f3d7e7
--- /dev/null
+++ b/src/voice/dca.rs
@@ -0,0 +1,14 @@
+#[derive(Debug, Deserialize)]
+pub struct DcaMetadata {
+ opus: OpusInfo,
+}
+
+#[derive(Debug, Deserialize)]
+struct OpusInfo {
+ /// Number of channels
+ channels: u8,
+}
+
+impl DcaMetadata {
+ pub fn is_stereo(&self) -> bool { self.opus.channels == 2 }
+}
diff --git a/src/voice/error.rs b/src/voice/error.rs
index b756bfb..a0bb11a 100644
--- a/src/voice/error.rs
+++ b/src/voice/error.rs
@@ -1,5 +1,6 @@
-use serde_json::Value;
+use serde_json::{self, Value};
use std::process::Output;
+use std;
/// An error returned from the voice module.
// Errors which are not visible to the end user are hidden.
@@ -7,14 +8,20 @@ use std::process::Output;
pub enum VoiceError {
/// An indicator that an endpoint URL was invalid.
EndpointUrl,
- #[doc(hidden)] ExpectedHandshake,
- #[doc(hidden)] FindingByte,
- #[doc(hidden)] HostnameResolve,
- #[doc(hidden)] KeyGen,
+ #[doc(hidden)]
+ ExpectedHandshake,
+ #[doc(hidden)]
+ FindingByte,
+ #[doc(hidden)]
+ HostnameResolve,
+ #[doc(hidden)]
+ KeyGen,
/// An error occurred while checking if a path is stereo.
Streams,
- #[doc(hidden)] VoiceModeInvalid,
- #[doc(hidden)] VoiceModeUnavailable,
+ #[doc(hidden)]
+ VoiceModeInvalid,
+ #[doc(hidden)]
+ VoiceModeUnavailable,
/// An error occurred while running `youtube-dl`.
YouTubeDLRun(Output),
/// An error occurred while processing the JSON output from `youtube-dl`.
@@ -26,3 +33,12 @@ pub enum VoiceError {
/// The JSON output is given.
YouTubeDLUrl(Value),
}
+
+/// An error returned from the dca method.
+#[derive(Debug)]
+pub enum DcaError {
+ IoError(std::io::Error),
+ InvalidHeader,
+ InvalidMetadata(serde_json::Error),
+ InvalidSize(i32),
+}
diff --git a/src/voice/handler.rs b/src/voice/handler.rs
index 9c3f691..24b3cd9 100644
--- a/src/voice/handler.rs
+++ b/src/voice/handler.rs
@@ -225,7 +225,7 @@ impl Handler {
/// can pass in just a boxed receiver, and do not need to specify `Some`.
///
/// Pass `None` to drop the current receiver, if one exists.
-pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O){
+ pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O) {
self.send(VoiceStatus::SetReceiver(receiver.into()))
}
diff --git a/src/voice/mod.rs b/src/voice/mod.rs
index a94c8a0..6af5926 100644
--- a/src/voice/mod.rs
+++ b/src/voice/mod.rs
@@ -3,6 +3,7 @@
mod audio;
mod connection;
mod connection_info;
+mod dca;
mod error;
mod manager;
mod handler;
@@ -10,11 +11,12 @@ mod payload;
mod streamer;
mod threading;
-pub use self::audio::{AudioReceiver, AudioSource};
-pub use self::error::VoiceError;
+pub use self::audio::{AudioReceiver, AudioSource, AudioType};
+pub use self::dca::DcaMetadata;
+pub use self::error::{DcaError, VoiceError};
pub use self::handler::Handler;
pub use self::manager::Manager;
-pub use self::streamer::{ffmpeg, pcm, ytdl};
+pub use self::streamer::{dca, ffmpeg, opus, pcm, ytdl};
use self::connection_info::ConnectionInfo;
@@ -22,7 +24,8 @@ const CRYPTO_MODE: &'static str = "xsalsa20_poly1305";
pub(crate) enum Status {
Connect(ConnectionInfo),
- #[allow(dead_code)] Disconnect,
+ #[allow(dead_code)]
+ Disconnect,
SetReceiver(Option<Box<AudioReceiver>>),
SetSender(Option<Box<AudioSource>>),
}
diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs
index c8400f0..5f05879 100644
--- a/src/voice/streamer.rs
+++ b/src/voice/streamer.rs
@@ -1,11 +1,14 @@
use byteorder::{LittleEndian, ReadBytesExt};
use serde_json;
use std::ffi::OsStr;
-use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult};
+use std::io::{BufReader, ErrorKind as IoErrorKind, Read, Result as IoResult};
use std::process::{Child, Command, Stdio};
-use super::{AudioSource, VoiceError};
+use super::{AudioSource, AudioType, DcaError, DcaMetadata, VoiceError};
use internal::prelude::*;
+use std::fs::File;
+use std::result::Result as StdResult;
+
struct ChildContainer(Child);
impl Read for ChildContainer {
@@ -14,14 +17,20 @@ impl Read for ChildContainer {
}
}
-struct PcmSource<R: Read + Send + 'static>(bool, R);
+struct InputSource<R: Read + Send + 'static> {
+ stereo: bool,
+ reader: R,
+ kind: AudioType,
+}
+
+impl<R: Read + Send> AudioSource for InputSource<R> {
+ fn is_stereo(&mut self) -> bool { self.stereo }
-impl<R: Read + Send> AudioSource for PcmSource<R> {
- fn is_stereo(&mut self) -> bool { self.0 }
+ fn get_type(&self) -> AudioType { self.kind.clone() }
- fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> {
+ fn read_pcm_frame(&mut self, buffer: &mut [i16]) -> Option<usize> {
for (i, v) in buffer.iter_mut().enumerate() {
- *v = match self.1.read_i16::<LittleEndian>() {
+ *v = match self.reader.read_i16::<LittleEndian>() {
Ok(v) => v,
Err(ref e) => {
return if e.kind() == IoErrorKind::UnexpectedEof {
@@ -35,13 +44,38 @@ impl<R: Read + Send> AudioSource for PcmSource<R> {
Some(buffer.len())
}
+
+ fn read_opus_frame(&mut self) -> Option<Vec<u8>> {
+ match self.reader.read_i16::<LittleEndian>() {
+ Ok(size) => {
+ let mut frame = Vec::with_capacity(size as usize);
+
+ {
+ let reader = self.reader.by_ref();
+
+ if reader.take(size as u64).read_to_end(&mut frame).is_err() {
+ return None;
+ }
+ }
+
+ Some(frame)
+ },
+ Err(ref e) => {
+ if e.kind() == IoErrorKind::UnexpectedEof {
+ Some(Vec::new())
+ } else {
+ None
+ }
+ },
+ }
+ }
}
/// Opens an audio file through `ffmpeg` and creates an audio source.
pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
let path = path.as_ref();
- /// Will fail if the path is not to a file on the fs. Likely a YouTube URI.
+ // Will fail if the path is not to a file on the fs. Likely a YouTube URI.
let is_stereo = is_stereo(path).unwrap_or(false);
let stereo_val = if is_stereo { "2" } else { "1" };
@@ -69,9 +103,63 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> {
Ok(pcm(is_stereo, ChildContainer(command)))
}
+/// Creates a streamed audio source from a DCA file.
+/// Currently only accepts the DCA1 format.
+pub fn dca<P: AsRef<OsStr>>(path: P) -> StdResult<Box<AudioSource>, DcaError> {
+ let file = File::open(path.as_ref()).map_err(DcaError::IoError)?;
+
+ let mut reader = BufReader::new(file);
+
+ let mut header = [0u8; 4];
+
+ // Read in the magic number to verify it's a DCA file.
+ reader.read_exact(&mut header).map_err(DcaError::IoError)?;
+
+ if header != b"DCA1"[..] {
+ return Err(DcaError::InvalidHeader);
+ }
+
+ reader.read_exact(&mut header).map_err(DcaError::IoError)?;
+
+ let size = (&header[..]).read_i32::<LittleEndian>().unwrap();
+
+ // Sanity check
+ if size < 2 {
+ return Err(DcaError::InvalidSize(size));
+ }
+
+ let mut raw_json = Vec::with_capacity(size as usize);
+
+ {
+ let json_reader = reader.by_ref();
+ json_reader
+ .take(size as u64)
+ .read_to_end(&mut raw_json)
+ .map_err(DcaError::IoError)?;
+ }
+
+ let metadata = serde_json::from_slice::<DcaMetadata>(raw_json.as_slice())
+ .map_err(DcaError::InvalidMetadata)?;
+
+ Ok(opus(metadata.is_stereo(), reader))
+}
+
+/// Creates an Opus audio source.
+pub fn opus<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> {
+ Box::new(InputSource {
+ stereo: is_stereo,
+ reader: reader,
+ kind: AudioType::Opus,
+ })
+}
+
/// Creates a PCM audio source.
pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> {
- Box::new(PcmSource(is_stereo, reader))
+ Box::new(InputSource {
+ stereo: is_stereo,
+ reader: reader,
+ kind: AudioType::Pcm,
+ })
}
/// Creates a streamed audio source with `youtube-dl` and `ffmpeg`.
@@ -101,9 +189,11 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> {
};
let uri = match obj.remove("url") {
- Some(v) => match v {
- Value::String(uri) => uri,
- other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ Some(v) => {
+ match v {
+ Value::String(uri) => uri,
+ other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))),
+ }
},
None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))),
};
@@ -129,9 +219,9 @@ fn is_stereo(path: &OsStr) -> Result<bool> {
.ok_or(Error::Voice(VoiceError::Streams))?;
let check = streams.iter().any(|stream| {
- let channels = stream
- .as_object()
- .and_then(|m| m.get("channels").and_then(|v| v.as_i64()));
+ let channels = stream.as_object().and_then(|m| {
+ m.get("channels").and_then(|v| v.as_i64())
+ });
channels == Some(2)
});