aboutsummaryrefslogtreecommitdiff
path: root/src/client/mod.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-24 15:48:02 -0700
committerZeyla Hellyer <[email protected]>2017-09-24 15:53:23 -0700
commit6c43fed3702be3fdc1eafed26a2f6335acd71843 (patch)
treee3dd142b36f221f33fb8e35c511bbf4e9e9471b6 /src/client/mod.rs
parentUse $crate for CommandError (diff)
downloadserenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.tar.xz
serenity-6c43fed3702be3fdc1eafed26a2f6335acd71843.zip
Add a shard manager
The shard manager will queue up shards for booting.
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r--src/client/mod.rs287
1 files changed, 19 insertions, 268 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 20c2bd6..6ab6951 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -19,6 +19,7 @@
//! [Client examples]: struct.Client.html#examples
#![allow(zero_ptr)]
+mod bridge;
mod context;
mod dispatch;
mod error;
@@ -35,20 +36,17 @@ pub use http as rest;
#[cfg(feature = "cache")]
pub use CACHE;
+use self::bridge::gateway::ShardManager;
use self::dispatch::dispatch;
use std::sync::{self, Arc};
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use parking_lot::Mutex;
use std::collections::HashMap;
-use std::time::Duration;
-use std::{mem, thread};
+use std::mem;
use super::gateway::Shard;
use typemap::ShareMap;
-use websocket::result::WebSocketError;
use http;
use internal::prelude::*;
-use internal::ws_impl::ReceiverExt;
-use model::event::*;
#[cfg(feature = "framework")]
use framework::Framework;
@@ -752,283 +750,36 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let gateway_url = Arc::new(sync::Mutex::new(url));
- let shards_index = shard_data[0];
- let shards_total = shard_data[1] + 1;
+ let mut manager = ShardManager::new(
+ shard_data[0],
+ shard_data[1] - shard_data[0] + 1,
+ shard_data[2],
+ gateway_url.clone(),
+ self.token.clone(),
+ self.data.clone(),
+ self.event_handler.clone(),
+ self.framework.clone(),
+ );
- let mut threads = vec![];
+ if let Err(why) = manager.initialize() {
+ error!("Failed to boot a shard: {:?}", why);
+ info!("Shutting down all shards");
- for shard_number in shards_index..shards_total {
- let shard_info = [shard_number, shard_data[2]];
+ manager.shutdown_all();
- let boot_info = BootInfo {
- gateway_url: gateway_url.clone(),
- shard_info: shard_info,
- token: self.token.clone(),
- };
-
- let boot = boot_shard(&boot_info);
-
- match boot {
- Ok(shard) => {
- let shard = Arc::new(Mutex::new(shard));
-
- self.shards.lock().insert(shard_number, shard.clone());
-
- let monitor_info = feature_framework! {{
- MonitorInfo {
- data: self.data.clone(),
- event_handler: self.event_handler.clone(),
- framework: self.framework.clone(),
- gateway_url: gateway_url.clone(),
- shard: shard,
- shard_info: shard_info,
- token: self.token.clone(),
- }
- } else {
- MonitorInfo {
- data: self.data.clone(),
- event_handler: self.event_handler.clone(),
- gateway_url: gateway_url.clone(),
- shard: shard,
- shard_info: shard_info,
- token: self.token.clone(),
- }
- }};
-
- threads.push(thread::spawn(move || {
- monitor_shard(monitor_info);
- }));
- },
- Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why),
- }
-
- // We need to wait at least 5 seconds between IDENTIFYs.
- //
- // Add an extra second as a buffer.
- thread::sleep(Duration::from_secs(6));
+ return Err(Error::Client(ClientError::ShardBootFailure));
}
- for thread in threads {
- let _ = thread.join();
- }
+ manager.run();
Err(Error::Client(ClientError::Shutdown))
}
}
-
impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
fn drop(&mut self) { self.close_handle().close(); }
}
-struct BootInfo {
- gateway_url: Arc<sync::Mutex<String>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-#[cfg(feature = "framework")]
-struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>,
- gateway_url: Arc<sync::Mutex<String>>,
- shard: Arc<Mutex<Shard>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-#[cfg(not(feature = "framework"))]
-struct MonitorInfo<H: EventHandler + Send + 'static> {
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- gateway_url: Arc<sync::Mutex<String>>,
- shard: Arc<Mutex<Shard>>,
- shard_info: [u64; 2],
- token: Arc<sync::Mutex<String>>,
-}
-
-fn boot_shard(info: &BootInfo) -> Result<Shard> {
- // Make ten attempts to boot the shard, exponentially backing off; if it
- // still doesn't boot after that, accept it as a failure.
- //
- // After three attempts, start re-retrieving the gateway URL. Before that,
- // use the cached one.
- for attempt_number in 1..3u64 {
- let BootInfo {
- ref gateway_url,
- ref token,
- shard_info,
- } = *info;
- // If we've tried over 3 times so far, get a new gateway URL.
- //
- // If doing so fails, count this as a boot attempt.
- if attempt_number > 3 {
- match http::get_gateway() {
- Ok(g) => *gateway_url.lock().unwrap() = g.url,
- Err(why) => {
- warn!("Failed to retrieve gateway URL: {:?}", why);
-
- // Failed -- start over.
- continue;
- },
- }
- }
-
- let attempt = Shard::new(gateway_url.clone(), token.clone(), shard_info);
-
- match attempt {
- Ok(shard) => {
- info!("Successfully booted shard: {:?}", shard_info);
-
- return Ok(shard);
- },
- Err(why) => warn!("Failed to boot shard: {:?}", why),
- }
- }
-
- // Hopefully _never_ happens?
- Err(Error::Client(ClientError::ShardBootFailure))
-}
-
-fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) {
- handle_shard(&mut info);
-
- let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed);
-
- while handle_still {
- let mut boot_successful = false;
-
- for _ in 0..3 {
- let boot = boot_shard(&BootInfo {
- gateway_url: info.gateway_url.clone(),
- shard_info: info.shard_info,
- token: info.token.clone(),
- });
-
- match boot {
- Ok(new_shard) => {
- *info.shard.lock() = new_shard;
-
- boot_successful = true;
-
- break;
- },
- Err(why) => warn!("Failed to boot shard: {:?}", why),
- }
- }
-
- if boot_successful {
- handle_shard(&mut info);
- } else {
- break;
- }
-
- // The shard died: redo the cycle, unless client close was requested.
- handle_still = HANDLE_STILL.load(Ordering::Relaxed);
- }
-
- if handle_still {
- error!("Completely failed to reboot shard");
- } else {
- info!("Client close was requested. Shutting down.");
-
- let mut shard = info.shard.lock();
-
- if let Err(e) = shard.shutdown_clean() {
- error!(
- "Error shutting down shard {:?}: {:?}",
- shard.shard_info(),
- e
- );
- }
- }
-}
-
-fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) {
-
- // This is currently all ducktape. Redo this.
- while HANDLE_STILL.load(Ordering::Relaxed) {
- {
- let mut shard = info.shard.lock();
-
- if let Err(why) = shard.check_heartbeat() {
- error!("Failed to heartbeat and reconnect: {:?}", why);
-
- return;
- }
- }
-
- #[cfg(feature = "voice")]
- {
- let mut shard = info.shard.lock();
-
- shard.cycle_voice_recv();
- }
-
- let event = {
- let mut shard = info.shard.lock();
-
- let event = match shard.client.recv_json(GatewayEvent::decode) {
- Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- // Check that an amount of time at least double the
- // heartbeat_interval has passed.
- //
- // If not, continue on trying to receive messages.
- //
- // If it has, attempt to auto-reconnect.
- let last = shard.last_heartbeat_ack();
- let interval = shard.heartbeat_interval();
-
- if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
- let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
- let interval_in_secs = interval / 1000;
-
- if seconds_passed <= interval_in_secs * 2 {
- continue;
- }
- } else {
- continue;
- }
-
- debug!("Attempting to auto-reconnect");
-
- if let Err(why) = shard.autoreconnect() {
- error!("Failed to auto-reconnect: {:?}", why);
- }
-
- continue;
- },
- Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
- other => other,
- };
-
- match shard.handle_event(event) {
- Ok(Some(event)) => event,
- Ok(None) => continue,
- Err(why) => {
- error!("Shard handler received err: {:?}", why);
-
- continue;
- },
- }
- };
-
- feature_framework! {{
- dispatch(event,
- &info.shard,
- &info.framework,
- &info.data,
- &info.event_handler);
- } else {
- dispatch(event,
- &info.shard,
- &info.data,
- &info.event_handler);
- }}
- }
-}
-
/// Validates that a token is likely in a valid format.
///
/// This performs the following checks on a given token: