From f2c21ef5b15ef1f345cdc30f4b793e55905f15f4 Mon Sep 17 00:00:00 2001 From: Zeyla Hellyer Date: Mon, 13 Nov 2017 13:28:08 -0800 Subject: Use the threadpool for framework command execution Instead of executing framework commands in the shard runner thread (potentially blocking the shard runner from reading new messages over the websocket and heartbeating), dispatch framework commands to the shard runner's threadpool. --- src/client/bridge/gateway/shard_runner.rs | 40 +-- src/client/dispatch.rs | 404 +++++++++++++++++++++--------- src/framework/mod.rs | 11 +- src/framework/standard/mod.rs | 44 ++-- 4 files changed, 326 insertions(+), 173 deletions(-) (limited to 'src') diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index a5fde3d..005f321 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -203,36 +203,18 @@ impl ShardRunner { false } + #[inline] fn dispatch(&self, event: Event) { - let data = Arc::clone(&self.data); - let event_handler = Arc::clone(&self.event_handler); - let runner_tx = self.runner_tx.clone(); - let shard_id = self.shard.shard_info()[0]; - - feature_framework! {{ - let framework = Arc::clone(&self.framework); - - self.threadpool.execute(move || { - dispatch( - event, - framework, - data, - event_handler, - runner_tx, - shard_id, - ); - }); - } else { - self.threadpool.execute(move || { - dispatch( - event, - data, - event_handler, - runner_tx, - shard_id, - ); - }); - }} + dispatch( + event, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + &self.runner_tx, + &self.threadpool, + self.shard.shard_info()[0], + ); } // Handles a received value over the shard runner rx channel. diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 38ab570..f79203c 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -6,6 +6,7 @@ use super::bridge::gateway::ShardClientMessage; use super::event_handler::EventHandler; use super::Context; use std::sync::mpsc::Sender; +use threadpool::ThreadPool; use typemap::ShareMap; #[cfg(feature = "cache")] @@ -37,20 +38,21 @@ macro_rules! now { } fn context( - data: Arc>, - runner_tx: Sender, + data: &Arc>, + runner_tx: &Sender, shard_id: u64, ) -> Context { - Context::new(data, runner_tx, shard_id) + Context::new(Arc::clone(&data), runner_tx.clone(), shard_id) } #[cfg(feature = "framework")] -pub fn dispatch( +pub fn dispatch( event: Event, - framework: Arc>>>, - data: Arc>, - event_handler: Arc, - runner_tx: Sender, + framework: &Arc>>>, + data: &Arc>, + event_handler: &Arc, + runner_tx: &Sender, + threadpool: &ThreadPool, shard_id: u64, ) { match event { @@ -60,30 +62,46 @@ pub fn dispatch( context.clone(), event.message.clone(), event_handler, + threadpool, ); if let Some(ref mut framework) = *framework.lock() { - framework.dispatch(context, event.message); + framework.dispatch(context, event.message, threadpool); } }, - other => handle_event(other, data, event_handler, runner_tx, shard_id), + other => handle_event( + other, + data, + event_handler, + runner_tx, + threadpool, + shard_id, + ), } } #[cfg(not(feature = "framework"))] -pub fn dispatch( +pub fn dispatch( event: Event, - data: Arc>, - event_handler: Arc, - runner_tx: Sender, + data: &Arc>, + event_handler: &Arc, + runner_tx: &Sender, + threadpool: &ThreadPool, shard_id: u64, ) { match event { Event::MessageCreate(event) => { let context = context(data, runner_tx, shard_id); - dispatch_message(context, event.message, event_handler); + dispatch_message(context, event.message, event_handler, threadpool); }, - other => handle_event(other, data, event_handler, runner_tx, shard_id), + other => handle_event( + other, + data, + event_handler, + runner_tx, + threadpool, + shard_id, + ), } } @@ -91,22 +109,28 @@ pub fn dispatch( fn dispatch_message( context: Context, mut message: Message, - event_handler: Arc -) where H: EventHandler + 'static { + event_handler: &Arc, + threadpool: &ThreadPool, +) where H: EventHandler + Send + Sync + 'static { #[cfg(feature = "model")] { message.transform_content(); } - event_handler.message(context, message); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.message(context, message); + }); } #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event( +fn handle_event( event: Event, - data: Arc>, - event_handler: Arc, - runner_tx: Sender, + data: &Arc>, + event_handler: &Arc, + runner_tx: &Sender, + threadpool: &ThreadPool, shard_id: u64, ) { #[cfg(feature = "cache")] @@ -134,14 +158,26 @@ fn handle_event( // So in short, only exists to reduce unnecessary clutter. match event.channel { Channel::Private(channel) => { - event_handler.private_channel_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.private_channel_create(context, channel); + }); }, Channel::Group(_) => {}, Channel::Guild(channel) => { - event_handler.channel_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_create(context, channel); + }); }, Channel::Category(channel) => { - event_handler.category_create(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.category_create(context, channel); + }); }, } }, @@ -153,53 +189,89 @@ fn handle_event( match event.channel { Channel::Private(_) | Channel::Group(_) => {}, Channel::Guild(channel) => { - event_handler.channel_delete(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_delete(context, channel); + }); }, Channel::Category(channel) => { - event_handler.category_delete(context, channel); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.category_delete(context, channel); + }); }, } }, Event::ChannelPinsUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.channel_pins_update(context, event); + threadpool.execute(move || { + event_handler.channel_pins_update(context, event); + }); }, Event::ChannelRecipientAdd(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); - event_handler.channel_recipient_addition(context, event.channel_id, event.user); + let event_handler = Arc::clone(&event_handler); + + threadpool.execute(move || { + event_handler.channel_recipient_addition( + context, + event.channel_id, + event.user, + ); + }); }, Event::ChannelRecipientRemove(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.channel_recipient_removal(context, event.channel_id, event.user); + threadpool.execute(move || { + event_handler.channel_recipient_removal( + context, + event.channel_id, + event.user, + ); + }); }, Event::ChannelUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - let before = CACHE.read().channel(event.channel.id()); - event_handler.channel_update(context, before, event.channel); - } else { - event_handler.channel_update(context, event.channel); - }} + threadpool.execute(move || { + feature_cache! {{ + let before = CACHE.read().channel(event.channel.id()); + + event_handler.channel_update(context, before, event.channel); + } else { + event_handler.channel_update(context, event.channel); + }} + }); }, Event::GuildBanAdd(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.guild_ban_addition(context, event.guild_id, event.user); + }); }, Event::GuildBanRemove(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_ban_removal(context, event.guild_id, event.user); + threadpool.execute(move || { + event_handler.guild_ban_removal(context, event.guild_id, event.user); + }); }, Event::GuildCreate(mut event) => { #[cfg(feature = "cache")] @@ -218,205 +290,277 @@ fn handle_event( let cache = CACHE.read(); if cache.unavailable_guilds.is_empty() { - let context = context(Arc::clone(&data), runner_tx.clone(), shard_id); + let context = context(data, runner_tx, shard_id); let guild_amount = cache .guilds .iter() .map(|(&id, _)| id) .collect::>(); + let event_handler = Arc::clone(&event_handler); - event_handler.cached(context, guild_amount); + threadpool.execute(move || { + event_handler.cached(context, guild_amount); + }); } } let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_create(context, event.guild, _is_new); - } else { - event_handler.guild_create(context, event.guild); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_create(context, event.guild, _is_new); + } else { + event_handler.guild_create(context, event.guild); + }} + }); }, Event::GuildDelete(mut event) => { let _full = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_delete(context, event.guild, _full); - } else { - event_handler.guild_delete(context, event.guild); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_delete(context, event.guild, _full); + } else { + event_handler.guild_delete(context, event.guild); + }} + }); }, Event::GuildEmojisUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_emojis_update(context, event.guild_id, event.emojis); + threadpool.execute(move || { + event_handler.guild_emojis_update(context, event.guild_id, event.emojis); + }); }, Event::GuildIntegrationsUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_integrations_update(context, event.guild_id); + threadpool.execute(move || { + event_handler.guild_integrations_update(context, event.guild_id); + }); }, Event::GuildMemberAdd(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_member_addition(context, event.guild_id, event.member); + threadpool.execute(move || { + event_handler.guild_member_addition(context, event.guild_id, event.member); + }); }, Event::GuildMemberRemove(mut event) => { let _member = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_member_removal(context, event.guild_id, event.user, _member); - } else { - event_handler.guild_member_removal(context, event.guild_id, event.user); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_member_removal(context, event.guild_id, event.user, _member); + } else { + event_handler.guild_member_removal(context, event.guild_id, event.user); + }} + }); }, Event::GuildMemberUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - // This is safe to unwrap, as the update would have created - // the member if it did not exist. So, there is be _no_ way - // that this could fail under any circumstance. - let after = CACHE.read() - .member(event.guild_id, event.user.id) - .unwrap() - .clone(); - - event_handler.guild_member_update(context, _before, after); - } else { - event_handler.guild_member_update(context, event); - }} + threadpool.execute(move || { + feature_cache! {{ + // This is safe to unwrap, as the update would have created + // the member if it did not exist. So, there is be _no_ way + // that this could fail under any circumstance. + let after = CACHE.read() + .member(event.guild_id, event.user.id) + .unwrap() + .clone(); + + event_handler.guild_member_update(context, _before, after); + } else { + event_handler.guild_member_update(context, event); + }} + }); }, Event::GuildMembersChunk(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_members_chunk(context, event.guild_id, event.members); + threadpool.execute(move || { + event_handler.guild_members_chunk(context, event.guild_id, event.members); + }); }, Event::GuildRoleCreate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_role_create(context, event.guild_id, event.role); + threadpool.execute(move || { + event_handler.guild_role_create(context, event.guild_id, event.role); + }); }, Event::GuildRoleDelete(mut event) => { let _role = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); - } else { - event_handler.guild_role_delete(context, event.guild_id, event.role_id); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role); + } else { + event_handler.guild_role_delete(context, event.guild_id, event.role_id); + }} + }); }, Event::GuildRoleUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.guild_role_update(context, event.guild_id, _before, event.role); - } else { - event_handler.guild_role_update(context, event.guild_id, event.role); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.guild_role_update(context, event.guild_id, _before, event.role); + } else { + event_handler.guild_role_update(context, event.guild_id, event.role); + }} + }); }, Event::GuildUnavailable(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.guild_unavailable(context, event.guild_id); + threadpool.execute(move || { + event_handler.guild_unavailable(context, event.guild_id); + }); }, Event::GuildUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - let before = CACHE.read() - .guilds - .get(&event.guild.id) - .cloned(); + threadpool.execute(move || { + feature_cache! {{ + let before = CACHE.read() + .guilds + .get(&event.guild.id) + .cloned(); - event_handler.guild_update(context, before, event.guild); - } else { - event_handler.guild_update(context, event.guild); - }} + event_handler.guild_update(context, before, event.guild); + } else { + event_handler.guild_update(context, event.guild); + }} + }); }, // Already handled by the framework check macro Event::MessageCreate(_) => {}, Event::MessageDeleteBulk(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_delete_bulk(context, event.channel_id, event.ids); + threadpool.execute(move || { + event_handler.message_delete_bulk(context, event.channel_id, event.ids); + }); }, Event::MessageDelete(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_delete(context, event.channel_id, event.message_id); + threadpool.execute(move || { + event_handler.message_delete(context, event.channel_id, event.message_id); + }); }, Event::MessageUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.message_update(context, event); + threadpool.execute(move || { + event_handler.message_update(context, event); + }); }, Event::PresencesReplace(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.presence_replace(context, event.presences); + threadpool.execute(move || { + event_handler.presence_replace(context, event.presences); + }); }, Event::PresenceUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.presence_update(context, event); + threadpool.execute(move || { + event_handler.presence_update(context, event); + }); }, Event::ReactionAdd(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_add(context, event.reaction); + threadpool.execute(move || { + event_handler.reaction_add(context, event.reaction); + }); }, Event::ReactionRemove(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_remove(context, event.reaction); + threadpool.execute(move || { + event_handler.reaction_remove(context, event.reaction); + }); }, Event::ReactionRemoveAll(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.reaction_remove_all(context, event.channel_id, event.message_id); + threadpool.execute(move || { + event_handler.reaction_remove_all(context, event.channel_id, event.message_id); + }); }, Event::Ready(mut event) => { update!(event); - feature_cache!{ - { - last_guild_create_time = now!(); + let event_handler = Arc::clone(&event_handler); + + feature_cache! {{ + last_guild_create_time = now!(); - let _ = wait_for_guilds() - .map(move |_| { - let context = context(data, runner_tx, shard_id); + let _ = wait_for_guilds() + .map(move |_| { + let context = context(data, &runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.ready(context, event.ready); }); - } else { - let context = context(data, runner_tx, shard_id); + }); + } else { + let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); + threadpool.execute(move || { event_handler.ready(context, event.ready); - } - } + }); + }} }, Event::Resumed(mut event) => { let context = context(data, runner_tx, shard_id); @@ -425,40 +569,58 @@ fn handle_event( }, Event::TypingStart(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.typing_start(context, event); + threadpool.execute(move || { + event_handler.typing_start(context, event); + }); }, Event::Unknown(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.unknown(context, event.kind, event.value); + threadpool.execute(move || { + event_handler.unknown(context, event.kind, event.value); + }); }, Event::UserUpdate(mut event) => { let _before = update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - feature_cache! {{ - event_handler.user_update(context, _before.unwrap(), event.current_user); - } else { - event_handler.user_update(context, event.current_user); - }} + threadpool.execute(move || { + feature_cache! {{ + event_handler.user_update(context, _before.unwrap(), event.current_user); + } else { + event_handler.user_update(context, event.current_user); + }} + }); }, Event::VoiceServerUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.voice_server_update(context, event); + threadpool.execute(move || { + event_handler.voice_server_update(context, event); + }); }, Event::VoiceStateUpdate(mut event) => { update!(event); let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.voice_state_update(context, event.guild_id, event.voice_state); + threadpool.execute(move || { + event_handler.voice_state_update(context, event.guild_id, event.voice_state); + }); }, Event::WebhookUpdate(mut event) => { let context = context(data, runner_tx, shard_id); + let event_handler = Arc::clone(&event_handler); - event_handler.webhook_update(context, event.guild_id, event.channel_id); + threadpool.execute(move || { + event_handler.webhook_update(context, event.guild_id, event.channel_id); + }); }, } } diff --git a/src/framework/mod.rs b/src/framework/mod.rs index 85ae6f4..c5ddb04 100644 --- a/src/framework/mod.rs +++ b/src/framework/mod.rs @@ -62,13 +62,14 @@ pub use self::standard::StandardFramework; use client::Context; use model::Message; +use threadpool::ThreadPool; #[cfg(feature = "standard_framework")] use model::UserId; /// This trait allows for serenity to either use its builtin framework, or yours. pub trait Framework { - fn dispatch(&mut self, Context, Message); + fn dispatch(&mut self, Context, Message, &ThreadPool); #[doc(hidden)] #[cfg(feature = "standard_framework")] @@ -76,8 +77,8 @@ pub trait Framework { } impl Framework for Box { - fn dispatch(&mut self, ctx: Context, msg: Message) { - (**self).dispatch(ctx, msg); + fn dispatch(&mut self, ctx: Context, msg: Message, threadpool: &ThreadPool) { + (**self).dispatch(ctx, msg, threadpool); } #[cfg(feature = "standard_framework")] @@ -87,8 +88,8 @@ impl Framework for Box { } impl<'a, F: Framework + ?Sized> Framework for &'a mut F { - fn dispatch(&mut self, ctx: Context, msg: Message) { - (**self).dispatch(ctx, msg); + fn dispatch(&mut self, ctx: Context, msg: Message, threadpool: &ThreadPool) { + (**self).dispatch(ctx, msg, threadpool); } #[cfg(feature = "standard_framework")] diff --git a/src/framework/standard/mod.rs b/src/framework/standard/mod.rs index 12ad797..dd7aaae 100644 --- a/src/framework/standard/mod.rs +++ b/src/framework/standard/mod.rs @@ -25,6 +25,7 @@ use std::collections::HashMap; use std::default::Default; use std::sync::Arc; use super::Framework; +use threadpool::ThreadPool; #[cfg(feature = "cache")] use client::CACHE; @@ -836,7 +837,12 @@ impl StandardFramework { } impl Framework for StandardFramework { - fn dispatch(&mut self, mut context: Context, message: Message) { + fn dispatch( + &mut self, + mut context: Context, + message: Message, + threadpool: &ThreadPool, + ) { let res = command::positions(&mut context, &message, &self.configuration); let positions = match res { @@ -926,27 +932,29 @@ impl Framework for StandardFramework { return; } - if let Some(before) = before { - if !(before)(&mut context, &message, &built) { - return; + threadpool.execute(move || { + if let Some(before) = before { + if !(before)(&mut context, &message, &built) { + return; + } } - } - let result = match command.exec { - CommandType::StringResponse(ref x) => { - let _ = message.channel_id.say(x); + let result = match command.exec { + CommandType::StringResponse(ref x) => { + let _ = message.channel_id.say(x); - Ok(()) - }, - CommandType::Basic(ref x) => (x)(&mut context, &message, args), - CommandType::WithCommands(ref x) => { - (x)(&mut context, &message, groups, args) - }, - }; + Ok(()) + }, + CommandType::Basic(ref x) => (x)(&mut context, &message, args), + CommandType::WithCommands(ref x) => { + (x)(&mut context, &message, groups, args) + }, + }; - if let Some(after) = after { - (after)(&mut context, &message, &built, result); - } + if let Some(after) = after { + (after)(&mut context, &message, &built, result); + } + }); return; } -- cgit v1.2.3