diff options
| author | Zeyla Hellyer <[email protected]> | 2018-01-18 08:33:27 -0800 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2018-01-18 08:33:27 -0800 |
| commit | 9232b8f065deb4637a74e7f85ab617bb527c51be (patch) | |
| tree | 9c2cc2bd6b6238a1bd039e9ad4900e8705197af2 /src | |
| parent | Fix travis' cache by chmoding $HOME/.cargo (#252) (diff) | |
| download | serenity-9232b8f065deb4637a74e7f85ab617bb527c51be.tar.xz serenity-9232b8f065deb4637a74e7f85ab617bb527c51be.zip | |
Use an InterMessage to communicate over gateway
Instead of communicating over the gateway in a split form of a
`serde_json::Value` or a `client::bridge::gateway::ShardClientMessage`,
wrap them both into a single enum for better interaction between the
client, gateway, and voice modules.
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/bridge/gateway/mod.rs | 5 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 4 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_messenger.rs | 9 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 22 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner_message.rs | 1 | ||||
| -rw-r--r-- | src/client/context.rs | 5 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 10 | ||||
| -rw-r--r-- | src/framework/standard/mod.rs | 2 | ||||
| -rw-r--r-- | src/gateway/mod.rs | 16 | ||||
| -rw-r--r-- | src/gateway/shard.rs | 8 | ||||
| -rw-r--r-- | src/voice/handler.rs | 18 | ||||
| -rw-r--r-- | src/voice/manager.rs | 6 |
12 files changed, 71 insertions, 35 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs index 2907877..06f5ea4 100644 --- a/src/client/bridge/gateway/mod.rs +++ b/src/client/bridge/gateway/mod.rs @@ -66,12 +66,13 @@ pub use self::shard_runner_message::ShardRunnerMessage; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::mpsc::Sender; use std::time::Duration as StdDuration; -use ::gateway::ConnectionStage; +use ::gateway::{ConnectionStage, InterMessage}; /// A message either for a [`ShardManager`] or a [`ShardRunner`]. /// /// [`ShardManager`]: struct.ShardManager.html /// [`ShardRunner`]: struct.ShardRunner.html +#[derive(Clone, Debug)] pub enum ShardClientMessage { /// A message intended to be worked with by a [`ShardManager`]. /// @@ -155,7 +156,7 @@ pub struct ShardRunnerInfo { pub latency: Option<StdDuration>, /// The channel used to communicate with the shard runner, telling it /// what to do with regards to its status. - pub runner_tx: Sender<ShardClientMessage>, + pub runner_tx: Sender<InterMessage>, /// The current connection stage of the shard. pub stage: ConnectionStage, } diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index c3c091c..ea3ad1e 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,3 +1,4 @@ +use gateway::InterMessage; use internal::prelude::*; use parking_lot::Mutex; use std::collections::{HashMap, VecDeque}; @@ -264,7 +265,8 @@ impl ShardManager { if let Some(runner) = self.runners.lock().get(&shard_id) { let shutdown = ShardManagerMessage::Shutdown(shard_id); - let msg = ShardClientMessage::Manager(shutdown); + let client_msg = ShardClientMessage::Manager(shutdown); + let msg = InterMessage::Client(client_msg); if let Err(why) = runner.runner_tx.send(msg) { warn!( diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs index a4b63f3..b2a5ce1 100644 --- a/src/client/bridge/gateway/shard_messenger.rs +++ b/src/client/bridge/gateway/shard_messenger.rs @@ -1,3 +1,4 @@ +use gateway::InterMessage; use model::prelude::*; use super::{ShardClientMessage, ShardRunnerMessage}; use std::sync::mpsc::{SendError, Sender}; @@ -14,7 +15,7 @@ use websocket::message::OwnedMessage; /// [`shutdown`]: #method.shutdown #[derive(Clone, Debug)] pub struct ShardMessenger { - tx: Sender<ShardClientMessage>, + tx: Sender<InterMessage>, } impl ShardMessenger { @@ -24,7 +25,7 @@ impl ShardMessenger { /// /// [`Client`]: ../../struct.Client.html #[inline] - pub fn new(tx: Sender<ShardClientMessage>) -> Self { + pub fn new(tx: Sender<InterMessage>) -> Self { Self { tx, } @@ -270,7 +271,7 @@ impl ShardMessenger { #[inline] fn send(&self, msg: ShardRunnerMessage) - -> Result<(), SendError<ShardClientMessage>> { - self.tx.send(ShardClientMessage::Runner(msg)) + -> Result<(), SendError<InterMessage>> { + self.tx.send(InterMessage::Client(ShardClientMessage::Runner(msg))) } } diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 8781149..ba9e00e 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -1,6 +1,6 @@ -use gateway::{ReconnectType, Shard, ShardAction}; +use gateway::{InterMessage, ReconnectType, Shard, ShardAction}; use internal::prelude::*; -use internal::ws_impl::ReceiverExt; +use internal::ws_impl::{ReceiverExt, SenderExt}; use model::event::{Event, GatewayEvent}; use parking_lot::Mutex; use serde::Deserialize; @@ -17,8 +17,6 @@ use websocket::WebSocketError; #[cfg(feature = "framework")] use framework::Framework; -#[cfg(feature = "voice")] -use internal::ws_impl::SenderExt; /// A runner for managing a [`Shard`] and its respective WebSocket client. /// @@ -30,9 +28,9 @@ pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { framework: Arc<Mutex<Option<Box<Framework + Send>>>>, manager_tx: Sender<ShardManagerMessage>, // channel to receive messages from the shard manager and dispatches - runner_rx: Receiver<ShardClientMessage>, + runner_rx: Receiver<InterMessage>, // channel to send messages to the shard runner from the shard manager - runner_tx: Sender<ShardClientMessage>, + runner_tx: Sender<InterMessage>, shard: Shard, threadpool: ThreadPool, } @@ -181,7 +179,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { } /// Clones the internal copy of the Sender to the shard runner. - pub(super) fn runner_tx(&self) -> Sender<ShardClientMessage> { + pub(super) fn runner_tx(&self) -> Sender<InterMessage> { self.runner_tx.clone() } @@ -250,9 +248,9 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { // // This always returns true, except in the case that the shard manager asked // the runner to shutdown. - fn handle_rx_value(&mut self, value: ShardClientMessage) -> bool { + fn handle_rx_value(&mut self, value: InterMessage) -> bool { match value { - ShardClientMessage::Manager(x) => match x { + InterMessage::Client(ShardClientMessage::Manager(x)) => match x { ShardManagerMessage::Restart(id) | ShardManagerMessage::Shutdown(id) => { self.checked_shutdown(id) @@ -273,7 +271,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { true }, }, - ShardClientMessage::Runner(x) => match x { + InterMessage::Client(ShardClientMessage::Runner(x)) => match x { ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { self.shard.chunk_guilds( guild_ids, @@ -320,6 +318,10 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { self.shard.update_presence().is_ok() }, }, + InterMessage::Json(value) => { + // Value must be forwarded over the websocket + self.shard.client.send_json(&value).is_ok() + }, } } diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs index 5c5512d..d6a18f5 100644 --- a/src/client/bridge/gateway/shard_runner_message.rs +++ b/src/client/bridge/gateway/shard_runner_message.rs @@ -4,6 +4,7 @@ use model::id::GuildId; use websocket::message::OwnedMessage; /// A message to send from a shard over a WebSocket. +#[derive(Clone, Debug)] pub enum ShardRunnerMessage { /// Indicates that the client is to send a member chunk message. ChunkGuilds { diff --git a/src/client/context.rs b/src/client/context.rs index 0f66e78..f44225f 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -1,4 +1,5 @@ -use client::bridge::gateway::{ShardClientMessage, ShardMessenger}; +use client::bridge::gateway::ShardMessenger; +use gateway::InterMessage; use model::prelude::*; use parking_lot::Mutex; use std::sync::mpsc::Sender; @@ -48,7 +49,7 @@ impl Context { /// Create a new Context to be passed to an event handler. pub(crate) fn new( data: Arc<Mutex<ShareMap>>, - runner_tx: Sender<ShardClientMessage>, + runner_tx: Sender<InterMessage>, shard_id: u64, ) -> Context { Context { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 86ea693..bdd55d6 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,9 +1,9 @@ +use gateway::InterMessage; use model::event::Event; use model::channel::{Channel, Message}; use std::sync::Arc; use parking_lot::Mutex; use super::bridge::gateway::event::ClientEvent; -use super::bridge::gateway::ShardClientMessage; use super::event_handler::EventHandler; use super::Context; use std::sync::mpsc::Sender; @@ -40,7 +40,7 @@ macro_rules! now { fn context( data: &Arc<Mutex<ShareMap>>, - runner_tx: &Sender<ShardClientMessage>, + runner_tx: &Sender<InterMessage>, shard_id: u64, ) -> Context { Context::new(Arc::clone(data), runner_tx.clone(), shard_id) @@ -58,7 +58,7 @@ pub(crate) fn dispatch<H: EventHandler + Send + Sync + 'static>( framework: &Arc<Mutex<Option<Box<Framework + Send>>>>, data: &Arc<Mutex<ShareMap>>, event_handler: &Arc<H>, - runner_tx: &Sender<ShardClientMessage>, + runner_tx: &Sender<InterMessage>, threadpool: &ThreadPool, shard_id: u64, ) { @@ -92,7 +92,7 @@ pub(crate) fn dispatch<H: EventHandler + Send + Sync + 'static>( event: DispatchEvent, data: &Arc<Mutex<ShareMap>>, event_handler: &Arc<H>, - runner_tx: &Sender<ShardClientMessage>, + runner_tx: &Sender<InterMessage>, threadpool: &ThreadPool, shard_id: u64, ) { @@ -136,7 +136,7 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>( event: DispatchEvent, data: &Arc<Mutex<ShareMap>>, event_handler: &Arc<H>, - runner_tx: &Sender<ShardClientMessage>, + runner_tx: &Sender<InterMessage>, threadpool: &ThreadPool, shard_id: u64, ) { diff --git a/src/framework/standard/mod.rs b/src/framework/standard/mod.rs index 4e43cf8..6a03344 100644 --- a/src/framework/standard/mod.rs +++ b/src/framework/standard/mod.rs @@ -810,7 +810,7 @@ impl StandardFramework { /// /// client.with_framework(StandardFramework::new() /// .before(|ctx, msg, cmd_name| { - /// println!("Running command {}", cmd_name); +/// println!("Running command {}", cmd_name); /// true /// })); /// ``` diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 8445b29..c4748a0 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -59,10 +59,14 @@ pub use self::ws_client_ext::WebSocketGatewayClientExt; use model::gateway::Game; use model::user::OnlineStatus; +use serde_json::Value; use std::fmt::{Display, Formatter, Result as FmtResult}; use websocket::sync::client::Client; use websocket::sync::stream::{TcpStream, TlsStream}; +#[cfg(feature = "client")] +use client::bridge::gateway::ShardClientMessage; + pub type CurrentPresence = (Option<Game>, OnlineStatus); pub type WsClient = Client<TlsStream<TcpStream>>; @@ -163,6 +167,18 @@ impl Display for ConnectionStage { } } +/// A message to be passed around within the library. +/// +/// As a user you usually don't need to worry about this, but when working with +/// the lower-level internals of the `client`, `gateway, and `voice` modules it +/// may be necessary. +#[derive(Clone, Debug)] +pub enum InterMessage { + #[cfg(feature = "client")] + Client(ShardClientMessage), + Json(Value), +} + pub enum ShardAction { Heartbeat, Identify, diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index a890029..af7b76b 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -26,6 +26,8 @@ use serde_json::Value; #[cfg(feature = "voice")] use std::sync::mpsc::{self, Receiver as MpscReceiver}; #[cfg(feature = "voice")] +use super::InterMessage; +#[cfg(feature = "voice")] use voice::Manager as VoiceManager; #[cfg(feature = "voice")] use http; @@ -94,7 +96,8 @@ pub struct Shard { /// update the voice connections' states. #[cfg(feature = "voice")] pub manager: VoiceManager, - #[cfg(feature = "voice")] manager_rx: MpscReceiver<Value>, + #[cfg(feature = "voice")] + manager_rx: MpscReceiver<InterMessage>, } impl Shard { @@ -684,9 +687,10 @@ impl Shard { pub(crate) fn cycle_voice_recv(&mut self) -> Vec<Value> { let mut messages = vec![]; - while let Ok(v) = self.manager_rx.try_recv() { + while let Ok(InterMessage::Json(v)) = self.manager_rx.try_recv() { messages.push(v); } + self.shutdown = true; debug!("[Shard {:?}] Cleanly shutdown shard", self.shard_info); diff --git a/src/voice/handler.rs b/src/voice/handler.rs index 94f7c07..d4085d5 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -1,7 +1,7 @@ use constants::VoiceOpCode; +use gateway::InterMessage; use model::id::{ChannelId, GuildId, UserId}; use model::voice::VoiceState; -use serde_json::Value; use std::sync::mpsc::{self, Sender as MpscSender}; use super::connection_info::ConnectionInfo; use super::{AudioReceiver, AudioSource, Status as VoiceStatus, threading}; @@ -98,13 +98,17 @@ pub struct Handler { /// /// When set via [`standalone`][`Handler::standalone`], it will not be /// present. - ws: Option<MpscSender<Value>>, + ws: Option<MpscSender<InterMessage>>, } impl Handler { /// Creates a new Handler. #[inline] - pub(crate) fn new(guild_id: GuildId, ws: MpscSender<Value>, user_id: UserId) -> Self { + pub(crate) fn new( + guild_id: GuildId, + ws: MpscSender<InterMessage>, + user_id: UserId, + ) -> Self { Self::new_raw(guild_id, Some(ws), user_id) } @@ -346,7 +350,11 @@ impl Handler { } } - fn new_raw(guild_id: GuildId, ws: Option<MpscSender<Value>>, user_id: UserId) -> Self { + fn new_raw( + guild_id: GuildId, + ws: Option<MpscSender<InterMessage>>, + user_id: UserId, + ) -> Self { let (tx, rx) = mpsc::channel(); threading::start(guild_id, rx); @@ -407,7 +415,7 @@ impl Handler { } }); - let _ = ws.send(map); + let _ = ws.send(InterMessage::Json(map)); } } } diff --git a/src/voice/manager.rs b/src/voice/manager.rs index 4b8e43e..a6bcd2f 100644 --- a/src/voice/manager.rs +++ b/src/voice/manager.rs @@ -1,5 +1,5 @@ +use gateway::InterMessage; use model::id::{ChannelId, GuildId, UserId}; -use serde_json::Value; use std::collections::HashMap; use std::sync::mpsc::Sender as MpscSender; use super::Handler; @@ -24,11 +24,11 @@ use super::Handler; pub struct Manager { handlers: HashMap<GuildId, Handler>, user_id: UserId, - ws: MpscSender<Value>, + ws: MpscSender<InterMessage>, } impl Manager { - pub(crate) fn new(ws: MpscSender<Value>, user_id: UserId) -> Manager { + pub(crate) fn new(ws: MpscSender<InterMessage>, user_id: UserId) -> Manager { Manager { handlers: HashMap::new(), user_id: user_id, |