aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-11-13 13:28:08 -0800
committerZeyla Hellyer <[email protected]>2017-11-13 13:29:03 -0800
commitf2c21ef5b15ef1f345cdc30f4b793e55905f15f4 (patch)
tree33a0d5e68976a31d750cefaf78df8b2577b63578 /src/client
parentFix strange behaviour when the prefix has spaces (#215) (diff)
downloadserenity-f2c21ef5b15ef1f345cdc30f4b793e55905f15f4.tar.xz
serenity-f2c21ef5b15ef1f345cdc30f4b793e55905f15f4.zip
Use the threadpool for framework command execution
Instead of executing framework commands in the shard runner thread (potentially blocking the shard runner from reading new messages over the websocket and heartbeating), dispatch framework commands to the shard runner's threadpool.
Diffstat (limited to 'src/client')
-rw-r--r--src/client/bridge/gateway/shard_runner.rs40
-rw-r--r--src/client/dispatch.rs404
2 files changed, 294 insertions, 150 deletions
diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs
index a5fde3d..005f321 100644
--- a/src/client/bridge/gateway/shard_runner.rs
+++ b/src/client/bridge/gateway/shard_runner.rs
@@ -203,36 +203,18 @@ impl<H: EventHandler + Send + Sync + 'static> ShardRunner<H> {
false
}
+ #[inline]
fn dispatch(&self, event: Event) {
- let data = Arc::clone(&self.data);
- let event_handler = Arc::clone(&self.event_handler);
- let runner_tx = self.runner_tx.clone();
- let shard_id = self.shard.shard_info()[0];
-
- feature_framework! {{
- let framework = Arc::clone(&self.framework);
-
- self.threadpool.execute(move || {
- dispatch(
- event,
- framework,
- data,
- event_handler,
- runner_tx,
- shard_id,
- );
- });
- } else {
- self.threadpool.execute(move || {
- dispatch(
- event,
- data,
- event_handler,
- runner_tx,
- shard_id,
- );
- });
- }}
+ dispatch(
+ event,
+ #[cfg(feature = "framework")]
+ &self.framework,
+ &self.data,
+ &self.event_handler,
+ &self.runner_tx,
+ &self.threadpool,
+ self.shard.shard_info()[0],
+ );
}
// Handles a received value over the shard runner rx channel.
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 38ab570..f79203c 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -6,6 +6,7 @@ use super::bridge::gateway::ShardClientMessage;
use super::event_handler::EventHandler;
use super::Context;
use std::sync::mpsc::Sender;
+use threadpool::ThreadPool;
use typemap::ShareMap;
#[cfg(feature = "cache")]
@@ -37,20 +38,21 @@ macro_rules! now {
}
fn context(
- data: Arc<Mutex<ShareMap>>,
- runner_tx: Sender<ShardClientMessage>,
+ data: &Arc<Mutex<ShareMap>>,
+ runner_tx: &Sender<ShardClientMessage>,
shard_id: u64,
) -> Context {
- Context::new(data, runner_tx, shard_id)
+ Context::new(Arc::clone(&data), runner_tx.clone(), shard_id)
}
#[cfg(feature = "framework")]
-pub fn dispatch<H: EventHandler + 'static>(
+pub fn dispatch<H: EventHandler + Send + Sync + 'static>(
event: Event,
- framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- runner_tx: Sender<ShardClientMessage>,
+ framework: &Arc<Mutex<Option<Box<Framework + Send>>>>,
+ data: &Arc<Mutex<ShareMap>>,
+ event_handler: &Arc<H>,
+ runner_tx: &Sender<ShardClientMessage>,
+ threadpool: &ThreadPool,
shard_id: u64,
) {
match event {
@@ -60,30 +62,46 @@ pub fn dispatch<H: EventHandler + 'static>(
context.clone(),
event.message.clone(),
event_handler,
+ threadpool,
);
if let Some(ref mut framework) = *framework.lock() {
- framework.dispatch(context, event.message);
+ framework.dispatch(context, event.message, threadpool);
}
},
- other => handle_event(other, data, event_handler, runner_tx, shard_id),
+ other => handle_event(
+ other,
+ data,
+ event_handler,
+ runner_tx,
+ threadpool,
+ shard_id,
+ ),
}
}
#[cfg(not(feature = "framework"))]
-pub fn dispatch<H: EventHandler + 'static>(
+pub fn dispatch<H: EventHandler + Send + Sync + 'static>(
event: Event,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- runner_tx: Sender<ShardClientMessage>,
+ data: &Arc<Mutex<ShareMap>>,
+ event_handler: &Arc<H>,
+ runner_tx: &Sender<ShardClientMessage>,
+ threadpool: &ThreadPool,
shard_id: u64,
) {
match event {
Event::MessageCreate(event) => {
let context = context(data, runner_tx, shard_id);
- dispatch_message(context, event.message, event_handler);
+ dispatch_message(context, event.message, event_handler, threadpool);
},
- other => handle_event(other, data, event_handler, runner_tx, shard_id),
+ other => handle_event(
+ other,
+ data,
+ event_handler,
+ runner_tx,
+ threadpool,
+ shard_id,
+ ),
}
}
@@ -91,22 +109,28 @@ pub fn dispatch<H: EventHandler + 'static>(
fn dispatch_message<H>(
context: Context,
mut message: Message,
- event_handler: Arc<H>
-) where H: EventHandler + 'static {
+ event_handler: &Arc<H>,
+ threadpool: &ThreadPool,
+) where H: EventHandler + Send + Sync + 'static {
#[cfg(feature = "model")]
{
message.transform_content();
}
- event_handler.message(context, message);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.message(context, message);
+ });
}
#[allow(cyclomatic_complexity, unused_assignments, unused_mut)]
-fn handle_event<H: EventHandler + 'static>(
+fn handle_event<H: EventHandler + Send + Sync + 'static>(
event: Event,
- data: Arc<Mutex<ShareMap>>,
- event_handler: Arc<H>,
- runner_tx: Sender<ShardClientMessage>,
+ data: &Arc<Mutex<ShareMap>>,
+ event_handler: &Arc<H>,
+ runner_tx: &Sender<ShardClientMessage>,
+ threadpool: &ThreadPool,
shard_id: u64,
) {
#[cfg(feature = "cache")]
@@ -134,14 +158,26 @@ fn handle_event<H: EventHandler + 'static>(
// So in short, only exists to reduce unnecessary clutter.
match event.channel {
Channel::Private(channel) => {
- event_handler.private_channel_create(context, channel);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.private_channel_create(context, channel);
+ });
},
Channel::Group(_) => {},
Channel::Guild(channel) => {
- event_handler.channel_create(context, channel);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.channel_create(context, channel);
+ });
},
Channel::Category(channel) => {
- event_handler.category_create(context, channel);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.category_create(context, channel);
+ });
},
}
},
@@ -153,53 +189,89 @@ fn handle_event<H: EventHandler + 'static>(
match event.channel {
Channel::Private(_) | Channel::Group(_) => {},
Channel::Guild(channel) => {
- event_handler.channel_delete(context, channel);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.channel_delete(context, channel);
+ });
},
Channel::Category(channel) => {
- event_handler.category_delete(context, channel);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.category_delete(context, channel);
+ });
},
}
},
Event::ChannelPinsUpdate(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.channel_pins_update(context, event);
+ threadpool.execute(move || {
+ event_handler.channel_pins_update(context, event);
+ });
},
Event::ChannelRecipientAdd(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
- event_handler.channel_recipient_addition(context, event.channel_id, event.user);
+ let event_handler = Arc::clone(&event_handler);
+
+ threadpool.execute(move || {
+ event_handler.channel_recipient_addition(
+ context,
+ event.channel_id,
+ event.user,
+ );
+ });
},
Event::ChannelRecipientRemove(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.channel_recipient_removal(context, event.channel_id, event.user);
+ threadpool.execute(move || {
+ event_handler.channel_recipient_removal(
+ context,
+ event.channel_id,
+ event.user,
+ );
+ });
},
Event::ChannelUpdate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- let before = CACHE.read().channel(event.channel.id());
- event_handler.channel_update(context, before, event.channel);
- } else {
- event_handler.channel_update(context, event.channel);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ let before = CACHE.read().channel(event.channel.id());
+
+ event_handler.channel_update(context, before, event.channel);
+ } else {
+ event_handler.channel_update(context, event.channel);
+ }}
+ });
},
Event::GuildBanAdd(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
+ threadpool.execute(move || {
event_handler.guild_ban_addition(context, event.guild_id, event.user);
+ });
},
Event::GuildBanRemove(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_ban_removal(context, event.guild_id, event.user);
+ threadpool.execute(move || {
+ event_handler.guild_ban_removal(context, event.guild_id, event.user);
+ });
},
Event::GuildCreate(mut event) => {
#[cfg(feature = "cache")]
@@ -218,205 +290,277 @@ fn handle_event<H: EventHandler + 'static>(
let cache = CACHE.read();
if cache.unavailable_guilds.is_empty() {
- let context = context(Arc::clone(&data), runner_tx.clone(), shard_id);
+ let context = context(data, runner_tx, shard_id);
let guild_amount = cache
.guilds
.iter()
.map(|(&id, _)| id)
.collect::<Vec<GuildId>>();
+ let event_handler = Arc::clone(&event_handler);
- event_handler.cached(context, guild_amount);
+ threadpool.execute(move || {
+ event_handler.cached(context, guild_amount);
+ });
}
}
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.guild_create(context, event.guild, _is_new);
- } else {
- event_handler.guild_create(context, event.guild);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.guild_create(context, event.guild, _is_new);
+ } else {
+ event_handler.guild_create(context, event.guild);
+ }}
+ });
},
Event::GuildDelete(mut event) => {
let _full = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.guild_delete(context, event.guild, _full);
- } else {
- event_handler.guild_delete(context, event.guild);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.guild_delete(context, event.guild, _full);
+ } else {
+ event_handler.guild_delete(context, event.guild);
+ }}
+ });
},
Event::GuildEmojisUpdate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_emojis_update(context, event.guild_id, event.emojis);
+ threadpool.execute(move || {
+ event_handler.guild_emojis_update(context, event.guild_id, event.emojis);
+ });
},
Event::GuildIntegrationsUpdate(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_integrations_update(context, event.guild_id);
+ threadpool.execute(move || {
+ event_handler.guild_integrations_update(context, event.guild_id);
+ });
},
Event::GuildMemberAdd(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_member_addition(context, event.guild_id, event.member);
+ threadpool.execute(move || {
+ event_handler.guild_member_addition(context, event.guild_id, event.member);
+ });
},
Event::GuildMemberRemove(mut event) => {
let _member = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.guild_member_removal(context, event.guild_id, event.user, _member);
- } else {
- event_handler.guild_member_removal(context, event.guild_id, event.user);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.guild_member_removal(context, event.guild_id, event.user, _member);
+ } else {
+ event_handler.guild_member_removal(context, event.guild_id, event.user);
+ }}
+ });
},
Event::GuildMemberUpdate(mut event) => {
let _before = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- // This is safe to unwrap, as the update would have created
- // the member if it did not exist. So, there is be _no_ way
- // that this could fail under any circumstance.
- let after = CACHE.read()
- .member(event.guild_id, event.user.id)
- .unwrap()
- .clone();
-
- event_handler.guild_member_update(context, _before, after);
- } else {
- event_handler.guild_member_update(context, event);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ // This is safe to unwrap, as the update would have created
+ // the member if it did not exist. So, there is be _no_ way
+ // that this could fail under any circumstance.
+ let after = CACHE.read()
+ .member(event.guild_id, event.user.id)
+ .unwrap()
+ .clone();
+
+ event_handler.guild_member_update(context, _before, after);
+ } else {
+ event_handler.guild_member_update(context, event);
+ }}
+ });
},
Event::GuildMembersChunk(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_members_chunk(context, event.guild_id, event.members);
+ threadpool.execute(move || {
+ event_handler.guild_members_chunk(context, event.guild_id, event.members);
+ });
},
Event::GuildRoleCreate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_role_create(context, event.guild_id, event.role);
+ threadpool.execute(move || {
+ event_handler.guild_role_create(context, event.guild_id, event.role);
+ });
},
Event::GuildRoleDelete(mut event) => {
let _role = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role);
- } else {
- event_handler.guild_role_delete(context, event.guild_id, event.role_id);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.guild_role_delete(context, event.guild_id, event.role_id, _role);
+ } else {
+ event_handler.guild_role_delete(context, event.guild_id, event.role_id);
+ }}
+ });
},
Event::GuildRoleUpdate(mut event) => {
let _before = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.guild_role_update(context, event.guild_id, _before, event.role);
- } else {
- event_handler.guild_role_update(context, event.guild_id, event.role);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.guild_role_update(context, event.guild_id, _before, event.role);
+ } else {
+ event_handler.guild_role_update(context, event.guild_id, event.role);
+ }}
+ });
},
Event::GuildUnavailable(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.guild_unavailable(context, event.guild_id);
+ threadpool.execute(move || {
+ event_handler.guild_unavailable(context, event.guild_id);
+ });
},
Event::GuildUpdate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- let before = CACHE.read()
- .guilds
- .get(&event.guild.id)
- .cloned();
+ threadpool.execute(move || {
+ feature_cache! {{
+ let before = CACHE.read()
+ .guilds
+ .get(&event.guild.id)
+ .cloned();
- event_handler.guild_update(context, before, event.guild);
- } else {
- event_handler.guild_update(context, event.guild);
- }}
+ event_handler.guild_update(context, before, event.guild);
+ } else {
+ event_handler.guild_update(context, event.guild);
+ }}
+ });
},
// Already handled by the framework check macro
Event::MessageCreate(_) => {},
Event::MessageDeleteBulk(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.message_delete_bulk(context, event.channel_id, event.ids);
+ threadpool.execute(move || {
+ event_handler.message_delete_bulk(context, event.channel_id, event.ids);
+ });
},
Event::MessageDelete(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.message_delete(context, event.channel_id, event.message_id);
+ threadpool.execute(move || {
+ event_handler.message_delete(context, event.channel_id, event.message_id);
+ });
},
Event::MessageUpdate(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.message_update(context, event);
+ threadpool.execute(move || {
+ event_handler.message_update(context, event);
+ });
},
Event::PresencesReplace(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.presence_replace(context, event.presences);
+ threadpool.execute(move || {
+ event_handler.presence_replace(context, event.presences);
+ });
},
Event::PresenceUpdate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.presence_update(context, event);
+ threadpool.execute(move || {
+ event_handler.presence_update(context, event);
+ });
},
Event::ReactionAdd(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.reaction_add(context, event.reaction);
+ threadpool.execute(move || {
+ event_handler.reaction_add(context, event.reaction);
+ });
},
Event::ReactionRemove(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.reaction_remove(context, event.reaction);
+ threadpool.execute(move || {
+ event_handler.reaction_remove(context, event.reaction);
+ });
},
Event::ReactionRemoveAll(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.reaction_remove_all(context, event.channel_id, event.message_id);
+ threadpool.execute(move || {
+ event_handler.reaction_remove_all(context, event.channel_id, event.message_id);
+ });
},
Event::Ready(mut event) => {
update!(event);
- feature_cache!{
- {
- last_guild_create_time = now!();
+ let event_handler = Arc::clone(&event_handler);
+
+ feature_cache! {{
+ last_guild_create_time = now!();
- let _ = wait_for_guilds()
- .map(move |_| {
- let context = context(data, runner_tx, shard_id);
+ let _ = wait_for_guilds()
+ .map(move |_| {
+ let context = context(data, &runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
+ threadpool.execute(move || {
event_handler.ready(context, event.ready);
});
- } else {
- let context = context(data, runner_tx, shard_id);
+ });
+ } else {
+ let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
+ threadpool.execute(move || {
event_handler.ready(context, event.ready);
- }
- }
+ });
+ }}
},
Event::Resumed(mut event) => {
let context = context(data, runner_tx, shard_id);
@@ -425,40 +569,58 @@ fn handle_event<H: EventHandler + 'static>(
},
Event::TypingStart(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.typing_start(context, event);
+ threadpool.execute(move || {
+ event_handler.typing_start(context, event);
+ });
},
Event::Unknown(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.unknown(context, event.kind, event.value);
+ threadpool.execute(move || {
+ event_handler.unknown(context, event.kind, event.value);
+ });
},
Event::UserUpdate(mut event) => {
let _before = update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- feature_cache! {{
- event_handler.user_update(context, _before.unwrap(), event.current_user);
- } else {
- event_handler.user_update(context, event.current_user);
- }}
+ threadpool.execute(move || {
+ feature_cache! {{
+ event_handler.user_update(context, _before.unwrap(), event.current_user);
+ } else {
+ event_handler.user_update(context, event.current_user);
+ }}
+ });
},
Event::VoiceServerUpdate(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.voice_server_update(context, event);
+ threadpool.execute(move || {
+ event_handler.voice_server_update(context, event);
+ });
},
Event::VoiceStateUpdate(mut event) => {
update!(event);
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.voice_state_update(context, event.guild_id, event.voice_state);
+ threadpool.execute(move || {
+ event_handler.voice_state_update(context, event.guild_id, event.voice_state);
+ });
},
Event::WebhookUpdate(mut event) => {
let context = context(data, runner_tx, shard_id);
+ let event_handler = Arc::clone(&event_handler);
- event_handler.webhook_update(context, event.guild_id, event.channel_id);
+ threadpool.execute(move || {
+ event_handler.webhook_update(context, event.guild_id, event.channel_id);
+ });
},
}
}