aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorAlex Lyon <[email protected]>2017-07-13 21:30:00 -0700
committeralex <[email protected]>2017-07-14 06:30:00 +0200
commit88765d0a978001ff88a1ee12798a725b7f5a90e9 (patch)
treed66970df218ac9e9c4aa3b038e56ee6ce7c81292 /src/client
parentFix the doc on `PrivateChannel::name` (diff)
downloadserenity-88765d0a978001ff88a1ee12798a725b7f5a90e9.tar.xz
serenity-88765d0a978001ff88a1ee12798a725b7f5a90e9.zip
Switch to tokio for events (#122)
Diffstat (limited to 'src/client')
-rw-r--r--src/client/context.rs21
-rw-r--r--src/client/dispatch.rs298
-rw-r--r--src/client/mod.rs52
3 files changed, 268 insertions, 103 deletions
diff --git a/src/client/context.rs b/src/client/context.rs
index 97cde50..3053648 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -1,7 +1,8 @@
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use typemap::ShareMap;
use ::gateway::Shard;
use ::model::*;
+use parking_lot::Mutex;
#[cfg(feature="cache")]
use super::CACHE;
@@ -125,7 +126,7 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn online(&self) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_status(OnlineStatus::Online);
}
@@ -154,7 +155,7 @@ impl Context {
///
/// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle
pub fn idle(&self) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_status(OnlineStatus::Idle);
}
@@ -183,7 +184,7 @@ impl Context {
///
/// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb
pub fn dnd(&self) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_status(OnlineStatus::DoNotDisturb);
}
@@ -213,7 +214,7 @@ impl Context {
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible
pub fn invisible(&self) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_status(OnlineStatus::Invisible);
}
@@ -245,7 +246,7 @@ impl Context {
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
/// [`set_presence`]: #method.set_presence
pub fn reset_presence(&self) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_presence(None, OnlineStatus::Online, false)
}
@@ -280,7 +281,7 @@ impl Context {
///
/// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online
pub fn set_game(&self, game: Game) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_presence(Some(game), OnlineStatus::Online, false);
}
@@ -326,7 +327,7 @@ impl Context {
url: None,
};
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_presence(Some(game), OnlineStatus::Online, false);
}
@@ -380,7 +381,7 @@ impl Context {
game: Option<Game>,
status: OnlineStatus,
afk: bool) {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.set_presence(game, status, afk)
}
@@ -391,7 +392,7 @@ impl Context {
///
/// [`Client::start`]: ./struct.Client.html#method.start
pub fn quit(&self) -> Result<()> {
- let mut shard = self.shard.lock().unwrap();
+ let mut shard = self.shard.lock();
shard.shutdown_clean()
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 0def95e..f2850ec 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,4 +1,5 @@
-use std::sync::{Arc, Mutex};
+use std::sync::{self, Arc};
+use parking_lot::Mutex;
use std::thread;
use std::time;
use super::event_handler::EventHandler;
@@ -8,6 +9,7 @@ use ::gateway::Shard;
use ::model::event::Event;
use ::model::{Message, Reaction, GuildId, Channel};
use chrono::{Utc, Timelike};
+use tokio_core::reactor::Handle;
#[cfg(feature="framework")]
use ::ext::framework::{Framework, ReactionAction};
@@ -61,14 +63,15 @@ fn context(conn: &Arc<Mutex<Shard>>,
// Heck you macro hygiene.
macro_rules! impl_reaction_events {
- (($event:ident, $conn:ident, $data:ident, $event_handler:ident, $framework:ident), $type_of_action:ident, $dispatch_name:ident) => {
+ (($event:ident, $conn:ident, $data:ident, $event_handler:ident, $framework:ident, $handle:ident), $type_of_action:ident, $dispatch_name:ident) => {
let context = context($conn, $data);
let framework = $framework.lock().unwrap();
if framework.initialized {
$dispatch_name(context.clone(),
$event.reaction.clone(),
- $event_handler);
+ $event_handler,
+ $handle);
let res = framework.reaction_actions
.iter()
@@ -84,7 +87,7 @@ macro_rules! impl_reaction_events {
f(context, $event.reaction.message_id, $event.reaction.channel_id);
}
} else {
- $dispatch_name(context, $event.reaction, $event_handler);
+ $dispatch_name(context, $event.reaction, $event_handler, $handle);
}
}
}
@@ -92,9 +95,10 @@ macro_rules! impl_reaction_events {
#[cfg(feature="framework")]
pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
- framework: &Arc<Mutex<Framework>>,
+ framework: &Arc<sync::Mutex<Framework>>,
data: &Arc<Mutex<ShareMap>>,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
match event {
Event::MessageCreate(event) => {
let context = context(conn, data);
@@ -103,20 +107,21 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
if framework.initialized {
dispatch_message(context.clone(),
event.message.clone(),
- event_handler);
+ event_handler,
+ tokio_handle);
- framework.dispatch(context, event.message);
+ framework.dispatch(context, event.message, tokio_handle);
} else {
- dispatch_message(context, event.message, event_handler);
+ dispatch_message(context, event.message, event_handler, tokio_handle);
}
},
Event::ReactionAdd(event) => {
- impl_reaction_events!((event, conn, data, event_handler, framework), Add, dispatch_reaction_add);
+ impl_reaction_events!((event, conn, data, event_handler, framework, tokio_handle), Add, dispatch_reaction_add);
},
Event::ReactionRemove(event) => {
- impl_reaction_events!((event, conn, data, event_handler, framework), Remove, dispatch_reaction_remove);
+ impl_reaction_events!((event, conn, data, event_handler, framework, tokio_handle), Remove, dispatch_reaction_remove);
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, conn, data, event_handler, tokio_handle),
}
}
@@ -124,56 +129,65 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
data: &Arc<Mutex<ShareMap>>,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
match event {
Event::MessageCreate(event) => {
let context = context(conn, data);
dispatch_message(context,
event.message,
- event_handler);
+ event_handler,
+ tokio_handle);
},
Event::ReactionAdd(event) => {
let context = context(conn, data);
- dispatch_reaction_add(context, event.reaction);
+ dispatch_reaction_add(context, event.reaction, tokio_handle);
},
Event::ReactionRemove(event) => {
let context = context(conn, data);
- dispatch_reaction_remove(context, event.reaction);
+ dispatch_reaction_remove(context, event.reaction, tokio_handle);
},
- other => handle_event(other, conn, data, event_handler),
+ other => handle_event(other, conn, data, event_handler, tokio_handle),
}
}
#[allow(unused_mut)]
fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context,
mut message: Message,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
let h = event_handler.clone();
- thread::spawn(move || {
+ tokio_handle.spawn_fn(move || {
#[cfg(feature="model")]
{
message.transform_content();
}
h.on_message(context, message);
+
+ Ok(())
});
}
fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Context,
reaction: Reaction,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
let h = event_handler.clone();
- thread::spawn(move || {
+ tokio_handle.spawn_fn(move || {
h.on_reaction_add(context, reaction);
+ Ok(())
});
}
fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Context,
reaction: Reaction,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
let h = event_handler.clone();
- thread::spawn(move || {
+ tokio_handle.spawn_fn(move || {
h.on_reaction_remove(context, reaction);
+ Ok(())
});
}
@@ -181,7 +195,8 @@ fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Co
fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
data: &Arc<Mutex<ShareMap>>,
- event_handler: &Arc<H>) {
+ event_handler: &Arc<H>,
+ tokio_handle: &Handle) {
#[cfg(feature="cache")]
let mut last_guild_create_time = now!();
@@ -208,11 +223,17 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
match event.channel {
Channel::Private(channel) => {
- thread::spawn(move || h.on_private_channel_create(context, channel));
+ tokio_handle.spawn_fn(move || {
+ h.on_private_channel_create(context, channel);
+ Ok(())
+ });
},
Channel::Group(_) => {},
Channel::Guild(channel) => {
- thread::spawn(move || h.on_channel_create(context, channel));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_create(context, channel);
+ Ok(())
+ });
},
}
},
@@ -225,7 +246,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
Channel::Private(_) | Channel::Group(_) => {}
Channel::Guild(channel) => {
let h = event_handler.clone();
- thread::spawn(move || h.on_channel_delete(context, channel));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_delete(context, channel);
+ Ok(())
+ });
},
}
},
@@ -233,7 +257,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_channel_pins_update(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_pins_update(context, event);
+ Ok(())
+ });
},
Event::ChannelRecipientAdd(mut event) => {
update!(update_with_channel_recipient_add, @event);
@@ -241,7 +268,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_channel_recipient_addition(context, event.channel_id, event.user));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_recipient_addition(context, event.channel_id, event.user);
+ Ok(())
+ });
},
Event::ChannelRecipientRemove(event) => {
update!(update_with_channel_recipient_remove, event);
@@ -249,7 +279,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_channel_recipient_removal(context, event.channel_id, event.user));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_recipient_removal(context, event.channel_id, event.user);
+ Ok(())
+ });
},
Event::ChannelUpdate(event) => {
update!(update_with_channel_update, event);
@@ -259,22 +292,34 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
let before = CACHE.read().unwrap().channel(event.channel.id());
- thread::spawn(move || h.on_channel_update(context, before, event.channel));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_update(context, before, event.channel);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_channel_update(context, event.channel));
+ tokio_handle.spawn_fn(move || {
+ h.on_channel_update(context, event.channel);
+ Ok(())
+ });
}}
},
Event::GuildBanAdd(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_ban_addition(context, event.guild_id, event.user));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_ban_addition(context, event.guild_id, event.user);
+ Ok(())
+ });
},
Event::GuildBanRemove(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_ban_removal(context, event.guild_id, event.user));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_ban_removal(context, event.guild_id, event.user);
+ Ok(())
+ });
},
Event::GuildCreate(event) => {
#[cfg(feature="cache")]
@@ -301,7 +346,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
.map(|(&id, _)| id)
.collect::<Vec<GuildId>>();
- thread::spawn(move || h.on_cached(context, guild_amount));
+ tokio_handle.spawn_fn(move || {
+ h.on_cached(context, guild_amount);
+ Ok(())
+ });
}
}
@@ -309,9 +357,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_guild_create(context, event.guild, _is_new));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_create(context, event.guild, _is_new);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_create(context, event.guild));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_create(context, event.guild);
+ Ok(())
+ });
}}
},
Event::GuildDelete(event) => {
@@ -320,9 +374,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_guild_delete(context, event.guild, _full));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_delete(context, event.guild, _full);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_delete(context, event.guild));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_delete(context, event.guild);
+ Ok(())
+ });
}}
},
Event::GuildEmojisUpdate(event) => {
@@ -331,13 +391,19 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_emojis_update(context, event.guild_id, event.emojis));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_emojis_update(context, event.guild_id, event.emojis);
+ Ok(())
+ });
},
Event::GuildIntegrationsUpdate(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_integrations_update(context, event.guild_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_integrations_update(context, event.guild_id);
+ Ok(())
+ });
},
Event::GuildMemberAdd(mut event) => {
update!(update_with_guild_member_add, @event);
@@ -345,7 +411,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_member_addition(context, event.guild_id, event.member));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_member_addition(context, event.guild_id, event.member);
+ Ok(())
+ });
},
Event::GuildMemberRemove(event) => {
let _member = update!(update_with_guild_member_remove, event);
@@ -353,9 +422,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_guild_member_removal(context, event.guild_id, event.user, _member));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_member_removal(context, event.guild_id, event.user, _member);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_member_removal(context, event.guild_id, event.user));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_member_removal(context, event.guild_id, event.user);
+ Ok(())
+ });
}}
},
Event::GuildMemberUpdate(event) => {
@@ -373,9 +448,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
.unwrap()
.clone();
- thread::spawn(move || h.on_guild_member_update(context, _before, after));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_member_update(context, _before, after);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_member_update(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_member_update(context, event);
+ Ok(())
+ });
}}
},
Event::GuildMembersChunk(event) => {
@@ -384,7 +465,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_members_chunk(context, event.guild_id, event.members));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_members_chunk(context, event.guild_id, event.members);
+ Ok(())
+ });
},
Event::GuildRoleCreate(event) => {
update!(update_with_guild_role_create, event);
@@ -392,7 +476,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_role_create(context, event.guild_id, event.role));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_role_create(context, event.guild_id, event.role);
+ Ok(())
+ });
},
Event::GuildRoleDelete(event) => {
let _role = update!(update_with_guild_role_delete, event);
@@ -400,9 +487,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_guild_role_delete(context, event.guild_id, event.role_id, _role));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_role_delete(context, event.guild_id, event.role_id, _role);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_role_delete(context, event.guild_id, event.role_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_role_delete(context, event.guild_id, event.role_id);
+ Ok(())
+ });
}}
},
Event::GuildRoleUpdate(event) => {
@@ -411,9 +504,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_guild_role_update(context, event.guild_id, _before, event.role));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_role_update(context, event.guild_id, _before, event.role);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_role_update(context, event.guild_id, event.role));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_role_update(context, event.guild_id, event.role);
+ Ok(())
+ });
}}
},
Event::GuildUnavailable(event) => {
@@ -422,7 +521,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_guild_unavailable(context, event.guild_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_unavailable(context, event.guild_id);
+ Ok(())
+ });
},
Event::GuildUpdate(event) => {
update!(update_with_guild_update, event);
@@ -437,9 +539,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
.get(&event.guild.id)
.cloned();
- thread::spawn(move || h.on_guild_update(context, before, event.guild));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_update(context, before, event.guild);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_guild_update(context, event.guild));
+ tokio_handle.spawn_fn(move || {
+ h.on_guild_update(context, event.guild);
+ Ok(())
+ });
}}
},
// Already handled by the framework check macro
@@ -448,19 +556,28 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_message_delete_bulk(context, event.channel_id, event.ids));
+ tokio_handle.spawn_fn(move || {
+ h.on_message_delete_bulk(context, event.channel_id, event.ids);
+ Ok(())
+ });
},
Event::MessageDelete(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_message_delete(context, event.channel_id, event.message_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_message_delete(context, event.channel_id, event.message_id);
+ Ok(())
+ });
},
Event::MessageUpdate(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_message_update(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_message_update(context, event);
+ Ok(())
+ });
},
Event::PresencesReplace(event) => {
update!(update_with_presences_replace, event);
@@ -468,7 +585,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_presence_replace(context, event.presences));
+ tokio_handle.spawn_fn(move || {
+ h.on_presence_replace(context, event.presences);
+ Ok(())
+ });
},
Event::PresenceUpdate(mut event) => {
update!(update_with_presence_update, @event);
@@ -476,7 +596,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_presence_update(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_presence_update(context, event);
+ Ok(())
+ });
},
// Already handled by the framework check macro
@@ -486,7 +609,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_reaction_remove_all(context, event.channel_id, event.message_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_reaction_remove_all(context, event.channel_id, event.message_id);
+ Ok(())
+ });
},
Event::Ready(event) => {
update!(update_with_ready, event);
@@ -499,32 +625,47 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_ready(context, event.ready));
+ tokio_handle.spawn_fn(move || {
+ h.on_ready(context, event.ready);
+ Ok(())
+ });
});
} else {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_ready(context, event.ready));
+ tokio_handle.spawn_fn(move || {
+ h.on_ready(context, event.ready);
+ Ok(())
+ });
}}
},
Event::Resumed(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_resume(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_resume(context, event);
+ Ok(())
+ });
},
Event::TypingStart(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_typing_start(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_typing_start(context, event);
+ Ok(())
+ });
},
Event::Unknown(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_unknown(context, event.kind, event.value));
+ tokio_handle.spawn_fn(move || {
+ h.on_unknown(context, event.kind, event.value);
+ Ok(())
+ });
},
Event::UserUpdate(event) => {
let _before = update!(update_with_user_update, event);
@@ -532,16 +673,25 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let h = event_handler.clone();
feature_cache! {{
- thread::spawn(move || h.on_user_update(context, _before, event.current_user));
+ tokio_handle.spawn_fn(move || {
+ h.on_user_update(context, _before, event.current_user);
+ Ok(())
+ });
} else {
- thread::spawn(move || h.on_user_update(context, event.current_user));
+ tokio_handle.spawn_fn(move || {
+ h.on_user_update(context, event.current_user);
+ Ok(())
+ });
}}
},
Event::VoiceServerUpdate(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_voice_server_update(context, event));
+ tokio_handle.spawn_fn(move || {
+ h.on_voice_server_update(context, event);
+ Ok(())
+ });
},
Event::VoiceStateUpdate(event) => {
update!(update_with_voice_state_update, event);
@@ -549,13 +699,19 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_voice_state_update(context, event.guild_id, event.voice_state));
+ tokio_handle.spawn_fn(move || {
+ h.on_voice_state_update(context, event.guild_id, event.voice_state);
+ Ok(())
+ });
},
Event::WebhookUpdate(event) => {
let context = context(conn, data);
let h = event_handler.clone();
- thread::spawn(move || h.on_webhook_update(context, event.guild_id, event.channel_id));
+ tokio_handle.spawn_fn(move || {
+ h.on_webhook_update(context, event.guild_id, event.channel_id);
+ Ok(())
+ });
},
}
}
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 4ffbf3f..12b3152 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -36,7 +36,9 @@ pub use ::http as rest;
pub use ::CACHE;
use self::dispatch::dispatch;
-use std::sync::{Arc, Mutex};
+use std::sync::{self, Arc};
+use parking_lot::Mutex;
+use tokio_core::reactor::Core;
use std::time::Duration;
use std::{mem, thread};
use super::gateway::Shard;
@@ -133,7 +135,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
/// macro_rules! reg {
/// ($ctx:ident $name:expr) => {
/// {
- /// let mut data = $ctx.data.lock().unwrap();
+ /// let mut data = $ctx.data.lock();
/// let counter = data.get_mut::<MessageEventCounter>().unwrap();
/// let entry = counter.entry($name).or_insert(0);
/// *entry += 1;
@@ -153,7 +155,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
/// let mut client = Client::new(&env::var("DISCORD_TOKEN").unwrap(), Handler);
///
/// {
- /// let mut data = client.data.lock().unwrap();
+ /// let mut data = client.data.lock();
/// data.insert::<MessageEventCounter>(HashMap::default());
/// }
///
@@ -177,8 +179,8 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
/// [`on_ready`]: #method.on_ready
event_handler: Arc<H>,
#[cfg(feature="framework")]
- framework: Arc<Mutex<Framework>>,
- token: Arc<Mutex<String>>,
+ framework: Arc<sync::Mutex<Framework>>,
+ token: Arc<sync::Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> Client<H> {
@@ -263,7 +265,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
#[cfg(feature="framework")]
pub fn with_framework<F>(&mut self, f: F)
where F: FnOnce(Framework) -> Framework + Send + Sync + 'static {
- self.framework = Arc::new(Mutex::new(f(Framework::default())));
+ self.framework = Arc::new(sync::Mutex::new(f(Framework::default())));
}
/// Establish the connection and start listening for events.
@@ -599,7 +601,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
.update_current_user(user.id, user.bot);
}
- let gateway_url = Arc::new(Mutex::new(url));
+ let gateway_url = Arc::new(sync::Mutex::new(url));
let shards_index = shard_data[0];
let shards_total = shard_data[1] + 1;
@@ -662,30 +664,30 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
}
struct BootInfo {
- gateway_url: Arc<Mutex<String>>,
+ gateway_url: Arc<sync::Mutex<String>>,
shard_info: [u64; 2],
- token: Arc<Mutex<String>>,
+ token: Arc<sync::Mutex<String>>,
}
#[cfg(feature="framework")]
struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
- framework: Arc<Mutex<Framework>>,
- gateway_url: Arc<Mutex<String>>,
+ framework: Arc<sync::Mutex<Framework>>,
+ gateway_url: Arc<sync::Mutex<String>>,
shard: Arc<Mutex<Shard>>,
shard_info: [u64; 2],
- token: Arc<Mutex<String>>,
+ token: Arc<sync::Mutex<String>>,
}
#[cfg(not(feature="framework"))]
struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
- gateway_url: Arc<Mutex<String>>,
+ gateway_url: Arc<sync::Mutex<String>>,
shard: Arc<Mutex<Shard>>,
shard_info: [u64; 2],
- token: Arc<Mutex<String>>,
+ token: Arc<sync::Mutex<String>>,
}
fn boot_shard(info: &BootInfo) -> Result<Shard> {
@@ -743,7 +745,7 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<
match boot {
Ok(new_shard) => {
- *info.shard.lock().unwrap() = new_shard;
+ *info.shard.lock() = new_shard;
boot_successful = true;
@@ -767,9 +769,11 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<
fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) {
// This is currently all ducktape. Redo this.
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
loop {
{
- let mut shard = info.shard.lock().unwrap();
+ let mut shard = info.shard.lock();
if let Err(why) = shard.check_heartbeat() {
error!("Failed to heartbeat and reconnect: {:?}", why);
@@ -780,13 +784,13 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<
#[cfg(feature="voice")]
{
- let mut shard = info.shard.lock().unwrap();
+ let mut shard = info.shard.lock();
shard.cycle_voice_recv();
}
let event = {
- let mut shard = info.shard.lock().unwrap();
+ let mut shard = info.shard.lock();
let event = match shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
@@ -838,25 +842,29 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<
&info.shard,
&info.framework,
&info.data,
- &info.event_handler);
+ &info.event_handler,
+ &handle);
} else {
dispatch(event,
&info.shard,
&info.data,
- &info.event_handler);
+ &info.event_handler,
+ &handle);
}}
+
+ core.turn(None);
}
}
fn init_client<H: EventHandler + Send + Sync + 'static>(token: String, handler: H) -> Client<H> {
http::set_token(&token);
- let locked = Arc::new(Mutex::new(token));
+ let locked = Arc::new(sync::Mutex::new(token));
feature_framework! {{
Client {
data: Arc::new(Mutex::new(ShareMap::custom())),
event_handler: Arc::new(handler),
- framework: Arc::new(Mutex::new(Framework::default())),
+ framework: Arc::new(sync::Mutex::new(Framework::default())),
token: locked,
}
} else {