aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client/connection.rs101
-rw-r--r--src/client/dispatch.rs13
-rw-r--r--src/client/mod.rs37
3 files changed, 70 insertions, 81 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 1e7f2a0..8937710 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -137,7 +137,6 @@ pub struct Connection {
keepalive_channel: MpscSender<Status>,
last_sequence: u64,
login_type: LoginType,
- receiver: Receiver<WebSocketStream>,
session_id: Option<String>,
shard_info: Option<[u8; 2]>,
token: String,
@@ -173,7 +172,7 @@ impl Connection {
token: &str,
shard_info: Option<[u8; 2]>,
login_type: LoginType)
- -> Result<(Connection, ReadyEvent)> {
+ -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> {
let url = try!(build_gateway_url(base_url));
let response = try!(try!(WsClient::connect(url)).send());
@@ -216,7 +215,6 @@ impl Connection {
keepalive_channel: tx.clone(),
last_sequence: sequence,
login_type: login_type,
- receiver: receiver,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
shard_info: shard_info,
@@ -228,13 +226,12 @@ impl Connection {
keepalive_channel: tx.clone(),
last_sequence: sequence,
login_type: login_type,
- receiver: receiver,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
shard_info: shard_info,
ws_url: base_url.to_owned(),
}
- }}, ready))
+ }}, ready, receiver))
}
pub fn shard_info(&self) -> Option<[u8; 2]> {
@@ -273,13 +270,18 @@ impl Connection {
let _ = self.keepalive_channel.send(Status::SendMessage(msg));
}
- pub fn receive(&mut self) -> Result<Event> {
- match self.receiver.recv_json(GatewayEvent::decode) {
+ pub fn handle_event(&mut self,
+ event: Result<GatewayEvent>,
+ mut receiver: &mut Receiver<WebSocketStream>)
+ -> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> {
+ match event {
Ok(GatewayEvent::Dispatch(sequence, event)) => {
let status = Status::Sequence(sequence);
let _ = self.keepalive_channel.send(status);
- Ok(self.handle_dispatch(event))
+ self.handle_dispatch(&event);
+
+ Ok(Some((event, None)))
},
Ok(GatewayEvent::Heartbeat(sequence)) => {
let map = ObjectBuilder::new()
@@ -288,15 +290,15 @@ impl Connection {
.build();
let _ = self.keepalive_channel.send(Status::SendMessage(map));
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::HeartbeatAck) => {
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
let _ = self.keepalive_channel.send(Status::ChangeInterval(interval));
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::InvalidateSession) => {
self.session_id = None;
@@ -307,10 +309,10 @@ impl Connection {
let _ = self.keepalive_channel.send(status);
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::Reconnect) => {
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(Error::Connection(ConnectionError::Closed(num, message))) => {
warn!("Closing with {:?}: {:?}", num, message);
@@ -322,14 +324,14 @@ impl Connection {
// Otherwise, fallback to reconnecting.
if num != Some(1000) {
if let Some(session_id) = self.session_id.clone() {
- match self.resume(session_id) {
- Ok(event) => return Ok(event),
+ match self.resume(session_id, receiver) {
+ Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
Err(why) => debug!("Err resuming: {:?}", why),
}
}
}
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(Error::WebSocket(why)) => {
warn!("Websocket error: {:?}", why);
@@ -341,27 +343,27 @@ impl Connection {
//
// Otherwise, fallback to reconnecting.
if let Some(session_id) = self.session_id.clone() {
- match self.resume(session_id) {
- Ok(event) => return Ok(event),
+ match self.resume(session_id, &mut receiver) {
+ Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
Err(why) => debug!("Err resuming: {:?}", why),
}
}
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(error) => Err(error),
}
}
- fn handle_dispatch(&mut self, event: Event) -> Event {
- if let Event::Resumed(ref ev) = event {
+ fn handle_dispatch(&mut self, event: &Event) {
+ if let &Event::Resumed(ref ev) = event {
let status = Status::ChangeInterval(ev.heartbeat_interval);
let _ = self.keepalive_channel.send(status);
}
feature_voice_enabled! {{
- if let Event::VoiceStateUpdate(ref update) = event {
+ if let &Event::VoiceStateUpdate(ref update) = event {
if let Some(guild_id) = update.guild_id {
if let Some(handler) = self.manager.get(guild_id) {
handler.update_state(&update.voice_state);
@@ -369,7 +371,7 @@ impl Connection {
}
}
- if let Event::VoiceServerUpdate(ref update) = event {
+ if let &Event::VoiceServerUpdate(ref update) = event {
if let Some(guild_id) = update.guild_id {
if let Some(handler) = self.manager.get(guild_id) {
handler.update_server(&update.endpoint, &update.token);
@@ -377,11 +379,9 @@ impl Connection {
}
}
}}
-
- event
}
- fn reconnect(&mut self) -> Result<Event> {
+ fn reconnect(&mut self, mut receiver: &mut Receiver<WebSocketStream>) -> Result<(Event, Receiver<WebSocketStream>)> {
debug!("Reconnecting");
// Take a few attempts at reconnecting; otherwise fall back to
@@ -392,12 +392,12 @@ impl Connection {
self.shard_info,
self.login_type);
- if let Ok((connection, ready)) = connection {
- try!(mem::replace(self, connection).shutdown());
+ if let Ok((connection, ready, receiver_new)) = connection {
+ try!(mem::replace(self, connection).shutdown(&mut receiver));
self.session_id = Some(ready.ready.session_id.clone());
- return Ok(Event::Ready(ready));
+ return Ok((Event::Ready(ready), receiver_new));
}
thread::sleep(StdDuration::from_secs(1));
@@ -409,7 +409,7 @@ impl Connection {
// Client. This client _does not_ replace the current client(s) that the
// user has. This client will then connect to gateway. This new
// connection will be used to replace _this_ connection.
- let (connection, ready) = {
+ let (connection, ready, receiver_new) = {
let mut client = Client::login_raw(&self.token.clone(),
self.login_type);
@@ -418,15 +418,16 @@ impl Connection {
// Replace this connection with a new one, and shutdown the now-old
// connection.
- try!(mem::replace(self, connection).shutdown());
+ try!(mem::replace(self, connection).shutdown(&mut receiver));
self.session_id = Some(ready.ready.session_id.clone());
- Ok(Event::Ready(ready))
+ Ok((Event::Ready(ready), receiver_new))
}
- fn resume(&mut self, session_id: String) -> Result<Event> {
- try!(self.receiver.get_mut().get_mut().shutdown(Shutdown::Both));
+ fn resume(&mut self, session_id: String, receiver: &mut Receiver<WebSocketStream>)
+ -> Result<(Event, Receiver<WebSocketStream>)> {
+ try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both));
let url = try!(build_gateway_url(&self.ws_url));
let response = try!(try!(WsClient::connect(url)).send());
@@ -468,14 +469,14 @@ impl Connection {
}
}
- self.receiver = receiver;
let _ = self.keepalive_channel.send(Status::ChangeSender(sender));
- Ok(first_event)
+ Ok((first_event, receiver))
}
- pub fn shutdown(&mut self) -> Result<()> {
- let stream = self.receiver.get_mut().get_mut();
+ pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>)
+ -> Result<()> {
+ let stream = receiver.get_mut().get_mut();
{
let mut sender = Sender::new(stream.by_ref(), true);
@@ -513,30 +514,6 @@ impl Connection {
}
}
-impl Drop for Connection {
- fn drop(&mut self) {
- match self.shutdown() {
- Ok(()) => {
- if let Some(info) = self.shard_info {
- println!("Correctly shutdown shard {}/{}", info[0], info[1] - 1);
- } else {
- println!("Correctly shutdown connection");
- }
- },
- Err(why) => {
- if let Some(info) = self.shard_info {
- println!("Failed to shutdown shard {}/{}: {:?}",
- info[0],
- info[1] - 1,
- why);
- } else {
- println!("Failed to shutdown connection: {:?}", why);
- }
- }
- }
- }
-}
-
#[inline]
fn parse_ready(event: GatewayEvent,
tx: &MpscSender<Status>,
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 8798723..80b2061 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -3,7 +3,6 @@ use std::thread;
use super::event_store::EventStore;
use super::login_type::LoginType;
use super::{Connection, Context};
-use ::internal::prelude::*;
use ::model::{ChannelId, Event, Message};
#[cfg(feature="framework")]
@@ -42,13 +41,13 @@ fn context(channel_id: Option<ChannelId>,
}
#[cfg(feature="framework")]
-pub fn dispatch(event: Result<Event>,
+pub fn dispatch(event: Event,
conn: Arc<Mutex<Connection>>,
framework: Arc<Mutex<Framework>>,
login_type: LoginType,
event_store: Arc<Mutex<EventStore>>) {
match event {
- Ok(Event::MessageCreate(event)) => {
+ Event::MessageCreate(event) => {
let context = context(Some(event.message.channel_id),
conn,
login_type);
@@ -64,8 +63,7 @@ pub fn dispatch(event: Result<Event>,
dispatch_message(context, event.message, event_store);
}
},
- Ok(other) => handle_event(other, conn, login_type, event_store),
- Err(_why) => {},
+ other => handle_event(other, conn, login_type, event_store),
}
}
@@ -75,7 +73,7 @@ pub fn dispatch(event: Result<Event>,
login_type: LoginType,
event_store: Arc<Mutex<EventStore>>) {
match event {
- Ok(Event::MessageCreate(event)) => {
+ Event::MessageCreate(event) => {
let context = context(Some(event.message.channel_id),
conn,
login_type);
@@ -83,8 +81,7 @@ pub fn dispatch(event: Result<Event>,
event.message.clone(),
event_store);
},
- Ok(other) => handle_event(other, conn, login_type, event_store),
- Err(_why) => {},
+ other => handle_event(other, conn, login_type, event_store),
}
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 3dac20d..6b9be4c 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -45,7 +45,10 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
+use websocket::client::Receiver;
+use websocket::stream::WebSocketStream;
use ::internal::prelude::*;
+use ::internal::ws_impl::ReceiverExt;
use ::model::*;
#[cfg(feature = "framework")]
@@ -870,7 +873,7 @@ impl Client {
shard_data.map(|s| [i, s[2]]),
self.login_type);
match connection {
- Ok((connection, ready)) => {
+ Ok((connection, ready, receiver)) => {
self.connections.push(Arc::new(Mutex::new(connection)));
feature_state_enabled! {{
@@ -882,13 +885,13 @@ impl Client {
match self.connections.last() {
Some(connection) => {
feature_framework! {{
- dispatch(Ok(Event::Ready(ready)),
+ dispatch(Event::Ready(ready),
connection.clone(),
self.framework.clone(),
self.login_type,
self.event_store.clone());
} else {
- dispatch(Ok(Event::Ready(ready)),
+ dispatch(Event::Ready(ready),
connection.clone(),
self.login_type,
self.event_store.clone());
@@ -905,13 +908,15 @@ impl Client {
handle_connection(connection_clone,
framework,
login_type,
- event_store)
+ event_store,
+ receiver)
});
} else {
thread::spawn(move || {
handle_connection(connection_clone,
login_type,
- event_store)
+ event_store,
+ receiver)
});
}}
},
@@ -935,7 +940,7 @@ impl Client {
#[doc(hidden)]
pub fn boot_connection(&mut self,
shard_info: Option<[u8; 2]>)
- -> Result<(Connection, ReadyEvent)> {
+ -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> {
let gateway_url = try!(http::get_gateway()).url;
Connection::new(&gateway_url, &self.token, shard_info, self.login_type)
@@ -1250,12 +1255,21 @@ impl Client {
fn handle_connection(connection: Arc<Mutex<Connection>>,
framework: Arc<Mutex<Framework>>,
login_type: LoginType,
- event_store: Arc<Mutex<EventStore>>) {
+ event_store: Arc<Mutex<EventStore>>,
+ mut receiver: Receiver<WebSocketStream>) {
loop {
- let event = {
- let mut connection = connection.lock().unwrap();
+ let event = receiver.recv_json(GatewayEvent::decode);
- connection.receive()
+ let event = match connection.lock().unwrap().handle_event(event, &mut receiver) {
+ Ok(Some(x)) => match x {
+ (event, Some(new_receiver)) => {
+ receiver = new_receiver;
+
+ event
+ },
+ (event, None) => event,
+ },
+ _ => continue,
};
dispatch(event,
@@ -1269,7 +1283,8 @@ fn handle_connection(connection: Arc<Mutex<Connection>>,
#[cfg(not(feature="framework"))]
fn handle_connection(connection: Arc<Mutex<Connection>>,
login_type: LoginType,
- event_store: Arc<Mutex<EventStore>>) {
+ event_store: Arc<Mutex<EventStore>>,
+ receiver: Receiver<WebSocketStream>) {
loop {
let event = {
let mut connection = connection.lock().unwrap();