diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-24 15:48:02 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-09-24 15:53:23 -0700 |
| commit | 6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch) | |
| tree | e3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client/bridge/gateway/shard_runner.rs | |
| parent | Use $crate for CommandError (diff) | |
| download | serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip | |
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client/bridge/gateway/shard_runner.rs')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs new file mode 100644 index 0000000..8bdbb35 --- /dev/null +++ b/src/client/bridge/gateway/shard_runner.rs @@ -0,0 +1,171 @@ +use internal::prelude::*; +use internal::ws_impl::ReceiverExt; +use model::event::{Event, GatewayEvent}; +use parking_lot::Mutex as ParkingLotMutex; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use super::super::super::{EventHandler, dispatch}; +use super::{LockedShard, ShardId, ShardManagerMessage}; +use typemap::ShareMap; +use websocket::WebSocketError; + +#[cfg(feature = "framework")] +use framework::Framework; + +enum EventRetrieval { + Some() +} + +pub struct ShardRunner<H: EventHandler + 'static> { + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>, + #[cfg(feature = "framework")] + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + manager_tx: Sender<ShardManagerMessage>, + runner_rx: Receiver<ShardManagerMessage>, + runner_tx: Sender<ShardManagerMessage>, + shard: LockedShard, +} + +impl<H: EventHandler + 'static> ShardRunner<H> { + pub fn new(shard: LockedShard, + manager_tx: Sender<ShardManagerMessage>, + framework: Arc<Mutex<Option<Box<Framework + Send>>>>, + data: Arc<ParkingLotMutex<ShareMap>>, + event_handler: Arc<H>) -> Self { + let (tx, rx) = mpsc::channel(); + + Self { + runner_rx: rx, + runner_tx: tx, + data, + event_handler, + framework, + manager_tx, + shard, + } + } + + pub fn run(&mut self) -> Result<()> { + loop { + { + let mut shard = self.shard.lock(); + let incoming = self.runner_rx.try_recv(); + + // Check for an incoming message over the runner channel. + // + // If the message is to shutdown, first verify the ID so we know + // for certain this runner is to shutdown. + if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { + if id.0 == shard.shard_info()[0] { + let _ = shard.shutdown_clean(); + + return Ok(()); + } + } + + if let Err(why) = shard.check_heartbeat() { + error!("Failed to heartbeat and reconnect: {:?}", why); + + let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); + let _ = self.manager_tx.send(msg); + + return Ok(()); + } + + #[cfg(feature = "voice")] + { + shard.cycle_voice_recv(); + } + } + + let events = self.recv_events(); + + for event in events { + feature_framework! {{ + dispatch(event, + &self.shard, + &self.framework, + &self.data, + &self.event_handler); + } else { + dispatch(event, + &info.shard, + &info.data, + &info.event_handler, + &handle); + }} + } + } + } + + pub(super) fn runner_tx(&self) -> Sender<ShardManagerMessage> { + self.runner_tx.clone() + } + + fn recv_events(&mut self) -> Vec<Event> { + let mut shard = self.shard.lock(); + + let mut events = vec![]; + + loop { + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; + + if seconds_passed <= interval_in_secs * 2 { + break; + } + } else { + break; + } + + debug!("Attempting to auto-reconnect"); + + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } + + break; + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => break, + other => other, + }; + + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => break, + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => continue, + Err(why) => { + error!("Shard handler received err: {:?}", why); + + continue; + }, + }; + + events.push(event); + + if events.len() > 5 { + break; + } + }; + + events + } +} |