diff options
| author | Zeyla Hellyer <[email protected]> | 2018-05-28 16:34:38 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2018-05-28 16:34:38 -0700 |
| commit | 6b5f3b98084b86b00e3f7e78b5eb9512e75e78a0 (patch) | |
| tree | 4011d56b63d88999eb8169e332c54f3eafe972ae /src/client | |
| parent | Make Message Builder use &mut self instead of self (diff) | |
| parent | Futures shard manager #298 (WIP) (#300) (diff) | |
| download | serenity-6b5f3b98084b86b00e3f7e78b5eb9512e75e78a0.tar.xz serenity-6b5f3b98084b86b00e3f7e78b5eb9512e75e78a0.zip | |
Merge branch 'futures' into v0.6.x
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/event.rs | 29 | ||||
| -rw-r--r-- | src/client/bridge/gateway/mod.rs | 168 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 364 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager_monitor.rs | 66 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_messenger.rs | 277 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_queuer.rs | 207 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 496 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner_message.rs | 48 | ||||
| -rw-r--r-- | src/client/bridge/mod.rs | 13 | ||||
| -rw-r--r-- | src/client/bridge/voice/mod.rs | 117 | ||||
| -rw-r--r-- | src/client/context.rs | 436 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 647 | ||||
| -rw-r--r-- | src/client/event_handler.rs | 290 | ||||
| -rw-r--r-- | src/client/mod.rs | 1032 | ||||
| -rw-r--r-- | src/client/shard_manager.rs | 62 |
15 files changed, 197 insertions, 4055 deletions
diff --git a/src/client/bridge/gateway/event.rs b/src/client/bridge/gateway/event.rs deleted file mode 100644 index a967ab5..0000000 --- a/src/client/bridge/gateway/event.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! A collection of events created by the client, not a part of the Discord API -//! itself. - -use super::ShardId; -use ::gateway::ConnectionStage; - -#[derive(Clone, Debug)] -pub(crate) enum ClientEvent { - ShardStageUpdate(ShardStageUpdateEvent), -} - -/// An event denoting that a shard's connection stage was changed. -/// -/// # Examples -/// -/// This might happen when a shard changes from [`ConnectionStage::Identifying`] -/// to [`ConnectionStage::Connected`]. -/// -/// [`ConnectionStage::Connected`]: ../../../../gateway/enum.ConnectionStage.html#variant.Connected -/// [`ConnectionStage::Identifying`]: ../../../../gateway/enum.ConnectionStage.html#variant.Identifying -#[derive(Clone, Debug)] -pub struct ShardStageUpdateEvent { - /// The new connection stage. - pub new: ConnectionStage, - /// The old connection stage. - pub old: ConnectionStage, - /// The ID of the shard that had its connection stage change. - pub shard_id: ShardId, -} diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs deleted file mode 100644 index 16f0f07..0000000 --- a/src/client/bridge/gateway/mod.rs +++ /dev/null @@ -1,168 +0,0 @@ -//! The client gateway bridge is support essential for the [`client`] module. -//! -//! This is made available for user use if one wishes to be lower-level or avoid -//! the higher functionality of the [`Client`]. -//! -//! Of interest are three pieces: -//! -//! ### [`ShardManager`] -//! -//! The shard manager is responsible for being a clean interface between the -//! user and the shard runners, providing essential functions such as -//! [`ShardManager::shutdown`] to shutdown a shard and [`ShardManager::restart`] -//! to restart a shard. -//! -//! If you are using the `Client`, this is likely the only piece of interest to -//! you. Refer to [its documentation][`ShardManager`] for more information. -//! -//! ### [`ShardQueuer`] -//! -//! The shard queuer is a light wrapper around an mpsc receiver that receives -//! [`ShardManagerMessage`]s. It should be run in its own thread so it can -//! receive messages to start shards in a queue. -//! -//! Refer to [its documentation][`ShardQueuer`] for more information. -//! -//! ### [`ShardRunner`] -//! -//! The shard runner is responsible for actually running a shard and -//! communicating with its respective WebSocket client. -//! -//! It is performs all actions such as sending a presence update over the client -//! and, with the help of the [`Shard`], will be able to determine what to do. -//! This is, for example, whether to reconnect, resume, or identify with the -//! gateway. -//! -//! ### In Conclusion -//! -//! For almost every - if not every - use case, you only need to _possibly_ be -//! concerned about the [`ShardManager`] in this module. -//! -//! [`Client`]: ../../struct.Client.html -//! [`client`]: ../.. -//! [`Shard`]: ../../../gateway/struct.Shard.html -//! [`ShardManager`]: struct.ShardManager.html -//! [`ShardManager::restart`]: struct.ShardManager.html#method.restart -//! [`ShardManager::shutdown`]: struct.ShardManager.html#method.shutdown -//! [`ShardQueuer`]: struct.ShardQueuer.html -//! [`ShardRunner`]: struct.ShardRunner.html - -pub mod event; - -mod shard_manager; -mod shard_manager_monitor; -mod shard_messenger; -mod shard_queuer; -mod shard_runner; -mod shard_runner_message; - -pub use self::shard_manager::{ShardManager, ShardManagerOptions}; -pub use self::shard_manager_monitor::ShardManagerMonitor; -pub use self::shard_messenger::ShardMessenger; -pub use self::shard_queuer::ShardQueuer; -pub use self::shard_runner::{ShardRunner, ShardRunnerOptions}; -pub use self::shard_runner_message::ShardRunnerMessage; - -use std::{ - fmt::{ - Display, - Formatter, - Result as FmtResult - }, - sync::mpsc::Sender, - time::Duration as StdDuration -}; -use ::gateway::{ConnectionStage, InterMessage}; - -/// A message either for a [`ShardManager`] or a [`ShardRunner`]. -/// -/// [`ShardManager`]: struct.ShardManager.html -/// [`ShardRunner`]: struct.ShardRunner.html -#[derive(Clone, Debug)] -pub enum ShardClientMessage { - /// A message intended to be worked with by a [`ShardManager`]. - /// - /// [`ShardManager`]: struct.ShardManager.html - Manager(ShardManagerMessage), - /// A message intended to be worked with by a [`ShardRunner`]. - /// - /// [`ShardRunner`]: struct.ShardRunner.html - Runner(ShardRunnerMessage), -} - -/// A message for a [`ShardManager`] relating to an operation with a shard. -/// -/// [`ShardManager`]: struct.ShardManager.html -#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub enum ShardManagerMessage { - /// Indicator that a [`ShardManagerMonitor`] should restart a shard. - /// - /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html - Restart(ShardId), - /// An update from a shard runner, - ShardUpdate { - id: ShardId, - latency: Option<StdDuration>, - stage: ConnectionStage, - }, - /// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard - /// without bringing it back up. - /// - /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html - Shutdown(ShardId), - /// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards - /// and end its monitoring process for the [`ShardManager`]. - /// - /// [`ShardManager`]: struct.ShardManager.html - /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html - ShutdownAll, - /// Indicator that a [`ShardManager`] has initiated a shutdown, and for the - /// component that receives this to also shutdown with no further action - /// taken. - ShutdownInitiated, -} - -/// A message to be sent to the [`ShardQueuer`]. -/// -/// This should usually be wrapped in a [`ShardClientMessage`]. -/// -/// [`ShardClientMessage`]: enum.ShardClientMessage.html -/// [`ShardQueuer`]: enum.ShardQueuer.html -#[derive(Clone, Debug)] -pub enum ShardQueuerMessage { - /// Message to start a shard, where the 0-index element is the ID of the - /// Shard to start and the 1-index element is the total shards in use. - Start(ShardId, ShardId), - /// Message to shutdown the shard queuer. - Shutdown, -} - -/// A light tuplestruct wrapper around a u64 to verify type correctness when -/// working with the IDs of shards. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub struct ShardId(pub u64); - -impl Display for ShardId { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - write!(f, "{}", self.0) - } -} - -/// Information about a [`ShardRunner`]. -/// -/// The [`ShardId`] is not included because, as it stands, you probably already -/// know the Id if you obtained this. -/// -/// [`ShardId`]: struct.ShardId.html -/// [`ShardRunner`]: struct.ShardRunner.html -#[derive(Debug)] -pub struct ShardRunnerInfo { - /// The latency between when a heartbeat was sent and when the - /// acknowledgement was received. - pub latency: Option<StdDuration>, - /// The channel used to communicate with the shard runner, telling it - /// what to do with regards to its status. - pub runner_tx: Sender<InterMessage>, - /// The current connection stage of the shard. - pub stage: ConnectionStage, -} diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs deleted file mode 100644 index a6cea31..0000000 --- a/src/client/bridge/gateway/shard_manager.rs +++ /dev/null @@ -1,364 +0,0 @@ -use gateway::InterMessage; -use internal::prelude::*; -use parking_lot::Mutex; -use std::{ - collections::{HashMap, VecDeque}, - sync::{ - mpsc::{self, Sender}, - Arc - }, - thread -}; -use super::super::super::EventHandler; -use super::{ - ShardClientMessage, - ShardId, - ShardManagerMessage, - ShardManagerMonitor, - ShardQueuer, - ShardQueuerMessage, - ShardRunnerInfo, -}; -use threadpool::ThreadPool; -use typemap::ShareMap; - -#[cfg(feature = "framework")] -use framework::Framework; -#[cfg(feature = "voice")] -use client::bridge::voice::ClientVoiceManager; - -/// A manager for handling the status of shards by starting them, restarting -/// them, and stopping them when required. -/// -/// **Note**: The [`Client`] internally uses a shard manager. If you are using a -/// Client, then you do not need to make one of these. -/// -/// # Examples -/// -/// Initialize a shard manager with a framework responsible for shards 0 through -/// 2, of 5 total shards: -/// -/// ```rust,no_run -/// extern crate parking_lot; -/// extern crate serenity; -/// extern crate threadpool; -/// extern crate typemap; -/// -/// # use std::error::Error; -/// # -/// # #[cfg(feature = "voice")] -/// # use serenity::client::bridge::voice::ClientVoiceManager; -/// # #[cfg(feature = "voice")] -/// # use serenity::model::id::UserId; -/// # -/// # #[cfg(feature = "framework")] -/// # fn try_main() -> Result<(), Box<Error>> { -/// # -/// use parking_lot::Mutex; -/// use serenity::client::bridge::gateway::{ShardManager, ShardManagerOptions}; -/// use serenity::client::EventHandler; -/// use serenity::http; -/// use std::sync::Arc; -/// use std::env; -/// use threadpool::ThreadPool; -/// use typemap::ShareMap; -/// -/// struct Handler; -/// -/// impl EventHandler for Handler { } -/// -/// let token = env::var("DISCORD_TOKEN")?; -/// http::set_token(&token); -/// let token = Arc::new(Mutex::new(token)); -/// -/// let gateway_url = Arc::new(Mutex::new(http::get_gateway()?.url)); -/// let data = Arc::new(Mutex::new(ShareMap::custom())); -/// let event_handler = Arc::new(Handler); -/// let framework = Arc::new(Mutex::new(None)); -/// let threadpool = ThreadPool::with_name("my threadpool".to_owned(), 5); -/// -/// ShardManager::new(ShardManagerOptions { -/// data: &data, -/// event_handler: &event_handler, -/// framework: &framework, -/// // the shard index to start initiating from -/// shard_index: 0, -/// // the number of shards to initiate (this initiates 0, 1, and 2) -/// shard_init: 3, -/// // the total number of shards in use -/// shard_total: 5, -/// token: &token, -/// threadpool, -/// # #[cfg(feature = "voice")] -/// # voice_manager: &Arc::new(Mutex::new(ClientVoiceManager::new(0, UserId(0)))), -/// ws_url: &gateway_url, -/// }); -/// # Ok(()) -/// # } -/// # -/// # #[cfg(not(feature = "framework"))] -/// # fn try_main() -> Result<(), Box<Error>> { -/// # Ok(()) -/// # } -/// # -/// # fn main() { -/// # try_main().unwrap(); -/// # } -/// ``` -/// -/// [`Client`]: ../../struct.Client.html -#[derive(Debug)] -pub struct ShardManager { - monitor_tx: Sender<ShardManagerMessage>, - /// The shard runners currently managed. - /// - /// **Note**: It is highly unrecommended to mutate this yourself unless you - /// need to. Instead prefer to use methods on this struct that are provided - /// where possible. - pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, - /// The index of the first shard to initialize, 0-indexed. - shard_index: u64, - /// The number of shards to initialize. - shard_init: u64, - /// The total shards in use, 1-indexed. - shard_total: u64, - shard_queuer: Sender<ShardQueuerMessage>, -} - -impl ShardManager { - /// Creates a new shard manager, returning both the manager and a monitor - /// for usage in a separate thread. - pub fn new<'a, H>( - opt: ShardManagerOptions<'a, H>, - ) -> (Arc<Mutex<Self>>, ShardManagerMonitor) where H: EventHandler + Send + Sync + 'static { - let (thread_tx, thread_rx) = mpsc::channel(); - let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); - - let runners = Arc::new(Mutex::new(HashMap::new())); - - let mut shard_queuer = ShardQueuer { - data: Arc::clone(opt.data), - event_handler: Arc::clone(opt.event_handler), - #[cfg(feature = "framework")] - framework: Arc::clone(opt.framework), - last_start: None, - manager_tx: thread_tx.clone(), - queue: VecDeque::new(), - runners: Arc::clone(&runners), - rx: shard_queue_rx, - threadpool: opt.threadpool, - token: Arc::clone(opt.token), - #[cfg(feature = "voice")] - voice_manager: Arc::clone(opt.voice_manager), - ws_url: Arc::clone(opt.ws_url), - }; - - thread::spawn(move || { - shard_queuer.run(); - }); - - let manager = Arc::new(Mutex::new(Self { - monitor_tx: thread_tx, - shard_index: opt.shard_index, - shard_init: opt.shard_init, - shard_queuer: shard_queue_tx, - shard_total: opt.shard_total, - runners, - })); - - (Arc::clone(&manager), ShardManagerMonitor { - rx: thread_rx, - manager, - }) - } - - /// Returns whether the shard manager contains either an active instance of - /// a shard runner responsible for the given ID. - /// - /// If a shard has been queued but has not yet been initiated, then this - /// will return `false`. Consider double-checking [`is_responsible_for`] to - /// determine whether this shard manager is responsible for the given shard. - /// - /// [`is_responsible_for`]: #method.is_responsible_for - pub fn has(&self, shard_id: ShardId) -> bool { - self.runners.lock().contains_key(&shard_id) - } - - /// Initializes all shards that the manager is responsible for. - /// - /// This will communicate shard boots with the [`ShardQueuer`] so that they - /// are properly queued. - /// - /// [`ShardQueuer`]: struct.ShardQueuer.html - pub fn initialize(&mut self) -> Result<()> { - let shard_to = self.shard_index + self.shard_init; - - for shard_id in self.shard_index..shard_to { - let shard_total = self.shard_total; - - self.boot([ShardId(shard_id), ShardId(shard_total)]); - } - - Ok(()) - } - - /// Sets the new sharding information for the manager. - /// - /// This will shutdown all existing shards. - /// - /// This will _not_ instantiate the new shards. - pub fn set_shards(&mut self, index: u64, init: u64, total: u64) { - self.shutdown_all(); - - self.shard_index = index; - self.shard_init = init; - self.shard_total = total; - } - - /// Restarts a shard runner. - /// - /// This sends a shutdown signal to a shard's associated [`ShardRunner`], - /// and then queues a initialization of a shard runner for the same shard - /// via the [`ShardQueuer`]. - /// - /// # Examples - /// - /// Creating a client and then restarting a shard by ID: - /// - /// _(note: in reality this precise code doesn't have an effect since the - /// shard would not yet have been initialized via [`initialize`], but the - /// concept is the same)_ - /// - /// ```rust,no_run - /// use serenity::client::bridge::gateway::ShardId; - /// use serenity::client::{Client, EventHandler}; - /// use std::env; - /// - /// struct Handler; - /// - /// impl EventHandler for Handler { } - /// - /// let token = env::var("DISCORD_TOKEN").unwrap(); - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// // restart shard ID 7 - /// client.shard_manager.lock().restart(ShardId(7)); - /// ``` - /// - /// [`ShardQueuer`]: struct.ShardQueuer.html - /// [`ShardRunner`]: struct.ShardRunner.html - pub fn restart(&mut self, shard_id: ShardId) { - info!("Restarting shard {}", shard_id); - self.shutdown(shard_id); - - let shard_total = self.shard_total; - - self.boot([shard_id, ShardId(shard_total)]); - } - - /// Returns the [`ShardId`]s of the shards that have been instantiated and - /// currently have a valid [`ShardRunner`]. - /// - /// [`ShardId`]: struct.ShardId.html - /// [`ShardRunner`]: struct.ShardRunner.html - pub fn shards_instantiated(&self) -> Vec<ShardId> { - self.runners.lock().keys().cloned().collect() - } - - /// Attempts to shut down the shard runner by Id. - /// - /// Returns a boolean indicating whether a shard runner was present. This is - /// _not_ necessary an indicator of whether the shard runner was - /// successfully shut down. - /// - /// **Note**: If the receiving end of an mpsc channel - theoretically owned - /// by the shard runner - no longer exists, then the shard runner will not - /// know it should shut down. This _should never happen_. It may already be - /// stopped. - pub fn shutdown(&mut self, shard_id: ShardId) -> bool { - info!("Shutting down shard {}", shard_id); - - if let Some(runner) = self.runners.lock().get(&shard_id) { - let shutdown = ShardManagerMessage::Shutdown(shard_id); - let client_msg = ShardClientMessage::Manager(shutdown); - let msg = InterMessage::Client(client_msg); - - if let Err(why) = runner.runner_tx.send(msg) { - warn!( - "Failed to cleanly shutdown shard {}: {:?}", - shard_id, - why, - ); - } - } - - self.runners.lock().remove(&shard_id).is_some() - } - - /// Sends a shutdown message for all shards that the manager is responsible - /// for that are still known to be running. - /// - /// If you only need to shutdown a select number of shards, prefer looping - /// over the [`shutdown`] method. - /// - /// [`shutdown`]: #method.shutdown - pub fn shutdown_all(&mut self) { - let keys = { - let runners = self.runners.lock(); - - if runners.is_empty() { - return; - } - - runners.keys().cloned().collect::<Vec<_>>() - }; - - info!("Shutting down all shards"); - - for shard_id in keys { - self.shutdown(shard_id); - } - - let _ = self.shard_queuer.send(ShardQueuerMessage::Shutdown); - let _ = self.monitor_tx.send(ShardManagerMessage::ShutdownInitiated); - } - - fn boot(&mut self, shard_info: [ShardId; 2]) { - info!("Telling shard queuer to start shard {}", shard_info[0]); - - let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); - let _ = self.shard_queuer.send(msg); - } -} - -impl Drop for ShardManager { - /// A custom drop implementation to clean up after the manager. - /// - /// This shuts down all active [`ShardRunner`]s and attempts to tell the - /// [`ShardQueuer`] to shutdown. - /// - /// [`ShardQueuer`]: struct.ShardQueuer.html - /// [`ShardRunner`]: struct.ShardRunner.html - fn drop(&mut self) { - self.shutdown_all(); - - if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { - warn!("Failed to send shutdown to shard queuer: {:?}", why); - } - } -} - -pub struct ShardManagerOptions<'a, H: EventHandler + Send + Sync + 'static> { - pub data: &'a Arc<Mutex<ShareMap>>, - pub event_handler: &'a Arc<H>, - #[cfg(feature = "framework")] - pub framework: &'a Arc<Mutex<Option<Box<Framework + Send>>>>, - pub shard_index: u64, - pub shard_init: u64, - pub shard_total: u64, - pub threadpool: ThreadPool, - pub token: &'a Arc<Mutex<String>>, - #[cfg(feature = "voice")] - pub voice_manager: &'a Arc<Mutex<ClientVoiceManager>>, - pub ws_url: &'a Arc<Mutex<String>>, -} diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs deleted file mode 100644 index 16cc001..0000000 --- a/src/client/bridge/gateway/shard_manager_monitor.rs +++ /dev/null @@ -1,66 +0,0 @@ -use parking_lot::Mutex; -use std::sync::{ - mpsc::Receiver, - Arc -}; -use super::{ShardManager, ShardManagerMessage}; - -/// The shard manager monitor does what it says on the tin -- it monitors the -/// shard manager and performs actions on it as received. -/// -/// The monitor is essentially responsible for running in its own thread and -/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or -/// shutdown everything entirely. -/// -/// [`ShardManagerMessage`]: struct.ShardManagerMessage.html -#[derive(Debug)] -pub struct ShardManagerMonitor { - /// An clone of the Arc to the manager itself. - pub manager: Arc<Mutex<ShardManager>>, - /// The mpsc Receiver channel to receive shard manager messages over. - pub rx: Receiver<ShardManagerMessage>, -} - -impl ShardManagerMonitor { - /// "Runs" the monitor, waiting for messages over the Receiver. - /// - /// This should be called in its own thread due to its blocking, looped - /// nature. - /// - /// This will continue running until either: - /// - /// - a [`ShardManagerMessage::ShutdownAll`] has been received - /// - an error is returned while receiving a message from the - /// channel (probably indicating that the shard manager should stop anyway) - /// - /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll - pub fn run(&mut self) { - debug!("Starting shard manager worker"); - - while let Ok(value) = self.rx.recv() { - match value { - ShardManagerMessage::Restart(shard_id) => { - self.manager.lock().restart(shard_id); - }, - ShardManagerMessage::ShardUpdate { id, latency, stage } => { - let manager = self.manager.lock(); - let mut runners = manager.runners.lock(); - - runners.get_mut(&id).map(|runner| { - runner.latency = latency; - runner.stage = stage; - }); - } - ShardManagerMessage::Shutdown(shard_id) => { - self.manager.lock().shutdown(shard_id); - }, - ShardManagerMessage::ShutdownAll => { - self.manager.lock().shutdown_all(); - - break; - }, - ShardManagerMessage::ShutdownInitiated => break, - } - } - } -} diff --git a/src/client/bridge/gateway/shard_messenger.rs b/src/client/bridge/gateway/shard_messenger.rs deleted file mode 100644 index efec9fc..0000000 --- a/src/client/bridge/gateway/shard_messenger.rs +++ /dev/null @@ -1,277 +0,0 @@ -use gateway::InterMessage; -use model::prelude::*; -use super::{ShardClientMessage, ShardRunnerMessage}; -use std::sync::mpsc::{SendError, Sender}; -use websocket::message::OwnedMessage; - -/// A lightweight wrapper around an mpsc sender. -/// -/// This is used to cleanly communicate with a shard's respective -/// [`ShardRunner`]. This can be used for actions such as setting the activity -/// via [`set_activity`] or shutting down via [`shutdown`]. -/// -/// [`ShardRunner`]: struct.ShardRunner.html -/// [`set_activity`]: #method.set_activity -/// [`shutdown`]: #method.shutdown -#[derive(Clone, Debug)] -pub struct ShardMessenger { - tx: Sender<InterMessage>, -} - -impl ShardMessenger { - /// Creates a new shard messenger. - /// - /// If you are using the [`Client`], you do not need to do this. - /// - /// [`Client`]: ../../struct.Client.html - #[inline] - pub fn new(tx: Sender<InterMessage>) -> Self { - Self { - tx, - } - } - - /// Requests that one or multiple [`Guild`]s be chunked. - /// - /// This will ask the gateway to start sending member chunks for large - /// guilds (250 members+). If a guild is over 250 members, then a full - /// member list will not be downloaded, and must instead be requested to be - /// sent in "chunks" containing members. - /// - /// Member chunks are sent as the [`Event::GuildMembersChunk`] event. Each - /// chunk only contains a partial amount of the total members. - /// - /// If the `cache` feature is enabled, the cache will automatically be - /// updated with member chunks. - /// - /// # Examples - /// - /// Chunk a single guild by Id, limiting to 2000 [`Member`]s, and not - /// specifying a query parameter: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; - /// # - /// use serenity::model::id::GuildId; - /// - /// let guild_ids = vec![GuildId(81384788765712384)]; - /// - /// shard.chunk_guilds(guild_ids, Some(2000), None); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// Chunk a single guild by Id, limiting to 20 members, and specifying a - /// query parameter of `"do"`: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1])?; - /// # - /// use serenity::model::id::GuildId; - /// - /// let guild_ids = vec![GuildId(81384788765712384)]; - /// - /// shard.chunk_guilds(guild_ids, Some(20), Some("do")); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`Event::GuildMembersChunk`]: - /// ../../model/event/enum.Event.html#variant.GuildMembersChunk - /// [`Guild`]: ../../model/guild/struct.Guild.html - /// [`Member`]: ../../model/guild/struct.Member.html - pub fn chunk_guilds<It>( - &self, - guild_ids: It, - limit: Option<u16>, - query: Option<String>, - ) where It: IntoIterator<Item=GuildId> { - let guilds = guild_ids.into_iter().collect::<Vec<GuildId>>(); - - let _ = self.send(ShardRunnerMessage::ChunkGuilds { - guild_ids: guilds, - limit, - query, - }); - } - - /// Sets the user's current activity, if any. - /// - /// Other presence settings are maintained. - /// - /// # Examples - /// - /// Setting the current activity to playing `"Heroes of the Storm"`: - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::gateway::Activity; - /// - /// shard.set_activity(Some(Activity::playing("Heroes of the Storm"))); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn set_activity(&self, activity: Option<Activity>) { - let _ = self.send(ShardRunnerMessage::SetActivity(activity)); - } - - /// Sets the user's full presence information. - /// - /// Consider using the individual setters if you only need to modify one of - /// these. - /// - /// # Examples - /// - /// Set the current user as playing `"Heroes of the Storm"` and being - /// online: - /// - /// ```rust,ignore - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::{Activity, OnlineStatus}; - /// - /// shard.set_presence(Some(Activity::playing("Heroes of the Storm")), OnlineStatus::Online); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn set_presence(&self, activity: Option<Activity>, mut status: OnlineStatus) { - if status == OnlineStatus::Offline { - status = OnlineStatus::Invisible; - } - - let _ = self.send(ShardRunnerMessage::SetPresence(status, activity)); - } - - /// Sets the user's current online status. - /// - /// Note that [`Offline`] is not a valid online status, so it is - /// automatically converted to [`Invisible`]. - /// - /// Other presence settings are maintained. - /// - /// # Examples - /// - /// Setting the current online status for the shard to [`DoNotDisturb`]. - /// - /// ```rust,no_run - /// # extern crate parking_lot; - /// # extern crate serenity; - /// # - /// # use parking_lot::Mutex; - /// # use serenity::client::gateway::Shard; - /// # use std::error::Error; - /// # use std::sync::Arc; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// # let mutex = Arc::new(Mutex::new("".to_string())); - /// # - /// # let mut shard = Shard::new(mutex.clone(), mutex, [0, 1]).unwrap(); - /// # - /// use serenity::model::user::OnlineStatus; - /// - /// shard.set_status(OnlineStatus::DoNotDisturb); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`DoNotDisturb`]: ../../model/user/enum.OnlineStatus.html#variant.DoNotDisturb - /// [`Invisible`]: ../../model/user/enum.OnlineStatus.html#variant.Invisible - /// [`Offline`]: ../../model/user/enum.OnlineStatus.html#variant.Offline - pub fn set_status(&self, mut online_status: OnlineStatus) { - if online_status == OnlineStatus::Offline { - online_status = OnlineStatus::Invisible; - } - - let _ = self.send(ShardRunnerMessage::SetStatus(online_status)); - } - - /// Shuts down the websocket by attempting to cleanly close the - /// connection. - pub fn shutdown_clean(&self) { - let _ = self.send(ShardRunnerMessage::Close(1000, None)); - } - - /// Sends a raw message over the WebSocket. - /// - /// The given message is not mutated in any way, and is sent as-is. - /// - /// You should only use this if you know what you're doing. If you're - /// wanting to, for example, send a presence update, prefer the usage of - /// the [`set_presence`] method. - /// - /// [`set_presence`]: #method.set_presence - pub fn websocket_message(&self, message: OwnedMessage) { - let _ = self.send(ShardRunnerMessage::Message(message)); - } - - #[inline] - fn send(&self, msg: ShardRunnerMessage) - -> Result<(), SendError<InterMessage>> { - self.tx.send(InterMessage::Client(ShardClientMessage::Runner(msg))) - } -} diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs deleted file mode 100644 index edcb6ab..0000000 --- a/src/client/bridge/gateway/shard_queuer.rs +++ /dev/null @@ -1,207 +0,0 @@ -use gateway::Shard; -use internal::prelude::*; -use parking_lot::Mutex; -use std::{ - collections::{HashMap, VecDeque}, - sync::{ - mpsc::{ - Receiver, - RecvTimeoutError, - Sender}, - Arc - }, - thread, - time::{Duration, Instant} -}; -use super::super::super::EventHandler; -use super::{ - ShardId, - ShardManagerMessage, - ShardQueuerMessage, - ShardRunner, - ShardRunnerInfo, - ShardRunnerOptions, -}; -use threadpool::ThreadPool; -use typemap::ShareMap; -use ::gateway::ConnectionStage; - -#[cfg(feature = "voice")] -use client::bridge::voice::ClientVoiceManager; -#[cfg(feature = "framework")] -use framework::Framework; - -const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5; - -/// The shard queuer is a simple loop that runs indefinitely to manage the -/// startup of shards. -/// -/// A shard queuer instance _should_ be run in its own thread, due to the -/// blocking nature of the loop itself as well as a 5 second thread sleep -/// between shard starts. -pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> { - /// A copy of [`Client::data`] to be given to runners for contextual - /// dispatching. - /// - /// [`Client::data`]: ../../struct.Client.html#structfield.data - pub data: Arc<Mutex<ShareMap>>, - /// A reference to an `EventHandler`, such as the one given to the - /// [`Client`]. - /// - /// [`Client`]: ../../struct.Client.html - pub event_handler: Arc<H>, - /// A copy of the framework - #[cfg(feature = "framework")] - pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - /// The instant that a shard was last started. - /// - /// This is used to determine how long to wait between shard IDENTIFYs. - pub last_start: Option<Instant>, - /// A copy of the sender channel to communicate with the - /// [`ShardManagerMonitor`]. - /// - /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html - pub manager_tx: Sender<ShardManagerMessage>, - /// The shards that are queued for booting. - /// - /// This will typically be filled with previously failed boots. - pub queue: VecDeque<(u64, u64)>, - /// A copy of the map of shard runners. - pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>, - /// A receiver channel for the shard queuer to be told to start shards. - pub rx: Receiver<ShardQueuerMessage>, - /// A copy of a threadpool to give shard runners. - /// - /// For example, when using the [`Client`], this will be a copy of - /// [`Client::threadpool`]. - /// - /// [`Client`]: ../../struct.Client.html - /// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool - pub threadpool: ThreadPool, - /// A copy of the token to connect with. - pub token: Arc<Mutex<String>>, - /// A copy of the client's voice manager. - #[cfg(feature = "voice")] - pub voice_manager: Arc<Mutex<ClientVoiceManager>>, - /// A copy of the URI to use to connect to the gateway. - pub ws_url: Arc<Mutex<String>>, -} - -impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { - /// Begins the shard queuer loop. - /// - /// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s, - /// blocking for messages on what to do. - /// - /// If a [`ShardQueuerMessage::Start`] is received, this will: - /// - /// 1. Check how much time has passed since the last shard was started - /// 2. If the amount of time is less than the ratelimit, it will sleep until - /// that time has passed - /// 3. Start the shard by ID - /// - /// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and - /// the loop will be over. - /// - /// **Note**: This should be run in its own thread due to the blocking - /// nature of the loop. - /// - /// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html - /// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown - /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start - /// [`rx`]: #structfield.rx - pub fn run(&mut self) { - // The duration to timeout from reads over the Rx channel. This can be - // done in a loop, and if the read times out then a shard can be - // started if one is presently waiting in the queue. - let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); - - loop { - match self.rx.recv_timeout(wait_duration) { - Ok(ShardQueuerMessage::Shutdown) => break, - Ok(ShardQueuerMessage::Start(id, total)) => { - self.checked_start(id.0, total.0); - }, - Err(RecvTimeoutError::Disconnected) => { - // If the sender half has disconnected then the queuer's - // lifespan has passed and can shutdown. - break; - }, - Err(RecvTimeoutError::Timeout) => { - if let Some((id, total)) = self.queue.pop_front() { - self.checked_start(id, total); - } - } - } - } - } - - fn check_last_start(&mut self) { - let instant = match self.last_start { - Some(instant) => instant, - None => return, - }; - - // We must wait 5 seconds between IDENTIFYs to avoid session - // invalidations. - let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); - let elapsed = instant.elapsed(); - - if elapsed >= duration { - return; - } - - let to_sleep = duration - elapsed; - - thread::sleep(to_sleep); - } - - fn checked_start(&mut self, id: u64, total: u64) { - self.check_last_start(); - - if let Err(why) = self.start(id, total) { - warn!("Err starting shard {}: {:?}", id, why); - info!("Re-queueing start of shard {}", id); - - self.queue.push_back((id, total)); - } - - self.last_start = Some(Instant::now()); - } - - fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> { - let shard_info = [shard_id, shard_total]; - - let shard = Shard::new( - Arc::clone(&self.ws_url), - Arc::clone(&self.token), - shard_info, - )?; - - let mut runner = ShardRunner::new(ShardRunnerOptions { - data: Arc::clone(&self.data), - event_handler: Arc::clone(&self.event_handler), - #[cfg(feature = "framework")] - framework: Arc::clone(&self.framework), - manager_tx: self.manager_tx.clone(), - threadpool: self.threadpool.clone(), - #[cfg(feature = "voice")] - voice_manager: Arc::clone(&self.voice_manager), - shard, - }); - - let runner_info = ShardRunnerInfo { - latency: None, - runner_tx: runner.runner_tx(), - stage: ConnectionStage::Disconnected, - }; - - thread::spawn(move || { - let _ = runner.run(); - }); - - self.runners.lock().insert(ShardId(shard_id), runner_info); - - Ok(()) - } -} diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs deleted file mode 100644 index 0f244bd..0000000 --- a/src/client/bridge/gateway/shard_runner.rs +++ /dev/null @@ -1,496 +0,0 @@ -use gateway::{InterMessage, ReconnectType, Shard, ShardAction}; -use internal::prelude::*; -use internal::ws_impl::{ReceiverExt, SenderExt}; -use model::event::{Event, GatewayEvent}; -use parking_lot::Mutex; -use serde::Deserialize; -use std::sync::{ - mpsc::{ - self, - Receiver, - Sender, - TryRecvError - }, - Arc -}; -use super::super::super::dispatch::{DispatchEvent, dispatch}; -use super::super::super::EventHandler; -use super::event::{ClientEvent, ShardStageUpdateEvent}; -use super::{ShardClientMessage, ShardId, ShardManagerMessage, ShardRunnerMessage}; -use threadpool::ThreadPool; -use typemap::ShareMap; -use websocket::{ - message::{CloseData, OwnedMessage}, - WebSocketError -}; - -#[cfg(feature = "framework")] -use framework::Framework; -#[cfg(feature = "voice")] -use super::super::voice::ClientVoiceManager; - -/// A runner for managing a [`Shard`] and its respective WebSocket client. -/// -/// [`Shard`]: ../../../gateway/struct.Shard.html -pub struct ShardRunner<H: EventHandler + Send + Sync + 'static> { - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - #[cfg(feature = "framework")] - framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - manager_tx: Sender<ShardManagerMessage>, - // channel to receive messages from the shard manager and dispatches - runner_rx: Receiver<InterMessage>, - // channel to send messages to the shard runner from the shard manager - runner_tx: Sender<InterMessage>, - shard: Shard, - threadpool: ThreadPool, - #[cfg(feature = "voice")] - voice_manager: Arc<Mutex<ClientVoiceManager>>, -} - -impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { - /// Creates a new runner for a Shard. - pub fn new(opt: ShardRunnerOptions<H>) -> Self { - let (tx, rx) = mpsc::channel(); - - Self { - runner_rx: rx, - runner_tx: tx, - data: opt.data, - event_handler: opt.event_handler, - #[cfg(feature = "framework")] - framework: opt.framework, - manager_tx: opt.manager_tx, - shard: opt.shard, - threadpool: opt.threadpool, - #[cfg(feature = "voice")] - voice_manager: opt.voice_manager, - } - } - - /// Starts the runner's loop to receive events. - /// - /// This runs a loop that performs the following in each iteration: - /// - /// 1. checks the receiver for [`ShardRunnerMessage`]s, possibly from the - /// [`ShardManager`], and if there is one, acts on it. - /// - /// 2. checks if a heartbeat should be sent to the discord Gateway, and if - /// so, sends one. - /// - /// 3. attempts to retrieve a message from the WebSocket, processing it into - /// a [`GatewayEvent`]. This will block for 100ms before assuming there is - /// no message available. - /// - /// 4. Checks with the [`Shard`] to determine if the gateway event is - /// specifying an action to take (e.g. resuming, reconnecting, heartbeating) - /// and then performs that action, if any. - /// - /// 5. Dispatches the event via the Client. - /// - /// 6. Go back to 1. - /// - /// [`GatewayEvent`]: ../../../model/event/enum.GatewayEvent.html - /// [`Shard`]: ../../../gateway/struct.Shard.html - /// [`ShardManager`]: struct.ShardManager.html - /// [`ShardRunnerMessage`]: enum.ShardRunnerMessage.html - pub fn run(&mut self) -> Result<()> { - debug!("[ShardRunner {:?}] Running", self.shard.shard_info()); - - loop { - if !self.recv()? { - return Ok(()); - } - - // check heartbeat - if !self.shard.check_heartbeat() { - warn!( - "[ShardRunner {:?}] Error heartbeating", - self.shard.shard_info(), - ); - - return self.request_restart(); - } - - let pre = self.shard.stage(); - let (event, action, successful) = self.recv_event(); - let post = self.shard.stage(); - - if post != pre { - self.update_manager(); - - let e = ClientEvent::ShardStageUpdate(ShardStageUpdateEvent { - new: post, - old: pre, - shard_id: ShardId(self.shard.shard_info()[0]), - }); - self.dispatch(DispatchEvent::Client(e)); - } - - match action { - Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => { - return self.request_restart() - }, - Some(other) => { - let _ = self.action(&other); - }, - None => {}, - } - - if let Some(event) = event { - self.dispatch(DispatchEvent::Model(event)); - } - - if !successful && !self.shard.stage().is_connecting() { - return self.request_restart(); - } - } - } - - /// Clones the internal copy of the Sender to the shard runner. - pub(super) fn runner_tx(&self) -> Sender<InterMessage> { - self.runner_tx.clone() - } - - /// Takes an action that a [`Shard`] has determined should happen and then - /// does it. - /// - /// For example, if the shard says that an Identify message needs to be - /// sent, this will do that. - /// - /// # Errors - /// - /// Returns - fn action(&mut self, action: &ShardAction) -> Result<()> { - match *action { - ShardAction::Reconnect(ReconnectType::Reidentify) => { - self.request_restart() - }, - ShardAction::Reconnect(ReconnectType::Resume) => { - self.shard.resume() - }, - ShardAction::Heartbeat => self.shard.heartbeat(), - ShardAction::Identify => self.shard.identify(), - } - } - - // Checks if the ID received to shutdown is equivalent to the ID of the - // shard this runner is responsible. If so, it shuts down the WebSocket - // client. - // - // Returns whether the WebSocket client is still active. - // - // If true, the WebSocket client was _not_ shutdown. If false, it was. - fn checked_shutdown(&mut self, id: ShardId) -> bool { - // First verify the ID so we know for certain this runner is - // to shutdown. - if id.0 != self.shard.shard_info()[0] { - // Not meant for this runner for some reason, don't - // shutdown. - return true; - } - - let close_data = CloseData::new(1000, String::new()); - let msg = OwnedMessage::Close(Some(close_data)); - let _ = self.shard.client.send_message(&msg); - - false - } - - #[inline] - fn dispatch(&self, event: DispatchEvent) { - dispatch( - event, - #[cfg(feature = "framework")] - &self.framework, - &self.data, - &self.event_handler, - &self.runner_tx, - &self.threadpool, - self.shard.shard_info()[0], - ); - } - - // Handles a received value over the shard runner rx channel. - // - // Returns a boolean on whether the shard runner can continue. - // - // This always returns true, except in the case that the shard manager asked - // the runner to shutdown. - fn handle_rx_value(&mut self, value: InterMessage) -> bool { - match value { - InterMessage::Client(ShardClientMessage::Manager(x)) => match x { - ShardManagerMessage::Restart(id) | - ShardManagerMessage::Shutdown(id) => { - self.checked_shutdown(id) - }, - ShardManagerMessage::ShutdownAll => { - // This variant should never be received. - warn!( - "[ShardRunner {:?}] Received a ShutdownAll?", - self.shard.shard_info(), - ); - - true - }, - ShardManagerMessage::ShardUpdate { .. } - | ShardManagerMessage::ShutdownInitiated => { - // nb: not sent here - - true - }, - }, - InterMessage::Client(ShardClientMessage::Runner(x)) => match x { - ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query } => { - self.shard.chunk_guilds( - guild_ids, - limit, - query.as_ref().map(String::as_str), - ).is_ok() - }, - ShardRunnerMessage::Close(code, reason) => { - let reason = reason.unwrap_or_else(String::new); - let data = CloseData::new(code, reason); - let msg = OwnedMessage::Close(Some(data)); - - self.shard.client.send_message(&msg).is_ok() - }, - ShardRunnerMessage::Message(msg) => { - self.shard.client.send_message(&msg).is_ok() - }, - ShardRunnerMessage::SetActivity(activity) => { - // To avoid a clone of `activity`, we do a little bit of - // trickery here: - // - // First, we obtain a reference to the current presence of - // the shard, and create a new presence tuple of the new - // activity we received over the channel as well as the - // online status that the shard already had. - // - // We then (attempt to) send the websocket message with the - // status update, expressively returning: - // - // - whether the message successfully sent - // - the original activity we received over the channel - self.shard.set_activity(activity); - - self.shard.update_presence().is_ok() - }, - ShardRunnerMessage::SetPresence(status, activity) => { - self.shard.set_presence(status, activity); - - self.shard.update_presence().is_ok() - }, - ShardRunnerMessage::SetStatus(status) => { - self.shard.set_status(status); - - self.shard.update_presence().is_ok() - }, - }, - InterMessage::Json(value) => { - // Value must be forwarded over the websocket - self.shard.client.send_json(&value).is_ok() - }, - } - } - - #[cfg(feature = "voice")] - fn handle_voice_event(&self, event: &Event) { - match *event { - Event::Ready(_) => { - self.voice_manager.lock().set( - self.shard.shard_info()[0], - self.runner_tx.clone(), - ); - }, - Event::VoiceServerUpdate(ref event) => { - if let Some(guild_id) = event.guild_id { - let mut manager = self.voice_manager.lock(); - let mut search = manager.get_mut(guild_id); - - if let Some(handler) = search { - handler.update_server(&event.endpoint, &event.token); - } - } - }, - Event::VoiceStateUpdate(ref event) => { - if let Some(guild_id) = event.guild_id { - let mut manager = self.voice_manager.lock(); - let mut search = manager.get_mut(guild_id); - - if let Some(handler) = search { - handler.update_state(&event.voice_state); - } - } - }, - _ => {}, - } - } - - // Receives values over the internal shard runner rx channel and handles - // them. - // - // This will loop over values until there is no longer one. - // - // Requests a restart if the sending half of the channel disconnects. This - // should _never_ happen, as the sending half is kept on the runner. - - // Returns whether the shard runner is in a state that can continue. - fn recv(&mut self) -> Result<bool> { - loop { - match self.runner_rx.try_recv() { - Ok(value) => { - if !self.handle_rx_value(value) { - return Ok(false); - } - }, - Err(TryRecvError::Disconnected) => { - warn!( - "[ShardRunner {:?}] Sending half DC; restarting", - self.shard.shard_info(), - ); - - let _ = self.request_restart(); - - return Ok(false); - }, - Err(TryRecvError::Empty) => break, - } - } - - // There are no longer any values available. - - Ok(true) - } - - /// Returns a received event, as well as whether reading the potentially - /// present event was successful. - fn recv_event(&mut self) -> (Option<Event>, Option<ShardAction>, bool) { - let gw_event = match self.shard.client.recv_json() { - Ok(Some(value)) => { - GatewayEvent::deserialize(value).map(Some).map_err(From::from) - }, - Ok(None) => Ok(None), - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - { - let last = self.shard.last_heartbeat_ack(); - let interval = self.shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - return (None, None, true); - } - } else { - return (None, None, true); - } - } - - debug!("Attempting to auto-reconnect"); - - match self.shard.reconnection_type() { - ReconnectType::Reidentify => return (None, None, false), - ReconnectType::Resume => { - if let Err(why) = self.shard.resume() { - warn!("Failed to resume: {:?}", why); - - return (None, None, false); - } - }, - } - - return (None, None, true); - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - return (None, None, false); - }, - Err(why) => Err(why), - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => return (None, None, true), - Err(why) => Err(why), - }; - - let action = match self.shard.handle_event(&event) { - Ok(Some(action)) => Some(action), - Ok(None) => None, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - return (None, None, true); - }, - }; - - if let Ok(GatewayEvent::HeartbeatAck) = event { - self.update_manager(); - } - - #[cfg(feature = "voice")] - { - if let Ok(GatewayEvent::Dispatch(_, ref event)) = event { - self.handle_voice_event(&event); - } - } - - let event = match event { - Ok(GatewayEvent::Dispatch(_, event)) => Some(event), - _ => None, - }; - - (event, action, true) - } - - fn request_restart(&self) -> Result<()> { - self.update_manager(); - - debug!( - "[ShardRunner {:?}] Requesting restart", - self.shard.shard_info(), - ); - let shard_id = ShardId(self.shard.shard_info()[0]); - let msg = ShardManagerMessage::Restart(shard_id); - let _ = self.manager_tx.send(msg); - - #[cfg(feature = "voice")] - { - self.voice_manager.lock().manager_remove(&shard_id.0); - } - - Ok(()) - } - - fn update_manager(&self) { - let _ = self.manager_tx.send(ShardManagerMessage::ShardUpdate { - id: ShardId(self.shard.shard_info()[0]), - latency: self.shard.latency(), - stage: self.shard.stage(), - }); - } -} - -/// Options to be passed to [`ShardRunner::new`]. -/// -/// [`ShardRunner::new`]: struct.ShardRunner.html#method.new -pub struct ShardRunnerOptions<H: EventHandler + Send + Sync + 'static> { - pub data: Arc<Mutex<ShareMap>>, - pub event_handler: Arc<H>, - #[cfg(feature = "framework")] - pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - pub manager_tx: Sender<ShardManagerMessage>, - pub shard: Shard, - pub threadpool: ThreadPool, - #[cfg(feature = "voice")] - pub voice_manager: Arc<Mutex<ClientVoiceManager>>, -} diff --git a/src/client/bridge/gateway/shard_runner_message.rs b/src/client/bridge/gateway/shard_runner_message.rs deleted file mode 100644 index 2e262d8..0000000 --- a/src/client/bridge/gateway/shard_runner_message.rs +++ /dev/null @@ -1,48 +0,0 @@ -use model::{ - gateway::Activity, - id::GuildId, - user::OnlineStatus, -}; -use websocket::message::OwnedMessage; - -/// A message to send from a shard over a WebSocket. -#[derive(Clone, Debug)] -pub enum ShardRunnerMessage { - /// Indicates that the client is to send a member chunk message. - ChunkGuilds { - /// The IDs of the [`Guild`]s to chunk. - /// - /// [`Guild`]: ../../../model/guild/struct.Guild.html - guild_ids: Vec<GuildId>, - /// The maximum number of members to receive [`GuildMembersChunkEvent`]s - /// for. - /// - /// [`GuildMembersChunkEvent`]: ../../../model/event/GuildMembersChunkEvent.html - limit: Option<u16>, - /// Text to filter members by. - /// - /// For example, a query of `"s"` will cause only [`Member`]s whose - /// usernames start with `"s"` to be chunked. - /// - /// [`Member`]: ../../../model/guild/struct.Member.html - query: Option<String>, - }, - /// Indicates that the client is to close with the given status code and - /// reason. - /// - /// You should rarely - if _ever_ - need this, but the option is available. - /// Prefer to use the [`ShardManager`] to shutdown WebSocket clients if you - /// are intending to send a 1000 close code. - /// - /// [`ShardManager`]: struct.ShardManager.html - Close(u16, Option<String>), - /// Indicates that the client is to send a custom WebSocket message. - Message(OwnedMessage), - /// Indicates that the client is to update the shard's presence's activity. - SetActivity(Option<Activity>), - /// Indicates that the client is to update the shard's presence in its - /// entirity. - SetPresence(OnlineStatus, Option<Activity>), - /// Indicates that the client is to update the shard's presence's status. - SetStatus(OnlineStatus), -} diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs deleted file mode 100644 index 41fcdec..0000000 --- a/src/client/bridge/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! A collection of bridged support between the [`client`] module and other -//! modules. -//! -//! **Warning**: You likely _do not_ need to mess with anything in here. Beware. -//! This is lower-level functionality abstracted by the [`Client`]. -//! -//! [`Client`]: ../struct.Client.html -//! [`client`]: ../ - -pub mod gateway; - -#[cfg(feature = "voice")] -pub mod voice; diff --git a/src/client/bridge/voice/mod.rs b/src/client/bridge/voice/mod.rs deleted file mode 100644 index e67366b..0000000 --- a/src/client/bridge/voice/mod.rs +++ /dev/null @@ -1,117 +0,0 @@ -use gateway::InterMessage; -use std::collections::HashMap; -use std::sync::mpsc::Sender as MpscSender; -use ::model::id::{ChannelId, GuildId, UserId}; -use ::voice::{Handler, Manager}; -use ::utils; - -pub struct ClientVoiceManager { - managers: HashMap<u64, Manager>, - shard_count: u64, - user_id: UserId, -} - -impl ClientVoiceManager { - pub fn new(shard_count: u64, user_id: UserId) -> Self { - Self { - managers: HashMap::default(), - shard_count, - user_id, - } - } - - pub fn get<G: Into<GuildId>>(&self, guild_id: G) -> Option<&Handler> { - let (gid, sid) = self.manager_info(guild_id); - - self.managers.get(&sid)?.get(gid) - } - - pub fn get_mut<G: Into<GuildId>>(&mut self, guild_id: G) - -> Option<&mut Handler> { - let (gid, sid) = self.manager_info(guild_id); - - self.managers.get_mut(&sid)?.get_mut(gid) - } - - /// Refer to [`Manager::join`]. - /// - /// This is a shortcut to retrieving the inner [`Manager`] and then calling - /// its `join` method. - /// - /// [`Manager`]: ../../../voice/struct.Manager.html - /// [`Manager::join`]: ../../../voice/struct.Manager.html#method.join - pub fn join<C, G>(&mut self, guild_id: G, channel_id: C) - -> Option<&mut Handler> where C: Into<ChannelId>, G: Into<GuildId> { - let (gid, sid) = self.manager_info(guild_id); - - self.managers.get_mut(&sid).map(|manager| manager.join(gid, channel_id)) - } - - /// Refer to [`Manager::leave`]. - /// - /// This is a shortcut to retrieving the inner [`Manager`] and then calling - /// its `leave` method. - /// - /// [`Manager`]: ../../../voice/struct.Manager.html - /// [`Manager::leave`]: ../../../voice/struct.Manager.html#method.leave - pub fn leave<G: Into<GuildId>>(&mut self, guild_id: G) -> Option<()> { - let (gid, sid) = self.manager_info(guild_id); - - self.managers.get_mut(&sid).map(|manager| manager.leave(gid)) - } - - /// Refer to [`Manager::remove`]. - /// - /// This is a shortcut to retrieving the inner [`Manager`] and then calling - /// its `remove` method. - /// - /// [`Manager`]: ../../../voice/struct.Manager.html - /// [`Manager::remove`]: ../../../voice/struct.Manager.html#method.remove - pub fn remove<G: Into<GuildId>>(&mut self, guild_id: G) -> Option<()> { - let (gid, sid) = self.manager_info(guild_id); - - self.managers.get_mut(&sid).map(|manager| manager.leave(gid)) - } - - pub fn set(&mut self, shard_id: u64, sender: MpscSender<InterMessage>) { - self.managers.insert(shard_id, Manager::new(sender, self.user_id)); - } - - /// Sets the number of shards for the voice manager to use when calculating - /// guilds' shard numbers. - /// - /// You probably should not call this. - #[doc(hidden)] - pub fn set_shard_count(&mut self, shard_count: u64) { - self.shard_count = shard_count; - } - - /// Sets the ID of the user for the voice manager. - /// - /// You probably _really_ should not call this. - /// - /// But it's there if you need it. For some reason. - #[doc(hidden)] - pub fn set_user_id(&mut self, user_id: UserId) { - self.user_id = user_id; - } - - pub fn manager_get(&self, shard_id: &u64) -> Option<&Manager> { - self.managers.get(shard_id) - } - - pub fn manager_get_mut(&mut self, shard_id: &u64) -> Option<&mut Manager> { - self.managers.get_mut(shard_id) - } - - pub fn manager_remove(&mut self, shard_id: &u64) -> Option<Manager> { - self.managers.remove(shard_id) - } - - fn manager_info<G: Into<GuildId>>(&self, guild_id: G) -> (GuildId, u64) { - let guild_id = guild_id.into(); - let shard_id = utils::shard_id(guild_id.0, self.shard_count); - - (guild_id, shard_id) - } -} diff --git a/src/client/context.rs b/src/client/context.rs deleted file mode 100644 index 72ca372..0000000 --- a/src/client/context.rs +++ /dev/null @@ -1,436 +0,0 @@ -use client::bridge::gateway::ShardMessenger; -use gateway::InterMessage; -use model::prelude::*; -use parking_lot::Mutex; -use std::sync::{ - mpsc::Sender, - Arc -}; -use typemap::ShareMap; - -#[cfg(feature = "builder")] -use builder::EditProfile; -#[cfg(feature = "builder")] -use internal::prelude::*; -#[cfg(all(feature = "builder", feature = "cache"))] -use super::CACHE; -#[cfg(feature = "builder")] -use {Result, http}; -#[cfg(feature = "builder")] -use utils::{self, VecMap}; - -/// The context is a general utility struct provided on event dispatches, which -/// helps with dealing with the current "context" of the event dispatch. -/// The context also acts as a general high-level interface over the associated -/// [`Shard`] which received the event, or the low-level [`http`] module. -/// -/// The context contains "shortcuts", like for interacting with the shard. -/// Methods like [`set_activity`] will unlock the shard and perform an update for -/// you to save a bit of work. -/// -/// A context will only live for the event it was dispatched for. After the -/// event handler finished, it is destroyed and will not be re-used. -/// -/// [`Shard`]: ../gateway/struct.Shard.html -/// [`http`]: ../http/index.html -/// [`set_activity`]: #method.set_activity -#[derive(Clone)] -pub struct Context { - /// A clone of [`Client::data`]. Refer to its documentation for more - /// information. - /// - /// [`Client::data`]: struct.Client.html#structfield.data - pub data: Arc<Mutex<ShareMap>>, - /// The messenger to communicate with the shard runner. - pub shard: ShardMessenger, - /// The ID of the shard this context is related to. - pub shard_id: u64, -} - -impl Context { - /// Create a new Context to be passed to an event handler. - pub(crate) fn new( - data: Arc<Mutex<ShareMap>>, - runner_tx: Sender<InterMessage>, - shard_id: u64, - ) -> Context { - Context { - shard: ShardMessenger::new(runner_tx), - shard_id, - data, - } - } - - /// Edits the current user's profile settings. - /// - /// Refer to `EditProfile`'s documentation for its methods. - /// - /// # Examples - /// - /// Change the current user's username: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::channel::Message; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "!changename" { - /// ctx.edit_profile(|mut e| { - /// e.username("Edward Elric"); - /// - /// e - /// }); - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - #[cfg(feature = "builder")] - pub fn edit_profile<F: FnOnce(EditProfile) -> EditProfile>(&self, f: F) -> Result<CurrentUser> { - let mut map = VecMap::with_capacity(2); - - feature_cache! { - { - let cache = CACHE.read(); - - map.insert("username", Value::String(cache.user.name.clone())); - - if let Some(email) = cache.user.email.clone() { - map.insert("email", Value::String(email)); - } - } else { - let user = http::get_current_user()?; - - map.insert("username", Value::String(user.name)); - - if let Some(email) = user.email { - map.insert("email", Value::String(email)); - } - } - } - - let edited = utils::vecmap_to_json_map(f(EditProfile(map)).0); - - http::edit_profile(&edited) - } - - - /// Sets the current user as being [`Online`]. This maintains the current - /// activity. - /// - /// # Examples - /// - /// Set the current user to being online on the shard: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::channel::Message; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "!online" { - /// ctx.online(); - /// } - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Online`]: ../model/user/enum.OnlineStatus.html#variant.Online - #[inline] - pub fn online(&self) { - self.shard.set_status(OnlineStatus::Online); - } - - /// Sets the current user as being [`Idle`]. This maintains the current - /// activity. - /// - /// # Examples - /// - /// Set the current user to being idle on the shard: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::channel::Message; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "!idle" { - /// ctx.idle(); - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Idle`]: ../model/user/enum.OnlineStatus.html#variant.Idle - #[inline] - pub fn idle(&self) { - self.shard.set_status(OnlineStatus::Idle); - } - - /// Sets the current user as being [`DoNotDisturb`]. This maintains the - /// current activity. - /// - /// # Examples - /// - /// Set the current user to being Do Not Disturb on the shard: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::channel::Message; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// if msg.content == "!dnd" { - /// ctx.dnd(); - /// } - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`DoNotDisturb`]: ../model/user/enum.OnlineStatus.html#variant.DoNotDisturb - #[inline] - pub fn dnd(&self) { - self.shard.set_status(OnlineStatus::DoNotDisturb); - } - - /// Sets the current user as being [`Invisible`]. This maintains the current - /// activity. - /// - /// # Examples - /// - /// Set the current user to being invisible on the shard when an - /// [`Event::Ready`] is received: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::gateway::Ready; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn ready(&self, ctx: Context, _: Ready) { - /// ctx.invisible(); - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready - /// [`Invisible`]: ../model/user/enum.OnlineStatus.html#variant.Invisible - #[inline] - pub fn invisible(&self) { - self.shard.set_status(OnlineStatus::Invisible); - } - - /// "Resets" the current user's presence, by setting the activity to `None` - /// and the online status to [`Online`]. - /// - /// Use [`set_presence`] for fine-grained control over individual details. - /// - /// # Examples - /// - /// Reset the presence when an [`Event::Resumed`] is received: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::event::ResumedEvent; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn resume(&self, ctx: Context, _: ResumedEvent) { - /// ctx.reset_presence(); - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`Event::Resumed`]: ../model/event/enum.Event.html#variant.Resumed - /// [`Online`]: ../model/user/enum.OnlineStatus.html#variant.Online - /// [`set_presence`]: #method.set_presence - #[inline] - pub fn reset_presence(&self) { - self.shard.set_presence(None, OnlineStatus::Online); - } - - /// Sets the current activity, defaulting to an online status of [`Online`]. - /// - /// # Examples - /// - /// Create a command named `~setgame` that accepts a name of a game to be - /// playing: - /// - /// ```rust,no_run - /// # #[cfg(feature = "model")] - /// # fn main() { - /// # use serenity::prelude::*; - /// # use serenity::model::channel::Message; - /// # - /// use serenity::model::gateway::Activity; - /// - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn message(&self, ctx: Context, msg: Message) { - /// let args = msg.content.splitn(2, ' ').collect::<Vec<&str>>(); - /// - /// if args.len() < 2 || *unsafe { args.get_unchecked(0) } != "~setgame" { - /// return; - /// } - /// - /// ctx.set_activity(Activity::playing(*unsafe { args.get_unchecked(1) })); - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// # } - /// - /// # #[cfg(not(feature = "model"))] - /// # fn main() {} - /// ``` - /// - /// [`Online`]: ../model/user/enum.OnlineStatus.html#variant.Online - #[inline] - pub fn set_activity(&self, activity: Activity) { - self.shard.set_presence(Some(activity), OnlineStatus::Online); - } - - /// Sets the current activity, passing in only its name. This will - /// automatically set the current user's [`OnlineStatus`] to [`Online`], and - /// its [`ActivityType`] as [`Playing`]. - /// - /// Use [`reset_presence`] to clear the current activity, or - /// [`set_presence`] for more fine-grained control. - /// - /// **Note**: Maximum length is 128. - /// - /// # Examples - /// - /// When an [`Event::Ready`] is received, set the activity name to `"test"`: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::gateway::Ready; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn ready(&self, ctx: Context, _: Ready) { - /// ctx.set_game_name("test"); - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// client.start().unwrap(); - /// ``` - /// - /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready - /// [`ActivityType`]: ../model/gateway/enum.ActivityType.html - /// [`Online`]: ../model/user/enum.OnlineStatus.html#variant.Online - /// [`OnlineStatus`]: ../model/user/enum.OnlineStatus.html - /// [`Playing`]: ../model/gateway/enum.ActivityType.html#variant.Playing - /// [`reset_presence`]: #method.reset_presence - /// [`set_presence`]: #method.set_presence - pub fn set_game_name(&self, game_name: &str) { - let activity = Activity::playing(game_name); - - self.shard.set_presence(Some(activity), OnlineStatus::Online); - } - - /// Sets the current user's presence, providing all fields to be passed. - /// - /// # Examples - /// - /// Setting the current user as having no activity and being [`Idle`]: - /// - /// ```rust,no_run - /// # use serenity::prelude::*; - /// # use serenity::model::gateway::Ready; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn ready(&self, ctx: Context, _: Ready) { - /// use serenity::model::user::OnlineStatus; - /// - /// ctx.set_presence(None, OnlineStatus::Idle); - /// } - /// } - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// Setting the current user as playing `"Heroes of the Storm"`, while being - /// [`DoNotDisturb`]: - /// - /// ```rust,ignore - /// # use serenity::prelude::*; - /// # use serenity::model::gateway::Ready; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn ready(&self, context: Context, _: Ready) { - /// use serenity::model::gateway::Activity; - /// use serenity::model::user::OnlineStatus; - /// - /// let activity = Activity::playing("Heroes of the Storm"); - /// let status = OnlineStatus::DoNotDisturb; - /// - /// context.set_presence(Some(activity), status); - /// } - /// } - /// - /// let mut client = Client::new("token", Handler).unwrap(); - /// - /// client.start().unwrap(); - /// ``` - /// - /// [`DoNotDisturb`]: ../model/user/enum.OnlineStatus.html#variant.DoNotDisturb - /// [`Idle`]: ../model/user/enum.OnlineStatus.html#variant.Idle - #[inline] - pub fn set_presence(&self, activity: Option<Activity>, status: OnlineStatus) { - self.shard.set_presence(activity, status); - } - - /// Disconnects the shard from the websocket, essentially "quiting" it. - /// Note however that this will only exit the one which the `Context` was given. - /// If it's just one shard that's on, then serenity will stop any further actions - /// until [`Client::start`] and vice versa are called again. - /// - /// [`Client::start`]: ./struct.Client.html#method.start - #[inline] - pub fn quit(&self) { - self.shard.shutdown_clean(); - } -} diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs deleted file mode 100644 index b91b8d7..0000000 --- a/src/client/dispatch.rs +++ /dev/null @@ -1,647 +0,0 @@ -use gateway::InterMessage; -use model::{ - channel::{Channel, Message}, - event::Event, - guild::Member, -}; -use std::sync::Arc; -use parking_lot::Mutex; -use super::{ - bridge::gateway::event::ClientEvent, - event_handler::EventHandler, - Context -}; -use std::sync::mpsc::Sender; -use threadpool::ThreadPool; -use typemap::ShareMap; - -#[cfg(feature = "cache")] -use chrono::{Timelike, Utc}; -#[cfg(feature = "framework")] -use framework::Framework; -#[cfg(feature = "cache")] -use model::id::GuildId; -#[cfg(feature = "cache")] -use std::{thread, time}; - -#[cfg(feature = "cache")] -use super::CACHE; - -macro_rules! update { - ($event:expr) => { - { - #[cfg(feature = "cache")] - { - CACHE.write().update(&mut $event) - } - } - }; -} - -#[cfg(feature = "cache")] -macro_rules! now { - () => (Utc::now().time().second() * 1000) -} - -fn context( - data: &Arc<Mutex<ShareMap>>, - runner_tx: &Sender<InterMessage>, - shard_id: u64, -) -> Context { - Context::new(Arc::clone(data), runner_tx.clone(), shard_id) -} - -pub(crate) enum DispatchEvent { - Client(ClientEvent), - Model(Event), -} - -#[cfg(feature = "framework")] -#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] -pub(crate) fn dispatch<H: EventHandler + Send + Sync + 'static>( - event: DispatchEvent, - framework: &Arc<Mutex<Option<Box<Framework + Send>>>>, - data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>, - runner_tx: &Sender<InterMessage>, - threadpool: &ThreadPool, - shard_id: u64, -) { - match event { - DispatchEvent::Model(Event::MessageCreate(event)) => { - let context = context(data, runner_tx, shard_id); - dispatch_message( - context.clone(), - event.message.clone(), - event_handler, - threadpool, - ); - - if let Some(ref mut framework) = *framework.lock() { - framework.dispatch(context, event.message, threadpool); - } - }, - other => handle_event( - other, - data, - event_handler, - runner_tx, - threadpool, - shard_id, - ), - } -} - -#[cfg(not(feature = "framework"))] -pub(crate) fn dispatch<H: EventHandler + Send + Sync + 'static>( - event: DispatchEvent, - data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>, - runner_tx: &Sender<InterMessage>, - threadpool: &ThreadPool, - shard_id: u64, -) { - match event { - DispatchEvent::Model(Event::MessageCreate(event)) => { - let context = context(data, runner_tx, shard_id); - dispatch_message(context, event.message, event_handler, threadpool); - }, - other => handle_event( - other, - data, - event_handler, - runner_tx, - threadpool, - shard_id, - ), - } -} - -#[allow(unused_mut)] -fn dispatch_message<H>( - context: Context, - mut message: Message, - event_handler: &Arc<H>, - threadpool: &ThreadPool, -) where H: EventHandler + Send + Sync + 'static { - #[cfg(feature = "model")] - { - message.transform_content(); - } - - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.message(context, message); - }); -} - -#[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event<H: EventHandler + Send + Sync + 'static>( - event: DispatchEvent, - data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>, - runner_tx: &Sender<InterMessage>, - threadpool: &ThreadPool, - shard_id: u64, -) { - #[cfg(feature = "cache")] - let mut last_guild_create_time = now!(); - - #[cfg(feature = "cache")] - let wait_for_guilds = move || -> ::Result<()> { - let unavailable_guilds = CACHE.read().unavailable_guilds.len(); - - while unavailable_guilds != 0 && (now!() < last_guild_create_time + 2000) { - thread::sleep(time::Duration::from_millis(500)); - } - - Ok(()) - }; - - match event { - DispatchEvent::Client(ClientEvent::ShardStageUpdate(event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.shard_stage_update(context, event); - }); - } - DispatchEvent::Model(Event::ChannelCreate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - - // This different channel_create dispatching is only due to the fact that - // each time the bot receives a dm, this event is also fired. - // So in short, only exists to reduce unnecessary clutter. - match event.channel { - Channel::Private(channel) => { - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.private_channel_create(context, channel); - }); - }, - Channel::Group(_) => {}, - Channel::Guild(channel) => { - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.channel_create(context, channel); - }); - }, - Channel::Category(channel) => { - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.category_create(context, channel); - }); - }, - } - }, - DispatchEvent::Model(Event::ChannelDelete(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - - match event.channel { - Channel::Private(_) | Channel::Group(_) => {}, - Channel::Guild(channel) => { - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.channel_delete(context, channel); - }); - }, - Channel::Category(channel) => { - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.category_delete(context, channel); - }); - }, - } - }, - DispatchEvent::Model(Event::ChannelPinsUpdate(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.channel_pins_update(context, event); - }); - }, - DispatchEvent::Model(Event::ChannelRecipientAdd(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.channel_recipient_addition( - context, - event.channel_id, - event.user, - ); - }); - }, - DispatchEvent::Model(Event::ChannelRecipientRemove(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.channel_recipient_removal( - context, - event.channel_id, - event.user, - ); - }); - }, - DispatchEvent::Model(Event::ChannelUpdate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - let before = CACHE.read().channel(event.channel.id()); - - event_handler.channel_update(context, before, event.channel); - } else { - event_handler.channel_update(context, event.channel); - }} - }); - }, - DispatchEvent::Model(Event::GuildBanAdd(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_ban_addition(context, event.guild_id, event.user); - }); - }, - DispatchEvent::Model(Event::GuildBanRemove(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_ban_removal(context, event.guild_id, event.user); - }); - }, - DispatchEvent::Model(Event::GuildCreate(mut event)) => { - #[cfg(feature = "cache")] - let _is_new = { - let cache = CACHE.read(); - - !cache.unavailable_guilds.contains(&event.guild.id) - }; - - update!(event); - - #[cfg(feature = "cache")] - { - last_guild_create_time = now!(); - - let cache = CACHE.read(); - - if cache.unavailable_guilds.is_empty() { - let context = context(data, runner_tx, shard_id); - - let guild_amount = cache - .guilds - .iter() - .map(|(&id, _)| id) - .collect::<Vec<GuildId>>(); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.cached(context, guild_amount); - }); - } - } - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.guild_create(context, event.guild, _is_new); - } else { - event_handler.guild_create(context, event.guild); - }} - }); - }, - DispatchEvent::Model(Event::GuildDelete(mut event)) => { - let _full = update!(event); - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.guild_delete(context, event.guild, _full); - } else { - event_handler.guild_delete(context, event.guild); - }} - }); - }, - DispatchEvent::Model(Event::GuildEmojisUpdate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_emojis_update(context, event.guild_id, event.emojis); - }); - }, - DispatchEvent::Model(Event::GuildIntegrationsUpdate(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_integrations_update(context, event.guild_id); - }); - }, - DispatchEvent::Model(Event::GuildMemberAdd(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_member_addition(context, event.guild_id, event.member); - }); - }, - DispatchEvent::Model(Event::GuildMemberRemove(mut event)) => { - let _member = update!(event); - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.guild_member_removal(context, event.guild_id, event.user, _member); - } else { - event_handler.guild_member_removal(context, event.guild_id, event.user); - }} - }); - }, - DispatchEvent::Model(Event::GuildMemberUpdate(mut event)) => { - let _before = update!(event); - - let _after: Option<Member> = feature_cache! {{ - CACHE.read().member(event.guild_id, event.user.id) - } else { - None - }}; - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - if let Some(after) = _after { - event_handler.guild_member_update(context, _before, after); - } - } else { - event_handler.guild_member_update(context, event); - }} - }); - }, - DispatchEvent::Model(Event::GuildMembersChunk(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_members_chunk(context, event.guild_id, event.members); - }); - }, - DispatchEvent::Model(Event::GuildRoleCreate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_role_create(context, event.guild_id, event.role); - }); - }, - DispatchEvent::Model(Event::GuildRoleDelete(mut event)) => { - let _role = update!(event); - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); - } else { - event_handler.guild_role_delete(context, event.guild_id, event.role_id); - }} - }); - }, - DispatchEvent::Model(Event::GuildRoleUpdate(mut event)) => { - let _before = update!(event); - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.guild_role_update(context, event.guild_id, _before, event.role); - } else { - event_handler.guild_role_update(context, event.guild_id, event.role); - }} - }); - }, - DispatchEvent::Model(Event::GuildUnavailable(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.guild_unavailable(context, event.guild_id); - }); - }, - DispatchEvent::Model(Event::GuildUpdate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - let before = CACHE.read() - .guilds - .get(&event.guild.id) - .cloned(); - - event_handler.guild_update(context, before, event.guild); - } else { - event_handler.guild_update(context, event.guild); - }} - }); - }, - // Already handled by the framework check macro - DispatchEvent::Model(Event::MessageCreate(_)) => {}, - DispatchEvent::Model(Event::MessageDeleteBulk(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.message_delete_bulk(context, event.channel_id, event.ids); - }); - }, - DispatchEvent::Model(Event::MessageDelete(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.message_delete(context, event.channel_id, event.message_id); - }); - }, - DispatchEvent::Model(Event::MessageUpdate(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.message_update(context, event); - }); - }, - DispatchEvent::Model(Event::PresencesReplace(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.presence_replace(context, event.presences); - }); - }, - DispatchEvent::Model(Event::PresenceUpdate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.presence_update(context, event); - }); - }, - DispatchEvent::Model(Event::ReactionAdd(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.reaction_add(context, event.reaction); - }); - }, - DispatchEvent::Model(Event::ReactionRemove(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.reaction_remove(context, event.reaction); - }); - }, - DispatchEvent::Model(Event::ReactionRemoveAll(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.reaction_remove_all(context, event.channel_id, event.message_id); - }); - }, - DispatchEvent::Model(Event::Ready(mut event)) => { - update!(event); - - let event_handler = Arc::clone(event_handler); - - feature_cache! {{ - last_guild_create_time = now!(); - - let _ = wait_for_guilds() - .map(move |_| { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(&event_handler); - - threadpool.execute(move || { - event_handler.ready(context, event.ready); - }); - }); - } else { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(&event_handler); - - threadpool.execute(move || { - event_handler.ready(context, event.ready); - }); - }} - }, - DispatchEvent::Model(Event::Resumed(mut event)) => { - let context = context(data, runner_tx, shard_id); - - event_handler.resume(context, event); - }, - DispatchEvent::Model(Event::TypingStart(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.typing_start(context, event); - }); - }, - DispatchEvent::Model(Event::Unknown(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.unknown(context, event.kind, event.value); - }); - }, - DispatchEvent::Model(Event::UserUpdate(mut event)) => { - let _before = update!(event); - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - feature_cache! {{ - event_handler.user_update(context, _before.unwrap(), event.current_user); - } else { - event_handler.user_update(context, event.current_user); - }} - }); - }, - DispatchEvent::Model(Event::VoiceServerUpdate(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.voice_server_update(context, event); - }); - }, - DispatchEvent::Model(Event::VoiceStateUpdate(mut event)) => { - update!(event); - - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.voice_state_update(context, event.guild_id, event.voice_state); - }); - }, - DispatchEvent::Model(Event::WebhookUpdate(mut event)) => { - let context = context(data, runner_tx, shard_id); - let event_handler = Arc::clone(event_handler); - - threadpool.execute(move || { - event_handler.webhook_update(context, event.guild_id, event.channel_id); - }); - }, - } -} diff --git a/src/client/event_handler.rs b/src/client/event_handler.rs deleted file mode 100644 index 32fdaa1..0000000 --- a/src/client/event_handler.rs +++ /dev/null @@ -1,290 +0,0 @@ -use model::prelude::*; -use parking_lot::RwLock; -use serde_json::Value; -use std::{ - collections::HashMap, - sync::Arc -}; -use super::context::Context; -use ::client::bridge::gateway::event::*; - -/// The core trait for handling events by serenity. -pub trait EventHandler { - /// Dispatched when the cache gets full. - /// - /// Provides the cached guilds' ids. - #[cfg(feature = "cache")] - fn cached(&self, _ctx: Context, _guilds: Vec<GuildId>) {} - - /// Dispatched when a channel is created. - /// - /// Provides said channel's data. - fn channel_create(&self, _ctx: Context, _channel: Arc<RwLock<GuildChannel>>) {} - - /// Dispatched when a category is created. - /// - /// Provides said category's data. - fn category_create(&self, _ctx: Context, _category: Arc<RwLock<ChannelCategory>>) {} - - /// Dispatched when a category is deleted. - /// - /// Provides said category's data. - fn category_delete(&self, _ctx: Context, _category: Arc<RwLock<ChannelCategory>>) {} - - /// Dispatched when a private channel is created. - /// - /// Provides said channel's data. - fn private_channel_create(&self, _ctx: Context, _channel: Arc<RwLock<PrivateChannel>>) {} - - /// Dispatched when a channel is deleted. - /// - /// Provides said channel's data. - fn channel_delete(&self, _ctx: Context, _channel: Arc<RwLock<GuildChannel>>) {} - - /// Dispatched when a pin is added, deleted. - /// - /// Provides said pin's data. - fn channel_pins_update(&self, _ctx: Context, _pin: ChannelPinsUpdateEvent) {} - - /// Dispatched when a user is added to a `Group`. - /// - /// Provides the group's id and the user's data. - fn channel_recipient_addition(&self, _ctx: Context, _group_id: ChannelId, _user: User) {} - - /// Dispatched when a user is removed to a `Group`. - /// - /// Provides the group's id and the user's data. - fn channel_recipient_removal(&self, _ctx: Context, _group_id: ChannelId, _user: User) {} - - /// Dispatched when a channel is updated. - /// - /// Provides the old channel data, and the new data. - #[cfg(feature = "cache")] - fn channel_update(&self, _ctx: Context, _old: Option<Channel>, _new: Channel) {} - - /// Dispatched when a channel is updated. - /// - /// Provides the new data. - #[cfg(not(feature = "cache"))] - fn channel_update(&self, _ctx: Context, _new_data: Channel) {} - - /// Dispatched when a user is banned from a guild. - /// - /// Provides the guild's id and the banned user's data. - fn guild_ban_addition(&self, _ctx: Context, _guild_id: GuildId, _banned_user: User) {} - - /// Dispatched when a user's ban is lifted from a guild. - /// - /// Provides the guild's id and the lifted user's data. - fn guild_ban_removal(&self, _ctx: Context, _guild_id: GuildId, _unbanned_user: User) {} - - /// Dispatched when a guild is created; - /// or an existing guild's data is sent to us. - /// - /// Provides the guild's data and whether the guild is new. - #[cfg(feature = "cache")] - fn guild_create(&self, _ctx: Context, _guild: Guild, _is_new: bool) {} - - /// Dispatched when a guild is created; - /// or an existing guild's data is sent to us. - /// - /// Provides the guild's data. - #[cfg(not(feature = "cache"))] - fn guild_create(&self, _ctx: Context, _guild: Guild) {} - - /// Dispatched when a guild is deleted. - /// - /// Provides the partial data of the guild sent by discord, - /// and the full data from the cache, if available. - #[cfg(feature = "cache")] - fn guild_delete(&self, _ctx: Context, _incomplete: PartialGuild, _full: Option<Arc<RwLock<Guild>>>) {} - - /// Dispatched when a guild is deleted. - /// - /// Provides the partial data of the guild sent by discord. - #[cfg(not(feature = "cache"))] - fn guild_delete(&self, _ctx: Context, _incomplete: PartialGuild) {} - - /* the emojis were updated. */ - - /// Dispatched when the emojis are updated. - /// - /// Provides the guild's id and the new state of the emojis in the guild. - fn guild_emojis_update(&self, _ctx: Context, _guild_id: GuildId, _current_state: HashMap<EmojiId, Emoji>) {} - - /// Dispatched when a guild's integration is added, updated or removed. - /// - /// Provides the guild's id. - fn guild_integrations_update(&self, _ctx: Context, _guild_id: GuildId) {} - - /// Dispatched when a user joins a guild. - /// - /// Provides the guild's id and the user's member data. - fn guild_member_addition(&self, _ctx: Context, _guild_id: GuildId, _new_member: Member) {} - - /// Dispatched when a user is removed (kicked). - /// - /// Provides the guild's id, the user's data, and the user's member data if available. - #[cfg(feature = "cache")] - fn guild_member_removal(&self, _ctx: Context, _guild: GuildId, _user: User, _member_data_if_available: Option<Member>) {} - - /// Dispatched when a user is removed (kicked). - /// - /// Provides the guild's id, the user's data. - #[cfg(not(feature = "cache"))] - fn guild_member_removal(&self, _ctx: Context, _guild_id: GuildId, _kicked: User) {} - - /// Dispatched when a member is updated (e.g their nickname is updated) - /// - /// Provides the member's old data (if available) and the new data. - #[cfg(feature = "cache")] - fn guild_member_update(&self, _ctx: Context, _old_if_available: Option<Member>, _new: Member) {} - - /// Dispatched when a member is updated (e.g their nickname is updated) - /// - /// Provides the new data. - #[cfg(not(feature = "cache"))] - fn guild_member_update(&self, _ctx: Context, _new: GuildMemberUpdateEvent) {} - - /// Dispatched when the data for offline members was requested. - /// - /// Provides the guild's id and the data. - fn guild_members_chunk(&self, _ctx: Context, _guild_id: GuildId, _offline_members: HashMap<UserId, Member>) {} - - /// Dispatched when a role is created. - /// - /// Provides the guild's id and the new role's data. - fn guild_role_create(&self, _ctx: Context, _guild_id: GuildId, _new: Role) {} - - /// Dispatched when a role is deleted. - /// - /// Provides the guild's id, the role's id and its data if available. - #[cfg(feature = "cache")] - fn guild_role_delete(&self, _ctx: Context, _guild_id: GuildId, _removed_role_id: RoleId, _removed_role_data_if_available: Option<Role>) {} - - /// Dispatched when a role is deleted. - /// - /// Provides the guild's id, the role's id. - #[cfg(not(feature = "cache"))] - fn guild_role_delete(&self, _ctx: Context, _guild_id: GuildId, _removed_role_id: RoleId) {} - - /// Dispatched when a role is updated. - /// - /// Provides the guild's id, the role's old (if available) and new data. - #[cfg(feature = "cache")] - fn guild_role_update(&self, _ctx: Context, _guild_id: GuildId, _old_data_if_available: Option<Role>, _new: Role) {} - - /// Dispatched when a role is updated. - /// - /// Provides the guild's id and the role's new data. - #[cfg(not(feature = "cache"))] - fn guild_role_update(&self, _ctx: Context, _guild_id: GuildId, _new_data: Role) {} - - /// Dispatched when a guild became unavailable. - /// - /// Provides the guild's id. - fn guild_unavailable(&self, _ctx: Context, _guild_id: GuildId) {} - - /// Dispatched when the guild is updated. - /// - /// Provides the guild's old full data (if available) and the new, albeit partial data. - #[cfg(feature = "cache")] - fn guild_update(&self, _ctx: Context, _old_data_if_available: Option<Arc<RwLock<Guild>>>, _new_but_incomplete: PartialGuild) {} - - /// Dispatched when the guild is updated. - /// - /// Provides the guild's new, albeit partial data. - #[cfg(not(feature = "cache"))] - fn guild_update(&self, _ctx: Context, _new_but_incomplete_data: PartialGuild) {} - - /// Dispatched when a message is created. - /// - /// Provides the message's data. - fn message(&self, _ctx: Context, _new_message: Message) {} - - /// Dispatched when a message is deleted. - /// - /// Provides the channel's id and the message's id. - fn message_delete(&self, _ctx: Context, _channel_id: ChannelId, _deleted_message_id: MessageId) {} - - /// Dispatched when multiple messages were deleted at once. - /// - /// Provides the channel's id and the deleted messages' ids. - fn message_delete_bulk(&self, _ctx: Context, _channel_id: ChannelId, _multiple_deleted_messages_ids: Vec<MessageId>) {} - - /// Dispatched when a new reaction is attached to a message. - /// - /// Provides the reaction's data. - fn reaction_add(&self, _ctx: Context, _add_reaction: Reaction) {} - - /// Dispatched when a reaction is dettached from a message. - /// - /// Provides the reaction's data. - fn reaction_remove(&self, _ctx: Context, _removed_reaction: Reaction) {} - - /// Dispatched when all reactions of a message are dettached from a message. - /// - /// Provides the channel's id and the message's id. - fn reaction_remove_all(&self, _ctx: Context, _channel_id: ChannelId, _removed_from_message_id: MessageId) {} - - /// Dispatched when a message is updated. - /// - /// Provides the new data of the message. - fn message_update(&self, _ctx: Context, _new_data: MessageUpdateEvent) {} - - fn presence_replace(&self, _ctx: Context, _: Vec<Presence>) {} - - /// Dispatched when a user's presence is updated (e.g off -> on). - /// - /// Provides the presence's new data. - fn presence_update(&self, _ctx: Context, _new_data: PresenceUpdateEvent) {} - - /// Dispatched upon startup. - /// - /// Provides data about the bot and the guilds it's in. - fn ready(&self, _ctx: Context, _data_about_bot: Ready) {} - - /// Dispatched upon reconnection. - fn resume(&self, _ctx: Context, _: ResumedEvent) {} - - /// Dispatched when a shard's connection stage is updated - /// - /// Provides the context of the shard and the event information about the update. - fn shard_stage_update(&self, _ctx: Context, _: ShardStageUpdateEvent) {} - - /// Dispatched when a user starts typing. - fn typing_start(&self, _ctx: Context, _: TypingStartEvent) {} - - /// Dispatched when an unknown event was sent from discord. - /// - /// Provides the event's name and its unparsed data. - fn unknown(&self, _ctx: Context, _name: String, _raw: Value) {} - - /// Dispatched when the bot's data is updated. - /// - /// Provides the old and new data. - #[cfg(feature = "cache")] - fn user_update(&self, _ctx: Context, _old_data: CurrentUser, _new: CurrentUser) {} - - /// Dispatched when the bot's data is updated. - /// - /// Provides the new data. - #[cfg(not(feature = "cache"))] - fn user_update(&self, _ctx: Context, _new_data: CurrentUser) {} - - /// Dispatched when a guild's voice server was updated (or changed to another one). - /// - /// Provides the voice server's data. - fn voice_server_update(&self, _ctx: Context, _: VoiceServerUpdateEvent) {} - - /// Dispatched when a user joins, leaves or moves a voice channel. - /// - /// Provides the guild's id (if available) and - /// the new state of the guild's voice channels. - fn voice_state_update(&self, _ctx: Context, _: Option<GuildId>, _: VoiceState) {} - - /// Dispatched when a guild's webhook is updated. - /// - /// Provides the guild's id and the channel's id the webhook belongs in. - fn webhook_update(&self, _ctx: Context, _guild_id: GuildId, _belongs_to_channel_id: ChannelId) {} -} diff --git a/src/client/mod.rs b/src/client/mod.rs index 0efd82f..57d2235 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -17,922 +17,160 @@ //! [`Client`]: struct.Client.html#examples //! [`Context`]: struct.Context.html //! [Client examples]: struct.Client.html#examples -#![allow(zero_ptr)] -pub mod bridge; +pub mod shard_manager; -mod context; -mod dispatch; mod error; -mod event_handler; -pub use self::{ - context::Context, - error::Error as ClientError, - event_handler::EventHandler -}; +pub use self::error::Error as ClientError; -// Note: the following re-exports are here for backwards compatibility -pub use gateway; -pub use http as rest; +use futures::Future; +use hyper::client::{Client as HyperClient, HttpConnector}; +use hyper::Body; +use hyper_tls::HttpsConnector; +use self::shard_manager::{ShardManager, ShardManagerOptions, ShardingStrategy}; +use std::rc::Rc; +use super::http::Client as HttpClient; +use tokio_core::reactor::Handle; +use Error; #[cfg(feature = "cache")] -pub use CACHE; - -use http; -use internal::prelude::*; -use parking_lot::Mutex; -use self::bridge::gateway::{ShardManager, ShardManagerMonitor, ShardManagerOptions}; -use std::sync::Arc; -use threadpool::ThreadPool; -use typemap::ShareMap; - -#[cfg(feature = "framework")] -use framework::Framework; -#[cfg(feature = "voice")] -use model::id::UserId; -#[cfg(feature = "voice")] -use self::bridge::voice::ClientVoiceManager; +use cache::Cache; + +#[derive(Debug)] +pub struct ClientOptions { + pub handle: Handle, + pub http_client: Rc<HyperClient<HttpsConnector<HttpConnector>, Body>>, + pub sharding: ShardingStrategy, + pub token: String, +} -/// The Client is the way to be able to start sending authenticated requests -/// over the REST API, as well as initializing a WebSocket connection through -/// [`Shard`]s. Refer to the [documentation on using sharding][sharding docs] -/// for more information. -/// -/// # Event Handlers -/// -/// Event handlers can be configured. For example, the event handler -/// [`EventHandler::on_message`] will be dispatched to whenever a [`Event::MessageCreate`] is -/// received over the connection. -/// -/// Note that you do not need to manually handle events, as they are handled -/// internally and then dispatched to your event handlers. -/// -/// # Examples -/// -/// Creating a Client instance and adding a handler on every message -/// receive, acting as a "ping-pong" bot is simple: -/// -/// ```rust,ignore -/// use serenity::prelude::*; -/// use serenity::model::*; -/// -/// struct Handler; -/// -/// impl EventHandler for Handler { -/// fn on_message(&self, _: Context, msg: Message) { -/// if msg.content == "!ping" { -/// let _ = msg.channel_id.say("Pong!"); -/// } -/// } -/// } -/// -/// let mut client = Client::new("my token here", Handler); -/// -/// client.start(); -/// ``` -/// -/// [`Shard`]: gateway/struct.Shard.html -/// [`on_message`]: #method.on_message -/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate -/// [sharding docs]: gateway/index.html#sharding +#[derive(Debug)] pub struct Client { - /// A ShareMap which requires types to be Send + Sync. This is a map that - /// can be safely shared across contexts. - /// - /// The purpose of the data field is to be accessible and persistent across - /// contexts; that is, data can be modified by one context, and will persist - /// through the future and be accessible through other contexts. This is - /// useful for anything that should "live" through the program: counters, - /// database connections, custom user caches, etc. - /// - /// In the meaning of a context, this data can be accessed through - /// [`Context::data`]. - /// - /// # Examples - /// - /// Create a `MessageEventCounter` to track the following events: - /// - /// - [`Event::MessageCreate`] - /// - [`Event::MessageDelete`] - /// - [`Event::MessageDeleteBulk`] - /// - [`Event::MessageUpdate`] - /// - /// ```rust,ignore - /// extern crate serenity; - /// extern crate typemap; - /// - /// use serenity::prelude::*; - /// use serenity::model::*; - /// use std::collections::HashMap; - /// use std::env; - /// use typemap::Key; - /// - /// struct MessageEventCounter; - /// - /// impl Key for MessageEventCounter { - /// type Value = HashMap<String, u64>; - /// } - /// - /// macro_rules! reg { - /// ($ctx:ident $name:expr) => { - /// { - /// let mut data = $ctx.data.lock(); - /// let counter = data.get_mut::<MessageEventCounter>().unwrap(); - /// let entry = counter.entry($name).or_insert(0); - /// *entry += 1; - /// } - /// }; - /// } - /// - /// struct Handler; - /// - /// impl EventHandler for Handler { - /// fn on_message(&self, ctx: Context, _: Message) { reg!(ctx "MessageCreate") } - /// fn on_message_delete(&self, ctx: Context, _: ChannelId, _: MessageId) { - /// reg!(ctx "MessageDelete") } - /// fn on_message_delete_bulk(&self, ctx: Context, _: ChannelId, _: Vec<MessageId>) { - /// reg!(ctx "MessageDeleteBulk") } - /// fn on_message_update(&self, ctx: Context, _: ChannelId, _: MessageId) { - /// reg!(ctx "MessageUpdate") } - /// } - /// - /// let mut client = Client::new(&env::var("DISCORD_TOKEN").unwrap(), Handler); - /// - /// { - /// let mut data = client.data.lock(); - /// data.insert::<MessageEventCounter>(HashMap::default()); - /// } - /// - /// client.start().unwrap(); - /// ``` - /// - /// Refer to [example 05] for an example on using the `data` field. - /// - /// [`Context::data`]: struct.Context.html#method.data - /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate - /// [`Event::MessageDelete`]: ../model/event/enum.Event.html#variant.MessageDelete - /// [`Event::MessageDeleteBulk`]: ../model/event/enum.Event.html#variant.MessageDeleteBulk - /// [`Event::MessageUpdate`]: ../model/event/enum.Event.html#variant.MessageUpdate - /// [example 05]: - /// https://github.com/serenity-rs/serenity/tree/master/examples/05_command_framework - pub data: Arc<Mutex<ShareMap>>, - /// A vector of all active shards that have received their [`Event::Ready`] - /// payload, and have dispatched to [`on_ready`] if an event handler was - /// configured. - /// - /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready - /// [`on_ready`]: #method.on_ready - #[cfg(feature = "framework")] framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - /// A HashMap of all shards instantiated by the Client. - /// - /// The key is the shard ID and the value is the shard itself. - /// - /// # Examples - /// - /// If you call [`client.start_shard(3, 5)`][`Client::start_shard`], this - /// HashMap will only ever contain a single key of `3`, as that's the only - /// Shard the client is responsible for. - /// - /// If you call [`client.start_shards(10)`][`Client::start_shards`], this - /// HashMap will contain keys 0 through 9, one for each shard handled by the - /// client. - /// - /// Printing the number of shards currently instantiated by the client every - /// 5 seconds: - /// - /// ```rust,no_run - /// # use serenity::client::{Client, EventHandler}; - /// # use std::error::Error; - /// # use std::time::Duration; - /// # use std::{env, thread}; - /// - /// # fn try_main() -> Result<(), Box<Error>> { - /// - /// struct Handler; - /// - /// impl EventHandler for Handler { } - /// - /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; - /// - /// let shard_manager = client.shard_manager.clone(); - /// - /// thread::spawn(move || { - /// loop { - /// println!("Shard count instantiated: {}", - /// shard_manager.lock().shards_instantiated().len()); - /// - /// thread::sleep(Duration::from_millis(5000)); - /// } - /// }); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// Shutting down all connections after one minute of operation: - /// - /// ```rust,no_run - /// # use std::error::Error; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::{Client, EventHandler}; - /// use std::time::Duration; - /// use std::{env, thread}; - /// - /// struct Handler; - /// - /// impl EventHandler for Handler { } - /// - /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; - /// - /// // Create a clone of the `Arc` containing the shard manager. - /// let shard_manager = client.shard_manager.clone(); - /// - /// // Create a thread which will sleep for 60 seconds and then have the - /// // shard manager shutdown. - /// thread::spawn(move || { - /// thread::sleep(Duration::from_secs(60)); - /// - /// shard_manager.lock().shutdown_all(); - /// - /// println!("Shutdown shard manager!"); - /// }); - /// - /// println!("Client shutdown: {:?}", client.start()); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [`Client::start_shard`]: #method.start_shard - /// [`Client::start_shards`]: #method.start_shards - pub shard_manager: Arc<Mutex<ShardManager>>, - shard_manager_worker: ShardManagerMonitor, - /// The threadpool shared by all shards. - /// - /// Defaults to 5 threads, which should suffice small bots. Consider - /// increasing this number as your bot grows. - pub threadpool: ThreadPool, - /// The token in use by the client. - pub token: Arc<Mutex<String>>, - /// The voice manager for the client. - /// - /// This is an ergonomic structure for interfacing over shards' voice - /// connections. - #[cfg(feature = "voice")] - pub voice_manager: Arc<Mutex<ClientVoiceManager>>, - /// URI that the client's shards will use to connect to the gateway. - /// - /// This is likely not important for production usage and is, at best, used - /// for debugging. - /// - /// This is wrapped in an `Arc<Mutex<T>>` so all shards will have an updated - /// value available. - pub ws_uri: Arc<Mutex<String>>, + #[cfg(feature = "cache")] + pub cache: Rc<RefCell<Cache>>, + pub handle: Handle, + pub http: Rc<HttpClient>, + pub shard_manager: ShardManager, + token: Rc<String>, + ws_uri: Rc<String>, } impl Client { - /// Creates a Client for a bot user. - /// - /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which - /// this function will automatically do for you if not already included. - /// - /// # Examples - /// - /// Create a Client, using a token from an environment variable: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # use std::error::Error; - /// # - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let client = Client::new(&token, Handler)?; - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - pub fn new<H>(token: &str, handler: H) -> Result<Self> - where H: EventHandler + Send + Sync + 'static { - let token = token.trim(); - - let token = if token.starts_with("Bot ") { - token.to_string() - } else { - format!("Bot {}", token) - }; - - http::set_token(&token); - let locked = Arc::new(Mutex::new(token)); - - let name = "serenity client".to_owned(); - let threadpool = ThreadPool::with_name(name, 5); - let url = Arc::new(Mutex::new(http::get_gateway()?.url)); - let data = Arc::new(Mutex::new(ShareMap::custom())); - let event_handler = Arc::new(handler); - - #[cfg(feature = "framework")] - let framework = Arc::new(Mutex::new(None)); - #[cfg(feature = "voice")] - let voice_manager = Arc::new(Mutex::new(ClientVoiceManager::new( - 0, - UserId(0), - ))); - - let (shard_manager, shard_manager_worker) = { - ShardManager::new(ShardManagerOptions { - data: &data, - event_handler: &event_handler, - #[cfg(feature = "framework")] - framework: &framework, - shard_index: 0, - shard_init: 0, - shard_total: 0, - threadpool: threadpool.clone(), - token: &locked, - #[cfg(feature = "voice")] - voice_manager: &voice_manager, - ws_url: &url, + pub fn new(options: ClientOptions) -> Box<Future<Item = Self, Error = Error>> { + let token = { + let trimmed = options.token.trim(); + + Rc::new(if trimmed.starts_with("Bot ") { + trimmed.to_string() + } else { + format!("Bot {}", trimmed) }) }; - Ok(Client { - token: locked, - ws_uri: url, - #[cfg(feature = "framework")] - framework, - data, - shard_manager, - shard_manager_worker, - threadpool, - #[cfg(feature = "voice")] - voice_manager, - }) - } - - /// Sets a framework to be used with the client. All message events will be - /// passed through the framework _after_ being passed to the [`on_message`] - /// event handler. - /// - /// See the [framework module-level documentation][framework docs] for more - /// information on usage. - /// - /// # Examples - /// - /// Create a simple framework that responds to a `~ping` command: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// use serenity::framework::StandardFramework; - /// - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::Client; - /// use std::env; - /// - /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; - /// client.with_framework(StandardFramework::new() - /// .configure(|c| c.prefix("~")) - /// .on("ping", |_, msg, _| { - /// msg.channel_id.say("Pong!")?; - /// - /// Ok(()) - /// })); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// Using your own framework: - /// - /// ```rust,ignore - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// use serenity::Framework; - /// use serenity::client::Context; - /// use serenity::model::*; - /// use tokio_core::reactor::Handle; - /// use std::collections::HashMap; - /// - /// - /// struct MyFramework { - /// commands: HashMap<String, Box<Fn(Message, Vec<String>)>>, - /// } - /// - /// impl Framework for MyFramework { - /// fn dispatch(&mut self, _: Context, msg: Message, tokio_handle: &Handle) { - /// let args = msg.content.split_whitespace(); - /// let command = match args.next() { - /// Some(command) => { - /// if !command.starts_with('*') { return; } - /// command - /// }, - /// None => return, - /// }; - /// - /// let command = match self.commands.get(&command) { - /// Some(command) => command, None => return, - /// }; - /// - /// tokio_handle.spawn_fn(move || { (command)(msg, args); Ok() }); - /// } - /// } - /// - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::Client; - /// use std::env; - /// - /// let mut client = Client::new(&token, Handler).unwrap(); - /// client.with_framework(MyFramework { commands: { - /// let mut map = HashMap::new(); - /// map.insert("ping".to_string(), Box::new(|msg, _| msg.channel_id.say("pong!"))); - /// map - /// }}); - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// Refer to the documentation for the `framework` module for more in-depth - /// information. - /// - /// [`on_message`]: #method.on_message - /// [framework docs]: ../framework/index.html - #[cfg(feature = "framework")] - pub fn with_framework<F: Framework + Send + 'static>(&mut self, f: F) { - *self.framework.lock() = Some(Box::new(f)); - } - - /// Establish the connection and start listening for events. - /// - /// This will start receiving events in a loop and start dispatching the - /// events to your registered handlers. - /// - /// Note that this should be used only for users and for bots which are in - /// less than 2500 guilds. If you have a reason for sharding and/or are in - /// more than 2500 guilds, use one of these depending on your use case: - /// - /// Refer to the [Gateway documentation][gateway docs] for more information - /// on effectively using sharding. - /// - /// # Examples - /// - /// Starting a Client with only 1 shard, out of 1 total: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// if let Err(why) = client.start() { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// [gateway docs]: gateway/index.html#sharding - pub fn start(&mut self) -> Result<()> { - self.start_connection([0, 0, 1]) - } - - /// Establish the connection(s) and start listening for events. - /// - /// This will start receiving events in a loop and start dispatching the - /// events to your registered handlers. - /// - /// This will retrieve an automatically determined number of shards to use - /// from the API - determined by Discord - and then open a number of shards - /// equivalent to that amount. - /// - /// Refer to the [Gateway documentation][gateway docs] for more information - /// on effectively using sharding. - /// - /// # Examples - /// - /// Start as many shards as needed using autosharding: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// if let Err(why) = client.start_autosharded() { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// # Errors - /// - /// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to - /// an error. - /// - /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - /// [gateway docs]: gateway/index.html#sharding - pub fn start_autosharded(&mut self) -> Result<()> { - let (x, y) = { - let res = http::get_bot_gateway()?; - - (res.shards as u64 - 1, res.shards as u64) - }; - - self.start_connection([0, x, y]) - } - - /// Establish a sharded connection and start listening for events. - /// - /// This will start receiving events and dispatch them to your registered - /// handlers. - /// - /// This will create a single shard by ID. If using one shard per process, - /// you will need to start other processes with the other shard IDs in some - /// way. - /// - /// Refer to the [Gateway documentation][gateway docs] for more information - /// on effectively using sharding. - /// - /// # Examples - /// - /// Start shard 3 of 5: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// if let Err(why) = client.start_shard(3, 5) { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// Start shard 0 of 1 (you may also be interested in [`start`] or - /// [`start_autosharded`]): - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let mut client = Client::new(&env::var("DISCORD_TOKEN")?, Handler)?; - /// - /// if let Err(why) = client.start_shard(0, 1) { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// # Errors - /// - /// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to - /// an error. - /// - /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - /// [`start`]: #method.start - /// [`start_autosharded`]: #method.start_autosharded - /// [gateway docs]: gateway/index.html#sharding - pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> { - self.start_connection([shard, shard, shards]) - } - - /// Establish sharded connections and start listening for events. - /// - /// This will start receiving events and dispatch them to your registered - /// handlers. - /// - /// This will create and handle all shards within this single process. If - /// you only need to start a single shard within the process, or a range of - /// shards, use [`start_shard`] or [`start_shard_range`], respectively. - /// - /// Refer to the [Gateway documentation][gateway docs] for more information - /// on effectively using sharding. - /// - /// # Examples - /// - /// Start all of 8 shards: - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// if let Err(why) = client.start_shards(8) { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// # Errors - /// - /// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to - /// an error. - /// - /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - /// [`start_shard`]: #method.start_shard - /// [`start_shard_range`]: #method.start_shard_range - /// [Gateway docs]: gateway/index.html#sharding - pub fn start_shards(&mut self, total_shards: u64) -> Result<()> { - self.start_connection([0, total_shards - 1, total_shards]) - } - - /// Establish a range of sharded connections and start listening for events. - /// - /// This will start receiving events and dispatch them to your registered - /// handlers. - /// - /// This will create and handle all shards within a given range within this - /// single process. If you only need to start a single shard within the - /// process, or all shards within the process, use [`start_shard`] or - /// [`start_shards`], respectively. - /// - /// Refer to the [Gateway documentation][gateway docs] for more - /// information on effectively using sharding. - /// - /// # Examples - /// - /// For a bot using a total of 10 shards, initialize shards 4 through 7: - /// - /// ```rust,ignore - /// # use serenity::prelude::EventHandler; - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// use serenity::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN").unwrap(); - /// let mut client = Client::new(&token, Handler); - /// - /// let _ = client.start_shard_range([4, 7], 10); - /// ``` - /// - /// ```rust,no_run - /// # use serenity::prelude::EventHandler; - /// # use std::error::Error; - /// # - /// struct Handler; - /// - /// impl EventHandler for Handler {} - /// # fn try_main() -> Result<(), Box<Error>> { - /// use serenity::client::Client; - /// use std::env; - /// - /// let token = env::var("DISCORD_TOKEN")?; - /// let mut client = Client::new(&token, Handler).unwrap(); - /// - /// if let Err(why) = client.start_shard_range([4, 7], 10) { - /// println!("Err with client: {:?}", why); - /// } - /// # Ok(()) - /// # } - /// # - /// # fn main() { - /// # try_main().unwrap(); - /// # } - /// ``` - /// - /// # Errors - /// - /// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to - /// an error. - /// - /// - /// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - /// [`start_shard`]: #method.start_shard - /// [`start_shards`]: #method.start_shards - /// [Gateway docs]: gateway/index.html#sharding - pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> { - self.start_connection([range[0], range[1], total_shards]) - } - - // Shard data layout is: - // 0: first shard number to initialize - // 1: shard number to initialize up to and including - // 2: total number of shards the bot is sharding for - // - // Not all shards need to be initialized in this process. - // - // # Errors - // - // Returns a [`ClientError::Shutdown`] when all shards have shutdown due to - // an error. - // - // [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - fn start_connection(&mut self, shard_data: [u64; 3]) -> Result<()> { - #[cfg(feature = "voice")] - self.voice_manager.lock().set_shard_count(shard_data[2]); - - // This is kind of gross, but oh well. - // - // Both the framework and voice bridge need the user's ID, so we'll only - // retrieve it over REST if at least one of those are enabled. - #[cfg(any(all(feature = "standard_framework", feature = "framework"), - feature = "voice"))] - { - let user = http::get_current_user()?; - - // Update the framework's current user if the feature is enabled. - // - // This also acts as a form of check to ensure the token is correct. - #[cfg(all(feature = "standard_framework", feature = "framework"))] - { - if let Some(ref mut framework) = *self.framework.lock() { - framework.update_current_user(user.id); - } - } - - #[cfg(feature = "voice")] - { - self.voice_manager.lock().set_user_id(user.id); - } - } - - { - let mut manager = self.shard_manager.lock(); - - let init = shard_data[1] - shard_data[0] + 1; - - manager.set_shards(shard_data[0], init, shard_data[2]); - - debug!( - "Initializing shard info: {} - {}/{}", - shard_data[0], - init, - shard_data[2], - ); - - if let Err(why) = manager.initialize() { - error!("Failed to boot a shard: {:?}", why); - info!("Shutting down all shards"); - - manager.shutdown_all(); + let h2 = options.handle.clone(); + let strategy = options.sharding; + let client = Rc::new(ftry!(HttpClient::new( + options.http_client, + options.handle.clone(), + Rc::clone(&token), + ))); - return Err(Error::Client(ClientError::ShardBootFailure)); + let done = client.get_bot_gateway().map(move |gateway| { + let uri = Rc::new(gateway.url); + + Self { + #[cfg(feature = "cache")] + cache: Rc::new(RefCell::new(Cache::default())), + handle: h2, + http: client, + shard_manager: ShardManager::new(ShardManagerOptions { + strategy: strategy, + token: Rc::clone(&token), + ws_uri: Rc::clone(&uri), + }), + token: token, + ws_uri: Rc::clone(&uri), } - } + }).from_err(); - self.shard_manager_worker.run(); - - Ok(()) - } -} - -/// Validates that a token is likely in a valid format. -/// -/// This performs the following checks on a given token: -/// -/// - At least one character long; -/// - Contains 3 parts (split by the period char `'.'`); -/// - The second part of the token is at least 6 characters long; -/// - The token does not contain any whitespace prior to or after the token. -/// -/// # Examples -/// -/// Validate that a token is valid and that a number of invalid tokens are -/// actually invalid: -/// -/// ```rust,no_run -/// use serenity::client::validate_token; -/// -/// // ensure a valid token is in fact valid: -/// assert!(validate_token("Mjg4NzYwMjQxMzYzODc3ODg4.C_ikow.j3VupLBuE1QWZng3TMGH0z_UAwg").is_ok()); -/// -/// // "cat" isn't a valid token: -/// assert!(validate_token("cat").is_err()); -/// -/// // tokens must have three parts, separated by periods (this is still -/// // actually an invalid token): -/// assert!(validate_token("aaa.abcdefgh.bbb").is_ok()); -/// -/// // the second part must be _at least_ 6 characters long: -/// assert!(validate_token("a.abcdef.b").is_ok()); -/// assert!(validate_token("a.abcde.b").is_err()); -/// ``` -/// -/// # Errors -/// -/// Returns a [`ClientError::InvalidToken`] when one of the above checks fail. -/// The type of failure is not specified. -/// -/// [`ClientError::InvalidToken`]: enum.ClientError.html#variant.InvalidToken -pub fn validate_token(token: &str) -> Result<()> { - if token.is_empty() { - return Err(Error::Client(ClientError::InvalidToken)); - } - - let parts: Vec<&str> = token.split('.').collect(); - - // Check that the token has a total of 3 parts. - if parts.len() != 3 { - return Err(Error::Client(ClientError::InvalidToken)); - } - - // Check that the second part is at least 6 characters long. - if parts[1].len() < 6 { - return Err(Error::Client(ClientError::InvalidToken)); + Box::new(done) } - // Check that there is no whitespace before/after the token. - if token.trim() != token { - return Err(Error::Client(ClientError::InvalidToken)); - } + // pub fn connect(&self) -> ::futures::Stream<Item = Dispatch, Error = ::Error> { + // self.shard_manager.start().map(|(shard_id, msg)| { + // Dispatch { + // msg, + // shard_id, + // } + // }) + // } +} - Ok(()) +pub struct Dispatch { + pub msg: ::model::event::GatewayEvent, + pub shard_id: u64, } + +// Validates that a token is likely in a valid format. +// +// This performs the following checks on a given token: +// +// - At least one character long; +// - Contains 3 parts (split by the period char `'.'`); +// - The second part of the token is at least 6 characters long; +// - The token does not contain any whitespace prior to or after the token. +// +// # Examples +// +// Validate that a token is valid and that a number of invalid tokens are +// actually invalid: +// +// ```rust,no_run +// use serenity::client::validate_token; +// +// // ensure a valid token is in fact valid: +// assert!(validate_token"Mjg4NzYwMjQxMzYzODc3ODg4.C_ikow.j3VupLBuE1QWZng3TMGH0z_UAwg").is_ok()); +// +// // "cat" isn't a valid token: +// assert!(validate_token("cat").is_err()); +// +// // tokens must have three parts, separated by periods (this is still +// // actually an invalid token): +// assert!(validate_token("aaa.abcdefgh.bbb").is_ok()); +// +// // the second part must be _at least_ 6 characters long: +// assert!(validate_token("a.abcdef.b").is_ok()); +// assert!(validate_token("a.abcde.b").is_err()); +// ``` +// +// # Errors +// +// Returns a [`ClientError::InvalidToken`] when one of the above checks fail. +// The type of failure is not specified. +// +// [`ClientError::InvalidToken`]: enum.ClientError.html#variant.InvalidToken +// pub fn validate_token(token: &str) -> Result<()> { +// if token.is_empty() { +// return Err(Error::Client(ClientError::InvalidToken)); +// } + +// let parts: Vec<&str> = token.split('.').collect(); + +// // Check that the token has a total of 3 parts. +// if parts.len() != 3 { +// return Err(Error::Client(ClientError::InvalidToken)); +// } + +// // Check that the second part is at least 6 characters long. +// if parts[1].len() < 6 { +// return Err(Error::Client(ClientError::InvalidToken)); +// } + +// // Check that there is no whitespace before/after the token. +// if token.trim() != token { +// return Err(Error::Client(ClientError::InvalidToken)); +// } + +// Ok(()) +// } diff --git a/src/client/shard_manager.rs b/src/client/shard_manager.rs new file mode 100644 index 0000000..0de3deb --- /dev/null +++ b/src/client/shard_manager.rs @@ -0,0 +1,62 @@ +use std::collections::VecDeque; +use std::rc::Rc; + +#[derive(Clone, Copy, Debug)] +pub enum ShardingStrategy { + Autoshard, + Range([u64; 3]), +} + +impl ShardingStrategy { + pub fn auto() -> Self { + ShardingStrategy::Autoshard + } + + pub fn multi(count: u64) -> Self { + ShardingStrategy::Range([0, count, count]) + } + + pub fn simple() -> Self { + ShardingStrategy::Range([0, 1, 1]) + } + + pub fn range(index: u64, count: u64, total: u64) -> Self { + ShardingStrategy::Range([index, count, total]) + } +} + +impl Default for ShardingStrategy { + fn default() -> Self { + ShardingStrategy::Autoshard + } +} + +#[derive(Clone, Debug, Default)] +pub struct ShardManagerOptions { + pub strategy: ShardingStrategy, + pub token: Rc<String>, + pub ws_uri: Rc<String>, +} + +#[derive(Debug)] +pub struct ShardManager { + pub queue: VecDeque<u64>, + pub shards: (), + pub strategy: ShardingStrategy, + pub token: Rc<String>, + pub ws_uri: Rc<String>, + non_exhaustive: (), +} + +impl ShardManager { + pub fn new(options: ShardManagerOptions) -> Self { + Self { + queue: VecDeque::new(), + shards: (), + strategy: options.strategy, + token: options.token, + ws_uri: options.ws_uri, + non_exhaustive: (), + } + } +} |