aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-12-20 00:57:12 +0800
committerAustin Hellyer <[email protected]>2016-12-29 11:50:53 -0800
commit94fc85bd60b31c55231a9a0ed61101237c4cf989 (patch)
tree9aef4b1d96c646dfbb1d9cdb6141edb0a7628a37 /src
parentUse conditional blocks over macros (diff)
downloadserenity-94fc85bd60b31c55231a9a0ed61101237c4cf989.tar.xz
serenity-94fc85bd60b31c55231a9a0ed61101237c4cf989.zip
Round 1
Diffstat (limited to 'src')
-rw-r--r--src/client/gateway/error.rs2
-rw-r--r--src/client/gateway/prep.rs4
-rw-r--r--src/client/gateway/shard.rs135
-rw-r--r--src/client/gateway/status.rs4
-rw-r--r--src/client/mod.rs39
5 files changed, 123 insertions, 61 deletions
diff --git a/src/client/gateway/error.rs b/src/client/gateway/error.rs
index 3e355f5..30e87f0 100644
--- a/src/client/gateway/error.rs
+++ b/src/client/gateway/error.rs
@@ -6,7 +6,7 @@ use std::fmt::{self, Display};
/// you manually handle these.
#[derive(Clone, Debug)]
pub enum Error {
- /// The connection unexpectedly (read: non-cleanly) closed.
+ /// The connection closed, potentially uncleanly.
Closed(Option<u16>, String),
/// Expected a Hello during a handshake
ExpectedHello,
diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs
index 40ce9a6..82a096f 100644
--- a/src/client/gateway/prep.rs
+++ b/src/client/gateway/prep.rs
@@ -100,10 +100,10 @@ pub fn keepalive(interval: u64,
loop {
match channel.try_recv() {
- Ok(GatewayStatus::ChangeInterval(interval)) => {
+ Ok(GatewayStatus::Interval(interval)) => {
base_interval = Duration::milliseconds(interval as i64);
},
- Ok(GatewayStatus::ChangeSender(new_sender)) => {
+ Ok(GatewayStatus::Sender(new_sender)) => {
sender = new_sender;
},
Ok(GatewayStatus::SendMessage(val)) => {
diff --git a/src/client/gateway/shard.rs b/src/client/gateway/shard.rs
index 7c3b385..8554714 100644
--- a/src/client/gateway/shard.rs
+++ b/src/client/gateway/shard.rs
@@ -64,7 +64,7 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool);
pub struct Shard {
current_presence: CurrentPresence,
keepalive_channel: MpscSender<GatewayStatus>,
- last_sequence: u64,
+ seq: u64,
login_type: LoginType,
session_id: Option<String>,
shard_info: Option<[u64; 2]>,
@@ -118,7 +118,7 @@ impl Shard {
let heartbeat_interval = match receiver.recv_json(GatewayEvent::decode)? {
GatewayEvent::Hello(interval) => interval,
other => {
- debug!("Unexpected event during connection start: {:?}", other);
+ debug!("Unexpected event during shard start: {:?}", other);
return Err(Error::Gateway(GatewayError::ExpectedHello));
},
@@ -138,15 +138,15 @@ impl Shard {
// Parse READY
let event = receiver.recv_json(GatewayEvent::decode)?;
let (ready, sequence) = prep::parse_ready(event,
- &tx,
- &mut receiver,
- identification)?;
+ &tx,
+ &mut receiver,
+ identification)?;
Ok((feature_voice! {{
Shard {
current_presence: (None, OnlineStatus::Online, false),
keepalive_channel: tx.clone(),
- last_sequence: sequence,
+ seq: sequence,
login_type: login_type,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
@@ -158,7 +158,7 @@ impl Shard {
Shard {
current_presence: (None, OnlineStatus::Online, false),
keepalive_channel: tx.clone(),
- last_sequence: sequence,
+ seq: sequence,
login_type: login_type,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
@@ -267,20 +267,23 @@ impl Shard {
mut receiver: &mut Receiver<WebSocketStream>)
-> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> {
match event {
- Ok(GatewayEvent::Dispatch(sequence, event)) => {
- let status = GatewayStatus::Sequence(sequence);
+ Ok(GatewayEvent::Dispatch(seq, event)) => {
+ let status = GatewayStatus::Sequence(seq);
let _ = self.keepalive_channel.send(status);
self.handle_dispatch(&event);
Ok(Some((event, None)))
},
- Ok(GatewayEvent::Heartbeat(sequence)) => {
+ Ok(GatewayEvent::Heartbeat(seq)) => {
+ info!("Received shard heartbeat; seq: {}", seq);
+
let map = ObjectBuilder::new()
- .insert("d", sequence)
+ .insert("d", seq)
.insert("op", OpCode::Heartbeat.num())
.build();
- let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(map));
+ let status = GatewayStatus::SendMessage(map);
+ let _ = self.keepalive_channel.send(status);
Ok(None)
},
@@ -288,17 +291,20 @@ impl Shard {
Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
- let _ = self.keepalive_channel.send(GatewayStatus::ChangeInterval(interval));
+ if interval > 0 {
+ let status = GatewayStatus::Interval(interval);
+ let _ = self.keepalive_channel.send(status);
+ }
Ok(None)
},
Ok(GatewayEvent::InvalidateSession) => {
+ info!("Received session invalidation; re-identifying");
+ self.seq = 0;
self.session_id = None;
let identification = prep::identify(&self.token, self.shard_info);
-
let status = GatewayStatus::SendMessage(identification);
-
let _ = self.keepalive_channel.send(status);
Ok(None)
@@ -307,14 +313,39 @@ impl Shard {
self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(Error::Gateway(GatewayError::Closed(num, message))) => {
- warn!("Closing with {:?}: {:?}", num, message);
+ let clean = num == Some(1000);
- // Attempt to resume if the following was not received:
- //
- // - 1000: Close.
- //
- // Otherwise, fallback to reconnecting.
- if num != Some(1000) {
+ {
+ let kind = if clean { "Cleanly" } else { "Uncleanly" };
+
+ info!("{} closing with {:?}: {}", kind, num, message);
+ }
+
+ match num {
+ Some(4001) => warn!("Sent invalid opcode"),
+ Some(4002) => warn!("Sent invalid message"),
+ Some(4003) => warn!("Sent no authentication"),
+ Some(4004) => warn!("Sent invalid authentication"),
+ Some(4005) => warn!("Already authenticated"),
+ Some(4007) => {
+ warn!("Sent invalid seq: {}", self.seq);
+
+ self.seq = 0;
+ },
+ Some(4008) => warn!("Gateway ratelimited"),
+ Some(4010) => warn!("Sent invalid shard"),
+ Some(4006) | Some(4009) => {
+ info!("Invalid session");
+
+ self.session_id = None;
+ },
+ Some(other) if !clean => {
+ warn!("Unknown unclean close {}: {:?}", other, message);
+ },
+ _ => {},
+ }
+
+ if !clean && num != Some(1000) && num != Some(4004) {
if let Some(session_id) = self.session_id.clone() {
match self.resume(session_id, receiver) {
Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
@@ -328,7 +359,7 @@ impl Shard {
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => Ok(None),
Err(Error::WebSocket(why)) => {
warn!("Websocket error: {:?}", why);
- info!("Reconnecting");
+ info!("Will attempt to reconnect or resume");
// Attempt to resume if the following was not received:
//
@@ -338,10 +369,12 @@ impl Shard {
if let Some(session_id) = self.session_id.clone() {
match self.resume(session_id, &mut receiver) {
Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
- Err(why) => debug!("Error resuming: {:?}", why),
+ Err(why) => info!("Error resuming: {:?}", why),
}
}
+ info!("Reconnecting");
+
self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(error) => Err(error),
@@ -351,19 +384,28 @@ impl Shard {
/// Shuts down the receiver by attempting to cleanly close the
/// connection.
#[doc(hidden)]
- pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>)
+ pub fn shutdown_clean(receiver: &mut Receiver<WebSocketStream>)
-> Result<()> {
- let stream = receiver.get_mut().get_mut();
+ let r = receiver.get_mut().get_mut();
{
- let mut sender = Sender::new(stream.by_ref(), true);
+ let mut sender = Sender::new(r.by_ref(), true);
let message = WsMessage::close_because(1000, "");
sender.send_message(&message)?;
}
- stream.flush()?;
- stream.shutdown(Shutdown::Both)?;
+ r.flush()?;
+ r.shutdown(Shutdown::Both)?;
+
+ Ok(())
+ }
+
+ pub fn shutdown(receiver: &mut Receiver<WebSocketStream>) -> Result<()> {
+ let r = receiver.get_mut().get_mut();
+
+ r.flush()?;
+ r.shutdown(Shutdown::Both)?;
Ok(())
}
@@ -407,7 +449,7 @@ impl Shard {
fn handle_dispatch(&mut self, event: &Event) {
if let Event::Resumed(ref ev) = *event {
- let status = GatewayStatus::ChangeInterval(ev.heartbeat_interval);
+ let status = GatewayStatus::Interval(ev.heartbeat_interval);
let _ = self.keepalive_channel.send(status);
}
@@ -434,7 +476,7 @@ impl Shard {
fn reconnect(&mut self, mut receiver: &mut Receiver<WebSocketStream>)
-> Result<(Event, Receiver<WebSocketStream>)> {
- debug!("Reconnecting");
+ info!("Attempting to reconnect");
// Take a few attempts at reconnecting.
for i in 1u64..11u64 {
@@ -446,8 +488,9 @@ impl Shard {
self.login_type);
if let Ok((shard, ready, receiver_new)) = shard {
- mem::replace(self, shard).shutdown(&mut receiver)?;
+ let _ = Shard::shutdown(&mut receiver);
+ mem::replace(self, shard);
self.session_id = Some(ready.ready.session_id.clone());
return Ok((Event::Ready(ready), receiver_new));
@@ -474,26 +517,34 @@ impl Shard {
sender.send_json(&ObjectBuilder::new()
.insert_object("d", |o| o
.insert("session_id", session_id)
- .insert("seq", self.last_sequence)
- .insert("token", &self.token)
- )
+ .insert("seq", self.seq)
+ .insert("token", &self.token))
.insert("op", OpCode::Resume.num())
.build())?;
- let first_event;
+ // Note to self when this gets accepted in a decade:
+ // https://github.com/rust-lang/rfcs/issues/961
+ let ev;
loop {
match receiver.recv_json(GatewayEvent::decode)? {
GatewayEvent::Dispatch(seq, event) => {
- if let Event::Ready(ref event) = event {
- self.session_id = Some(event.ready.session_id.clone());
+ match event {
+ Event::Ready(ref ready) => {
+ self.session_id = Some(ready.ready.session_id.clone());
+ },
+ Event::Resumed { .. } => info!("Resumed"),
+ ref other => warn!("Unknown resume event: {:?}", other),
}
- self.last_sequence = seq;
- first_event = event;
+ self.seq = seq;
+ ev = event;
break;
},
+ GatewayEvent::Hello(i) => {
+ let _ = self.keepalive_channel.send(GatewayStatus::Interval(i));
+ }
GatewayEvent::InvalidateSession => {
sender.send_json(&prep::identify(&self.token, self.shard_info))?;
},
@@ -505,9 +556,9 @@ impl Shard {
}
}
- let _ = self.keepalive_channel.send(GatewayStatus::ChangeSender(sender));
+ let _ = self.keepalive_channel.send(GatewayStatus::Sender(sender));
- Ok((first_event, receiver))
+ Ok((ev, receiver))
}
fn update_presence(&self) {
diff --git a/src/client/gateway/status.rs b/src/client/gateway/status.rs
index 4409e65..f6e5ec2 100644
--- a/src/client/gateway/status.rs
+++ b/src/client/gateway/status.rs
@@ -4,8 +4,8 @@ use websocket::stream::WebSocketStream;
#[doc(hidden)]
pub enum Status {
+ Interval(u64),
+ Sender(Sender<WebSocketStream>),
SendMessage(Value),
Sequence(u64),
- ChangeInterval(u64),
- ChangeSender(Sender<WebSocketStream>),
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index bbc2bc1..aed11a0 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -1187,16 +1187,23 @@ fn handle_shard(shard: Arc<Mutex<Shard>>,
loop {
let event = receiver.recv_json(GatewayEvent::decode);
+ // This will only lock when _updating_ the shard, resuming, etc. Most
+ // of the time, this won't be locked (i.e. when receiving an event over
+ // the receiver, separate from the shard itself).
let event = match shard.lock().unwrap().handle_event(event, &mut receiver) {
- Ok(Some(x)) => match x {
- (event, Some(new_receiver)) => {
- receiver = new_receiver;
+ Ok(Some((event, Some(new_receiver)))) => {
+ receiver = new_receiver;
- event
- },
- (event, None) => event,
+ event
+ },
+ Ok(Some((event, None))) => event,
+ Ok(None) => continue,
+ Err(why) => {
+ // This is potentially causing problems -- let's see.
+ error!("Shard handler received err: {:?}", why);
+
+ continue;
},
- _ => continue,
};
dispatch(event,
@@ -1209,6 +1216,7 @@ fn handle_shard(shard: Arc<Mutex<Shard>>,
}
#[cfg(not(feature="framework"))]
+// aaaaaaaaaaaaaaaaaaaaaaaaaaaaa
fn handle_shard(shard: Arc<Mutex<Shard>>,
data: Arc<Mutex<ShareMap>>,
login_type: LoginType,
@@ -1218,15 +1226,18 @@ fn handle_shard(shard: Arc<Mutex<Shard>>,
let event = receiver.recv_json(GatewayEvent::decode);
let event = match shard.lock().unwrap().handle_event(event, &mut receiver) {
- Ok(Some(x)) => match x {
- (event, Some(new_receiver)) => {
- receiver = new_receiver;
+ Ok(Some((event, Some(new_receiver)))) => {
+ receiver = new_receiver;
- event
- },
- (event, None) => event,
+ event
+ },
+ Ok(Some((event, None))) => event,
+ Ok(None) => continue,
+ Err(why) => {
+ error!("Shard handler received err: {:?}", why);
+
+ continue;
},
- _ => continue,
};
dispatch(event,