aboutsummaryrefslogtreecommitdiff
path: root/src/voice/connection.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-05-22 17:02:00 -0700
committerZeyla Hellyer <[email protected]>2017-05-22 17:02:00 -0700
commit9969be60cf320797c37b317da24d9a08fd5eafa5 (patch)
treef27bf7a57af95bbc11990b1edcea9cca99276964 /src/voice/connection.rs
parentReasonably derive Debug on items (diff)
downloadserenity-9969be60cf320797c37b317da24d9a08fd5eafa5.tar.xz
serenity-9969be60cf320797c37b317da24d9a08fd5eafa5.zip
Restructure modules
Modules are now separated into a fashion where the library can be used for most use cases, without needing to compile the rest. The core of serenity, with no features enabled, contains only the struct (model) definitions, constants, and prelude. Models do not have most functions compiled in, as that is separated into the `model` feature. The `client` module has been split into 3 modules: `client`, `gateway`, and `http`. `http` contains functions to interact with the REST API. `gateway` contains the Shard to interact with the gateway, requiring `http` for retrieving the gateway URL. `client` requires both of the other features and acts as an abstracted interface over both the gateway and REST APIs, handling the event loop. The `builder` module has been separated from `utils`, and can now be optionally compiled in. It and the `http` feature are required by the `model` feature due to a large number of methods requiring access to them. `utils` now contains a number of utilities, such as the Colour struct, the `MessageBuilder`, and mention parsing functions. Each of the original `ext` modules are still featured, with `cache` not requiring any feature to be enabled, `framework` requiring the `client`, `model`, and `utils`, and `voice` requiring `gateway`. In total the features and their requirements are: - `builder`: none - `cache`: none - `client`: `gateway`, `http` - `framework`: `client`, `model`, `utils` - `gateway`: `http` - `http`: none - `model`: `builder`, `http` - `utils`: none - `voice`: `gateway` The default features are `builder`, `cache`, `client`, `framework`, `gateway`, `model`, `http`, and `utils`. To help with forwards compatibility, modules have been re-exported from their original locations.
Diffstat (limited to 'src/voice/connection.rs')
-rw-r--r--src/voice/connection.rs477
1 files changed, 477 insertions, 0 deletions
diff --git a/src/voice/connection.rs b/src/voice/connection.rs
new file mode 100644
index 0000000..698f469
--- /dev/null
+++ b/src/voice/connection.rs
@@ -0,0 +1,477 @@
+use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
+use opus::{
+ Channels,
+ CodingMode,
+ Decoder as OpusDecoder,
+ Encoder as OpusEncoder,
+ packet as opus_packet,
+};
+use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
+use std::collections::HashMap;
+use std::io::Write;
+use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
+use std::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
+use std::thread::{self, Builder as ThreadBuilder, JoinHandle};
+use std::time::Duration;
+use super::audio::{HEADER_LEN, SAMPLE_RATE, AudioReceiver, AudioSource};
+use super::connection_info::ConnectionInfo;
+use super::{CRYPTO_MODE, VoiceError, payload};
+use websocket::client::request::Url as WebsocketUrl;
+use websocket::client::{
+ Client as WsClient,
+ Receiver as WsReceiver,
+ Sender as WsSender
+};
+use websocket::stream::WebSocketStream;
+use ::internal::prelude::*;
+use ::internal::ws_impl::{ReceiverExt, SenderExt};
+use ::internal::Timer;
+use ::model::event::VoiceEvent;
+use ::model::UserId;
+
+enum ReceiverStatus {
+ Udp(Vec<u8>),
+ Websocket(VoiceEvent),
+}
+
+#[allow(dead_code)]
+struct ThreadItems {
+ rx: MpscReceiver<ReceiverStatus>,
+ udp_close_sender: MpscSender<i32>,
+ udp_thread: JoinHandle<()>,
+ ws_close_sender: MpscSender<i32>,
+ ws_thread: JoinHandle<()>,
+}
+
+#[allow(dead_code)]
+pub struct Connection {
+ audio_timer: Timer,
+ decoder_map: HashMap<(u32, Channels), OpusDecoder>,
+ destination: SocketAddr,
+ encoder: OpusEncoder,
+ encoder_stereo: bool,
+ keepalive_timer: Timer,
+ key: Key,
+ sender: WsSender<WebSocketStream>,
+ sequence: u16,
+ silence_frames: u8,
+ speaking: bool,
+ ssrc: u32,
+ thread_items: ThreadItems,
+ timestamp: u32,
+ udp: UdpSocket,
+ user_id: UserId,
+}
+
+impl Connection {
+ pub fn new(mut info: ConnectionInfo) -> Result<Connection> {
+ let url = generate_url(&mut info.endpoint)?;
+
+ let response = WsClient::connect(url)?.send()?;
+ response.validate()?;
+ let (mut sender, mut receiver) = response.begin().split();
+
+ sender.send_json(&payload::build_identify(&info))?;
+
+ let hello = {
+ let hello;
+
+ loop {
+ match receiver.recv_json(VoiceEvent::decode)? {
+ VoiceEvent::Hello(received_hello) => {
+ hello = received_hello;
+
+ break;
+ },
+ VoiceEvent::Heartbeat(_) => continue,
+ other => {
+ debug!("[Voice] Expected hello/heartbeat; got: {:?}",
+ other);
+
+ return Err(Error::Voice(VoiceError::ExpectedHandshake));
+ },
+ }
+ }
+
+ hello
+ };
+
+ if !has_valid_mode(&hello.modes) {
+ return Err(Error::Voice(VoiceError::VoiceModeUnavailable));
+ }
+
+ let destination = (&info.endpoint[..], hello.port)
+ .to_socket_addrs()?
+ .next()
+ .ok_or(Error::Voice(VoiceError::HostnameResolve))?;
+
+ // Important to note here: the length of the packet can be of either 4
+ // or 70 bytes. If it is 4 bytes, then we need to send a 70-byte packet
+ // to determine the IP.
+ //
+ // Past the initial 4 bytes, the packet _must_ be completely empty data.
+ //
+ // The returned packet will be a null-terminated string of the IP, and
+ // the port encoded in LE in the last two bytes of the packet.
+ let udp = UdpSocket::bind("0.0.0.0:0")?;
+
+ {
+ let mut bytes = [0; 70];
+
+ (&mut bytes[..]).write_u32::<BigEndian>(hello.ssrc)?;
+ udp.send_to(&bytes, destination)?;
+
+ let mut bytes = [0; 256];
+ let (len, _addr) = udp.recv_from(&mut bytes)?;
+
+ // Find the position in the bytes that contains the first byte of 0,
+ // indicating the "end of the address".
+ let index = bytes.iter().skip(4).position(|&x| x == 0)
+ .ok_or(Error::Voice(VoiceError::FindingByte))?;
+
+ let pos = 4 + index;
+ let addr = String::from_utf8_lossy(&bytes[4..pos]);
+ let port_pos = len - 2;
+ let port = (&bytes[port_pos..]).read_u16::<LittleEndian>()?;
+
+ sender.send_json(&payload::build_select_protocol(addr, port))?;
+ }
+
+ let key = encryption_key(&mut receiver)?;
+
+ let thread_items = start_threads(receiver, &udp)?;
+
+ info!("[Voice] Connected to: {}", info.endpoint);
+
+ let encoder = OpusEncoder::new(SAMPLE_RATE, Channels::Mono, CodingMode::Audio)?;
+
+ Ok(Connection {
+ audio_timer: Timer::new(1000 * 60 * 4),
+ decoder_map: HashMap::new(),
+ destination: destination,
+ encoder: encoder,
+ encoder_stereo: false,
+ key: key,
+ keepalive_timer: Timer::new(hello.heartbeat_interval),
+ udp: udp,
+ sender: sender,
+ sequence: 0,
+ silence_frames: 0,
+ speaking: false,
+ ssrc: hello.ssrc,
+ thread_items: thread_items,
+ timestamp: 0,
+ user_id: info.user_id,
+ })
+ }
+
+ #[allow(unused_variables)]
+ pub fn cycle(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ receiver: &mut Option<Box<AudioReceiver>>,
+ audio_timer: &mut Timer)
+ -> Result<()> {
+ let mut buffer = [0i16; 960 * 2];
+ let mut packet = [0u8; 512];
+ let mut nonce = secretbox::Nonce([0; 24]);
+
+ if let Some(receiver) = receiver.as_mut() {
+ while let Ok(status) = self.thread_items.rx.try_recv() {
+ match status {
+ ReceiverStatus::Udp(packet) => {
+ let mut handle = &packet[2..];
+ let seq = handle.read_u16::<BigEndian>()?;
+ let timestamp = handle.read_u32::<BigEndian>()?;
+ let ssrc = handle.read_u32::<BigEndian>()?;
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ if let Ok(decrypted) = secretbox::open(&packet[HEADER_LEN..], &nonce, &self.key) {
+ let channels = opus_packet::get_nb_channels(&decrypted)?;
+
+ let entry = self.decoder_map.entry((ssrc, channels))
+ .or_insert_with(|| OpusDecoder::new(SAMPLE_RATE,
+ channels)
+ .unwrap());
+
+ let len = entry.decode(&decrypted, &mut buffer, false)?;
+
+ let is_stereo = channels == Channels::Stereo;
+
+ let b = if is_stereo {
+ len * 2
+ } else {
+ len
+ };
+
+ receiver.voice_packet(ssrc, seq, timestamp, is_stereo, &buffer[..b]);
+ }
+ },
+ ReceiverStatus::Websocket(VoiceEvent::Speaking(ev)) => {
+ receiver.speaking_update(ev.ssrc,
+ ev.user_id.0,
+ ev.speaking);
+ },
+ ReceiverStatus::Websocket(other) => {
+ info!("[Voice] Received other websocket data: {:?}",
+ other);
+ },
+ }
+ }
+ } else {
+ loop {
+ if self.thread_items.rx.try_recv().is_err() {
+ break;
+ }
+ }
+ }
+
+ // Send the voice websocket keepalive if it's time
+ if self.keepalive_timer.check() {
+ self.sender.send_json(&payload::build_keepalive())?;
+ }
+
+ // Send UDP keepalive if it's time
+ if self.audio_timer.check() {
+ let mut bytes = [0; 4];
+ (&mut bytes[..]).write_u32::<BigEndian>(self.ssrc)?;
+ self.udp.send_to(&bytes, self.destination)?;
+ }
+
+ let len = self.read(source, &mut buffer)?;
+
+ if len == 0 {
+ self.set_speaking(false)?;
+
+ if self.silence_frames > 0 {
+ self.silence_frames -= 1;
+
+ for value in &mut buffer[..] {
+ *value = 0;
+ }
+ } else {
+ audio_timer.await();
+
+ return Ok(());
+ }
+ } else {
+ self.silence_frames = 5;
+
+ for value in &mut buffer[len..] {
+ *value = 0;
+ }
+ }
+
+ self.set_speaking(true)?;
+ let index = self.prep_packet(&mut packet, buffer, nonce)?;
+
+ audio_timer.await();
+ self.udp.send_to(&packet[..index], self.destination)?;
+ self.audio_timer.reset();
+
+ Ok(())
+ }
+
+ fn prep_packet(&mut self,
+ packet: &mut [u8; 512],
+ buffer: [i16; 1920],
+ mut nonce: Nonce)
+ -> Result<usize> {
+ {
+ let mut cursor = &mut packet[..HEADER_LEN];
+ cursor.write_all(&[0x80, 0x78])?;
+ cursor.write_u16::<BigEndian>(self.sequence)?;
+ cursor.write_u32::<BigEndian>(self.timestamp)?;
+ cursor.write_u32::<BigEndian>(self.ssrc)?;
+ }
+
+ nonce.0[..HEADER_LEN].clone_from_slice(&packet[..HEADER_LEN]);
+
+ let sl_index = packet.len() - 16;
+ let buffer_len = if self.encoder_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ let len = self.encoder.encode(&buffer[..buffer_len],
+ &mut packet[HEADER_LEN..sl_index])?;
+ let crypted = {
+ let slice = &packet[HEADER_LEN..HEADER_LEN + len];
+ secretbox::seal(slice, &nonce, &self.key)
+ };
+ let index = HEADER_LEN + crypted.len();
+ packet[HEADER_LEN..index].clone_from_slice(&crypted);
+
+ self.sequence = self.sequence.wrapping_add(1);
+ self.timestamp = self.timestamp.wrapping_add(960);
+
+ Ok(HEADER_LEN + crypted.len())
+ }
+
+ fn read(&mut self,
+ source: &mut Option<Box<AudioSource>>,
+ buffer: &mut [i16; 1920])
+ -> Result<usize> {
+ let mut clear = false;
+
+ let len = match source.as_mut() {
+ Some(source) => {
+ let is_stereo = source.is_stereo();
+
+ if is_stereo != self.encoder_stereo {
+ let channels = if is_stereo {
+ Channels::Stereo
+ } else {
+ Channels::Mono
+ };
+ self.encoder = OpusEncoder::new(SAMPLE_RATE,
+ channels,
+ CodingMode::Audio)?;
+ self.encoder_stereo = is_stereo;
+ }
+
+ let buffer_len = if is_stereo {
+ 960 * 2
+ } else {
+ 960
+ };
+
+ match source.read_frame(&mut buffer[..buffer_len]) {
+ Some(len) => len,
+ None => {
+ clear = true;
+
+ 0
+ },
+ }
+ },
+ None => 0,
+ };
+
+ if clear {
+ *source = None;
+ }
+
+ Ok(len)
+ }
+
+ fn set_speaking(&mut self, speaking: bool) -> Result<()> {
+ if self.speaking == speaking {
+ return Ok(());
+ }
+
+ self.speaking = speaking;
+
+ self.sender.send_json(&payload::build_speaking(speaking))
+ }
+}
+
+impl Drop for Connection {
+ fn drop(&mut self) {
+ let _ = self.thread_items.udp_close_sender.send(0);
+ let _ = self.thread_items.ws_close_sender.send(0);
+
+ info!("[Voice] Disconnected");
+ }
+}
+
+fn generate_url(endpoint: &mut String) -> Result<WebsocketUrl> {
+ if endpoint.ends_with(":80") {
+ let len = endpoint.len();
+
+ endpoint.truncate(len - 3);
+ }
+
+ WebsocketUrl::parse(&format!("wss://{}", endpoint))
+ .or(Err(Error::Voice(VoiceError::EndpointUrl)))
+}
+
+#[inline]
+fn encryption_key(receiver: &mut WsReceiver<WebSocketStream>)
+ -> Result<Key> {
+ loop {
+ match receiver.recv_json(VoiceEvent::decode)? {
+ VoiceEvent::Ready(ready) => {
+ if ready.mode != CRYPTO_MODE {
+ return Err(Error::Voice(VoiceError::VoiceModeInvalid));
+ }
+
+ return Key::from_slice(&ready.secret_key)
+ .ok_or(Error::Voice(VoiceError::KeyGen));
+ },
+ VoiceEvent::Unknown(op, value) => {
+ debug!("[Voice] Expected ready for key; got: op{}/v{:?}",
+ op.num(),
+ value);
+ },
+ _ => {},
+ }
+ }
+}
+
+#[inline]
+fn has_valid_mode(modes: &[String]) -> bool {
+ modes.iter().any(|s| s == CRYPTO_MODE)
+}
+
+#[inline]
+fn start_threads(mut receiver: WsReceiver<WebSocketStream>, udp: &UdpSocket)
+ -> Result<ThreadItems> {
+ let (udp_close_sender, udp_close_reader) = mpsc::channel();
+ let (ws_close_sender, ws_close_reader) = mpsc::channel();
+
+ let current_thread = thread::current();
+ let thread_name = current_thread.name().unwrap_or("serenity voice");
+
+ let (tx, rx) = mpsc::channel();
+ let tx_clone = tx.clone();
+ let udp_clone = udp.try_clone()?;
+
+ let udp_thread = ThreadBuilder::new()
+ .name(format!("{} UDP", thread_name))
+ .spawn(move || {
+ let _ = udp_clone.set_read_timeout(Some(Duration::from_millis(250)));
+
+ let mut buffer = [0; 512];
+
+ loop {
+ if let Ok((len, _)) = udp_clone.recv_from(&mut buffer) {
+ let piece = buffer[..len].to_vec();
+ let send = tx.send(ReceiverStatus::Udp(piece));
+
+ if send.is_err() {
+ return;
+ }
+ } else if udp_close_reader.try_recv().is_ok() {
+ return;
+ }
+ }
+ })?;
+
+ let ws_thread = ThreadBuilder::new()
+ .name(format!("{} WS", thread_name))
+ .spawn(move || {
+ loop {
+ while let Ok(msg) = receiver.recv_json(VoiceEvent::decode) {
+ if tx_clone.send(ReceiverStatus::Websocket(msg)).is_ok() {
+ return;
+ }
+ }
+
+ if ws_close_reader.try_recv().is_ok() {
+ return;
+ }
+
+ thread::sleep(Duration::from_millis(25));
+ }
+ })?;
+
+ Ok(ThreadItems {
+ rx: rx,
+ udp_close_sender: udp_close_sender,
+ udp_thread: udp_thread,
+ ws_close_sender: ws_close_sender,
+ ws_thread: ws_thread,
+ })
+}