diff options
| author | acdenisSK <[email protected]> | 2017-07-15 01:49:59 +0200 |
|---|---|---|
| committer | acdenisSK <[email protected]> | 2017-07-15 01:49:59 +0200 |
| commit | 4ce2ddfb8dd04eefd769a1d17b590854f66af17c (patch) | |
| tree | e0f997ad8c025f3e304e8434e667a909d904dfc1 /src | |
| parent | Add a way to close all shards explictly (diff) | |
| download | serenity-4ce2ddfb8dd04eefd769a1d17b590854f66af17c.tar.xz serenity-4ce2ddfb8dd04eefd769a1d17b590854f66af17c.zip | |
Remove more threads with futures
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/dispatch.rs | 12 | ||||
| -rw-r--r-- | src/client/mod.rs | 45 |
2 files changed, 28 insertions, 29 deletions
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index f2850ec..68190d1 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -93,7 +93,7 @@ macro_rules! impl_reaction_events { } #[cfg(feature="framework")] -pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, +pub fn dispatch<H: EventHandler + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, framework: &Arc<sync::Mutex<Framework>>, data: &Arc<Mutex<ShareMap>>, @@ -126,7 +126,7 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, } #[cfg(not(feature="framework"))] -pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, +pub fn dispatch<H: EventHandler + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, data: &Arc<Mutex<ShareMap>>, event_handler: &Arc<H>, @@ -152,7 +152,7 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event, } #[allow(unused_mut)] -fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context, +fn dispatch_message<H: EventHandler + 'static>(context: Context, mut message: Message, event_handler: &Arc<H>, tokio_handle: &Handle) { @@ -169,7 +169,7 @@ fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context, }); } -fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Context, +fn dispatch_reaction_add<H: EventHandler + 'static>(context: Context, reaction: Reaction, event_handler: &Arc<H>, tokio_handle: &Handle) { @@ -180,7 +180,7 @@ fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Conte }); } -fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Context, +fn dispatch_reaction_remove<H: EventHandler + 'static>(context: Context, reaction: Reaction, event_handler: &Arc<H>, tokio_handle: &Handle) { @@ -192,7 +192,7 @@ fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Co } #[allow(cyclomatic_complexity, unused_assignments, unused_mut)] -fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event, +fn handle_event<H: EventHandler + 'static>(event: Event, conn: &Arc<Mutex<Shard>>, data: &Arc<Mutex<ShareMap>>, event_handler: &Arc<H>, diff --git a/src/client/mod.rs b/src/client/mod.rs index e59c87a..a0e6ffb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -39,9 +39,10 @@ use self::dispatch::dispatch; use std::sync::{self, Arc}; use std::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering}; use parking_lot::Mutex; -use tokio_core::reactor::Core; +use tokio_core::reactor::{Core, Handle}; +use futures; use std::time::Duration; -use std::{mem, thread}; +use std::mem; use super::gateway::Shard; use typemap::ShareMap; use websocket::result::WebSocketError; @@ -97,7 +98,7 @@ static HANDLE_STILL: AtomicBool = ATOMIC_BOOL_INIT; /// [`on_message`]: #method.on_message /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate /// [sharding docs]: gateway/index.html#sharding -pub struct Client<H: EventHandler + Send + Sync + 'static> { +pub struct Client<H: EventHandler + 'static> { /// A ShareMap which requires types to be Send + Sync. This is a map that /// can be safely shared across contexts. /// @@ -186,7 +187,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> { token: Arc<sync::Mutex<String>>, } -impl<H: EventHandler + Send + Sync + 'static> Client<H> { +impl<H: EventHandler + 'static> Client<H> { /// Creates a Client for a bot user. /// /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which @@ -614,6 +615,9 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { HANDLE_STILL.store(true, Ordering::Relaxed); + + let mut core = Core::new().unwrap(); + // Update the framework's current user if the feature is enabled. // // This also acts as a form of check to ensure the token is correct. @@ -631,7 +635,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { let shards_index = shard_data[0]; let shards_total = shard_data[1] + 1; - let mut threads = vec![]; + let mut futures = vec![]; for shard_number in shards_index..shards_total { let shard_info = [shard_number, shard_data[2]]; @@ -667,8 +671,10 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { } }}; - threads.push(thread::spawn(move || { - monitor_shard(monitor_info); + let handle = core.handle(); + futures.push(futures::future::lazy(move || { + monitor_shard(monitor_info, handle); + futures::future::ok::<(), ()>(()) })); }, Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), @@ -677,19 +683,17 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> { // Wait 5 seconds between shard boots. // // We need to wait at least 5 seconds between READYs. - thread::sleep(Duration::from_secs(5)); + core.turn(Some(Duration::from_secs(5))); } - for thread in threads { - let _ = thread.join(); - } + core.run(futures::future::join_all(futures)).unwrap(); Err(Error::Client(ClientError::Shutdown)) } } -impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { +impl<H: EventHandler + 'static> Drop for Client<H> { fn drop(&mut self) { self.close(); } @@ -702,7 +706,7 @@ struct BootInfo { } #[cfg(feature="framework")] -struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { +struct MonitorInfo<H: EventHandler + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, framework: Arc<sync::Mutex<Framework>>, @@ -713,7 +717,7 @@ struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { } #[cfg(not(feature="framework"))] -struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { +struct MonitorInfo<H: EventHandler + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, gateway_url: Arc<sync::Mutex<String>>, @@ -762,8 +766,8 @@ fn boot_shard(info: &BootInfo) -> Result<Shard> { Err(Error::Client(ClientError::ShardBootFailure)) } -fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) { - handle_shard(&mut info); +fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>, handle: Handle) { + handle_shard(&mut info, &handle); loop { let mut boot_successful = false; @@ -788,7 +792,7 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo< } if boot_successful { - handle_shard(&mut info); + handle_shard(&mut info, &handle); } else { break; } @@ -799,11 +803,8 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo< error!("Completely failed to reboot shard"); } -fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) { +fn handle_shard<H: EventHandler + 'static>(info: &mut MonitorInfo<H>, handle: &Handle) { // This is currently all ducktape. Redo this. - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let handle_still = HANDLE_STILL.load(Ordering::Relaxed); while handle_still { { @@ -885,8 +886,6 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo< &info.event_handler, &handle); }} - - core.turn(None); } } |