diff options
| author | Alex Lyon <[email protected]> | 2017-07-13 21:30:00 -0700 |
|---|---|---|
| committer | alex <[email protected]> | 2017-07-14 06:30:00 +0200 |
| commit | 88765d0a978001ff88a1ee12798a725b7f5a90e9 (patch) | |
| tree | d66970df218ac9e9c4aa3b038e56ee6ce7c81292 /src/client | |
| parent | Fix the doc on `PrivateChannel::name` (diff) | |
| download | serenity-88765d0a978001ff88a1ee12798a725b7f5a90e9.tar.xz serenity-88765d0a978001ff88a1ee12798a725b7f5a90e9.zip | |
Switch to tokio for events (#122)
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/context.rs | 21 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 298 | ||||
| -rw-r--r-- | src/client/mod.rs | 52 |
3 files changed, 268 insertions, 103 deletions
diff --git a/src/client/context.rs b/src/client/context.rs index 97cde50..3053648 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -1,7 +1,8 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use typemap::ShareMap; use ::gateway::Shard; use ::model::*; +use parking_lot::Mutex; #[cfg(feature="cache")] use super::CACHE; @@ -125,7 +126,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn online(&self) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_status(OnlineStatus::Online); } @@ -154,7 +155,7 @@ impl Context { /// /// [`Idle`]: ../model/enum.OnlineStatus.html#variant.Idle pub fn idle(&self) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_status(OnlineStatus::Idle); } @@ -183,7 +184,7 @@ impl Context { /// /// [`DoNotDisturb`]: ../model/enum.OnlineStatus.html#variant.DoNotDisturb pub fn dnd(&self) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_status(OnlineStatus::DoNotDisturb); } @@ -213,7 +214,7 @@ impl Context { /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`Invisible`]: ../model/enum.OnlineStatus.html#variant.Invisible pub fn invisible(&self) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_status(OnlineStatus::Invisible); } @@ -245,7 +246,7 @@ impl Context { /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online /// [`set_presence`]: #method.set_presence pub fn reset_presence(&self) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_presence(None, OnlineStatus::Online, false) } @@ -280,7 +281,7 @@ impl Context { /// /// [`Online`]: ../model/enum.OnlineStatus.html#variant.Online pub fn set_game(&self, game: Game) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_presence(Some(game), OnlineStatus::Online, false); } @@ -326,7 +327,7 @@ impl Context { url: None, }; - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_presence(Some(game), OnlineStatus::Online, false); } @@ -380,7 +381,7 @@ impl Context { game: Option<Game>, status: OnlineStatus, afk: bool) { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.set_presence(game, status, afk) } @@ -391,7 +392,7 @@ impl Context { /// /// [`Client::start`]: ./struct.Client.html#method.start pub fn quit(&self) -> Result<()> { - let mut shard = self.shard.lock().unwrap(); + let mut shard = self.shard.lock(); shard.shutdown_clean() } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 0def95e..f2850ec 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{self, Arc}; +use parking_lot::Mutex; use std::thread; use std::time; use super::event_handler::EventHandler; @@ -8,6 +9,7 @@ use ::gateway::Shard; use ::model::event::Event; use ::model::{Message, Reaction, GuildId, Channel}; use chrono::{Utc, Timelike}; +use tokio_core::reactor::Handle; #[cfg(feature="framework")] use ::ext::framework::{Framework, ReactionAction}; @@ -61,14 +63,15 @@ fn context(conn: &Arc<Mutex<Shard>>, // Heck you macro hygiene. macro_rules! impl_reaction_events { - (($event:ident, $conn:ident, $data:ident, $event_handler:ident, $framework:ident), $type_of_action:ident, $dispatch_name:ident) => { + (($event:ident, $conn:ident, $data:ident, $event_handler:ident, $framework:ident, $handle:ident), $type_of_action:ident, $dispatch_name:ident) => { let context = context($conn, $data); let framework = $framework.lock().unwrap(); if framework.initialized { $dispatch_name(context.clone(), $event.reaction.clone(), - $event_handler); + $event_handler, + $handle); let res = framework.reaction_actions .iter() @@ -84,7 +87,7 @@ macro_rules! impl_reaction_events { f(context, $event.reaction.message_id, $event.reaction.channel_id); } } else { - $dispatch_name(context, $event.reaction, $event_handler); + $dispatch_name(context, $event.reaction, $event_handler, $handle); } } } @@ -92,9 +95,10 @@ macro_rules! impl_reaction_events { #[cfg(feature="framework")] pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, - framework: &Arc<Mutex<Framework>>, + framework: &Arc<sync::Mutex<Framework>>, data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { match event { Event::MessageCreate(event) => { let context = context(conn, data); @@ -103,20 +107,21 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, if framework.initialized { dispatch_message(context.clone(), event.message.clone(), - event_handler); + event_handler, + tokio_handle); - framework.dispatch(context, event.message); + framework.dispatch(context, event.message, tokio_handle); } else { - dispatch_message(context, event.message, event_handler); + dispatch_message(context, event.message, event_handler, tokio_handle); } }, Event::ReactionAdd(event) => { - impl_reaction_events!((event, conn, data, event_handler, framework), Add, dispatch_reaction_add); + impl_reaction_events!((event, conn, data, event_handler, framework, tokio_handle), Add, dispatch_reaction_add); }, Event::ReactionRemove(event) => { - impl_reaction_events!((event, conn, data, event_handler, framework), Remove, dispatch_reaction_remove); + impl_reaction_events!((event, conn, data, event_handler, framework, tokio_handle), Remove, dispatch_reaction_remove); }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, conn, data, event_handler, tokio_handle), } } @@ -124,56 +129,65 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { match event { Event::MessageCreate(event) => { let context = context(conn, data); dispatch_message(context, event.message, - event_handler); + event_handler, + tokio_handle); }, Event::ReactionAdd(event) => { let context = context(conn, data); - dispatch_reaction_add(context, event.reaction); + dispatch_reaction_add(context, event.reaction, tokio_handle); }, Event::ReactionRemove(event) => { let context = context(conn, data); - dispatch_reaction_remove(context, event.reaction); + dispatch_reaction_remove(context, event.reaction, tokio_handle); }, - other => handle_event(other, conn, data, event_handler), + other => handle_event(other, conn, data, event_handler, tokio_handle), } } #[allow(unused_mut)] fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context, mut message: Message, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { let h = event_handler.clone(); - thread::spawn(move || { + tokio_handle.spawn_fn(move || { #[cfg(feature="model")] { message.transform_content(); } h.on_message(context, message); + + Ok(()) }); } fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Context, reaction: Reaction, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { let h = event_handler.clone(); - thread::spawn(move || { + tokio_handle.spawn_fn(move || { h.on_reaction_add(context, reaction); + Ok(()) }); } fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Context, reaction: Reaction, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { let h = event_handler.clone(); - thread::spawn(move || { + tokio_handle.spawn_fn(move || { h.on_reaction_remove(context, reaction); + Ok(()) }); } @@ -181,7 +195,8 @@ fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Co fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, data: &Arc<Mutex<ShareMap>>, - event_handler: &Arc<H>) { + event_handler: &Arc<H>, + tokio_handle: &Handle) { #[cfg(feature="cache")] let mut last_guild_create_time = now!(); @@ -208,11 +223,17 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); match event.channel { Channel::Private(channel) => { - thread::spawn(move || h.on_private_channel_create(context, channel)); + tokio_handle.spawn_fn(move || { + h.on_private_channel_create(context, channel); + Ok(()) + }); }, Channel::Group(_) => {}, Channel::Guild(channel) => { - thread::spawn(move || h.on_channel_create(context, channel)); + tokio_handle.spawn_fn(move || { + h.on_channel_create(context, channel); + Ok(()) + }); }, } }, @@ -225,7 +246,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, Channel::Private(_) | Channel::Group(_) => {} Channel::Guild(channel) => { let h = event_handler.clone(); - thread::spawn(move || h.on_channel_delete(context, channel)); + tokio_handle.spawn_fn(move || { + h.on_channel_delete(context, channel); + Ok(()) + }); }, } }, @@ -233,7 +257,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_channel_pins_update(context, event)); + tokio_handle.spawn_fn(move || { + h.on_channel_pins_update(context, event); + Ok(()) + }); }, Event::ChannelRecipientAdd(mut event) => { update!(update_with_channel_recipient_add, @event); @@ -241,7 +268,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_channel_recipient_addition(context, event.channel_id, event.user)); + tokio_handle.spawn_fn(move || { + h.on_channel_recipient_addition(context, event.channel_id, event.user); + Ok(()) + }); }, Event::ChannelRecipientRemove(event) => { update!(update_with_channel_recipient_remove, event); @@ -249,7 +279,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_channel_recipient_removal(context, event.channel_id, event.user)); + tokio_handle.spawn_fn(move || { + h.on_channel_recipient_removal(context, event.channel_id, event.user); + Ok(()) + }); }, Event::ChannelUpdate(event) => { update!(update_with_channel_update, event); @@ -259,22 +292,34 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ let before = CACHE.read().unwrap().channel(event.channel.id()); - thread::spawn(move || h.on_channel_update(context, before, event.channel)); + tokio_handle.spawn_fn(move || { + h.on_channel_update(context, before, event.channel); + Ok(()) + }); } else { - thread::spawn(move || h.on_channel_update(context, event.channel)); + tokio_handle.spawn_fn(move || { + h.on_channel_update(context, event.channel); + Ok(()) + }); }} }, Event::GuildBanAdd(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_ban_addition(context, event.guild_id, event.user)); + tokio_handle.spawn_fn(move || { + h.on_guild_ban_addition(context, event.guild_id, event.user); + Ok(()) + }); }, Event::GuildBanRemove(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_ban_removal(context, event.guild_id, event.user)); + tokio_handle.spawn_fn(move || { + h.on_guild_ban_removal(context, event.guild_id, event.user); + Ok(()) + }); }, Event::GuildCreate(event) => { #[cfg(feature="cache")] @@ -301,7 +346,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, .map(|(&id, _)| id) .collect::<Vec<GuildId>>(); - thread::spawn(move || h.on_cached(context, guild_amount)); + tokio_handle.spawn_fn(move || { + h.on_cached(context, guild_amount); + Ok(()) + }); } } @@ -309,9 +357,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_guild_create(context, event.guild, _is_new)); + tokio_handle.spawn_fn(move || { + h.on_guild_create(context, event.guild, _is_new); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_create(context, event.guild)); + tokio_handle.spawn_fn(move || { + h.on_guild_create(context, event.guild); + Ok(()) + }); }} }, Event::GuildDelete(event) => { @@ -320,9 +374,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_guild_delete(context, event.guild, _full)); + tokio_handle.spawn_fn(move || { + h.on_guild_delete(context, event.guild, _full); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_delete(context, event.guild)); + tokio_handle.spawn_fn(move || { + h.on_guild_delete(context, event.guild); + Ok(()) + }); }} }, Event::GuildEmojisUpdate(event) => { @@ -331,13 +391,19 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_emojis_update(context, event.guild_id, event.emojis)); + tokio_handle.spawn_fn(move || { + h.on_guild_emojis_update(context, event.guild_id, event.emojis); + Ok(()) + }); }, Event::GuildIntegrationsUpdate(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_integrations_update(context, event.guild_id)); + tokio_handle.spawn_fn(move || { + h.on_guild_integrations_update(context, event.guild_id); + Ok(()) + }); }, Event::GuildMemberAdd(mut event) => { update!(update_with_guild_member_add, @event); @@ -345,7 +411,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_member_addition(context, event.guild_id, event.member)); + tokio_handle.spawn_fn(move || { + h.on_guild_member_addition(context, event.guild_id, event.member); + Ok(()) + }); }, Event::GuildMemberRemove(event) => { let _member = update!(update_with_guild_member_remove, event); @@ -353,9 +422,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_guild_member_removal(context, event.guild_id, event.user, _member)); + tokio_handle.spawn_fn(move || { + h.on_guild_member_removal(context, event.guild_id, event.user, _member); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_member_removal(context, event.guild_id, event.user)); + tokio_handle.spawn_fn(move || { + h.on_guild_member_removal(context, event.guild_id, event.user); + Ok(()) + }); }} }, Event::GuildMemberUpdate(event) => { @@ -373,9 +448,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, .unwrap() .clone(); - thread::spawn(move || h.on_guild_member_update(context, _before, after)); + tokio_handle.spawn_fn(move || { + h.on_guild_member_update(context, _before, after); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_member_update(context, event)); + tokio_handle.spawn_fn(move || { + h.on_guild_member_update(context, event); + Ok(()) + }); }} }, Event::GuildMembersChunk(event) => { @@ -384,7 +465,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_members_chunk(context, event.guild_id, event.members)); + tokio_handle.spawn_fn(move || { + h.on_guild_members_chunk(context, event.guild_id, event.members); + Ok(()) + }); }, Event::GuildRoleCreate(event) => { update!(update_with_guild_role_create, event); @@ -392,7 +476,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_role_create(context, event.guild_id, event.role)); + tokio_handle.spawn_fn(move || { + h.on_guild_role_create(context, event.guild_id, event.role); + Ok(()) + }); }, Event::GuildRoleDelete(event) => { let _role = update!(update_with_guild_role_delete, event); @@ -400,9 +487,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_guild_role_delete(context, event.guild_id, event.role_id, _role)); + tokio_handle.spawn_fn(move || { + h.on_guild_role_delete(context, event.guild_id, event.role_id, _role); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_role_delete(context, event.guild_id, event.role_id)); + tokio_handle.spawn_fn(move || { + h.on_guild_role_delete(context, event.guild_id, event.role_id); + Ok(()) + }); }} }, Event::GuildRoleUpdate(event) => { @@ -411,9 +504,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_guild_role_update(context, event.guild_id, _before, event.role)); + tokio_handle.spawn_fn(move || { + h.on_guild_role_update(context, event.guild_id, _before, event.role); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_role_update(context, event.guild_id, event.role)); + tokio_handle.spawn_fn(move || { + h.on_guild_role_update(context, event.guild_id, event.role); + Ok(()) + }); }} }, Event::GuildUnavailable(event) => { @@ -422,7 +521,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_guild_unavailable(context, event.guild_id)); + tokio_handle.spawn_fn(move || { + h.on_guild_unavailable(context, event.guild_id); + Ok(()) + }); }, Event::GuildUpdate(event) => { update!(update_with_guild_update, event); @@ -437,9 +539,15 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, .get(&event.guild.id) .cloned(); - thread::spawn(move || h.on_guild_update(context, before, event.guild)); + tokio_handle.spawn_fn(move || { + h.on_guild_update(context, before, event.guild); + Ok(()) + }); } else { - thread::spawn(move || h.on_guild_update(context, event.guild)); + tokio_handle.spawn_fn(move || { + h.on_guild_update(context, event.guild); + Ok(()) + }); }} }, // Already handled by the framework check macro @@ -448,19 +556,28 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_message_delete_bulk(context, event.channel_id, event.ids)); + tokio_handle.spawn_fn(move || { + h.on_message_delete_bulk(context, event.channel_id, event.ids); + Ok(()) + }); }, Event::MessageDelete(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_message_delete(context, event.channel_id, event.message_id)); + tokio_handle.spawn_fn(move || { + h.on_message_delete(context, event.channel_id, event.message_id); + Ok(()) + }); }, Event::MessageUpdate(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_message_update(context, event)); + tokio_handle.spawn_fn(move || { + h.on_message_update(context, event); + Ok(()) + }); }, Event::PresencesReplace(event) => { update!(update_with_presences_replace, event); @@ -468,7 +585,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_presence_replace(context, event.presences)); + tokio_handle.spawn_fn(move || { + h.on_presence_replace(context, event.presences); + Ok(()) + }); }, Event::PresenceUpdate(mut event) => { update!(update_with_presence_update, @event); @@ -476,7 +596,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_presence_update(context, event)); + tokio_handle.spawn_fn(move || { + h.on_presence_update(context, event); + Ok(()) + }); }, // Already handled by the framework check macro @@ -486,7 +609,10 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_reaction_remove_all(context, event.channel_id, event.message_id)); + tokio_handle.spawn_fn(move || { + h.on_reaction_remove_all(context, event.channel_id, event.message_id); + Ok(()) + }); }, Event::Ready(event) => { update!(update_with_ready, event); @@ -499,32 +625,47 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_ready(context, event.ready)); + tokio_handle.spawn_fn(move || { + h.on_ready(context, event.ready); + Ok(()) + }); }); } else { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_ready(context, event.ready)); + tokio_handle.spawn_fn(move || { + h.on_ready(context, event.ready); + Ok(()) + }); }} }, Event::Resumed(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_resume(context, event)); + tokio_handle.spawn_fn(move || { + h.on_resume(context, event); + Ok(()) + }); }, Event::TypingStart(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_typing_start(context, event)); + tokio_handle.spawn_fn(move || { + h.on_typing_start(context, event); + Ok(()) + }); }, Event::Unknown(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_unknown(context, event.kind, event.value)); + tokio_handle.spawn_fn(move || { + h.on_unknown(context, event.kind, event.value); + Ok(()) + }); }, Event::UserUpdate(event) => { let _before = update!(update_with_user_update, event); @@ -532,16 +673,25 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let h = event_handler.clone(); feature_cache! {{ - thread::spawn(move || h.on_user_update(context, _before, event.current_user)); + tokio_handle.spawn_fn(move || { + h.on_user_update(context, _before, event.current_user); + Ok(()) + }); } else { - thread::spawn(move || h.on_user_update(context, event.current_user)); + tokio_handle.spawn_fn(move || { + h.on_user_update(context, event.current_user); + Ok(()) + }); }} }, Event::VoiceServerUpdate(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_voice_server_update(context, event)); + tokio_handle.spawn_fn(move || { + h.on_voice_server_update(context, event); + Ok(()) + }); }, Event::VoiceStateUpdate(event) => { update!(update_with_voice_state_update, event); @@ -549,13 +699,19 @@ fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_voice_state_update(context, event.guild_id, event.voice_state)); + tokio_handle.spawn_fn(move || { + h.on_voice_state_update(context, event.guild_id, event.voice_state); + Ok(()) + }); }, Event::WebhookUpdate(event) => { let context = context(conn, data); let h = event_handler.clone(); - thread::spawn(move || h.on_webhook_update(context, event.guild_id, event.channel_id)); + tokio_handle.spawn_fn(move || { + h.on_webhook_update(context, event.guild_id, event.channel_id); + Ok(()) + }); }, } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 4ffbf3f..12b3152 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -36,7 +36,9 @@ pub use ::http as rest; pub use ::CACHE; use self::dispatch::dispatch; -use std::sync::{Arc, Mutex}; +use std::sync::{self, Arc}; +use parking_lot::Mutex; +use tokio_core::reactor::Core; use std::time::Duration; use std::{mem, thread}; use super::gateway::Shard; @@ -133,7 +135,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// macro_rules! reg { /// ($ctx:ident $name:expr) => { /// { - /// let mut data = $ctx.data.lock().unwrap(); + /// let mut data = $ctx.data.lock(); /// let counter = data.get_mut::<MessageEventCounter>().unwrap(); /// let entry = counter.entry($name).or_insert(0); /// *entry += 1; @@ -153,7 +155,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// let mut client = Client::new(&env::var("DISCORD_TOKEN").unwrap(), Handler); /// /// { - /// let mut data = client.data.lock().unwrap(); + /// let mut data = client.data.lock(); /// data.insert::<MessageEventCounter>(HashMap::default()); /// } /// @@ -177,8 +179,8 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { /// [`on_ready`]: #method.on_ready event_handler: Arc<H>, #[cfg(feature="framework")] - framework: Arc<Mutex<Framework>>, - token: Arc<Mutex<String>>, + framework: Arc<sync::Mutex<Framework>>, + token: Arc<sync::Mutex<String>>, } impl<H: EventHandler + Send + Sync + 'static> Client<H> { @@ -263,7 +265,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { #[cfg(feature="framework")] pub fn with_framework<F>(&mut self, f: F) where F: FnOnce(Framework) -> Framework + Send + Sync + 'static { - self.framework = Arc::new(Mutex::new(f(Framework::default()))); + self.framework = Arc::new(sync::Mutex::new(f(Framework::default()))); } /// Establish the connection and start listening for events. @@ -599,7 +601,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { .update_current_user(user.id, user.bot); } - let gateway_url = Arc::new(Mutex::new(url)); + let gateway_url = Arc::new(sync::Mutex::new(url)); let shards_index = shard_data[0]; let shards_total = shard_data[1] + 1; @@ -662,30 +664,30 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { } struct BootInfo { - gateway_url: Arc<Mutex<String>>, + gateway_url: Arc<sync::Mutex<String>>, shard_info: [u64; 2], - token: Arc<Mutex<String>>, + token: Arc<sync::Mutex<String>>, } #[cfg(feature="framework")] struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, - framework: Arc<Mutex<Framework>>, - gateway_url: Arc<Mutex<String>>, + framework: Arc<sync::Mutex<Framework>>, + gateway_url: Arc<sync::Mutex<String>>, shard: Arc<Mutex<Shard>>, shard_info: [u64; 2], - token: Arc<Mutex<String>>, + token: Arc<sync::Mutex<String>>, } #[cfg(not(feature="framework"))] struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, - gateway_url: Arc<Mutex<String>>, + gateway_url: Arc<sync::Mutex<String>>, shard: Arc<Mutex<Shard>>, shard_info: [u64; 2], - token: Arc<Mutex<String>>, + token: Arc<sync::Mutex<String>>, } fn boot_shard(info: &BootInfo) -> Result<Shard> { @@ -743,7 +745,7 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo< match boot { Ok(new_shard) => { - *info.shard.lock().unwrap() = new_shard; + *info.shard.lock() = new_shard; boot_successful = true; @@ -767,9 +769,11 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo< fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) { // This is currently all ducktape. Redo this. + let mut core = Core::new().unwrap(); + let handle = core.handle(); loop { { - let mut shard = info.shard.lock().unwrap(); + let mut shard = info.shard.lock(); if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); @@ -780,13 +784,13 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo< #[cfg(feature="voice")] { - let mut shard = info.shard.lock().unwrap(); + let mut shard = info.shard.lock(); shard.cycle_voice_recv(); } let event = { - let mut shard = info.shard.lock().unwrap(); + let mut shard = info.shard.lock(); let event = match shard.client.recv_json(GatewayEvent::decode) { Err(Error::WebSocket(WebSocketError::IoError(_))) => { @@ -838,25 +842,29 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo< &info.shard, &info.framework, &info.data, - &info.event_handler); + &info.event_handler, + &handle); } else { dispatch(event, &info.shard, &info.data, - &info.event_handler); + &info.event_handler, + &handle); }} + + core.turn(None); } } fn init_client<H: EventHandler + Send + Sync + 'static>(token: String, handler: H) -> Client<H> { http::set_token(&token); - let locked = Arc::new(Mutex::new(token)); + let locked = Arc::new(sync::Mutex::new(token)); feature_framework! {{ Client { data: Arc::new(Mutex::new(ShareMap::custom())), event_handler: Arc::new(handler), - framework: Arc::new(Mutex::new(Framework::default())), + framework: Arc::new(sync::Mutex::new(Framework::default())), token: locked, } } else { |