From 7c4b052d7b5a50f234721249bd0221f037e48ea9 Mon Sep 17 00:00:00 2001 From: Zeyla Hellyer Date: Mon, 18 Sep 2017 18:10:12 -0700 Subject: Fix block on spawning multiple shards When spawning multiple shards (via an equal number of futures - one per shard) joined on a core.run use, the very first future executed would block forever due to a sync, blocking `monitor_shard` use. While this defeats the purpose of tokio, this was meant to be a first step to an async serenity implementation. To "fix" this blocking call until a deeper async implementation is made, spawn a new thread per tokio core (and thus per shard). This causes the same expected behaviour, just with multiple threads like before. --- src/client/context.rs | 1 + src/client/dispatch.rs | 2 +- src/client/mod.rs | 47 ++++++++++++++++++++++++++--------------------- 3 files changed, 28 insertions(+), 22 deletions(-) (limited to 'src/client') diff --git a/src/client/context.rs b/src/client/context.rs index 9ed3652..614f79a 100644 --- a/src/client/context.rs +++ b/src/client/context.rs @@ -400,6 +400,7 @@ impl Context { /// [`Client::start`]: ./struct.Client.html#method.start pub fn quit(&self) -> Result<()> { let mut shard = self.shard.lock(); + shard.shutdown_clean() } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 494ab6d..490f4b6 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -45,7 +45,7 @@ fn context(conn: &Arc>, data: &Arc>, handle: &Handl #[cfg(feature = "framework")] pub fn dispatch(event: Event, conn: &Arc>, - framework: &Arc>>>, + framework: &Arc>>>, data: &Arc>, event_handler: &Arc, tokio_handle: &Handle) { diff --git a/src/client/mod.rs b/src/client/mod.rs index 6bd1579..5eec673 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -43,7 +43,7 @@ use tokio_core::reactor::Core; use futures; use std::collections::HashMap; use std::time::Duration; -use std::mem; +use std::{mem, thread}; use super::gateway::Shard; use typemap::ShareMap; use websocket::result::WebSocketError; @@ -107,7 +107,7 @@ impl CloseHandle { /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate /// [sharding docs]: gateway/index.html#sharding #[derive(Clone)] -pub struct Client { +pub struct Client { /// A ShareMap which requires types to be Send + Sync. This is a map that /// can be safely shared across contexts. /// @@ -195,7 +195,7 @@ pub struct Client { /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready event_handler: Arc, - #[cfg(feature = "framework")] framework: Arc>>>, + #[cfg(feature = "framework")] framework: Arc>>>, /// A HashMap of all shards instantiated by the Client. /// /// The key is the shard ID and the value is the shard itself. @@ -250,7 +250,7 @@ pub struct Client { token: Arc>, } -impl Client { +impl Client { /// Creates a Client for a bot user. /// /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which @@ -408,7 +408,7 @@ impl Client { /// [`on_message`]: #method.on_message /// [framework docs]: ../framework/index.html #[cfg(feature = "framework")] - pub fn with_framework(&mut self, f: F) { + pub fn with_framework(&mut self, f: F) { self.framework = Arc::new(sync::Mutex::new(Some(Box::new(f)))); } @@ -740,8 +740,6 @@ impl Client { fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { HANDLE_STILL.store(true, Ordering::Relaxed); - let mut core = Core::new().unwrap(); - // Update the framework's current user if the feature is enabled. // // This also acts as a form of check to ensure the token is correct. @@ -759,7 +757,7 @@ impl Client { let shards_index = shard_data[0]; let shards_total = shard_data[1] + 1; - let mut futures = vec![]; + let mut threads = vec![]; for shard_number in shards_index..shards_total { let shard_info = [shard_number, shard_data[2]]; @@ -799,28 +797,35 @@ impl Client { } }}; - futures.push(futures::future::lazy(move || { - monitor_shard(monitor_info); - futures::future::ok::<(), ()>(()) + threads.push(thread::spawn(move || { + let mut core = Core::new().unwrap(); + + core.run(futures::future::lazy(move || { + monitor_shard(monitor_info); + + futures::future::ok::<(), ()>(()) + })).unwrap(); })); }, Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), } - // Wait 5 seconds between shard boots. + // We need to wait at least 5 seconds between IDENTIFYs. // - // We need to wait at least 5 seconds between READYs. - core.turn(Some(Duration::from_secs(5))); + // Add an extra second as a buffer. + thread::sleep(Duration::from_secs(6)); } - core.run(futures::future::join_all(futures)).unwrap(); + for thread in threads { + let _ = thread.join(); + } Err(Error::Client(ClientError::Shutdown)) } } -impl Drop for Client { +impl Drop for Client { fn drop(&mut self) { self.close_handle().close(); } } @@ -831,10 +836,10 @@ struct BootInfo { } #[cfg(feature = "framework")] -struct MonitorInfo { +struct MonitorInfo { data: Arc>, event_handler: Arc, - framework: Arc>>>, + framework: Arc>>>, gateway_url: Arc>, shard: Arc>, shard_info: [u64; 2], @@ -842,7 +847,7 @@ struct MonitorInfo { } #[cfg(not(feature = "framework"))] -struct MonitorInfo { +struct MonitorInfo { data: Arc>, event_handler: Arc, gateway_url: Arc>, @@ -894,7 +899,7 @@ fn boot_shard(info: &BootInfo) -> Result { Err(Error::Client(ClientError::ShardBootFailure)) } -fn monitor_shard(mut info: MonitorInfo) { +fn monitor_shard(mut info: MonitorInfo) { handle_shard(&mut info); let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed); @@ -948,7 +953,7 @@ fn monitor_shard(mut info: MonitorInfo) { } } -fn handle_shard(info: &mut MonitorInfo) { +fn handle_shard(info: &mut MonitorInfo) { let mut core = Core::new().unwrap(); let handle = core.handle(); -- cgit v1.2.3