diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-24 15:48:02 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-09-24 15:53:23 -0700 |
| commit | 6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch) | |
| tree | e3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client | |
| parent | Use $crate for CommandError (diff) | |
| download | serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip | |
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/mod.rs | 46 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_manager.rs | 171 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_queuer.rs | 118 | ||||
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 171 | ||||
| -rw-r--r-- | src/client/bridge/mod.rs | 1 | ||||
| -rw-r--r-- | src/client/mod.rs | 287 |
6 files changed, 526 insertions, 268 deletions
diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs new file mode 100644 index 0000000..0bfd4e6 --- /dev/null +++ b/src/client/bridge/gateway/mod.rs @@ -0,0 +1,46 @@ +mod shard_manager; +mod shard_queuer; +mod shard_runner; + +pub use self::shard_manager::ShardManager; +pub use self::shard_queuer::ShardQueuer; +pub use self::shard_runner::ShardRunner; + +use gateway::Shard; +use parking_lot::Mutex; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::sync::mpsc::Sender; +use std::sync::Arc; + +type Parked<T> = Arc<Mutex<T>>; +type LockedShard = Parked<Shard>; + +#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub enum ShardManagerMessage { + Restart(ShardId), + Shutdown(ShardId), + ShutdownAll, +} + +pub enum ShardQueuerMessage { + /// Message to start a shard, where the 0-index element is the ID of the + /// Shard to start and the 1-index element is the total shards in use. + Start(ShardId, ShardId), + /// Message to shutdown the shard queuer. + Shutdown, +} + +// A light tuplestruct wrapper around a u64 to verify type correctness when +// working with the IDs of shards. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub struct ShardId(pub u64); + +impl Display for ShardId { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{}", self.0) + } +} + +pub struct ShardRunnerInfo { + runner_tx: Sender<ShardManagerMessage>, +} diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs new file mode 100644 index 0000000..0ac9d19 --- /dev/null +++ b/src/client/bridge/gateway/shard_manager.rs @@ -0,0 +1,171 @@ +use internal::prelude::*; +use parking_lot::Mutex as ParkingLotMutex; +use std::collections::HashMap; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; +use super::super::super::EventHandler; +use super::{ + ShardId, + ShardManagerMessage, + ShardQueuer, + ShardQueuerMessage, + ShardRunnerInfo, +}; +use typemap::ShareMap; + +#[cfg(feature = "framework")] +use framework::Framework; + +pub struct ShardManager { + #[cfg(feature = "framework")] + runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>, + /// The index of the first shard to initialize, 0-indexed. + shard_index: u64, + /// The number of shards to initialize. + shard_init: u64, + /// The total shards in use, 1-indexed. + shard_total: u64, + shard_queuer: Sender<ShardQueuerMessage>, + thread_rx: Receiver<ShardManagerMessage>, +} + +impl ShardManager { + #[cfg(feature = "framework")] + pub fn new<H>( + shard_index: u64, + shard_init: u64, + shard_total: u64, + ws_url: Arc<Mutex<String>>, + token: Arc<Mutex<String>>, + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + ) -> Self where H: EventHandler + Send + Sync + 'static { + let (thread_tx, thread_rx) = mpsc::channel(); + let (shard_queue_tx, shard_queue_rx) = mpsc::channel(); + + let runners = Arc::new(ParkingLotMutex::new(HashMap::new())); + + let mut shard_queuer = feature_framework! {{ + ShardQueuer { + data: data.clone(), + event_handler: event_handler.clone(), + framework: framework.clone(), + last_start: None, + manager_tx: thread_tx.clone(), + runners: runners.clone(), + rx: shard_queue_rx, + token: token.clone(), + ws_url: ws_url.clone(), + } + } else { + ShardQueuer { + data: data.clone(), + event_handler: event_handler.clone(), + last_start: None, + manager_tx: thread_tx.clone(), + runners: runners.clone(), + rx: shard_queue_rx, + rx: shard_queue_rx, + token: token.clone(), + ws_url: ws_url.clone(), + } + }}; + + thread::spawn(move || { + shard_queuer.run(); + }); + + Self { + shard_queuer: shard_queue_tx, + thread_rx: thread_rx, + runners, + shard_index, + shard_init, + shard_total, + } + } + + pub fn initialize(&mut self) -> Result<()> { + let shard_to = self.shard_index + self.shard_init; + + debug!("{}, {}", self.shard_index, self.shard_init); + + for shard_id in self.shard_index..shard_to { + let shard_total = self.shard_total; + + self.boot([ShardId(shard_id), ShardId(shard_total)]); + } + + Ok(()) + } + + pub fn run(&mut self) { + loop { + let value = match self.thread_rx.recv() { + Ok(value) => value, + Err(_) => break, + }; + + match value { + ShardManagerMessage::Restart(shard_id) => self.restart(shard_id), + ShardManagerMessage::Shutdown(shard_id) => self.shutdown(shard_id), + ShardManagerMessage::ShutdownAll => { + self.shutdown_all(); + + break; + }, + } + } + } + + pub fn shutdown_all(&mut self) { + info!("Shutting down all shards"); + let keys = { + self.runners.lock().keys().cloned().collect::<Vec<ShardId>>() + }; + + for shard_id in keys { + self.shutdown(shard_id); + } + } + + fn boot(&mut self, shard_info: [ShardId; 2]) { + info!("Telling shard queuer to start shard {}", shard_info[0]); + + let msg = ShardQueuerMessage::Start(shard_info[0], shard_info[1]); + let _ = self.shard_queuer.send(msg); + } + + fn restart(&mut self, shard_id: ShardId) { + info!("Restarting shard {}", shard_id); + self.shutdown(shard_id); + + let shard_total = self.shard_total; + + self.boot([shard_id, ShardId(shard_total)]); + } + + fn shutdown(&mut self, shard_id: ShardId) { + info!("Shutting down shard {}", shard_id); + + if let Some(runner) = self.runners.lock().get(&shard_id) { + let msg = ShardManagerMessage::Shutdown(shard_id); + + if let Err(why) = runner.runner_tx.send(msg) { + warn!("Failed to cleanly shutdown shard {}: {:?}", shard_id, why); + } + } + + self.runners.lock().remove(&shard_id); + } +} + +impl Drop for ShardManager { + fn drop(&mut self) { + if let Err(why) = self.shard_queuer.send(ShardQueuerMessage::Shutdown) { + warn!("Failed to send shutdown to shard queuer: {:?}", why); + } + } +} diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs new file mode 100644 index 0000000..8d3dbe1 --- /dev/null +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -0,0 +1,118 @@ +use framework::Framework; +use gateway::Shard; +use internal::prelude::*; +use parking_lot::Mutex as ParkingLotMutex; +use std::collections::HashMap; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; +use super::super::super::EventHandler; +use super::{ + ShardId, + ShardManagerMessage, + ShardQueuerMessage, + ShardRunner, + ShardRunnerInfo, +}; +use typemap::ShareMap; + +/// The shard queuer is a simple loop that runs indefinitely to manage the +/// startup of shards. +/// +/// A shard queuer instance _should_ be run in its own thread, due to the +/// blocking nature of the loop itself as well as a 5 second thread sleep +/// between shard starts. +pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> { + pub data: Arc<ParkingLotMutex<ShareMap>>, + pub event_handler: Arc<H>, + #[cfg(feature = "framework")] + pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + pub last_start: Option<Instant>, + pub manager_tx: Sender<ShardManagerMessage>, + pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>, + pub rx: Receiver<ShardQueuerMessage>, + pub token: Arc<Mutex<String>>, + pub ws_url: Arc<Mutex<String>>, +} + +impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> { + pub fn run(&mut self) { + loop { + let msg = match self.rx.recv() { + Ok(msg) => msg, + Err(_) => { + break; + } + }; + + match msg { + ShardQueuerMessage::Shutdown => break, + ShardQueuerMessage::Start(shard_id, shard_total) => { + self.check_last_start(); + + if let Err(why) = self.start(shard_id, shard_total) { + warn!("Err starting shard {}: {:?}", shard_id, why); + } + + self.last_start = Some(Instant::now()); + }, + } + } + } + + fn check_last_start(&mut self) { + let instant = match self.last_start { + Some(instant) => instant, + None => return, + }; + + // We must wait 5 seconds between IDENTIFYs to avoid session + // invalidations. + let duration = Duration::from_secs(5); + let elapsed = instant.elapsed(); + + if elapsed >= duration { + return; + } + + let to_sleep = duration - elapsed; + + thread::sleep(to_sleep); + } + + fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> { + let shard_info = [shard_id.0, shard_total.0]; + let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?; + let locked = Arc::new(ParkingLotMutex::new(shard)); + + let mut runner = feature_framework! {{ + ShardRunner::new( + locked.clone(), + self.manager_tx.clone(), + self.framework.clone(), + self.data.clone(), + self.event_handler.clone(), + ) + } else { + ShardRunner::new( + locked.clone(), + self.manager_tx.clone(), + self.data.clone(), + self.event_handler.clone(), + ) + }}; + + let runner_info = ShardRunnerInfo { + runner_tx: runner.runner_tx(), + }; + + thread::spawn(move || { + let _ = runner.run(); + }); + + self.runners.lock().insert(shard_id, runner_info); + + Ok(()) + } +} diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs new file mode 100644 index 0000000..8bdbb35 --- /dev/null +++ b/src/client/bridge/gateway/shard_runner.rs @@ -0,0 +1,171 @@ +use internal::prelude::*; +use internal::ws_impl::ReceiverExt; +use model::event::{Event, GatewayEvent}; +use parking_lot::Mutex as ParkingLotMutex; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use super::super::super::{EventHandler, dispatch}; +use super::{LockedShard, ShardId, ShardManagerMessage}; +use typemap::ShareMap; +use websocket::WebSocketError; + +#[cfg(feature = "framework")] +use framework::Framework; + +enum EventRetrieval { + Some() +} + +pub struct ShardRunner<H: EventHandler + 'static> { + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>, + #[cfg(feature = "framework")] + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + manager_tx: Sender<ShardManagerMessage>, + runner_rx: Receiver<ShardManagerMessage>, + runner_tx: Sender<ShardManagerMessage>, + shard: LockedShard, +} + +impl<H: EventHandler + 'static> ShardRunner<H> { + pub fn new(shard: LockedShard, + manager_tx: Sender<ShardManagerMessage>, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>) -> Self { + let (tx, rx) = mpsc::channel(); + + Self { + runner_rx: rx, + runner_tx: tx, + data, + event_handler, + framework, + manager_tx, + shard, + } + } + + pub fn run(&mut self) -> Result<()> { + loop { + { + let mut shard = self.shard.lock(); + let incoming = self.runner_rx.try_recv(); + + // Check for an incoming message over the runner channel. + // + // If the message is to shutdown, first verify the ID so we know + // for certain this runner is to shutdown. + if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { + if id.0 == shard.shard_info()[0] { + let _ = shard.shutdown_clean(); + + return Ok(()); + } + } + + if let Err(why) = shard.check_heartbeat() { + error!("Failed to heartbeat and reconnect: {:?}", why); + + let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); + let _ = self.manager_tx.send(msg); + + return Ok(()); + } + + #[cfg(feature = "voice")] + { + shard.cycle_voice_recv(); + } + } + + let events = self.recv_events(); + + for event in events { + feature_framework! {{ + dispatch(event, + &self.shard, + &self.framework, + &self.data, + &self.event_handler); + } else { + dispatch(event, + &info.shard, + &info.data, + &info.event_handler, + &handle); + }} + } + } + } + + pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> { + self.runner_tx.clone() + } + + fn recv_events(&mut self) -> Vec<Event> { + let mut shard = self.shard.lock(); + + let mut events = vec![]; + + loop { + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; + + if seconds_passed <= interval_in_secs * 2 { + break; + } + } else { + break; + } + + debug!("Attempting to auto-reconnect"); + + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } + + break; + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => break, + other => other, + }; + + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => break, + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => continue, + Err(why) => { + error!("Shard handler received err: {:?}", why); + + continue; + }, + }; + + events.push(event); + + if events.len() > 5 { + break; + } + }; + + events + } +} diff --git a/src/client/bridge/mod.rs b/src/client/bridge/mod.rs new file mode 100644 index 0000000..4f27526 --- /dev/null +++ b/src/client/bridge/mod.rs @@ -0,0 +1 @@ +pub mod gateway; diff --git a/src/client/mod.rs b/src/client/mod.rs index 20c2bd6..6ab6951 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -19,6 +19,7 @@ //! [Client examples]: struct.Client.html#examples #![allow(zero_ptr)] +mod bridge; mod context; mod dispatch; mod error; @@ -35,20 +36,17 @@ pub use http as rest; #[cfg(feature = "cache")] pub use CACHE; +use self::bridge::gateway::ShardManager; use self::dispatch::dispatch; use std::sync::{self, Arc}; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; use parking_lot::Mutex; use std::collections::HashMap; -use std::time::Duration; -use std::{mem, thread}; +use std::mem; use super::gateway::Shard; use typemap::ShareMap; -use websocket::result::WebSocketError; use http; use internal::prelude::*; -use internal::ws_impl::ReceiverExt; -use model::event::*; #[cfg(feature = "framework")] use framework::Framework; @@ -752,283 +750,36 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { let gateway_url = Arc::new(sync::Mutex::new(url)); - let shards_index = shard_data[0]; - let shards_total = shard_data[1] + 1; + let mut manager = ShardManager::new( + shard_data[0], + shard_data[1] - shard_data[0] + 1, + shard_data[2], + gateway_url.clone(), + self.token.clone(), + self.data.clone(), + self.event_handler.clone(), + self.framework.clone(), + ); - let mut threads = vec![]; + if let Err(why) = manager.initialize() { + error!("Failed to boot a shard: {:?}", why); + info!("Shutting down all shards"); - for shard_number in shards_index..shards_total { - let shard_info = [shard_number, shard_data[2]]; + manager.shutdown_all(); - let boot_info = BootInfo { - gateway_url: gateway_url.clone(), - shard_info: shard_info, - token: self.token.clone(), - }; - - let boot = boot_shard(&boot_info); - - match boot { - Ok(shard) => { - let shard = Arc::new(Mutex::new(shard)); - - self.shards.lock().insert(shard_number, shard.clone()); - - let monitor_info = feature_framework! {{ - MonitorInfo { - data: self.data.clone(), - event_handler: self.event_handler.clone(), - framework: self.framework.clone(), - gateway_url: gateway_url.clone(), - shard: shard, - shard_info: shard_info, - token: self.token.clone(), - } - } else { - MonitorInfo { - data: self.data.clone(), - event_handler: self.event_handler.clone(), - gateway_url: gateway_url.clone(), - shard: shard, - shard_info: shard_info, - token: self.token.clone(), - } - }}; - - threads.push(thread::spawn(move || { - monitor_shard(monitor_info); - })); - }, - Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), - } - - // We need to wait at least 5 seconds between IDENTIFYs. - // - // Add an extra second as a buffer. - thread::sleep(Duration::from_secs(6)); + return Err(Error::Client(ClientError::ShardBootFailure)); } - for thread in threads { - let _ = thread.join(); - } + manager.run(); Err(Error::Client(ClientError::Shutdown)) } } - impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { fn drop(&mut self) { self.close_handle().close(); } } -struct BootInfo { - gateway_url: Arc<sync::Mutex<String>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -#[cfg(feature = "framework")] -struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>, - gateway_url: Arc<sync::Mutex<String>>, - shard: Arc<Mutex<Shard>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -#[cfg(not(feature = "framework"))] -struct MonitorInfo<H: EventHandler + Send + 'static> { - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - gateway_url: Arc<sync::Mutex<String>>, - shard: Arc<Mutex<Shard>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -fn boot_shard(info: &BootInfo) -> Result<Shard> { - // Make ten attempts to boot the shard, exponentially backing off; if it - // still doesn't boot after that, accept it as a failure. - // - // After three attempts, start re-retrieving the gateway URL. Before that, - // use the cached one. - for attempt_number in 1..3u64 { - let BootInfo { - ref gateway_url, - ref token, - shard_info, - } = *info; - // If we've tried over 3 times so far, get a new gateway URL. - // - // If doing so fails, count this as a boot attempt. - if attempt_number > 3 { - match http::get_gateway() { - Ok(g) => *gateway_url.lock().unwrap() = g.url, - Err(why) => { - warn!("Failed to retrieve gateway URL: {:?}", why); - - // Failed -- start over. - continue; - }, - } - } - - let attempt = Shard::new(gateway_url.clone(), token.clone(), shard_info); - - match attempt { - Ok(shard) => { - info!("Successfully booted shard: {:?}", shard_info); - - return Ok(shard); - }, - Err(why) => warn!("Failed to boot shard: {:?}", why), - } - } - - // Hopefully _never_ happens? - Err(Error::Client(ClientError::ShardBootFailure)) -} - -fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) { - handle_shard(&mut info); - - let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed); - - while handle_still { - let mut boot_successful = false; - - for _ in 0..3 { - let boot = boot_shard(&BootInfo { - gateway_url: info.gateway_url.clone(), - shard_info: info.shard_info, - token: info.token.clone(), - }); - - match boot { - Ok(new_shard) => { - *info.shard.lock() = new_shard; - - boot_successful = true; - - break; - }, - Err(why) => warn!("Failed to boot shard: {:?}", why), - } - } - - if boot_successful { - handle_shard(&mut info); - } else { - break; - } - - // The shard died: redo the cycle, unless client close was requested. - handle_still = HANDLE_STILL.load(Ordering::Relaxed); - } - - if handle_still { - error!("Completely failed to reboot shard"); - } else { - info!("Client close was requested. Shutting down."); - - let mut shard = info.shard.lock(); - - if let Err(e) = shard.shutdown_clean() { - error!( - "Error shutting down shard {:?}: {:?}", - shard.shard_info(), - e - ); - } - } -} - -fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) { - - // This is currently all ducktape. Redo this. - while HANDLE_STILL.load(Ordering::Relaxed) { - { - let mut shard = info.shard.lock(); - - if let Err(why) = shard.check_heartbeat() { - error!("Failed to heartbeat and reconnect: {:?}", why); - - return; - } - } - - #[cfg(feature = "voice")] - { - let mut shard = info.shard.lock(); - - shard.cycle_voice_recv(); - } - - let event = { - let mut shard = info.shard.lock(); - - let event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - continue; - } - } else { - continue; - } - - debug!("Attempting to auto-reconnect"); - - if let Err(why) = shard.autoreconnect() { - error!("Failed to auto-reconnect: {:?}", why); - } - - continue; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, - other => other, - }; - - match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - } - }; - - feature_framework! {{ - dispatch(event, - &info.shard, - &info.framework, - &info.data, - &info.event_handler); - } else { - dispatch(event, - &info.shard, - &info.data, - &info.event_handler); - }} - } -} - /// Validates that a token is likely in a valid format. /// /// This performs the following checks on a given token: |