aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAustin Hellyer <[email protected]>2016-11-19 15:14:06 -0800
committerAustin Hellyer <[email protected]>2016-11-19 15:14:06 -0800
commit87f03ea4384cf3293558171c56257d03dbb6766c (patch)
tree4a70c3e2c168433344f8590f403a7167667d3b4e /src
parentFix cond. compile across multiple feature targets (diff)
downloadserenity-87f03ea4384cf3293558171c56257d03dbb6766c.tar.xz
serenity-87f03ea4384cf3293558171c56257d03dbb6766c.zip
Nonblocking connection
The connection is now non-blocking, allowing user event handlers to immediately unlock it themselves (assuming they haven't unlocked it elsewhere), rather than waiting on the library to receive an event. This is done by decoupling the receiver away from the connection, which is not necessarily "directly" associated with the connection. This needs a _lot_ of code cleanup later.
Diffstat (limited to 'src')
-rw-r--r--src/client/connection.rs101
-rw-r--r--src/client/dispatch.rs13
-rw-r--r--src/client/mod.rs37
3 files changed, 70 insertions, 81 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 1e7f2a0..8937710 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -137,7 +137,6 @@ pub struct Connection {
keepalive_channel: MpscSender<Status>,
last_sequence: u64,
login_type: LoginType,
- receiver: Receiver<WebSocketStream>,
session_id: Option<String>,
shard_info: Option<[u8; 2]>,
token: String,
@@ -173,7 +172,7 @@ impl Connection {
token: &str,
shard_info: Option<[u8; 2]>,
login_type: LoginType)
- -> Result<(Connection, ReadyEvent)> {
+ -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> {
let url = try!(build_gateway_url(base_url));
let response = try!(try!(WsClient::connect(url)).send());
@@ -216,7 +215,6 @@ impl Connection {
keepalive_channel: tx.clone(),
last_sequence: sequence,
login_type: login_type,
- receiver: receiver,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
shard_info: shard_info,
@@ -228,13 +226,12 @@ impl Connection {
keepalive_channel: tx.clone(),
last_sequence: sequence,
login_type: login_type,
- receiver: receiver,
token: token.to_owned(),
session_id: Some(ready.ready.session_id.clone()),
shard_info: shard_info,
ws_url: base_url.to_owned(),
}
- }}, ready))
+ }}, ready, receiver))
}
pub fn shard_info(&self) -> Option<[u8; 2]> {
@@ -273,13 +270,18 @@ impl Connection {
let _ = self.keepalive_channel.send(Status::SendMessage(msg));
}
- pub fn receive(&mut self) -> Result<Event> {
- match self.receiver.recv_json(GatewayEvent::decode) {
+ pub fn handle_event(&mut self,
+ event: Result<GatewayEvent>,
+ mut receiver: &mut Receiver<WebSocketStream>)
+ -> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> {
+ match event {
Ok(GatewayEvent::Dispatch(sequence, event)) => {
let status = Status::Sequence(sequence);
let _ = self.keepalive_channel.send(status);
- Ok(self.handle_dispatch(event))
+ self.handle_dispatch(&event);
+
+ Ok(Some((event, None)))
},
Ok(GatewayEvent::Heartbeat(sequence)) => {
let map = ObjectBuilder::new()
@@ -288,15 +290,15 @@ impl Connection {
.build();
let _ = self.keepalive_channel.send(Status::SendMessage(map));
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::HeartbeatAck) => {
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
let _ = self.keepalive_channel.send(Status::ChangeInterval(interval));
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::InvalidateSession) => {
self.session_id = None;
@@ -307,10 +309,10 @@ impl Connection {
let _ = self.keepalive_channel.send(status);
- self.receive()
+ Ok(None)
},
Ok(GatewayEvent::Reconnect) => {
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(Error::Connection(ConnectionError::Closed(num, message))) => {
warn!("Closing with {:?}: {:?}", num, message);
@@ -322,14 +324,14 @@ impl Connection {
// Otherwise, fallback to reconnecting.
if num != Some(1000) {
if let Some(session_id) = self.session_id.clone() {
- match self.resume(session_id) {
- Ok(event) => return Ok(event),
+ match self.resume(session_id, receiver) {
+ Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
Err(why) => debug!("Err resuming: {:?}", why),
}
}
}
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(Error::WebSocket(why)) => {
warn!("Websocket error: {:?}", why);
@@ -341,27 +343,27 @@ impl Connection {
//
// Otherwise, fallback to reconnecting.
if let Some(session_id) = self.session_id.clone() {
- match self.resume(session_id) {
- Ok(event) => return Ok(event),
+ match self.resume(session_id, &mut receiver) {
+ Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))),
Err(why) => debug!("Err resuming: {:?}", why),
}
}
- self.reconnect()
+ self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec))))
},
Err(error) => Err(error),
}
}
- fn handle_dispatch(&mut self, event: Event) -> Event {
- if let Event::Resumed(ref ev) = event {
+ fn handle_dispatch(&mut self, event: &Event) {
+ if let &Event::Resumed(ref ev) = event {
let status = Status::ChangeInterval(ev.heartbeat_interval);
let _ = self.keepalive_channel.send(status);
}
feature_voice_enabled! {{
- if let Event::VoiceStateUpdate(ref update) = event {
+ if let &Event::VoiceStateUpdate(ref update) = event {
if let Some(guild_id) = update.guild_id {
if let Some(handler) = self.manager.get(guild_id) {
handler.update_state(&update.voice_state);
@@ -369,7 +371,7 @@ impl Connection {
}
}
- if let Event::VoiceServerUpdate(ref update) = event {
+ if let &Event::VoiceServerUpdate(ref update) = event {
if let Some(guild_id) = update.guild_id {
if let Some(handler) = self.manager.get(guild_id) {
handler.update_server(&update.endpoint, &update.token);
@@ -377,11 +379,9 @@ impl Connection {
}
}
}}
-
- event
}
- fn reconnect(&mut self) -> Result<Event> {
+ fn reconnect(&mut self, mut receiver: &mut Receiver<WebSocketStream>) -> Result<(Event, Receiver<WebSocketStream>)> {
debug!("Reconnecting");
// Take a few attempts at reconnecting; otherwise fall back to
@@ -392,12 +392,12 @@ impl Connection {
self.shard_info,
self.login_type);
- if let Ok((connection, ready)) = connection {
- try!(mem::replace(self, connection).shutdown());
+ if let Ok((connection, ready, receiver_new)) = connection {
+ try!(mem::replace(self, connection).shutdown(&mut receiver));
self.session_id = Some(ready.ready.session_id.clone());
- return Ok(Event::Ready(ready));
+ return Ok((Event::Ready(ready), receiver_new));
}
thread::sleep(StdDuration::from_secs(1));
@@ -409,7 +409,7 @@ impl Connection {
// Client. This client _does not_ replace the current client(s) that the
// user has. This client will then connect to gateway. This new
// connection will be used to replace _this_ connection.
- let (connection, ready) = {
+ let (connection, ready, receiver_new) = {
let mut client = Client::login_raw(&self.token.clone(),
self.login_type);
@@ -418,15 +418,16 @@ impl Connection {
// Replace this connection with a new one, and shutdown the now-old
// connection.
- try!(mem::replace(self, connection).shutdown());
+ try!(mem::replace(self, connection).shutdown(&mut receiver));
self.session_id = Some(ready.ready.session_id.clone());
- Ok(Event::Ready(ready))
+ Ok((Event::Ready(ready), receiver_new))
}
- fn resume(&mut self, session_id: String) -> Result<Event> {
- try!(self.receiver.get_mut().get_mut().shutdown(Shutdown::Both));
+ fn resume(&mut self, session_id: String, receiver: &mut Receiver<WebSocketStream>)
+ -> Result<(Event, Receiver<WebSocketStream>)> {
+ try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both));
let url = try!(build_gateway_url(&self.ws_url));
let response = try!(try!(WsClient::connect(url)).send());
@@ -468,14 +469,14 @@ impl Connection {
}
}
- self.receiver = receiver;
let _ = self.keepalive_channel.send(Status::ChangeSender(sender));
- Ok(first_event)
+ Ok((first_event, receiver))
}
- pub fn shutdown(&mut self) -> Result<()> {
- let stream = self.receiver.get_mut().get_mut();
+ pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>)
+ -> Result<()> {
+ let stream = receiver.get_mut().get_mut();
{
let mut sender = Sender::new(stream.by_ref(), true);
@@ -513,30 +514,6 @@ impl Connection {
}
}
-impl Drop for Connection {
- fn drop(&mut self) {
- match self.shutdown() {
- Ok(()) => {
- if let Some(info) = self.shard_info {
- println!("Correctly shutdown shard {}/{}", info[0], info[1] - 1);
- } else {
- println!("Correctly shutdown connection");
- }
- },
- Err(why) => {
- if let Some(info) = self.shard_info {
- println!("Failed to shutdown shard {}/{}: {:?}",
- info[0],
- info[1] - 1,
- why);
- } else {
- println!("Failed to shutdown connection: {:?}", why);
- }
- }
- }
- }
-}
-
#[inline]
fn parse_ready(event: GatewayEvent,
tx: &MpscSender<Status>,
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 8798723..80b2061 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -3,7 +3,6 @@ use std::thread;
use super::event_store::EventStore;
use super::login_type::LoginType;
use super::{Connection, Context};
-use ::internal::prelude::*;
use ::model::{ChannelId, Event, Message};
#[cfg(feature="framework")]
@@ -42,13 +41,13 @@ fn context(channel_id: Option<ChannelId>,
}
#[cfg(feature="framework")]
-pub fn dispatch(event: Result<Event>,
+pub fn dispatch(event: Event,
conn: Arc<Mutex<Connection>>,
framework: Arc<Mutex<Framework>>,
login_type: LoginType,
event_store: Arc<Mutex<EventStore>>) {
match event {
- Ok(Event::MessageCreate(event)) => {
+ Event::MessageCreate(event) => {
let context = context(Some(event.message.channel_id),
conn,
login_type);
@@ -64,8 +63,7 @@ pub fn dispatch(event: Result<Event>,
dispatch_message(context, event.message, event_store);
}
},
- Ok(other) => handle_event(other, conn, login_type, event_store),
- Err(_why) => {},
+ other => handle_event(other, conn, login_type, event_store),
}
}
@@ -75,7 +73,7 @@ pub fn dispatch(event: Result<Event>,
login_type: LoginType,
event_store: Arc<Mutex<EventStore>>) {
match event {
- Ok(Event::MessageCreate(event)) => {
+ Event::MessageCreate(event) => {
let context = context(Some(event.message.channel_id),
conn,
login_type);
@@ -83,8 +81,7 @@ pub fn dispatch(event: Result<Event>,
event.message.clone(),
event_store);
},
- Ok(other) => handle_event(other, conn, login_type, event_store),
- Err(_why) => {},
+ other => handle_event(other, conn, login_type, event_store),
}
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 3dac20d..6b9be4c 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -45,7 +45,10 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
+use websocket::client::Receiver;
+use websocket::stream::WebSocketStream;
use ::internal::prelude::*;
+use ::internal::ws_impl::ReceiverExt;
use ::model::*;
#[cfg(feature = "framework")]
@@ -870,7 +873,7 @@ impl Client {
shard_data.map(|s| [i, s[2]]),
self.login_type);
match connection {
- Ok((connection, ready)) => {
+ Ok((connection, ready, receiver)) => {
self.connections.push(Arc::new(Mutex::new(connection)));
feature_state_enabled! {{
@@ -882,13 +885,13 @@ impl Client {
match self.connections.last() {
Some(connection) => {
feature_framework! {{
- dispatch(Ok(Event::Ready(ready)),
+ dispatch(Event::Ready(ready),
connection.clone(),
self.framework.clone(),
self.login_type,
self.event_store.clone());
} else {
- dispatch(Ok(Event::Ready(ready)),
+ dispatch(Event::Ready(ready),
connection.clone(),
self.login_type,
self.event_store.clone());
@@ -905,13 +908,15 @@ impl Client {
handle_connection(connection_clone,
framework,
login_type,
- event_store)
+ event_store,
+ receiver)
});
} else {
thread::spawn(move || {
handle_connection(connection_clone,
login_type,
- event_store)
+ event_store,
+ receiver)
});
}}
},
@@ -935,7 +940,7 @@ impl Client {
#[doc(hidden)]
pub fn boot_connection(&mut self,
shard_info: Option<[u8; 2]>)
- -> Result<(Connection, ReadyEvent)> {
+ -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> {
let gateway_url = try!(http::get_gateway()).url;
Connection::new(&gateway_url, &self.token, shard_info, self.login_type)
@@ -1250,12 +1255,21 @@ impl Client {
fn handle_connection(connection: Arc<Mutex<Connection>>,
framework: Arc<Mutex<Framework>>,
login_type: LoginType,
- event_store: Arc<Mutex<EventStore>>) {
+ event_store: Arc<Mutex<EventStore>>,
+ mut receiver: Receiver<WebSocketStream>) {
loop {
- let event = {
- let mut connection = connection.lock().unwrap();
+ let event = receiver.recv_json(GatewayEvent::decode);
- connection.receive()
+ let event = match connection.lock().unwrap().handle_event(event, &mut receiver) {
+ Ok(Some(x)) => match x {
+ (event, Some(new_receiver)) => {
+ receiver = new_receiver;
+
+ event
+ },
+ (event, None) => event,
+ },
+ _ => continue,
};
dispatch(event,
@@ -1269,7 +1283,8 @@ fn handle_connection(connection: Arc<Mutex<Connection>>,
#[cfg(not(feature="framework"))]
fn handle_connection(connection: Arc<Mutex<Connection>>,
login_type: LoginType,
- event_store: Arc<Mutex<EventStore>>) {
+ event_store: Arc<Mutex<EventStore>>,
+ receiver: Receiver<WebSocketStream>) {
loop {
let event = {
let mut connection = connection.lock().unwrap();