diff options
| -rw-r--r-- | src/client/connection.rs | 101 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 13 | ||||
| -rw-r--r-- | src/client/mod.rs | 37 |
3 files changed, 70 insertions, 81 deletions
diff --git a/src/client/connection.rs b/src/client/connection.rs index 1e7f2a0..8937710 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -137,7 +137,6 @@ pub struct Connection { keepalive_channel: MpscSender<Status>, last_sequence: u64, login_type: LoginType, - receiver: Receiver<WebSocketStream>, session_id: Option<String>, shard_info: Option<[u8; 2]>, token: String, @@ -173,7 +172,7 @@ impl Connection { token: &str, shard_info: Option<[u8; 2]>, login_type: LoginType) - -> Result<(Connection, ReadyEvent)> { + -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> { let url = try!(build_gateway_url(base_url)); let response = try!(try!(WsClient::connect(url)).send()); @@ -216,7 +215,6 @@ impl Connection { keepalive_channel: tx.clone(), last_sequence: sequence, login_type: login_type, - receiver: receiver, token: token.to_owned(), session_id: Some(ready.ready.session_id.clone()), shard_info: shard_info, @@ -228,13 +226,12 @@ impl Connection { keepalive_channel: tx.clone(), last_sequence: sequence, login_type: login_type, - receiver: receiver, token: token.to_owned(), session_id: Some(ready.ready.session_id.clone()), shard_info: shard_info, ws_url: base_url.to_owned(), } - }}, ready)) + }}, ready, receiver)) } pub fn shard_info(&self) -> Option<[u8; 2]> { @@ -273,13 +270,18 @@ impl Connection { let _ = self.keepalive_channel.send(Status::SendMessage(msg)); } - pub fn receive(&mut self) -> Result<Event> { - match self.receiver.recv_json(GatewayEvent::decode) { + pub fn handle_event(&mut self, + event: Result<GatewayEvent>, + mut receiver: &mut Receiver<WebSocketStream>) + -> Result<Option<(Event, Option<Receiver<WebSocketStream>>)>> { + match event { Ok(GatewayEvent::Dispatch(sequence, event)) => { let status = Status::Sequence(sequence); let _ = self.keepalive_channel.send(status); - Ok(self.handle_dispatch(event)) + self.handle_dispatch(&event); + + Ok(Some((event, None))) }, Ok(GatewayEvent::Heartbeat(sequence)) => { let map = ObjectBuilder::new() @@ -288,15 +290,15 @@ impl Connection { .build(); let _ = self.keepalive_channel.send(Status::SendMessage(map)); - self.receive() + Ok(None) }, Ok(GatewayEvent::HeartbeatAck) => { - self.receive() + Ok(None) }, Ok(GatewayEvent::Hello(interval)) => { let _ = self.keepalive_channel.send(Status::ChangeInterval(interval)); - self.receive() + Ok(None) }, Ok(GatewayEvent::InvalidateSession) => { self.session_id = None; @@ -307,10 +309,10 @@ impl Connection { let _ = self.keepalive_channel.send(status); - self.receive() + Ok(None) }, Ok(GatewayEvent::Reconnect) => { - self.reconnect() + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, Err(Error::Connection(ConnectionError::Closed(num, message))) => { warn!("Closing with {:?}: {:?}", num, message); @@ -322,14 +324,14 @@ impl Connection { // Otherwise, fallback to reconnecting. if num != Some(1000) { if let Some(session_id) = self.session_id.clone() { - match self.resume(session_id) { - Ok(event) => return Ok(event), + match self.resume(session_id, receiver) { + Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), Err(why) => debug!("Err resuming: {:?}", why), } } } - self.reconnect() + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, Err(Error::WebSocket(why)) => { warn!("Websocket error: {:?}", why); @@ -341,27 +343,27 @@ impl Connection { // // Otherwise, fallback to reconnecting. if let Some(session_id) = self.session_id.clone() { - match self.resume(session_id) { - Ok(event) => return Ok(event), + match self.resume(session_id, &mut receiver) { + Ok((ev, rec)) => return Ok(Some((ev, Some(rec)))), Err(why) => debug!("Err resuming: {:?}", why), } } - self.reconnect() + self.reconnect(receiver).map(|(ev, rec)| Some((ev, Some(rec)))) }, Err(error) => Err(error), } } - fn handle_dispatch(&mut self, event: Event) -> Event { - if let Event::Resumed(ref ev) = event { + fn handle_dispatch(&mut self, event: &Event) { + if let &Event::Resumed(ref ev) = event { let status = Status::ChangeInterval(ev.heartbeat_interval); let _ = self.keepalive_channel.send(status); } feature_voice_enabled! {{ - if let Event::VoiceStateUpdate(ref update) = event { + if let &Event::VoiceStateUpdate(ref update) = event { if let Some(guild_id) = update.guild_id { if let Some(handler) = self.manager.get(guild_id) { handler.update_state(&update.voice_state); @@ -369,7 +371,7 @@ impl Connection { } } - if let Event::VoiceServerUpdate(ref update) = event { + if let &Event::VoiceServerUpdate(ref update) = event { if let Some(guild_id) = update.guild_id { if let Some(handler) = self.manager.get(guild_id) { handler.update_server(&update.endpoint, &update.token); @@ -377,11 +379,9 @@ impl Connection { } } }} - - event } - fn reconnect(&mut self) -> Result<Event> { + fn reconnect(&mut self, mut receiver: &mut Receiver<WebSocketStream>) -> Result<(Event, Receiver<WebSocketStream>)> { debug!("Reconnecting"); // Take a few attempts at reconnecting; otherwise fall back to @@ -392,12 +392,12 @@ impl Connection { self.shard_info, self.login_type); - if let Ok((connection, ready)) = connection { - try!(mem::replace(self, connection).shutdown()); + if let Ok((connection, ready, receiver_new)) = connection { + try!(mem::replace(self, connection).shutdown(&mut receiver)); self.session_id = Some(ready.ready.session_id.clone()); - return Ok(Event::Ready(ready)); + return Ok((Event::Ready(ready), receiver_new)); } thread::sleep(StdDuration::from_secs(1)); @@ -409,7 +409,7 @@ impl Connection { // Client. This client _does not_ replace the current client(s) that the // user has. This client will then connect to gateway. This new // connection will be used to replace _this_ connection. - let (connection, ready) = { + let (connection, ready, receiver_new) = { let mut client = Client::login_raw(&self.token.clone(), self.login_type); @@ -418,15 +418,16 @@ impl Connection { // Replace this connection with a new one, and shutdown the now-old // connection. - try!(mem::replace(self, connection).shutdown()); + try!(mem::replace(self, connection).shutdown(&mut receiver)); self.session_id = Some(ready.ready.session_id.clone()); - Ok(Event::Ready(ready)) + Ok((Event::Ready(ready), receiver_new)) } - fn resume(&mut self, session_id: String) -> Result<Event> { - try!(self.receiver.get_mut().get_mut().shutdown(Shutdown::Both)); + fn resume(&mut self, session_id: String, receiver: &mut Receiver<WebSocketStream>) + -> Result<(Event, Receiver<WebSocketStream>)> { + try!(receiver.get_mut().get_mut().shutdown(Shutdown::Both)); let url = try!(build_gateway_url(&self.ws_url)); let response = try!(try!(WsClient::connect(url)).send()); @@ -468,14 +469,14 @@ impl Connection { } } - self.receiver = receiver; let _ = self.keepalive_channel.send(Status::ChangeSender(sender)); - Ok(first_event) + Ok((first_event, receiver)) } - pub fn shutdown(&mut self) -> Result<()> { - let stream = self.receiver.get_mut().get_mut(); + pub fn shutdown(&mut self, receiver: &mut Receiver<WebSocketStream>) + -> Result<()> { + let stream = receiver.get_mut().get_mut(); { let mut sender = Sender::new(stream.by_ref(), true); @@ -513,30 +514,6 @@ impl Connection { } } -impl Drop for Connection { - fn drop(&mut self) { - match self.shutdown() { - Ok(()) => { - if let Some(info) = self.shard_info { - println!("Correctly shutdown shard {}/{}", info[0], info[1] - 1); - } else { - println!("Correctly shutdown connection"); - } - }, - Err(why) => { - if let Some(info) = self.shard_info { - println!("Failed to shutdown shard {}/{}: {:?}", - info[0], - info[1] - 1, - why); - } else { - println!("Failed to shutdown connection: {:?}", why); - } - } - } - } -} - #[inline] fn parse_ready(event: GatewayEvent, tx: &MpscSender<Status>, diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 8798723..80b2061 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -3,7 +3,6 @@ use std::thread; use super::event_store::EventStore; use super::login_type::LoginType; use super::{Connection, Context}; -use ::internal::prelude::*; use ::model::{ChannelId, Event, Message}; #[cfg(feature="framework")] @@ -42,13 +41,13 @@ fn context(channel_id: Option<ChannelId>, } #[cfg(feature="framework")] -pub fn dispatch(event: Result<Event>, +pub fn dispatch(event: Event, conn: Arc<Mutex<Connection>>, framework: Arc<Mutex<Framework>>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>) { match event { - Ok(Event::MessageCreate(event)) => { + Event::MessageCreate(event) => { let context = context(Some(event.message.channel_id), conn, login_type); @@ -64,8 +63,7 @@ pub fn dispatch(event: Result<Event>, dispatch_message(context, event.message, event_store); } }, - Ok(other) => handle_event(other, conn, login_type, event_store), - Err(_why) => {}, + other => handle_event(other, conn, login_type, event_store), } } @@ -75,7 +73,7 @@ pub fn dispatch(event: Result<Event>, login_type: LoginType, event_store: Arc<Mutex<EventStore>>) { match event { - Ok(Event::MessageCreate(event)) => { + Event::MessageCreate(event) => { let context = context(Some(event.message.channel_id), conn, login_type); @@ -83,8 +81,7 @@ pub fn dispatch(event: Result<Event>, event.message.clone(), event_store); }, - Ok(other) => handle_event(other, conn, login_type, event_store), - Err(_why) => {}, + other => handle_event(other, conn, login_type, event_store), } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 3dac20d..6b9be4c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -45,7 +45,10 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; +use websocket::client::Receiver; +use websocket::stream::WebSocketStream; use ::internal::prelude::*; +use ::internal::ws_impl::ReceiverExt; use ::model::*; #[cfg(feature = "framework")] @@ -870,7 +873,7 @@ impl Client { shard_data.map(|s| [i, s[2]]), self.login_type); match connection { - Ok((connection, ready)) => { + Ok((connection, ready, receiver)) => { self.connections.push(Arc::new(Mutex::new(connection))); feature_state_enabled! {{ @@ -882,13 +885,13 @@ impl Client { match self.connections.last() { Some(connection) => { feature_framework! {{ - dispatch(Ok(Event::Ready(ready)), + dispatch(Event::Ready(ready), connection.clone(), self.framework.clone(), self.login_type, self.event_store.clone()); } else { - dispatch(Ok(Event::Ready(ready)), + dispatch(Event::Ready(ready), connection.clone(), self.login_type, self.event_store.clone()); @@ -905,13 +908,15 @@ impl Client { handle_connection(connection_clone, framework, login_type, - event_store) + event_store, + receiver) }); } else { thread::spawn(move || { handle_connection(connection_clone, login_type, - event_store) + event_store, + receiver) }); }} }, @@ -935,7 +940,7 @@ impl Client { #[doc(hidden)] pub fn boot_connection(&mut self, shard_info: Option<[u8; 2]>) - -> Result<(Connection, ReadyEvent)> { + -> Result<(Connection, ReadyEvent, Receiver<WebSocketStream>)> { let gateway_url = try!(http::get_gateway()).url; Connection::new(&gateway_url, &self.token, shard_info, self.login_type) @@ -1250,12 +1255,21 @@ impl Client { fn handle_connection(connection: Arc<Mutex<Connection>>, framework: Arc<Mutex<Framework>>, login_type: LoginType, - event_store: Arc<Mutex<EventStore>>) { + event_store: Arc<Mutex<EventStore>>, + mut receiver: Receiver<WebSocketStream>) { loop { - let event = { - let mut connection = connection.lock().unwrap(); + let event = receiver.recv_json(GatewayEvent::decode); - connection.receive() + let event = match connection.lock().unwrap().handle_event(event, &mut receiver) { + Ok(Some(x)) => match x { + (event, Some(new_receiver)) => { + receiver = new_receiver; + + event + }, + (event, None) => event, + }, + _ => continue, }; dispatch(event, @@ -1269,7 +1283,8 @@ fn handle_connection(connection: Arc<Mutex<Connection>>, #[cfg(not(feature="framework"))] fn handle_connection(connection: Arc<Mutex<Connection>>, login_type: LoginType, - event_store: Arc<Mutex<EventStore>>) { + event_store: Arc<Mutex<EventStore>>, + receiver: Receiver<WebSocketStream>) { loop { let event = { let mut connection = connection.lock().unwrap(); |