diff options
| author | Zeyla Hellyer <[email protected]> | 2017-11-13 13:28:08 -0800 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-11-13 13:29:03 -0800 |
| commit | f2c21ef5b15ef1f345cdc30f4b793e55905f15f4 (patch) | |
| tree | 33a0d5e68976a31d750cefaf78df8b2577b63578 /src/client | |
| parent | Fix strange behaviour when the prefix has spaces (#215) (diff) | |
| download | serenity-f2c21ef5b15ef1f345cdc30f4b793e55905f15f4.tar.xz serenity-f2c21ef5b15ef1f345cdc30f4b793e55905f15f4.zip | |
Use the threadpool for framework command execution
Instead of executing framework commands in the shard runner thread
(potentially blocking the shard runner from reading new messages over
the websocket and heartbeating), dispatch framework commands to the
shard runner's threadpool.
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/bridge/gateway/shard_runner.rs | 40 | ||||
| -rw-r--r-- | src/client/dispatch.rs | 404 |
2 files changed, 294 insertions, 150 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index a5fde3d..005f321 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -203,36 +203,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> { false } + #[inline] fn dispatch(&self, event: Event) { - let data = Arc::clone(&self.data); - let event_handler = Arc::clone(&self.event_handler); - let runner_tx = self.runner_tx.clone(); - let shard_id = self.shard.shard_info()[0]; - - feature_framework! {{ - let framework = Arc::clone(&self.framework); - - self.threadpool.execute(move || { - dispatch( - event, - framework, - data, - event_handler, - runner_tx, - shard_id, - ); - }); - } else { - self.threadpool.execute(move || { - dispatch( - event, - data, - event_handler, - runner_tx, - shard_id, - ); - }); - }} + dispatch( + event, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + &self.runner_tx, + &self.threadpool, + self.shard.shard_info()[0], + ); } // Handles a received value over the shard runner rx channel. diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 38ab570..f79203c 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -6,6 +6,7 @@ use super::bridge::gateway::ShardClientMessage; use super::event_handler::EventHandler; use super::Context; use std::sync::mpsc::Sender; +use threadpool::ThreadPool; use typemap::ShareMap; #[cfg(feature = "cache")] @@ -37,20 +38,21 @@ macro_rules! now { } fn context( - data: Arc<Mutex<ShareMap>>, - runner_tx: Sender<ShardClientMessage>, + data: &Arc<Mutex<ShareMap>>, + runner_tx: &Sender<ShardClientMessage>, shard_id: u64, ) -> Context { - Context::new(data, runner_tx, shard_id) + Context::new(Arc::clone(&data), runner_tx.clone(), shard_id) } #[cfg(feature = "framework")] -pub fn dispatch<H: EventHandler + 'static>( +pub fn dispatch<H: EventHandler + Send + Sync + 'static>( event: Event, - framework: Arc<Mutex<Option<Box<Framework + Send>>>>, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - runner_tx: Sender<ShardClientMessage>, + framework: &Arc<Mutex<Option<Box<Framework + Send>>>>, + data: &Arc<Mutex<ShareMap>>, + event_handler: &Arc<H>, + runner_tx: &Sender<ShardClientMessage>, + threadpool: &ThreadPool, shard_id: u64, ) { match event { @@ -60,30 +62,46 @@ pub fn dispatch<H: EventHandler + 'static>( context.clone(), event.message.clone(), event_handler, + threadpool, ); if let Some(ref mut framework) = *framework.lock() { - framework.dispatch(context, event.message); + framework.dispatch(context, event.message, threadpool); } }, - other => handle_event(other, data, event_handler, runner_tx, shard_id), + other => handle_event( + other, + data, + event_handler, + runner_tx, + threadpool, + shard_id, + ), } } #[cfg(not(feature = "framework"))] -pub fn dispatch<H: EventHandler + 'static>( +pub fn dispatch<H: EventHandler + Send + Sync + 'static>( event: Event, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - runner_tx: Sender<ShardClientMessage>, + data: &Arc<Mutex<ShareMap>>, + event_handler: &Arc<H>, + runner_tx: &Sender<ShardClientMessage>, + threadpool: &ThreadPool, shard_id: u64, ) { match event { Event::MessageCreate(event) => { let context = context(data, runner_tx, shard_id); - dispatch_message(context, event.message, event_handler); + dispatch_message(context, event.message, event_handler, threadpool); }, - other => handle_event(other, data, event_handler, runner_tx, shard_id), + other => handle_event( + other, + data, + event_handler, + runner_tx, + threadpool, + shard_id, + ), } } @@ -91,22 +109,28 @@ pub fn dispatch<H: EventHandler + 'static>( fn dispatch_message<H>( context: Context, mut message: Message, - event_handler: Arc<H> -) where H: EventHandler + 'static { + event_handler: &Arc<H>, + threadpool: &ThreadPool, +) where H: EventHandler + Send + Sync + 'static { #[cfg(feature = "model")] { message.transform_content(); } - event_handler.message(context, message); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.message(context, message); + }); } #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event<H: EventHandler + 'static>( +fn handle_event<H: EventHandler + Send + Sync + 'static>( event: Event, - data: Arc<Mutex<ShareMap>>, - event_handler: Arc<H>, - runner_tx: Sender<ShardClientMessage>, + data: &Arc<Mutex<ShareMap>>, + event_handler: &Arc<H>, + runner_tx: &Sender<ShardClientMessage>, + threadpool: &ThreadPool, shard_id: u64, ) { #[cfg(feature = "cache")] @@ -134,14 +158,26 @@ fn handle_event<H: EventHandler + 'static>( // So in short, only exists to reduce unnecessary clutter. match event.channel { Channel::Private(channel) => { - event_handler.private_channel_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.private_channel_create(context, channel); + }); }, Channel::Group(_) => {}, Channel::Guild(channel) => { - event_handler.channel_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_create(context, channel); + }); }, Channel::Category(channel) => { - event_handler.category_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.category_create(context, channel); + }); }, } }, @@ -153,53 +189,89 @@ fn handle_event<H: EventHandler + 'static>( match event.channel { Channel::Private(_) | Channel::Group(_) => {}, Channel::Guild(channel) => { - event_handler.channel_delete(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_delete(context, channel); + }); }, Channel::Category(channel) => { - event_handler.category_delete(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.category_delete(context, channel); + }); }, } }, Event::ChannelPinsUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.channel_pins_update(context, event); + threadpool.execute(move || { + event_handler.channel_pins_update(context, event); + }); }, Event::ChannelRecipientAdd(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); - event_handler.channel_recipient_addition(context, event.channel_id, event.user); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_recipient_addition( + context, + event.channel_id, + event.user, + ); + }); }, Event::ChannelRecipientRemove(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.channel_recipient_removal(context, event.channel_id, event.user); + threadpool.execute(move || { + event_handler.channel_recipient_removal( + context, + event.channel_id, + event.user, + ); + }); }, Event::ChannelUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - let before = CACHE.read().channel(event.channel.id()); - event_handler.channel_update(context, before, event.channel); - } else { - event_handler.channel_update(context, event.channel); - }} + threadpool.execute(move || { + feature_cache! {{ + let before = CACHE.read().channel(event.channel.id()); + + event_handler.channel_update(context, before, event.channel); + } else { + event_handler.channel_update(context, event.channel); + }} + }); }, Event::GuildBanAdd(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.guild_ban_addition(context, event.guild_id, event.user); + }); }, Event::GuildBanRemove(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_ban_removal(context, event.guild_id, event.user); + threadpool.execute(move || { + event_handler.guild_ban_removal(context, event.guild_id, event.user); + }); }, Event::GuildCreate(mut event) => { #[cfg(feature = "cache")] @@ -218,205 +290,277 @@ fn handle_event<H: EventHandler + 'static>( let cache = CACHE.read(); if cache.unavailable_guilds.is_empty() { - let context = context(Arc::clone(&data), runner_tx.clone(), shard_id); + let context = context(data, runner_tx, shard_id); let guild_amount = cache .guilds .iter() .map(|(&id, _)| id) .collect::<Vec<GuildId>>(); + let event_handler = Arc::clone(&event_handler); - event_handler.cached(context, guild_amount); + threadpool.execute(move || { + event_handler.cached(context, guild_amount); + }); } } let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_create(context, event.guild, _is_new); - } else { - event_handler.guild_create(context, event.guild); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_create(context, event.guild, _is_new); + } else { + event_handler.guild_create(context, event.guild); + }} + }); }, Event::GuildDelete(mut event) => { let _full = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_delete(context, event.guild, _full); - } else { - event_handler.guild_delete(context, event.guild); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_delete(context, event.guild, _full); + } else { + event_handler.guild_delete(context, event.guild); + }} + }); }, Event::GuildEmojisUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_emojis_update(context, event.guild_id, event.emojis); + threadpool.execute(move || { + event_handler.guild_emojis_update(context, event.guild_id, event.emojis); + }); }, Event::GuildIntegrationsUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_integrations_update(context, event.guild_id); + threadpool.execute(move || { + event_handler.guild_integrations_update(context, event.guild_id); + }); }, Event::GuildMemberAdd(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_member_addition(context, event.guild_id, event.member); + threadpool.execute(move || { + event_handler.guild_member_addition(context, event.guild_id, event.member); + }); }, Event::GuildMemberRemove(mut event) => { let _member = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_member_removal(context, event.guild_id, event.user, _member); - } else { - event_handler.guild_member_removal(context, event.guild_id, event.user); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_member_removal(context, event.guild_id, event.user, _member); + } else { + event_handler.guild_member_removal(context, event.guild_id, event.user); + }} + }); }, Event::GuildMemberUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - // This is safe to unwrap, as the update would have created - // the member if it did not exist. So, there is be _no_ way - // that this could fail under any circumstance. - let after = CACHE.read() - .member(event.guild_id, event.user.id) - .unwrap() - .clone(); - - event_handler.guild_member_update(context, _before, after); - } else { - event_handler.guild_member_update(context, event); - }} + threadpool.execute(move || { + feature_cache! {{ + // This is safe to unwrap, as the update would have created + // the member if it did not exist. So, there is be _no_ way + // that this could fail under any circumstance. + let after = CACHE.read() + .member(event.guild_id, event.user.id) + .unwrap() + .clone(); + + event_handler.guild_member_update(context, _before, after); + } else { + event_handler.guild_member_update(context, event); + }} + }); }, Event::GuildMembersChunk(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_members_chunk(context, event.guild_id, event.members); + threadpool.execute(move || { + event_handler.guild_members_chunk(context, event.guild_id, event.members); + }); }, Event::GuildRoleCreate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_role_create(context, event.guild_id, event.role); + threadpool.execute(move || { + event_handler.guild_role_create(context, event.guild_id, event.role); + }); }, Event::GuildRoleDelete(mut event) => { let _role = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); - } else { - event_handler.guild_role_delete(context, event.guild_id, event.role_id); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); + } else { + event_handler.guild_role_delete(context, event.guild_id, event.role_id); + }} + }); }, Event::GuildRoleUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_role_update(context, event.guild_id, _before, event.role); - } else { - event_handler.guild_role_update(context, event.guild_id, event.role); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_role_update(context, event.guild_id, _before, event.role); + } else { + event_handler.guild_role_update(context, event.guild_id, event.role); + }} + }); }, Event::GuildUnavailable(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_unavailable(context, event.guild_id); + threadpool.execute(move || { + event_handler.guild_unavailable(context, event.guild_id); + }); }, Event::GuildUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - let before = CACHE.read() - .guilds - .get(&event.guild.id) - .cloned(); + threadpool.execute(move || { + feature_cache! {{ + let before = CACHE.read() + .guilds + .get(&event.guild.id) + .cloned(); - event_handler.guild_update(context, before, event.guild); - } else { - event_handler.guild_update(context, event.guild); - }} + event_handler.guild_update(context, before, event.guild); + } else { + event_handler.guild_update(context, event.guild); + }} + }); }, // Already handled by the framework check macro Event::MessageCreate(_) => {}, Event::MessageDeleteBulk(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_delete_bulk(context, event.channel_id, event.ids); + threadpool.execute(move || { + event_handler.message_delete_bulk(context, event.channel_id, event.ids); + }); }, Event::MessageDelete(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_delete(context, event.channel_id, event.message_id); + threadpool.execute(move || { + event_handler.message_delete(context, event.channel_id, event.message_id); + }); }, Event::MessageUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_update(context, event); + threadpool.execute(move || { + event_handler.message_update(context, event); + }); }, Event::PresencesReplace(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.presence_replace(context, event.presences); + threadpool.execute(move || { + event_handler.presence_replace(context, event.presences); + }); }, Event::PresenceUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.presence_update(context, event); + threadpool.execute(move || { + event_handler.presence_update(context, event); + }); }, Event::ReactionAdd(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_add(context, event.reaction); + threadpool.execute(move || { + event_handler.reaction_add(context, event.reaction); + }); }, Event::ReactionRemove(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_remove(context, event.reaction); + threadpool.execute(move || { + event_handler.reaction_remove(context, event.reaction); + }); }, Event::ReactionRemoveAll(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_remove_all(context, event.channel_id, event.message_id); + threadpool.execute(move || { + event_handler.reaction_remove_all(context, event.channel_id, event.message_id); + }); }, Event::Ready(mut event) => { update!(event); - feature_cache!{ - { - last_guild_create_time = now!(); + let event_handler = Arc::clone(&event_handler); + + feature_cache! {{ + last_guild_create_time = now!(); - let _ = wait_for_guilds() - .map(move |_| { - let context = context(data, runner_tx, shard_id); + let _ = wait_for_guilds() + .map(move |_| { + let context = context(data, &runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.ready(context, event.ready); }); - } else { - let context = context(data, runner_tx, shard_id); + }); + } else { + let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.ready(context, event.ready); - } - } + }); + }} }, Event::Resumed(mut event) => { let context = context(data, runner_tx, shard_id); @@ -425,40 +569,58 @@ fn handle_event<H: EventHandler + 'static>( }, Event::TypingStart(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.typing_start(context, event); + threadpool.execute(move || { + event_handler.typing_start(context, event); + }); }, Event::Unknown(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.unknown(context, event.kind, event.value); + threadpool.execute(move || { + event_handler.unknown(context, event.kind, event.value); + }); }, Event::UserUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.user_update(context, _before.unwrap(), event.current_user); - } else { - event_handler.user_update(context, event.current_user); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.user_update(context, _before.unwrap(), event.current_user); + } else { + event_handler.user_update(context, event.current_user); + }} + }); }, Event::VoiceServerUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.voice_server_update(context, event); + threadpool.execute(move || { + event_handler.voice_server_update(context, event); + }); }, Event::VoiceStateUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.voice_state_update(context, event.guild_id, event.voice_state); + threadpool.execute(move || { + event_handler.voice_state_update(context, event.guild_id, event.voice_state); + }); }, Event::WebhookUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.webhook_update(context, event.guild_id, event.channel_id); + threadpool.execute(move || { + event_handler.webhook_update(context, event.guild_id, event.channel_id); + }); }, } } |