aboutsummaryrefslogtreecommitdiff
path: root/src/client/mod.rs
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-06-16 20:29:57 -0700
committerZeyla Hellyer <[email protected]>2017-06-16 20:29:57 -0700
commit601704acb94601a134ae43e795474afe8574b2ae (patch)
tree16194482225b4877ce70613962d277e81b13660b /src/client/mod.rs
parentFix broken link from ModelError (diff)
downloadserenity-601704acb94601a134ae43e795474afe8574b2ae.tar.xz
serenity-601704acb94601a134ae43e795474afe8574b2ae.zip
Rework shard logic and shard handling
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r--src/client/mod.rs176
1 files changed, 49 insertions, 127 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 3a6b786..40859f2 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -35,7 +35,6 @@ pub use ::http as rest;
#[cfg(feature="cache")]
pub use ::CACHE;
-use chrono::UTC;
use self::dispatch::dispatch;
use self::event_store::EventStore;
use std::collections::HashMap;
@@ -170,7 +169,7 @@ pub struct Client {
event_store: Arc<RwLock<EventStore>>,
#[cfg(feature="framework")]
framework: Arc<Mutex<Framework>>,
- token: String,
+ token: Arc<Mutex<String>>,
}
#[allow(type_complexity)]
@@ -307,7 +306,7 @@ impl Client {
///
/// [gateway docs]: gateway/index.html#sharding
pub fn start(&mut self) -> Result<()> {
- self.start_connection(None, http::get_gateway()?.url)
+ self.start_connection([0, 0, 1], http::get_gateway()?.url)
}
/// Establish the connection(s) and start listening for events.
@@ -362,7 +361,7 @@ impl Client {
drop(res);
- self.start_connection(Some([0, x, y]), url)
+ self.start_connection([0, x, y], url)
}
/// Establish a sharded connection and start listening for events.
@@ -434,7 +433,7 @@ impl Client {
/// [`start_autosharded`]: #method.start_autosharded
/// [gateway docs]: gateway/index.html#sharding
pub fn start_shard(&mut self, shard: u64, shards: u64) -> Result<()> {
- self.start_connection(Some([shard, shard, shards]), http::get_gateway()?.url)
+ self.start_connection([shard, shard, shards], http::get_gateway()?.url)
}
/// Establish sharded connections and start listening for events.
@@ -483,7 +482,7 @@ impl Client {
/// [`start_shard_range`]: #method.start_shard_range
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shards(&mut self, total_shards: u64) -> Result<()> {
- self.start_connection(Some([0, total_shards - 1, total_shards]), http::get_gateway()?.url)
+ self.start_connection([0, total_shards - 1, total_shards], http::get_gateway()?.url)
}
/// Establish a range of sharded connections and start listening for events.
@@ -544,7 +543,7 @@ impl Client {
/// [`start_shards`]: #method.start_shards
/// [Gateway docs]: gateway/index.html#sharding
pub fn start_shard_range(&mut self, range: [u64; 2], total_shards: u64) -> Result<()> {
- self.start_connection(Some([range[0], range[1], total_shards]), http::get_gateway()?.url)
+ self.start_connection([range[0], range[1], total_shards], http::get_gateway()?.url)
}
/// Attaches a handler for when a [`ChannelCreate`] is received.
@@ -950,7 +949,7 @@ impl Client {
// an error.
//
// [`ClientError::Shutdown`]: enum.ClientError.html#variant.Shutdown
- fn start_connection(&mut self, shard_data: Option<[u64; 3]>, url: String)
+ fn start_connection(&mut self, shard_data: [u64; 3], url: String)
-> Result<()> {
// Update the framework's current user if the feature is enabled.
//
@@ -966,13 +965,13 @@ impl Client {
let gateway_url = Arc::new(Mutex::new(url));
- let shards_index = shard_data.map_or(0, |x| x[0]);
- let shards_total = shard_data.map_or(1, |x| x[1] + 1);
+ let shards_index = shard_data[0];
+ let shards_total = shard_data[1] + 1;
let mut threads = vec![];
for shard_number in shards_index..shards_total {
- let shard_info = shard_data.map(|s| [shard_number, s[2]]);
+ let shard_info = [shard_number, shard_data[2]];
let boot = boot_shard(&BootInfo {
gateway_url: gateway_url.clone(),
@@ -981,29 +980,9 @@ impl Client {
});
match boot {
- Ok((shard, ready)) => {
- #[cfg(feature="cache")]
- {
- CACHE.write()
- .unwrap()
- .update_with_ready(&ready);
- }
-
+ Ok(shard) => {
let shard = Arc::new(Mutex::new(shard));
- feature_framework! {{
- dispatch(Event::Ready(ready),
- &shard,
- &self.framework,
- &self.data,
- &self.event_store);
- } else {
- dispatch(Event::Ready(ready),
- &shard,
- &self.data,
- &self.event_store);
- }}
-
let monitor_info = feature_framework! {{
MonitorInfo {
data: self.data.clone(),
@@ -1241,8 +1220,8 @@ impl Client {
struct BootInfo {
gateway_url: Arc<Mutex<String>>,
- shard_info: Option<[u64; 2]>,
- token: String,
+ shard_info: [u64; 2],
+ token: Arc<Mutex<String>>,
}
#[cfg(feature="framework")]
@@ -1252,8 +1231,8 @@ struct MonitorInfo {
framework: Arc<Mutex<Framework>>,
gateway_url: Arc<Mutex<String>>,
shard: Arc<Mutex<Shard>>,
- shard_info: Option<[u64; 2]>,
- token: String,
+ shard_info: [u64; 2],
+ token: Arc<Mutex<String>>,
}
#[cfg(not(feature="framework"))]
@@ -1262,11 +1241,11 @@ struct MonitorInfo {
event_store: Arc<RwLock<EventStore>>,
gateway_url: Arc<Mutex<String>>,
shard: Arc<Mutex<Shard>>,
- shard_info: Option<[u64; 2]>,
+ shard_info: [u64; 2],
token: String,
}
-fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
+fn boot_shard(info: &BootInfo) -> Result<Shard> {
// Make ten attempts to boot the shard, exponentially backing off; if it
// still doesn't boot after that, accept it as a failure.
//
@@ -1288,22 +1267,15 @@ fn boot_shard(info: &BootInfo) -> Result<(Shard, ReadyEvent)> {
}
}
- let attempt = Shard::new(&info.gateway_url.lock().unwrap(),
- &info.token,
+ let attempt = Shard::new(info.gateway_url.clone(),
+ info.token.clone(),
info.shard_info);
match attempt {
- Ok((shard, ready)) => {
- #[cfg(feature="cache")]
- {
- CACHE.write()
- .unwrap()
- .update_with_ready(&ready);
- }
-
+ Ok(shard) => {
info!("Successfully booted shard: {:?}", info.shard_info);
- return Ok((shard, ready));
+ return Ok(shard);
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
}
@@ -1327,29 +1299,11 @@ fn monitor_shard(mut info: MonitorInfo) {
});
match boot {
- Ok((new_shard, ready)) => {
- #[cfg(feature="cache")]
- {
- CACHE.write().unwrap().update_with_ready(&ready);
- }
-
+ Ok(new_shard) => {
*info.shard.lock().unwrap() = new_shard;
boot_successful = true;
- feature_framework! {{
- dispatch(Event::Ready(ready),
- &info.shard,
- &info.framework,
- &info.data,
- &info.event_store);
- } else {
- dispatch(Event::Ready(ready),
- &info.shard,
- &info.data,
- &info.event_store);
- }}
-
break;
},
Err(why) => warn!("Failed to boot shard: {:?}", why),
@@ -1370,43 +1324,11 @@ fn monitor_shard(mut info: MonitorInfo) {
fn handle_shard(info: &mut MonitorInfo) {
// This is currently all ducktape. Redo this.
- let mut last_ack_time = UTC::now().timestamp();
- let mut last_heartbeat_sent = UTC::now().timestamp();
-
loop {
- let in_secs = {
- let shard = info.shard.lock().unwrap();
-
- shard.heartbeat_interval() / 1000
- };
-
- if UTC::now().timestamp() - last_heartbeat_sent > in_secs {
+ {
let mut shard = info.shard.lock().unwrap();
- // If the last heartbeat didn't receive an acknowledgement, then
- // shutdown and auto-reconnect.
- if !shard.last_heartbeat_acknowledged() {
- debug!("Last heartbeat not acknowledged; re-connecting");
-
- match shard.resume() {
- Ok(_) => {
- debug!("Successfully resumed shard");
-
- last_ack_time = UTC::now().timestamp();
- last_heartbeat_sent = UTC::now().timestamp();
-
- continue;
- },
- Err(why) => {
- warn!("Err resuming shard: {:?}", why);
-
- return;
- },
- }
- }
-
- let _ = shard.heartbeat();
- last_heartbeat_sent = UTC::now().timestamp();
+ shard.check_heartbeat();
}
#[cfg(feature="voice")]
@@ -1420,40 +1342,39 @@ fn handle_shard(info: &mut MonitorInfo) {
let mut shard = info.shard.lock().unwrap();
let event = match shard.client.recv_json(GatewayEvent::decode) {
- Ok(GatewayEvent::HeartbeatAck) => {
- last_ack_time = UTC::now().timestamp();
-
- Ok(GatewayEvent::HeartbeatAck)
- },
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
- if shard.last_heartbeat_acknowledged() || UTC::now().timestamp() - 90 < last_ack_time {
+ // Check that an amount of time at least double the
+ // heartbeat_interval has passed.
+ //
+ // If not, continue on trying to receive messages.
+ //
+ // If it has, attempt to auto-reconnect.
+ let last = shard.last_heartbeat_ack();
+ let interval = shard.heartbeat_interval();
+
+ if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
+ let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
+ let interval_in_secs = interval / 1000;
+
+ if seconds_passed <= interval_in_secs * 2 {
+ continue;
+ }
+ } else {
continue;
}
- debug!("Attempting to shutdown receiver/sender");
-
- match shard.resume() {
- Ok(_) => {
- debug!("Successfully resumed shard");
+ debug!("Attempting to auto-reconnect");
- last_ack_time = UTC::now().timestamp();
- last_heartbeat_sent = UTC::now().timestamp();
-
- continue;
- },
- Err(why) => {
- warn!("Err resuming shard: {:?}", why);
-
- return;
- },
+ if let Err(why) = shard.autoreconnect() {
+ error!("Failed to auto-reconnect: {:?}", why);
}
+
+ continue;
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => continue,
other => other,
};
- trace!("Received event on shard handler: {:?}", event);
-
match shard.handle_event(event) {
Ok(Some(event)) => event,
Ok(None) => continue,
@@ -1482,19 +1403,20 @@ fn handle_shard(info: &mut MonitorInfo) {
fn init_client(token: String) -> Client {
http::set_token(&token);
+ let locked = Arc::new(Mutex::new(token));
feature_framework! {{
Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_store: Arc::new(RwLock::new(EventStore::default())),
framework: Arc::new(Mutex::new(Framework::default())),
- token: token,
+ token: locked,
}
} else {
Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_store: Arc::new(RwLock::new(EventStore::default())),
- token: token,
+ token: locked,
}
}}
}