diff options
| author | Austin Hellyer <[email protected]> | 2016-12-20 00:57:12 +0800 |
|---|---|---|
| committer | Austin Hellyer <[email protected]> | 2016-12-29 11:50:53 -0800 |
| commit | 94fc85bd60b31c55231a9a0ed61101237c4cf989 (patch) | |
| tree | 9aef4b1d96c646dfbb1d9cdb6141edb0a7628a37 /src | |
| parent | Use conditional blocks over macros (diff) | |
| download | serenity-94fc85bd60b31c55231a9a0ed61101237c4cf989.tar.xz serenity-94fc85bd60b31c55231a9a0ed61101237c4cf989.zip | |
Round 1
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/gateway/error.rs | 2 | ||||
| -rw-r--r-- | src/client/gateway/prep.rs | 4 | ||||
| -rw-r--r-- | src/client/gateway/shard.rs | 135 | ||||
| -rw-r--r-- | src/client/gateway/status.rs | 4 | ||||
| -rw-r--r-- | src/client/mod.rs | 39 |
5 files changed, 123 insertions, 61 deletions
diff --git a/src/client/gateway/error.rs b/src/client/gateway/error.rs index 3e355f5..30e87f0 100644 --- a/src/client/gateway/error.rs +++ b/src/client/gateway/error.rs @@ -6,7 +6,7 @@ use std::fmt::{self, Display}; /// you manually handle these. #[derive(Clone, Debug)] pub enum Error { - /// The connection unexpectedly (read: non-cleanly) closed. + /// The connection closed, potentially uncleanly. Closed(Option<u16>, String), /// Expected a Hello during a handshake ExpectedHello, diff --git a/src/client/gateway/prep.rs b/src/client/gateway/prep.rs index 40ce9a6..82a096f 100644 --- a/src/client/gateway/prep.rs +++ b/src/client/gateway/prep.rs @@ -100,10 +100,10 @@ pub fn keepalive(interval: u64, loop { match channel.try_recv() { - Ok(GatewayStatus::ChangeInterval(interval)) => { + Ok(GatewayStatus::Interval(interval)) => { base_interval = Duration::milliseconds(interval as i64); }, - Ok(GatewayStatus::ChangeSender(new_sender)) => { + Ok(GatewayStatus::Sender(new_sender)) => { sender = new_sender; }, Ok(GatewayStatus::SendMessage(val)) => { diff --git a/src/client/gateway/shard.rs b/src/client/gateway/shard.rs index 7c3b385..8554714 100644 --- a/src/client/gateway/shard.rs +++ b/src/client/gateway/shard.rs @@ -64,7 +64,7 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool); pub struct Shard { current_presence: CurrentPresence, keepalive_channel: MpscSender<GatewayStatus>, - last_sequence: u64, + seq: u64, login_type: LoginType, session_id: Option<String>, shard_info: Option<[u64; 2]>, @@ -118,7 +118,7 @@ impl Shard { let heartbeat_interval = match receiver.recv_json(GatewayEvent::decode)? { GatewayEvent::Hello(interval) => interval, other => { - debug!("Unexpected event during connection start: {:?}", other); + debug!("Unexpected event during shard start: {:?}", other); return Err(Error::Gateway(GatewayError::ExpectedHello)); }, @@ -138,15 +138,15 @@ impl Shard { // Parse READY let event = receiver.recv_json(GatewayEvent::decode)?; let (ready, sequence) = prep::parse_ready(event, - &tx, - &mut receiver, - identification)?; + &tx, + &mut receiver, + identification)?; Ok((feature_voice! {{ Shard { current_presence: (None, OnlineStatus::Online, false), keepalive_channel: tx.clone(), - last_sequence: sequence, + seq: sequence, login_type: login_type, token: token.to_owned(), session_id: Some(ready.ready.session_id.clone()), @@ -158,7 +158,7 @@ impl Shard { Shard { current_presence: (None, OnlineStatus::Online, false), keepalive_channel: tx.clone(), - last_sequence: sequence, + seq: sequence, login_type: login_type, token: token.to_owned(), session_id: Some(ready.ready.session_id.clone()), @@ -267,20 +267,23 @@ impl Shard { mut receiver: &mut Receiver<WebSocketStream>) -> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> { match event { - Ok(GatewayEvent::Dispatch(sequence, event)) => { - let status = GatewayStatus::Sequence(sequence); + Ok(GatewayEvent::Dispatch(seq, event)) => { + let status = GatewayStatus::Sequence(seq); let _ = self.keepalive_channel.send(status); self.handle_dispatch(&event); Ok(Some((event, None))) }, - Ok(GatewayEvent::Heartbeat(sequence)) => { + Ok(GatewayEvent::Heartbeat(seq)) => { + info!("Received shard heartbeat; seq: {}", seq); + let map = ObjectBuilder::new() - .insert("d", sequence) + .insert("d", seq) .insert("op", OpCode::Heartbeat.num()) .build(); - let _ = self.keepalive_channel.send(GatewayStatus::SendMessage(map)); + let status = GatewayStatus::SendMessage(map); + let _ = self.keepalive_channel.send(status); Ok(None) }, @@ -288,17 +291,20 @@ impl Shard { Ok(None) }, Ok(GatewayEvent::Hello(interval)) => { - let _ = self.keepalive_channel.send(GatewayStatus::ChangeInterval(interval)); + if interval > 0 { + let status = GatewayStatus::Interval(interval); + let _ = self.keepalive_channel.send(status); + } Ok(None) }, Ok(GatewayEvent::InvalidateSession) => { + info!("Received session invalidation; re-identifying"); + self.seq = 0; self.session_id = None; let identification = prep::identify(&self.token, self.shard_info); - let status = GatewayStatus::SendMessage(identification); - let _ = self.keepalive_channel.send(status); Ok(None) @@ -307,14 +313,39 @@ impl Shard { self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, Err(Error::Gateway(GatewayError::Closed(num, message))) => { - warn!("Closing with {:?}: {:?}", num, message); + let clean = num == Some(1000); - // Attempt to resume if the following was not received: - // - // - 1000: Close. - // - // Otherwise, fallback to reconnecting. - if num != Some(1000) { + { + let kind = if clean { "Cleanly" } else { "Uncleanly" }; + + info!("{} closing with {:?}: {}", kind, num, message); + } + + match num { + Some(4001) => warn!("Sent invalid opcode"), + Some(4002) => warn!("Sent invalid message"), + Some(4003) => warn!("Sent no authentication"), + Some(4004) => warn!("Sent invalid authentication"), + Some(4005) => warn!("Already authenticated"), + Some(4007) => { + warn!("Sent invalid seq: {}", self.seq); + + self.seq = 0; + }, + Some(4008) => warn!("Gateway ratelimited"), + Some(4010) => warn!("Sent invalid shard"), + Some(4006) | Some(4009) => { + info!("Invalid session"); + + self.session_id = None; + }, + Some(other) if !clean => { + warn!("Unknown unclean close {}: {:?}", other, message); + }, + _ => {}, + } + + if !clean && num != Some(1000) && num != Some(4004) { if let Some(session_id) = self.session_id.clone() { match self.resume(session_id, receiver) { Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), @@ -328,7 +359,7 @@ impl Shard { Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => Ok(None), Err(Error::WebSocket(why)) => { warn!("Websocket error: {:?}", why); - info!("Reconnecting"); + info!("Will attempt to reconnect or resume"); // Attempt to resume if the following was not received: // @@ -338,10 +369,12 @@ impl Shard { if let Some(session_id) = self.session_id.clone() { match self.resume(session_id, &mut receiver) { Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), - Err(why) => debug!("Error resuming: {:?}", why), + Err(why) => info!("Error resuming: {:?}", why), } } + info!("Reconnecting"); + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, Err(error) => Err(error), @@ -351,19 +384,28 @@ impl Shard { /// Shuts down the receiver by attempting to cleanly close the /// connection. #[doc(hidden)] - pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>) + pub fn shutdown_clean(receiver: &mut Receiver<WebSocketStream>) -> Result<()> { - let stream = receiver.get_mut().get_mut(); + let r = receiver.get_mut().get_mut(); { - let mut sender = Sender::new(stream.by_ref(), true); + let mut sender = Sender::new(r.by_ref(), true); let message = WsMessage::close_because(1000, ""); sender.send_message(&message)?; } - stream.flush()?; - stream.shutdown(Shutdown::Both)?; + r.flush()?; + r.shutdown(Shutdown::Both)?; + + Ok(()) + } + + pub fn shutdown(receiver: &mut Receiver<WebSocketStream>) -> Result<()> { + let r = receiver.get_mut().get_mut(); + + r.flush()?; + r.shutdown(Shutdown::Both)?; Ok(()) } @@ -407,7 +449,7 @@ impl Shard { fn handle_dispatch(&mut self, event: &Event) { if let Event::Resumed(ref ev) = *event { - let status = GatewayStatus::ChangeInterval(ev.heartbeat_interval); + let status = GatewayStatus::Interval(ev.heartbeat_interval); let _ = self.keepalive_channel.send(status); } @@ -434,7 +476,7 @@ impl Shard { fn reconnect(&mut self, mut receiver: &mut Receiver<WebSocketStream>) -> Result<(Event, Receiver<WebSocketStream>)> { - debug!("Reconnecting"); + info!("Attempting to reconnect"); // Take a few attempts at reconnecting. for i in 1u64..11u64 { @@ -446,8 +488,9 @@ impl Shard { self.login_type); if let Ok((shard, ready, receiver_new)) = shard { - mem::replace(self, shard).shutdown(&mut receiver)?; + let _ = Shard::shutdown(&mut receiver); + mem::replace(self, shard); self.session_id = Some(ready.ready.session_id.clone()); return Ok((Event::Ready(ready), receiver_new)); @@ -474,26 +517,34 @@ impl Shard { sender.send_json(&ObjectBuilder::new() .insert_object("d", |o| o .insert("session_id", session_id) - .insert("seq", self.last_sequence) - .insert("token", &self.token) - ) + .insert("seq", self.seq) + .insert("token", &self.token)) .insert("op", OpCode::Resume.num()) .build())?; - let first_event; + // Note to self when this gets accepted in a decade: + // https://github.com/rust-lang/rfcs/issues/961 + let ev; loop { match receiver.recv_json(GatewayEvent::decode)? { GatewayEvent::Dispatch(seq, event) => { - if let Event::Ready(ref event) = event { - self.session_id = Some(event.ready.session_id.clone()); + match event { + Event::Ready(ref ready) => { + self.session_id = Some(ready.ready.session_id.clone()); + }, + Event::Resumed { .. } => info!("Resumed"), + ref other => warn!("Unknown resume event: {:?}", other), } - self.last_sequence = seq; - first_event = event; + self.seq = seq; + ev = event; break; }, + GatewayEvent::Hello(i) => { + let _ = self.keepalive_channel.send(GatewayStatus::Interval(i)); + } GatewayEvent::InvalidateSession => { sender.send_json(&prep::identify(&self.token, self.shard_info))?; }, @@ -505,9 +556,9 @@ impl Shard { } } - let _ = self.keepalive_channel.send(GatewayStatus::ChangeSender(sender)); + let _ = self.keepalive_channel.send(GatewayStatus::Sender(sender)); - Ok((first_event, receiver)) + Ok((ev, receiver)) } fn update_presence(&self) { diff --git a/src/client/gateway/status.rs b/src/client/gateway/status.rs index 4409e65..f6e5ec2 100644 --- a/src/client/gateway/status.rs +++ b/src/client/gateway/status.rs @@ -4,8 +4,8 @@ use websocket::stream::WebSocketStream; #[doc(hidden)] pub enum Status { + Interval(u64), + Sender(Sender<WebSocketStream>), SendMessage(Value), Sequence(u64), - ChangeInterval(u64), - ChangeSender(Sender<WebSocketStream>), } diff --git a/src/client/mod.rs b/src/client/mod.rs index bbc2bc1..aed11a0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1187,16 +1187,23 @@ fn handle_shard(shard: Arc<Mutex<Shard>>, loop { let event = receiver.recv_json(GatewayEvent::decode); + // This will only lock when _updating_ the shard, resuming, etc. Most + // of the time, this won't be locked (i.e. when receiving an event over + // the receiver, separate from the shard itself). let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { - Ok(Some(x)) => match x { - (event, Some(new_receiver)) => { - receiver = new_receiver; + Ok(Some((event, Some(new_receiver)))) => { + receiver = new_receiver; - event - }, - (event, None) => event, + event + }, + Ok(Some((event, None))) => event, + Ok(None) => continue, + Err(why) => { + // This is potentially causing problems -- let's see. + error!("Shard handler received err: {:?}", why); + + continue; }, - _ => continue, }; dispatch(event, @@ -1209,6 +1216,7 @@ fn handle_shard(shard: Arc<Mutex<Shard>>, } #[cfg(not(feature="framework"))] +// aaaaaaaaaaaaaaaaaaaaaaaaaaaaa fn handle_shard(shard: Arc<Mutex<Shard>>, data: Arc<Mutex<ShareMap>>, login_type: LoginType, @@ -1218,15 +1226,18 @@ fn handle_shard(shard: Arc<Mutex<Shard>>, let event = receiver.recv_json(GatewayEvent::decode); let event = match shard.lock().unwrap().handle_event(event, &mut receiver) { - Ok(Some(x)) => match x { - (event, Some(new_receiver)) => { - receiver = new_receiver; + Ok(Some((event, Some(new_receiver)))) => { + receiver = new_receiver; - event - }, - (event, None) => event, + event + }, + Ok(Some((event, None))) => event, + Ok(None) => continue, + Err(why) => { + error!("Shard handler received err: {:?}", why); + + continue; }, - _ => continue, }; dispatch(event, |