diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-24 15:48:02 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-09-24 15:53:23 -0700 |
| commit | 6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch) | |
| tree | e3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client/mod.rs | |
| parent | Use $crate for CommandError (diff) | |
| download | serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip | |
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client/mod.rs')
| -rw-r--r-- | src/client/mod.rs | 287 |
1 files changed, 19 insertions, 268 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 20c2bd6..6ab6951 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -19,6 +19,7 @@ //! [Client examples]: struct.Client.html#examples #![allow(zero_ptr)] +mod bridge; mod context; mod dispatch; mod error; @@ -35,20 +36,17 @@ pub use http as rest; #[cfg(feature = "cache")] pub use CACHE; +use self::bridge::gateway::ShardManager; use self::dispatch::dispatch; use std::sync::{self, Arc}; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; use parking_lot::Mutex; use std::collections::HashMap; -use std::time::Duration; -use std::{mem, thread}; +use std::mem; use super::gateway::Shard; use typemap::ShareMap; -use websocket::result::WebSocketError; use http; use internal::prelude::*; -use internal::ws_impl::ReceiverExt; -use model::event::*; #[cfg(feature = "framework")] use framework::Framework; @@ -752,283 +750,36 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { let gateway_url = Arc::new(sync::Mutex::new(url)); - let shards_index = shard_data[0]; - let shards_total = shard_data[1] + 1; + let mut manager = ShardManager::new( + shard_data[0], + shard_data[1] - shard_data[0] + 1, + shard_data[2], + gateway_url.clone(), + self.token.clone(), + self.data.clone(), + self.event_handler.clone(), + self.framework.clone(), + ); - let mut threads = vec![]; + if let Err(why) = manager.initialize() { + error!("Failed to boot a shard: {:?}", why); + info!("Shutting down all shards"); - for shard_number in shards_index..shards_total { - let shard_info = [shard_number, shard_data[2]]; + manager.shutdown_all(); - let boot_info = BootInfo { - gateway_url: gateway_url.clone(), - shard_info: shard_info, - token: self.token.clone(), - }; - - let boot = boot_shard(&boot_info); - - match boot { - Ok(shard) => { - let shard = Arc::new(Mutex::new(shard)); - - self.shards.lock().insert(shard_number, shard.clone()); - - let monitor_info = feature_framework! {{ - MonitorInfo { - data: self.data.clone(), - event_handler: self.event_handler.clone(), - framework: self.framework.clone(), - gateway_url: gateway_url.clone(), - shard: shard, - shard_info: shard_info, - token: self.token.clone(), - } - } else { - MonitorInfo { - data: self.data.clone(), - event_handler: self.event_handler.clone(), - gateway_url: gateway_url.clone(), - shard: shard, - shard_info: shard_info, - token: self.token.clone(), - } - }}; - - threads.push(thread::spawn(move || { - monitor_shard(monitor_info); - })); - }, - Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), - } - - // We need to wait at least 5 seconds between IDENTIFYs. - // - // Add an extra second as a buffer. - thread::sleep(Duration::from_secs(6)); + return Err(Error::Client(ClientError::ShardBootFailure)); } - for thread in threads { - let _ = thread.join(); - } + manager.run(); Err(Error::Client(ClientError::Shutdown)) } } - impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { fn drop(&mut self) { self.close_handle().close(); } } -struct BootInfo { - gateway_url: Arc<sync::Mutex<String>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -#[cfg(feature = "framework")] -struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>, - gateway_url: Arc<sync::Mutex<String>>, - shard: Arc<Mutex<Shard>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -#[cfg(not(feature = "framework"))] -struct MonitorInfo<H: EventHandler + Send + 'static> { - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - gateway_url: Arc<sync::Mutex<String>>, - shard: Arc<Mutex<Shard>>, - shard_info: [u64; 2], - token: Arc<sync::Mutex<String>>, -} - -fn boot_shard(info: &BootInfo) -> Result<Shard> { - // Make ten attempts to boot the shard, exponentially backing off; if it - // still doesn't boot after that, accept it as a failure. - // - // After three attempts, start re-retrieving the gateway URL. Before that, - // use the cached one. - for attempt_number in 1..3u64 { - let BootInfo { - ref gateway_url, - ref token, - shard_info, - } = *info; - // If we've tried over 3 times so far, get a new gateway URL. - // - // If doing so fails, count this as a boot attempt. - if attempt_number > 3 { - match http::get_gateway() { - Ok(g) => *gateway_url.lock().unwrap() = g.url, - Err(why) => { - warn!("Failed to retrieve gateway URL: {:?}", why); - - // Failed -- start over. - continue; - }, - } - } - - let attempt = Shard::new(gateway_url.clone(), token.clone(), shard_info); - - match attempt { - Ok(shard) => { - info!("Successfully booted shard: {:?}", shard_info); - - return Ok(shard); - }, - Err(why) => warn!("Failed to boot shard: {:?}", why), - } - } - - // Hopefully _never_ happens? - Err(Error::Client(ClientError::ShardBootFailure)) -} - -fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) { - handle_shard(&mut info); - - let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed); - - while handle_still { - let mut boot_successful = false; - - for _ in 0..3 { - let boot = boot_shard(&BootInfo { - gateway_url: info.gateway_url.clone(), - shard_info: info.shard_info, - token: info.token.clone(), - }); - - match boot { - Ok(new_shard) => { - *info.shard.lock() = new_shard; - - boot_successful = true; - - break; - }, - Err(why) => warn!("Failed to boot shard: {:?}", why), - } - } - - if boot_successful { - handle_shard(&mut info); - } else { - break; - } - - // The shard died: redo the cycle, unless client close was requested. - handle_still = HANDLE_STILL.load(Ordering::Relaxed); - } - - if handle_still { - error!("Completely failed to reboot shard"); - } else { - info!("Client close was requested. Shutting down."); - - let mut shard = info.shard.lock(); - - if let Err(e) = shard.shutdown_clean() { - error!( - "Error shutting down shard {:?}: {:?}", - shard.shard_info(), - e - ); - } - } -} - -fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) { - - // This is currently all ducktape. Redo this. - while HANDLE_STILL.load(Ordering::Relaxed) { - { - let mut shard = info.shard.lock(); - - if let Err(why) = shard.check_heartbeat() { - error!("Failed to heartbeat and reconnect: {:?}", why); - - return; - } - } - - #[cfg(feature = "voice")] - { - let mut shard = info.shard.lock(); - - shard.cycle_voice_recv(); - } - - let event = { - let mut shard = info.shard.lock(); - - let event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - continue; - } - } else { - continue; - } - - debug!("Attempting to auto-reconnect"); - - if let Err(why) = shard.autoreconnect() { - error!("Failed to auto-reconnect: {:?}", why); - } - - continue; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, - other => other, - }; - - match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - } - }; - - feature_framework! {{ - dispatch(event, - &info.shard, - &info.framework, - &info.data, - &info.event_handler); - } else { - dispatch(event, - &info.shard, - &info.data, - &info.event_handler); - }} - } -} - /// Validates that a token is likely in a valid format. /// /// This performs the following checks on a given token: |