diff options
| author | Zeyla Hellyer <[email protected]> | 2017-06-16 20:29:57 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-06-16 20:29:57 -0700 |
| commit | 601704acb94601a134ae43e795474afe8574b2ae (patch) | |
| tree | 16194482225b4877ce70613962d277e81b13660b /src/client/mod.rs | |
| parent | Fix broken link from ModelError (diff) | |
| download | serenity-601704acb94601a134ae43e795474afe8574b2ae.tar.xz serenity-601704acb94601a134ae43e795474afe8574b2ae.zip | |
Rework shard logic and shard handling
Diffstat (limited to 'src/client/mod.rs')
| -rw-r--r-- | src/client/mod.rs | 176 |
1 files changed, 49 insertions, 127 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 3a6b786..40859f2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -35,7 +35,6 @@ pub use ::http as rest; #[cfg(feature="cache")] pub use ::CACHE; -use chrono::UTC; use self::dispatch::dispatch; use self::event_store::EventStore; use std::collections::HashMap; @@ -170,7 +169,7 @@ pub struct Client { event_store: Arc<RwLock<EventStore>>, #[cfg(feature="framework")] framework: Arc<Mutex<Framework>>, - token: String, + token: Arc<Mutex<String>>, } #[allow(type_complexity)] @@ -307,7 +306,7 @@ impl Client { /// /// [gateway docs]: gateway/index.html#sharding pub fn start(&mut self) -> Result<()> { - self.start_connection(None, http::get_gateway()?.url) + self.start_connection([0, 0, 1], http::get_gateway()?.url) } /// Establish the connection(s) and start listening for events. @@ -362,7 +361,7 @@ impl Client { drop(res); - self.start_connection(Some([0, x, y]), url) + self.start_connection([0, x, y], url) } /// Establish a sharded connection and start listening for events. @@ -434,7 +433,7 @@ impl Client { /// [`start_autosharded`]: #method.start_autosharded /// [gateway docs]: gateway/index.html#sharding pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> { - self.start_connection(Some([shard, shard, shards]), http::get_gateway()?.url) + self.start_connection([shard, shard, shards], http::get_gateway()?.url) } /// Establish sharded connections and start listening for events. @@ -483,7 +482,7 @@ impl Client { /// [`start_shard_range`]: #method.start_shard_range /// [Gateway docs]: gateway/index.html#sharding pub fn start_shards(&mut self, total_shards: u64) -> Result<()> { - self.start_connection(Some([0, total_shards - 1, total_shards]), http::get_gateway()?.url) + self.start_connection([0, total_shards - 1, total_shards], http::get_gateway()?.url) } /// Establish a range of sharded connections and start listening for events. @@ -544,7 +543,7 @@ impl Client { /// [`start_shards`]: #method.start_shards /// [Gateway docs]: gateway/index.html#sharding pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> { - self.start_connection(Some([range[0], range[1], total_shards]), http::get_gateway()?.url) + self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url) } /// Attaches a handler for when a [`ChannelCreate`] is received. @@ -950,7 +949,7 @@ impl Client { // an error. // // [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown - fn start_connection(&mut self, shard_data: Option<[u64; 3]>, url: String) + fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { // Update the framework's current user if the feature is enabled. // @@ -966,13 +965,13 @@ impl Client { let gateway_url = Arc::new(Mutex::new(url)); - let shards_index = shard_data.map_or(0, |x| x[0]); - let shards_total = shard_data.map_or(1, |x| x[1] + 1); + let shards_index = shard_data[0]; + let shards_total = shard_data[1] + 1; let mut threads = vec![]; for shard_number in shards_index..shards_total { - let shard_info = shard_data.map(|s| [shard_number, s[2]]); + let shard_info = [shard_number, shard_data[2]]; let boot = boot_shard(&BootInfo { gateway_url: gateway_url.clone(), @@ -981,29 +980,9 @@ impl Client { }); match boot { - Ok((shard, ready)) => { - #[cfg(feature="cache")] - { - CACHE.write() - .unwrap() - .update_with_ready(&ready); - } - + Ok(shard) => { let shard = Arc::new(Mutex::new(shard)); - feature_framework! {{ - dispatch(Event::Ready(ready), - &shard, - &self.framework, - &self.data, - &self.event_store); - } else { - dispatch(Event::Ready(ready), - &shard, - &self.data, - &self.event_store); - }} - let monitor_info = feature_framework! {{ MonitorInfo { data: self.data.clone(), @@ -1241,8 +1220,8 @@ impl Client { struct BootInfo { gateway_url: Arc<Mutex<String>>, - shard_info: Option<[u64; 2]>, - token: String, + shard_info: [u64; 2], + token: Arc<Mutex<String>>, } #[cfg(feature="framework")] @@ -1252,8 +1231,8 @@ struct MonitorInfo { framework: Arc<Mutex<Framework>>, gateway_url: Arc<Mutex<String>>, shard: Arc<Mutex<Shard>>, - shard_info: Option<[u64; 2]>, - token: String, + shard_info: [u64; 2], + token: Arc<Mutex<String>>, } #[cfg(not(feature="framework"))] @@ -1262,11 +1241,11 @@ struct MonitorInfo { event_store: Arc<RwLock<EventStore>>, gateway_url: Arc<Mutex<String>>, shard: Arc<Mutex<Shard>>, - shard_info: Option<[u64; 2]>, + shard_info: [u64; 2], token: String, } -fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> { +fn boot_shard(info: &BootInfo) -> Result<Shard> { // Make ten attempts to boot the shard, exponentially backing off; if it // still doesn't boot after that, accept it as a failure. // @@ -1288,22 +1267,15 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> { } } - let attempt = Shard::new(&info.gateway_url.lock().unwrap(), - &info.token, + let attempt = Shard::new(info.gateway_url.clone(), + info.token.clone(), info.shard_info); match attempt { - Ok((shard, ready)) => { - #[cfg(feature="cache")] - { - CACHE.write() - .unwrap() - .update_with_ready(&ready); - } - + Ok(shard) => { info!("Successfully booted shard: {:?}", info.shard_info); - return Ok((shard, ready)); + return Ok(shard); }, Err(why) => warn!("Failed to boot shard: {:?}", why), } @@ -1327,29 +1299,11 @@ fn monitor_shard(mut info: MonitorInfo) { }); match boot { - Ok((new_shard, ready)) => { - #[cfg(feature="cache")] - { - CACHE.write().unwrap().update_with_ready(&ready); - } - + Ok(new_shard) => { *info.shard.lock().unwrap() = new_shard; boot_successful = true; - feature_framework! {{ - dispatch(Event::Ready(ready), - &info.shard, - &info.framework, - &info.data, - &info.event_store); - } else { - dispatch(Event::Ready(ready), - &info.shard, - &info.data, - &info.event_store); - }} - break; }, Err(why) => warn!("Failed to boot shard: {:?}", why), @@ -1370,43 +1324,11 @@ fn monitor_shard(mut info: MonitorInfo) { fn handle_shard(info: &mut MonitorInfo) { // This is currently all ducktape. Redo this. - let mut last_ack_time = UTC::now().timestamp(); - let mut last_heartbeat_sent = UTC::now().timestamp(); - loop { - let in_secs = { - let shard = info.shard.lock().unwrap(); - - shard.heartbeat_interval() / 1000 - }; - - if UTC::now().timestamp() - last_heartbeat_sent > in_secs { + { let mut shard = info.shard.lock().unwrap(); - // If the last heartbeat didn't receive an acknowledgement, then - // shutdown and auto-reconnect. - if !shard.last_heartbeat_acknowledged() { - debug!("Last heartbeat not acknowledged; re-connecting"); - - match shard.resume() { - Ok(_) => { - debug!("Successfully resumed shard"); - - last_ack_time = UTC::now().timestamp(); - last_heartbeat_sent = UTC::now().timestamp(); - - continue; - }, - Err(why) => { - warn!("Err resuming shard: {:?}", why); - - return; - }, - } - } - - let _ = shard.heartbeat(); - last_heartbeat_sent = UTC::now().timestamp(); + shard.check_heartbeat(); } #[cfg(feature="voice")] @@ -1420,40 +1342,39 @@ fn handle_shard(info: &mut MonitorInfo) { let mut shard = info.shard.lock().unwrap(); let event = match shard.client.recv_json(GatewayEvent::decode) { - Ok(GatewayEvent::HeartbeatAck) => { - last_ack_time = UTC::now().timestamp(); - - Ok(GatewayEvent::HeartbeatAck) - }, Err(Error::WebSocket(WebSocketError::IoError(_))) => { - if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; + + if seconds_passed <= interval_in_secs * 2 { + continue; + } + } else { continue; } - debug!("Attempting to shutdown receiver/sender"); - - match shard.resume() { - Ok(_) => { - debug!("Successfully resumed shard"); + debug!("Attempting to auto-reconnect"); - last_ack_time = UTC::now().timestamp(); - last_heartbeat_sent = UTC::now().timestamp(); - - continue; - }, - Err(why) => { - warn!("Err resuming shard: {:?}", why); - - return; - }, + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); } + + continue; }, Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue, other => other, }; - trace!("Received event on shard handler: {:?}", event); - match shard.handle_event(event) { Ok(Some(event)) => event, Ok(None) => continue, @@ -1482,19 +1403,20 @@ fn handle_shard(info: &mut MonitorInfo) { fn init_client(token: String) -> Client { http::set_token(&token); + let locked = Arc::new(Mutex::new(token)); feature_framework! {{ Client { data: Arc::new(Mutex::new(ShareMap::custom())), event_store: Arc::new(RwLock::new(EventStore::default())), framework: Arc::new(Mutex::new(Framework::default())), - token: token, + token: locked, } } else { Client { data: Arc::new(Mutex::new(ShareMap::custom())), event_store: Arc::new(RwLock::new(EventStore::default())), - token: token, + token: locked, } }} } |