From d7621aa4dfb2a3dea22e7848eb97e2b4cc1ade14 Mon Sep 17 00:00:00 2001 From: Zeyla Hellyer Date: Sat, 30 Sep 2017 21:19:29 -0700 Subject: Add a threadpool to the shard runner A threadpool will help with giving event dispatches a threaded behaviour while still allowing the library the ability to perform other actions, such as receiving new events and heartbeating over the websocket client. --- src/client/bridge/gateway/shard_runner.rs | 46 ++++++++++++++++++++++++------- src/client/dispatch.rs | 35 +++++++++++------------ src/lib.rs | 2 ++ 3 files changed, 56 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 78ac725..6f6b050 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -6,6 +6,7 @@ use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::Arc; use super::super::super::{EventHandler, dispatch}; use super::{LockedShard, ShardId, ShardManagerMessage}; +use threadpool::ThreadPool; use typemap::ShareMap; use websocket::WebSocketError; @@ -14,7 +15,7 @@ use framework::Framework; #[cfg(feature = "framework")] use std::sync::Mutex; -pub struct ShardRunner { +pub struct ShardRunner { data: Arc>, event_handler: Arc, #[cfg(feature = "framework")] @@ -24,9 +25,10 @@ pub struct ShardRunner { runner_tx: Sender, shard: LockedShard, shard_info: [u64; 2], + threadpool: ThreadPool, } -impl ShardRunner { +impl ShardRunner { #[cfg(feature = "framework")] pub fn new(shard: LockedShard, manager_tx: Sender, @@ -36,6 +38,8 @@ impl ShardRunner { let (tx, rx) = mpsc::channel(); let shard_info = shard.lock().shard_info(); + let name = format!("threadpool {:?}", shard_info); + Self { runner_rx: rx, runner_tx: tx, @@ -45,6 +49,7 @@ impl ShardRunner { manager_tx, shard, shard_info, + threadpool: ThreadPool::with_name(name, 15), } } @@ -56,6 +61,8 @@ impl ShardRunner { let (tx, rx) = mpsc::channel(); let shard_info = shard.lock().shard_info(); + let name = format!("threadpool {:?}", shard_info); + Self { runner_rx: rx, runner_tx: tx, @@ -64,6 +71,7 @@ impl ShardRunner { manager_tx, shard, shard_info, + threadpool: ThreadPool::with_name(name, 15), } } @@ -102,14 +110,32 @@ impl ShardRunner { let (event, successful) = self.recv_event(); if let Some(event) = event { - dispatch( - event, - &self.shard, - #[cfg(feature = "framework")] - &self.framework, - &self.data, - &self.event_handler, - ); + let data = self.data.clone(); + let event_handler = self.event_handler.clone(); + let shard = self.shard.clone(); + + feature_framework! {{ + let framework = self.framework.clone(); + + self.threadpool.execute(|| { + dispatch( + event, + shard, + framework, + data, + event_handler, + ); + }); + } else { + self.threadpool.execute(|| { + dispatch( + event, + shard, + data, + event_handler, + ); + }); + }} } if !successful && !self.shard.lock().stage().is_connecting() { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 6c2cfc4..709e34b 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -37,16 +37,16 @@ macro_rules! now { () => (Utc::now().time().second() * 1000) } -fn context(conn: &Arc>, data: &Arc>) -> Context { - Context::new(conn.clone(), data.clone()) +fn context(conn: Arc>, data: Arc>) -> Context { + Context::new(conn, data) } #[cfg(feature = "framework")] pub fn dispatch(event: Event, - conn: &Arc>, - framework: &Arc>>>, - data: &Arc>, - event_handler: &Arc) { + conn: Arc>, + framework: Arc>>>, + data: Arc>, + event_handler: Arc) { match event { Event::MessageCreate(event) => { let context = context(conn, data); @@ -66,9 +66,9 @@ pub fn dispatch(event: Event, #[cfg(not(feature = "framework"))] pub fn dispatch(event: Event, - conn: &Arc>, - data: &Arc>, - event_handler: &Arc) { + conn: Arc>, + data: Arc>, + event_handler: Arc) { match event { Event::MessageCreate(event) => { let context = context(conn, data); @@ -79,10 +79,11 @@ pub fn dispatch(event: Event, } #[allow(unused_mut)] -fn dispatch_message(context: Context, - mut message: Message, - event_handler: &Arc) { - +fn dispatch_message( + context: Context, + mut message: Message, + event_handler: Arc +) where H: EventHandler + 'static { #[cfg(feature = "model")] { message.transform_content(); @@ -93,9 +94,9 @@ fn dispatch_message(context: Context, #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] fn handle_event(event: Event, - conn: &Arc>, - data: &Arc>, - event_handler: &Arc) { + conn: Arc>, + data: Arc>, + event_handler: Arc) { #[cfg(feature = "cache")] let mut last_guild_create_time = now!(); @@ -205,7 +206,7 @@ fn handle_event(event: Event, let cache = CACHE.read().unwrap(); if cache.unavailable_guilds.is_empty() { - let context = context(conn, data); + let context = context(conn.clone(), data.clone()); let guild_amount = cache .guilds diff --git a/src/lib.rs b/src/lib.rs index 9ee6949..612972a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,8 @@ extern crate opus; extern crate parking_lot; #[cfg(feature = "voice")] extern crate sodiumoxide; +#[cfg(feature = "threadpool")] +extern crate threadpool; #[cfg(feature = "client")] extern crate typemap; #[cfg(feature = "standard_framework")] -- cgit v1.2.3