diff options
| author | Maiddog <[email protected]> | 2017-08-26 17:55:43 -0500 |
|---|---|---|
| committer | alex <[email protected]> | 2017-08-27 00:55:43 +0200 |
| commit | 3e0b1032d80a1847558a752e8316d97f9ae58f04 (patch) | |
| tree | ca65390091cb3c0ab98b6497a1447ba69df3d20d /src/voice | |
| parent | Use `$crate` for `Args` (diff) | |
| download | serenity-3e0b1032d80a1847558a752e8316d97f9ae58f04.tar.xz serenity-3e0b1032d80a1847558a752e8316d97f9ae58f04.zip | |
Add ability to play DCA and Opus files. (#148)
Diffstat (limited to 'src/voice')
| -rw-r--r-- | src/voice/audio.rs | 12 | ||||
| -rw-r--r-- | src/voice/connection.rs | 192 | ||||
| -rw-r--r-- | src/voice/dca.rs | 14 | ||||
| -rw-r--r-- | src/voice/error.rs | 30 | ||||
| -rw-r--r-- | src/voice/handler.rs | 2 | ||||
| -rw-r--r-- | src/voice/mod.rs | 11 | ||||
| -rw-r--r-- | src/voice/streamer.rs | 120 |
7 files changed, 265 insertions, 116 deletions
diff --git a/src/voice/audio.rs b/src/voice/audio.rs index 14b9ccd..be3ae60 100644 --- a/src/voice/audio.rs +++ b/src/voice/audio.rs @@ -5,7 +5,11 @@ pub const SAMPLE_RATE: u32 = 48000; pub trait AudioSource: Send { fn is_stereo(&mut self) -> bool; - fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize>; + fn get_type(&self) -> AudioType; + + fn read_pcm_frame(&mut self, buffer: &mut [i16]) -> Option<usize>; + + fn read_opus_frame(&mut self) -> Option<Vec<u8>>; } /// A receiver for incoming audio. @@ -19,3 +23,9 @@ pub trait AudioReceiver: Send { stereo: bool, data: &[i16]); } + +#[derive(Clone)] +pub enum AudioType { + Opus, + Pcm, +} diff --git a/src/voice/connection.rs b/src/voice/connection.rs index 7cc8b3a..7f2ac23 100644 --- a/src/voice/connection.rs +++ b/src/voice/connection.rs @@ -1,11 +1,6 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; -use opus::{ - packet as opus_packet, - Application as CodingMode, - Channels, - Decoder as OpusDecoder, - Encoder as OpusEncoder, -}; +use opus::{Application as CodingMode, Channels, Decoder as OpusDecoder, Encoder as OpusEncoder, + packet as opus_packet}; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::collections::HashMap; use std::io::Write; @@ -14,9 +9,9 @@ use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; -use super::audio::{AudioReceiver, AudioSource, HEADER_LEN, SAMPLE_RATE}; +use super::audio::{AudioReceiver, AudioSource, AudioType, HEADER_LEN, SAMPLE_RATE}; use super::connection_info::ConnectionInfo; -use super::{payload, VoiceError, CRYPTO_MODE}; +use super::{CRYPTO_MODE, VoiceError, payload}; use websocket::client::Url as WebsocketUrl; use websocket::sync::client::ClientBuilder; use websocket::sync::stream::{AsTcpStream, TcpStream, TlsStream}; @@ -115,27 +110,27 @@ impl Connection { // Find the position in the bytes that contains the first byte of 0, // indicating the "end of the address". - let index = bytes - .iter() - .skip(4) - .position(|&x| x == 0) - .ok_or(Error::Voice(VoiceError::FindingByte))?; + let index = bytes.iter().skip(4).position(|&x| x == 0).ok_or( + Error::Voice( + VoiceError::FindingByte, + ), + )?; let pos = 4 + index; let addr = String::from_utf8_lossy(&bytes[4..pos]); let port_pos = len - 2; let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?; - client - .send_json(&payload::build_select_protocol(addr, port))?; + client.send_json( + &payload::build_select_protocol(addr, port), + )?; } let key = encryption_key(&mut client)?; - let _ = client - .stream_ref() - .as_tcp() - .set_read_timeout(Some(Duration::from_millis(25))); + let _ = client.stream_ref().as_tcp().set_read_timeout( + Some(Duration::from_millis(25)), + ); let mutexed_client = Arc::new(Mutex::new(client)); let thread_items = start_threads(mutexed_client.clone(), &udp)?; @@ -183,17 +178,21 @@ impl Connection { let timestamp = handle.read_u32::<BigEndian>()?; let ssrc = handle.read_u32::<BigEndian>()?; - nonce.0[..HEADER_LEN] - .clone_from_slice(&packet[..HEADER_LEN]); + nonce.0[..HEADER_LEN].clone_from_slice( + &packet[..HEADER_LEN], + ); - if let Ok(decrypted) = - secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) { + if let Ok(decrypted) = secretbox::open( + &packet[HEADER_LEN..], + &nonce, + &self.key, + ) { let channels = opus_packet::get_nb_channels(&decrypted)?; let entry = - self.decoder_map.entry((ssrc, channels)).or_insert_with( - || OpusDecoder::new(SAMPLE_RATE, channels).unwrap(), - ); + self.decoder_map.entry((ssrc, channels)).or_insert_with(|| { + OpusDecoder::new(SAMPLE_RATE, channels).unwrap() + }); let len = entry.decode(&decrypted, &mut buffer, false)?; @@ -201,8 +200,13 @@ impl Connection { let b = if is_stereo { len * 2 } else { len }; - receiver - .voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]); + receiver.voice_packet( + ssrc, + seq, + timestamp, + is_stereo, + &buffer[..b], + ); } }, ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => { @@ -223,10 +227,9 @@ impl Connection { // Send the voice websocket keepalive if it's time if self.keepalive_timer.check() { - self.client - .lock() - .unwrap() - .send_json(&payload::build_keepalive())?; + self.client.lock().unwrap().send_json( + &payload::build_keepalive(), + )?; } // Send UDP keepalive if it's time @@ -236,7 +239,45 @@ impl Connection { self.udp.send_to(&bytes, self.destination)?; } - let len = self.read(source, &mut buffer)?; + + let mut opus_frame = Vec::new(); + + let len = match source.as_mut() { + Some(stream) => { + let is_stereo = stream.is_stereo(); + + if is_stereo != self.encoder_stereo { + let channels = if is_stereo { + Channels::Stereo + } else { + Channels::Mono + }; + self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?; + self.encoder_stereo = is_stereo; + } + + match stream.get_type() { + AudioType::Opus => { + match stream.read_opus_frame() { + Some(frame) => { + opus_frame = frame; + opus_frame.len() + }, + None => 0, + } + }, + AudioType::Pcm => { + let buffer_len = if is_stereo { 960 * 2 } else { 960 }; + + match stream.read_pcm_frame(&mut buffer[..buffer_len]) { + Some(len) => len, + None => 0, + } + }, + } + }, + None => 0, + }; if len == 0 { self.set_speaking(false)?; @@ -262,7 +303,7 @@ impl Connection { self.set_speaking(true)?; - let index = self.prep_packet(&mut packet, buffer, nonce)?; + let index = self.prep_packet(&mut packet, buffer, &opus_frame, nonce)?; audio_timer.await(); self.udp.send_to(&packet[..index], self.destination)?; @@ -274,6 +315,7 @@ impl Connection { fn prep_packet(&mut self, packet: &mut [u8; 512], buffer: [i16; 1920], + opus_frame: &[u8], mut nonce: Nonce) -> Result<usize> { { @@ -284,14 +326,26 @@ impl Connection { cursor.write_u32::<BigEndian>(self.ssrc)?; } - nonce.0[..HEADER_LEN] - .clone_from_slice(&packet[..HEADER_LEN]); + nonce.0[..HEADER_LEN].clone_from_slice( + &packet[..HEADER_LEN], + ); let sl_index = packet.len() - 16; let buffer_len = if self.encoder_stereo { 960 * 2 } else { 960 }; - let len = self.encoder - .encode(&buffer[..buffer_len], &mut packet[HEADER_LEN..sl_index])?; + let len = if opus_frame.is_empty() { + self.encoder.encode( + &buffer[..buffer_len], + &mut packet[HEADER_LEN..sl_index], + )? + } else { + let len = opus_frame.len(); + packet[HEADER_LEN..HEADER_LEN + len].clone_from_slice( + opus_frame, + ); + len + }; + let crypted = { let slice = &packet[HEADER_LEN..HEADER_LEN + len]; secretbox::seal(slice, &nonce, &self.key) @@ -305,47 +359,6 @@ impl Connection { Ok(HEADER_LEN + crypted.len()) } - fn read(&mut self, - source: &mut Option<Box<AudioSource>>, - buffer: &mut [i16; 1920]) - -> Result<usize> { - let mut clear = false; - - let len = match source.as_mut() { - Some(source) => { - let is_stereo = source.is_stereo(); - - if is_stereo != self.encoder_stereo { - let channels = if is_stereo { - Channels::Stereo - } else { - Channels::Mono - }; - self.encoder = OpusEncoder::new(SAMPLE_RATE, channels, CodingMode::Audio)?; - self.encoder_stereo = is_stereo; - } - - let buffer_len = if is_stereo { 960 * 2 } else { 960 }; - - match source.read_frame(&mut buffer[..buffer_len]) { - Some(len) => len, - None => { - clear = true; - - 0 - }, - } - }, - None => 0, - }; - - if clear { - *source = None; - } - - Ok(len) - } - fn set_speaking(&mut self, speaking: bool) -> Result<()> { if self.speaking == speaking { return Ok(()); @@ -353,10 +366,11 @@ impl Connection { self.speaking = speaking; - self.client - .lock() - .unwrap() - .send_json(&payload::build_speaking(speaking)) + self.client.lock().unwrap().send_json( + &payload::build_speaking( + speaking, + ), + ) } } @@ -376,8 +390,9 @@ fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> { endpoint.truncate(len - 3); } - WebsocketUrl::parse(&format!("wss://{}", endpoint)) - .or(Err(Error::Voice(VoiceError::EndpointUrl))) + WebsocketUrl::parse(&format!("wss://{}", endpoint)).or(Err( + Error::Voice(VoiceError::EndpointUrl), + )) } #[inline] @@ -389,8 +404,9 @@ fn encryption_key(client: &mut Client) -> Result<Key> { return Err(Error::Voice(VoiceError::VoiceModeInvalid)); } - return Key::from_slice(&ready.secret_key) - .ok_or(Error::Voice(VoiceError::KeyGen)); + return Key::from_slice(&ready.secret_key).ok_or(Error::Voice( + VoiceError::KeyGen, + )); }, VoiceEvent::Unknown(op, value) => { debug!( diff --git a/src/voice/dca.rs b/src/voice/dca.rs new file mode 100644 index 0000000..4f3d7e7 --- /dev/null +++ b/src/voice/dca.rs @@ -0,0 +1,14 @@ +#[derive(Debug, Deserialize)] +pub struct DcaMetadata { + opus: OpusInfo, +} + +#[derive(Debug, Deserialize)] +struct OpusInfo { + /// Number of channels + channels: u8, +} + +impl DcaMetadata { + pub fn is_stereo(&self) -> bool { self.opus.channels == 2 } +} diff --git a/src/voice/error.rs b/src/voice/error.rs index b756bfb..a0bb11a 100644 --- a/src/voice/error.rs +++ b/src/voice/error.rs @@ -1,5 +1,6 @@ -use serde_json::Value; +use serde_json::{self, Value}; use std::process::Output; +use std; /// An error returned from the voice module. // Errors which are not visible to the end user are hidden. @@ -7,14 +8,20 @@ use std::process::Output; pub enum VoiceError { /// An indicator that an endpoint URL was invalid. EndpointUrl, - #[doc(hidden)] ExpectedHandshake, - #[doc(hidden)] FindingByte, - #[doc(hidden)] HostnameResolve, - #[doc(hidden)] KeyGen, + #[doc(hidden)] + ExpectedHandshake, + #[doc(hidden)] + FindingByte, + #[doc(hidden)] + HostnameResolve, + #[doc(hidden)] + KeyGen, /// An error occurred while checking if a path is stereo. Streams, - #[doc(hidden)] VoiceModeInvalid, - #[doc(hidden)] VoiceModeUnavailable, + #[doc(hidden)] + VoiceModeInvalid, + #[doc(hidden)] + VoiceModeUnavailable, /// An error occurred while running `youtube-dl`. YouTubeDLRun(Output), /// An error occurred while processing the JSON output from `youtube-dl`. @@ -26,3 +33,12 @@ pub enum VoiceError { /// The JSON output is given. YouTubeDLUrl(Value), } + +/// An error returned from the dca method. +#[derive(Debug)] +pub enum DcaError { + IoError(std::io::Error), + InvalidHeader, + InvalidMetadata(serde_json::Error), + InvalidSize(i32), +} diff --git a/src/voice/handler.rs b/src/voice/handler.rs index 9c3f691..24b3cd9 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -225,7 +225,7 @@ impl Handler { /// can pass in just a boxed receiver, and do not need to specify `Some`. /// /// Pass `None` to drop the current receiver, if one exists. -pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O){ + pub fn listen<O: Into<Option<Box<AudioReceiver>>>>(&mut self, receiver: O) { self.send(VoiceStatus::SetReceiver(receiver.into())) } diff --git a/src/voice/mod.rs b/src/voice/mod.rs index a94c8a0..6af5926 100644 --- a/src/voice/mod.rs +++ b/src/voice/mod.rs @@ -3,6 +3,7 @@ mod audio; mod connection; mod connection_info; +mod dca; mod error; mod manager; mod handler; @@ -10,11 +11,12 @@ mod payload; mod streamer; mod threading; -pub use self::audio::{AudioReceiver, AudioSource}; -pub use self::error::VoiceError; +pub use self::audio::{AudioReceiver, AudioSource, AudioType}; +pub use self::dca::DcaMetadata; +pub use self::error::{DcaError, VoiceError}; pub use self::handler::Handler; pub use self::manager::Manager; -pub use self::streamer::{ffmpeg, pcm, ytdl}; +pub use self::streamer::{dca, ffmpeg, opus, pcm, ytdl}; use self::connection_info::ConnectionInfo; @@ -22,7 +24,8 @@ const CRYPTO_MODE: &'static str = "xsalsa20_poly1305"; pub(crate) enum Status { Connect(ConnectionInfo), - #[allow(dead_code)] Disconnect, + #[allow(dead_code)] + Disconnect, SetReceiver(Option<Box<AudioReceiver>>), SetSender(Option<Box<AudioSource>>), } diff --git a/src/voice/streamer.rs b/src/voice/streamer.rs index c8400f0..5f05879 100644 --- a/src/voice/streamer.rs +++ b/src/voice/streamer.rs @@ -1,11 +1,14 @@ use byteorder::{LittleEndian, ReadBytesExt}; use serde_json; use std::ffi::OsStr; -use std::io::{ErrorKind as IoErrorKind, Read, Result as IoResult}; +use std::io::{BufReader, ErrorKind as IoErrorKind, Read, Result as IoResult}; use std::process::{Child, Command, Stdio}; -use super::{AudioSource, VoiceError}; +use super::{AudioSource, AudioType, DcaError, DcaMetadata, VoiceError}; use internal::prelude::*; +use std::fs::File; +use std::result::Result as StdResult; + struct ChildContainer(Child); impl Read for ChildContainer { @@ -14,14 +17,20 @@ impl Read for ChildContainer { } } -struct PcmSource<R: Read + Send + 'static>(bool, R); +struct InputSource<R: Read + Send + 'static> { + stereo: bool, + reader: R, + kind: AudioType, +} + +impl<R: Read + Send> AudioSource for InputSource<R> { + fn is_stereo(&mut self) -> bool { self.stereo } -impl<R: Read + Send> AudioSource for PcmSource<R> { - fn is_stereo(&mut self) -> bool { self.0 } + fn get_type(&self) -> AudioType { self.kind.clone() } - fn read_frame(&mut self, buffer: &mut [i16]) -> Option<usize> { + fn read_pcm_frame(&mut self, buffer: &mut [i16]) -> Option<usize> { for (i, v) in buffer.iter_mut().enumerate() { - *v = match self.1.read_i16::<LittleEndian>() { + *v = match self.reader.read_i16::<LittleEndian>() { Ok(v) => v, Err(ref e) => { return if e.kind() == IoErrorKind::UnexpectedEof { @@ -35,13 +44,38 @@ impl<R: Read + Send> AudioSource for PcmSource<R> { Some(buffer.len()) } + + fn read_opus_frame(&mut self) -> Option<Vec<u8>> { + match self.reader.read_i16::<LittleEndian>() { + Ok(size) => { + let mut frame = Vec::with_capacity(size as usize); + + { + let reader = self.reader.by_ref(); + + if reader.take(size as u64).read_to_end(&mut frame).is_err() { + return None; + } + } + + Some(frame) + }, + Err(ref e) => { + if e.kind() == IoErrorKind::UnexpectedEof { + Some(Vec::new()) + } else { + None + } + }, + } + } } /// Opens an audio file through `ffmpeg` and creates an audio source. pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> { let path = path.as_ref(); - /// Will fail if the path is not to a file on the fs. Likely a YouTube URI. + // Will fail if the path is not to a file on the fs. Likely a YouTube URI. let is_stereo = is_stereo(path).unwrap_or(false); let stereo_val = if is_stereo { "2" } else { "1" }; @@ -69,9 +103,63 @@ pub fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Box<AudioSource>> { Ok(pcm(is_stereo, ChildContainer(command))) } +/// Creates a streamed audio source from a DCA file. +/// Currently only accepts the DCA1 format. +pub fn dca<P: AsRef<OsStr>>(path: P) -> StdResult<Box<AudioSource>, DcaError> { + let file = File::open(path.as_ref()).map_err(DcaError::IoError)?; + + let mut reader = BufReader::new(file); + + let mut header = [0u8; 4]; + + // Read in the magic number to verify it's a DCA file. + reader.read_exact(&mut header).map_err(DcaError::IoError)?; + + if header != b"DCA1"[..] { + return Err(DcaError::InvalidHeader); + } + + reader.read_exact(&mut header).map_err(DcaError::IoError)?; + + let size = (&header[..]).read_i32::<LittleEndian>().unwrap(); + + // Sanity check + if size < 2 { + return Err(DcaError::InvalidSize(size)); + } + + let mut raw_json = Vec::with_capacity(size as usize); + + { + let json_reader = reader.by_ref(); + json_reader + .take(size as u64) + .read_to_end(&mut raw_json) + .map_err(DcaError::IoError)?; + } + + let metadata = serde_json::from_slice::<DcaMetadata>(raw_json.as_slice()) + .map_err(DcaError::InvalidMetadata)?; + + Ok(opus(metadata.is_stereo(), reader)) +} + +/// Creates an Opus audio source. +pub fn opus<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> { + Box::new(InputSource { + stereo: is_stereo, + reader: reader, + kind: AudioType::Opus, + }) +} + /// Creates a PCM audio source. pub fn pcm<R: Read + Send + 'static>(is_stereo: bool, reader: R) -> Box<AudioSource> { - Box::new(PcmSource(is_stereo, reader)) + Box::new(InputSource { + stereo: is_stereo, + reader: reader, + kind: AudioType::Pcm, + }) } /// Creates a streamed audio source with `youtube-dl` and `ffmpeg`. @@ -101,9 +189,11 @@ pub fn ytdl(uri: &str) -> Result<Box<AudioSource>> { }; let uri = match obj.remove("url") { - Some(v) => match v { - Value::String(uri) => uri, - other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))), + Some(v) => { + match v { + Value::String(uri) => uri, + other => return Err(Error::Voice(VoiceError::YouTubeDLUrl(other))), + } }, None => return Err(Error::Voice(VoiceError::YouTubeDLUrl(Value::Object(obj)))), }; @@ -129,9 +219,9 @@ fn is_stereo(path: &OsStr) -> Result<bool> { .ok_or(Error::Voice(VoiceError::Streams))?; let check = streams.iter().any(|stream| { - let channels = stream - .as_object() - .and_then(|m| m.get("channels").and_then(|v| v.as_i64())); + let channels = stream.as_object().and_then(|m| { + m.get("channels").and_then(|v| v.as_i64()) + }); channels == Some(2) }); |