aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2017-01-20 15:58:00 -0800
committerAustin Hellyer <[email protected]>2017-01-20 15:58:00 -0800
commit76f9095c012a8769c7bd27aca6540b7018574c28 (patch)
tree3a99e4f470711a36218392858bfa6ce23e43a37c
parentFix application decoding w/ rpc_origins (diff)
downloadserenity-76f9095c012a8769c7bd27aca6540b7018574c28.tar.xz
serenity-76f9095c012a8769c7bd27aca6540b7018574c28.zip
Reboot shard on broken pipe
If the receiver or sender breaks the pipe for one reason or another, shutdown both. Afterwards, close down the keepalive and perform a reboot of the shard.
-rw-r--r--src/client/error.rs1
-rw-r--r--src/client/gateway/prep.rs19
-rw-r--r--src/client/mod.rs299
3 files changed, 211 insertions, 108 deletions
diff --git a/src/client/error.rs b/src/client/error.rs
index 14818d9..3975576 100644
--- a/src/client/error.rs
+++ b/src/client/error.rs
@@ -122,6 +122,7 @@ pub enum Error {
///
/// [`Context::edit_role`]: struct.Context.html#method.edit_role
RecordNotFound,
+ ShardBootFailure,
/// When the shard being retrieved from within the Client could not be
/// found after being inserted into the Client's internal vector of
/// [`Shard`]s.
diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs
index b1c08bb..4aa5519 100644
--- a/src/client/gateway/prep.rs
+++ b/src/client/gateway/prep.rs
@@ -1,6 +1,5 @@
use serde_json::builder::ObjectBuilder;
use serde_json::Value;
-use std::net::Shutdown;
use std::sync::mpsc::{
Receiver as MpscReceiver,
Sender as MpscSender,
@@ -96,6 +95,7 @@ pub fn keepalive(interval: u64,
let mut next_tick = time::get_time() + base_interval;
let mut last_sequence = 0;
+ let mut last_successful = false;
'outer: loop {
thread::sleep(StdDuration::from_millis(100));
@@ -137,12 +137,25 @@ pub fn keepalive(interval: u64,
*heartbeat_sent.lock().unwrap() = now;
},
- Err(why) => warn!("Error sending keepalive: {:?}", why),
+ Err(why) => {
+ warn!("Error sending keepalive: {:?}", why);
+
+ if last_successful {
+ debug!("If next keepalive fails, closing");
+ } else {
+ break;
+ }
+
+ last_successful = false;
+ },
}
}
}
debug!("Closing keepalive");
- let _ = sender.get_mut().shutdown(Shutdown::Both);
+ match sender.shutdown_all() {
+ Ok(_) => debug!("Successfully shutdown sender/receiver"),
+ Err(why) => warn!("Failed to shutdown sender/receiver: {:?}", why),
+ }
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index f4f1135..3d712a5 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -43,17 +43,18 @@ use std::thread;
use std::time::Duration;
use typemap::ShareMap;
use websocket::client::Receiver;
+use websocket::result::WebSocketError;
use websocket::stream::WebSocketStream;
use ::internal::prelude::{Error, Result, Value};
use ::internal::ws_impl::ReceiverExt;
use ::model::event::{
ChannelPinsAckEvent,
ChannelPinsUpdateEvent,
- Event,
GatewayEvent,
GuildSyncEvent,
MessageUpdateEvent,
PresenceUpdateEvent,
+ ReadyEvent,
ResumedEvent,
TypingStartEvent,
VoiceServerUpdateEvent,
@@ -177,7 +178,6 @@ pub struct Client {
#[cfg(feature="framework")]
framework: Arc<Mutex<Framework>>,
login_type: LoginType,
- shards: Vec<Arc<Mutex<Shard>>>,
token: String,
}
@@ -780,74 +780,65 @@ impl Client {
.update_current_user(user.id, user.bot);
}
- let gateway_url = rest::get_gateway()?.url;
+ let gateway_url = Arc::new(Mutex::new(rest::get_gateway()?.url));
- for i in shard_data.map_or(0, |x| x[0])..shard_data.map_or(1, |x| x[1] + 1) {
- let shard = Shard::new(&gateway_url,
- &self.token,
- shard_data.map(|s| [i, s[2]]),
- self.login_type);
- match shard {
- Ok((shard, ready, receiver)) => {
- self.shards.push(Arc::new(Mutex::new(shard)));
+ let shards_index = shard_data.map_or(0, |x| x[0]);
+ let shards_total = shard_data.map_or(1, |x| x[1] + 1);
+ for shard_number in shards_index..shards_total {
+ let shard_info = shard_data.map(|s| [shard_number, s[2]]);
+
+ let boot = boot_shard(BootInfo {
+ gateway_url: gateway_url.clone(),
+ login_type: self.login_type,
+ shard_info: shard_info,
+ token: self.token.clone(),
+ });
+
+ match boot {
+ Ok((shard, _ready, receiver)) => {
#[cfg(feature="cache")]
{
CACHE.write()
.unwrap()
- .update_with_ready(&ready);
+ .update_with_ready(&_ready);
}
- match self.shards.last() {
- Some(shard) => {
- feature_framework! {{
- dispatch(Event::Ready(ready),
- shard.clone(),
- self.framework.clone(),
- self.data.clone(),
- self.login_type,
- self.event_store.clone());
- } else {
- dispatch(Event::Ready(ready),
- shard.clone(),
- self.data.clone(),
- self.login_type,
- self.event_store.clone());
- }}
-
- let data_clone = self.data.clone();
- let event_store = self.event_store.clone();
- let login_type = self.login_type;
- let shard_clone = shard.clone();
-
- feature_framework! {{
- let framework = self.framework.clone();
-
- thread::spawn(move || {
- handle_shard(shard_clone,
- framework,
- data_clone,
- login_type,
- event_store,
- receiver)
- });
- } else {
- thread::spawn(move || {
- handle_shard(shard_clone,
- data_clone,
- login_type,
- event_store,
- receiver)
- });
- }}
- },
- None => return Err(Error::Client(ClientError::ShardUnknown)),
- }
+ let monitor_info = feature_framework! {{
+ MonitorInfo {
+ data: self.data.clone(),
+ event_store: self.event_store.clone(),
+ framework: self.framework.clone(),
+ gateway_url: gateway_url.clone(),
+ login_type: self.login_type,
+ receiver: receiver,
+ shard: Arc::new(Mutex::new(shard)),
+ shard_info: shard_info,
+ token: self.token.clone(),
+ }
+ } else {
+ MonitorInfo {
+ data: self.data.clone(),
+ event_store: self.event_store.clone(),
+ gateway_url: gateway_url.clone(),
+ login_type: self.login_type,
+ receiver: receiver,
+ shard: Arc::new(Mutex::new(shard)),
+ shard_info: shard_info,
+ token: self.token.clone(),
+ }
+ }};
+
+ thread::spawn(move || {
+ monitor_shard(monitor_info);
+ });
},
- Err(why) => return Err(why),
+ Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why),
}
// Wait 5 seconds between shard boots.
+ //
+ // We need to wait at least 5 seconds between READYs.
thread::sleep(Duration::from_secs(5));
}
@@ -1161,61 +1152,152 @@ impl Client {
}
}
+pub struct BootInfo {
+ gateway_url: Arc<Mutex<String>>,
+ login_type: LoginType,
+ shard_info: Option<[u64; 2]>,
+ token: String,
+}
+
#[cfg(feature="framework")]
-fn handle_shard(shard: Arc<Mutex<Shard>>,
- framework: Arc<Mutex<Framework>>,
- data: Arc<Mutex<ShareMap>>,
- login_type: LoginType,
- event_store: Arc<RwLock<EventStore>>,
- mut receiver: Receiver<WebSocketStream>) {
- loop {
- let event = receiver.recv_json(GatewayEvent::decode);
+pub struct MonitorInfo {
+ data: Arc<Mutex<ShareMap>>,
+ event_store: Arc<RwLock<EventStore>>,
+ framework: Arc<Mutex<Framework>>,
+ gateway_url: Arc<Mutex<String>>,
+ login_type: LoginType,
+ receiver: Receiver<WebSocketStream>,
+ shard: Arc<Mutex<Shard>>,
+ shard_info: Option<[u64; 2]>,
+ token: String,
+}
- trace!("Received event on shard handler: {:?}", event);
+#[cfg(not(feature="framework"))]
+pub struct MonitorInfo {
+ data: Arc<Mutex<ShareMap>>,
+ event_store: Arc<RwLock<EventStore>>,
+ gateway_url: Arc<Mutex<String>>,
+ login_type: LoginType,
+ receiver: Receiver<WebSocketStream>,
+ shard: Arc<Mutex<Shard>>,
+ shard_info: Option<[u64; 2]>,
+ token: String,
+}
- // This will only lock when _updating_ the shard, resuming, etc. Most
- // of the time, this won't be locked (i.e. when receiving an event over
- // the receiver, separate from the shard itself).
- let event = match shard.lock().unwrap().handle_event(event, &mut receiver) {
- Ok(Some((event, Some(new_receiver)))) => {
- receiver = new_receiver;
+fn boot_shard(info: BootInfo) -> Result<(Shard, ReadyEvent, Receiver<WebSocketStream>)> {
+ // Make ten attempts to boot the shard, exponentially backing off; if it
+ // still doesn't boot after that, accept it as a failure.
+ //
+ // After three attempts, start re-retrieving the gateway URL. Before that,
+ // use the cached one.
+ for attempt_number in 1..11u64 {
+ // If we've tried over 3 times so far, get a new gateway URL.
+ //
+ // If doing so fails, count this as a boot attempt.
+ if attempt_number > 3 {
+ match rest::get_gateway() {
+ Ok(g) => *info.gateway_url.lock().unwrap() = g.url,
+ Err(why) => {
+ warn!("Failed to retrieve gateway URL: {:?}", why);
+
+ // Failed -- start over.
+ continue;
+ },
+ }
+ }
- event
- },
- Ok(Some((event, None))) => event,
- Ok(None) => continue,
- Err(why) => {
- // This is potentially causing problems -- let's see.
- error!("Shard handler received err: {:?}", why);
+ let attempt = Shard::new(&info.gateway_url.lock().unwrap(),
+ &info.token,
+ info.shard_info,
+ info.login_type);
- continue;
+ match attempt {
+ Ok((shard, ready, receiver)) => {
+ #[cfg(feature="cache")]
+ {
+ CACHE.write()
+ .unwrap()
+ .update_with_ready(&ready);
+ }
+
+ info!("Successfully booted shard: {:?}", info.shard_info);
+
+ return Ok((shard, ready, receiver));
},
- };
+ Err(why) => warn!("Failed to boot shard: {:?}", why),
+ }
+ }
+
+ // Hopefully _never_ happens?
+ Err(Error::Client(ClientError::ShardBootFailure))
+}
+
+fn monitor_shard(mut info: MonitorInfo) {
+ loop {
+ let mut boot_successful = false;
+
+ for _ in 0..3 {
+ let boot = boot_shard(BootInfo {
+ gateway_url: info.gateway_url.clone(),
+ login_type: info.login_type,
+ shard_info: info.shard_info,
+ token: info.token.clone(),
+ });
+
+ match boot {
+ Ok((new_shard, _ready, new_receiver)) => {
+ #[cfg(feature="cache")]
+ {
+ CACHE.write().unwrap().update_with_ready(&_ready);
+ }
+
+ *info.shard.lock().unwrap() = new_shard;
+ info.receiver = new_receiver;
- dispatch(event,
- shard.clone(),
- framework.clone(),
- data.clone(),
- login_type,
- event_store.clone());
+ boot_successful = true;
+
+ break;
+ },
+ Err(why) => warn!("Failed to boot shard: {:?}", why),
+ }
+ }
+
+ if boot_successful {
+ handle_shard(&mut info);
+ } else {
+ break;
+ }
+
+ // The shard died: redo the cycle.
}
+
+ error!("Completely failed to reboot shard");
}
-#[cfg(not(feature="framework"))]
-// aaaaaaaaaaaaaaaaaaaaaaaaaaaaa
-fn handle_shard(shard: Arc<Mutex<Shard>>,
- data: Arc<Mutex<ShareMap>>,
- login_type: LoginType,
- event_store: Arc<RwLock<EventStore>>,
- mut receiver: Receiver<WebSocketStream>) {
+fn handle_shard(info: &mut MonitorInfo) {
loop {
- let event = receiver.recv_json(GatewayEvent::decode);
+ let event = match info.receiver.recv_json(GatewayEvent::decode) {
+ Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
+ debug!("Attempting to shutdown receiver/sender");
+
+ match info.receiver.shutdown_all() {
+ Ok(_) => debug!("Successfully shutdown receiver/sender"),
+ Err(why) => warn!("Err shutting down receiver/sender: {:?}", why),
+ }
+
+ return;
+ },
+ other => other,
+ };
trace!("Received event on shard handler: {:?}", event);
- let event = match shard.lock().unwrap().handle_event(event, &mut receiver) {
+ // This will only lock when _updating_ the shard, resuming, etc. Most
+ // of the time, this won't be locked (i.e. when receiving an event over
+ // the receiver, separate from the shard itself).
+ let event = match info.shard.lock().unwrap().handle_event(event, &mut info.receiver) {
Ok(Some((event, Some(new_receiver)))) => {
- receiver = new_receiver;
+ info.receiver = new_receiver;
event
},
@@ -1228,11 +1310,20 @@ fn handle_shard(shard: Arc<Mutex<Shard>>,
},
};
- dispatch(event,
- shard.clone(),
- data.clone(),
- login_type,
- event_store.clone());
+ feature_framework! {{
+ dispatch(event,
+ info.shard.clone(),
+ info.framework.clone(),
+ info.data.clone(),
+ info.login_type,
+ info.event_store.clone());
+ } else {
+ dispatch(event,
+ info.shard.clone(),
+ info.data.clone(),
+ info.login_type,
+ info.event_store.clone());
+ }}
}
}
@@ -1247,7 +1338,6 @@ fn login(token: &str, login_type: LoginType) -> Client {
event_store: Arc::new(RwLock::new(EventStore::default())),
framework: Arc::new(Mutex::new(Framework::default())),
login_type: login_type,
- shards: Vec::default(),
token: token.to_owned(),
}
} else {
@@ -1255,7 +1345,6 @@ fn login(token: &str, login_type: LoginType) -> Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_store: Arc::new(RwLock::new(EventStore::default())),
login_type: login_type,
- shards: Vec::default(),
token: token.to_owned(),
}
}}