diff options
| author | Zeyla Hellyer <[email protected]> | 2017-09-18 18:10:12 -0700 |
|---|---|---|
| committer | Zeyla Hellyer <[email protected]> | 2017-09-18 18:10:12 -0700 |
| commit | 7c4b052d7b5a50f234721249bd0221f037e48ea9 (patch) | |
| tree | 03e7637731782f72ff1490747019491194167b54 /src/client/mod.rs | |
| parent | travis: don't test rustfmt (diff) | |
| download | serenity-7c4b052d7b5a50f234721249bd0221f037e48ea9.tar.xz serenity-7c4b052d7b5a50f234721249bd0221f037e48ea9.zip | |
Fix block on spawning multiple shards
When spawning multiple shards (via an equal number of futures - one
per shard) joined on a core.run use, the very first future executed
would block forever due to a sync, blocking `monitor_shard` use. While
this defeats the purpose of tokio, this was meant to be a first step to
an async serenity implementation.
To "fix" this blocking call until a deeper async implementation is made,
spawn a new thread per tokio core (and thus per shard). This causes the
same expected behaviour, just with multiple threads like before.
Diffstat (limited to 'src/client/mod.rs')
| -rw-r--r-- | src/client/mod.rs | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 6bd1579..5eec673 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -43,7 +43,7 @@ use tokio_core::reactor::Core; use futures; use std::collections::HashMap; use std::time::Duration; -use std::mem; +use std::{mem, thread}; use super::gateway::Shard; use typemap::ShareMap; use websocket::result::WebSocketError; @@ -107,7 +107,7 @@ impl CloseHandle { /// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate /// [sharding docs]: gateway/index.html#sharding #[derive(Clone)] -pub struct Client<H: EventHandler + 'static> { +pub struct Client<H: EventHandler + Send + Sync + 'static> { /// A ShareMap which requires types to be Send + Sync. This is a map that /// can be safely shared across contexts. /// @@ -195,7 +195,7 @@ pub struct Client<H: EventHandler + 'static> { /// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready /// [`on_ready`]: #method.on_ready event_handler: Arc<H>, - #[cfg(feature = "framework")] framework: Arc<sync::Mutex<Option<Box<Framework>>>>, + #[cfg(feature = "framework")] framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>, /// A HashMap of all shards instantiated by the Client. /// /// The key is the shard ID and the value is the shard itself. @@ -250,7 +250,7 @@ pub struct Client<H: EventHandler + 'static> { token: Arc<sync::Mutex<String>>, } -impl<H: EventHandler + 'static> Client<H> { +impl<H: EventHandler + Send + Sync + 'static> Client<H> { /// Creates a Client for a bot user. /// /// Discord has a requirement of prefixing bot tokens with `"Bot "`, which @@ -408,7 +408,7 @@ impl<H: EventHandler + 'static> Client<H> { /// [`on_message`]: #method.on_message /// [framework docs]: ../framework/index.html #[cfg(feature = "framework")] - pub fn with_framework<F: Framework + 'static>(&mut self, f: F) { + pub fn with_framework<F: Framework + Send + 'static>(&mut self, f: F) { self.framework = Arc::new(sync::Mutex::new(Some(Box::new(f)))); } @@ -740,8 +740,6 @@ impl<H: EventHandler + 'static> Client<H> { fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> { HANDLE_STILL.store(true, Ordering::Relaxed); - let mut core = Core::new().unwrap(); - // Update the framework's current user if the feature is enabled. // // This also acts as a form of check to ensure the token is correct. @@ -759,7 +757,7 @@ impl<H: EventHandler + 'static> Client<H> { let shards_index = shard_data[0]; let shards_total = shard_data[1] + 1; - let mut futures = vec![]; + let mut threads = vec![]; for shard_number in shards_index..shards_total { let shard_info = [shard_number, shard_data[2]]; @@ -799,28 +797,35 @@ impl<H: EventHandler + 'static> Client<H> { } }}; - futures.push(futures::future::lazy(move || { - monitor_shard(monitor_info); - futures::future::ok::<(), ()>(()) + threads.push(thread::spawn(move || { + let mut core = Core::new().unwrap(); + + core.run(futures::future::lazy(move || { + monitor_shard(monitor_info); + + futures::future::ok::<(), ()>(()) + })).unwrap(); })); }, Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why), } - // Wait 5 seconds between shard boots. + // We need to wait at least 5 seconds between IDENTIFYs. // - // We need to wait at least 5 seconds between READYs. - core.turn(Some(Duration::from_secs(5))); + // Add an extra second as a buffer. + thread::sleep(Duration::from_secs(6)); } - core.run(futures::future::join_all(futures)).unwrap(); + for thread in threads { + let _ = thread.join(); + } Err(Error::Client(ClientError::Shutdown)) } } -impl<H: EventHandler + 'static> Drop for Client<H> { +impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> { fn drop(&mut self) { self.close_handle().close(); } } @@ -831,10 +836,10 @@ struct BootInfo { } #[cfg(feature = "framework")] -struct MonitorInfo<H: EventHandler + 'static> { +struct MonitorInfo<H: EventHandler + Send + Sync + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, - framework: Arc<sync::Mutex<Option<Box<Framework>>>>, + framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>, gateway_url: Arc<sync::Mutex<String>>, shard: Arc<Mutex<Shard>>, shard_info: [u64; 2], @@ -842,7 +847,7 @@ struct MonitorInfo<H: EventHandler + 'static> { } #[cfg(not(feature = "framework"))] -struct MonitorInfo<H: EventHandler + 'static> { +struct MonitorInfo<H: EventHandler + Send + 'static> { data: Arc<Mutex<ShareMap>>, event_handler: Arc<H>, gateway_url: Arc<sync::Mutex<String>>, @@ -894,7 +899,7 @@ fn boot_shard(info: &BootInfo) -> Result<Shard> { Err(Error::Client(ClientError::ShardBootFailure)) } -fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>) { +fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) { handle_shard(&mut info); let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed); @@ -948,7 +953,7 @@ fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>) { } } -fn handle_shard<H: EventHandler + 'static>(info: &mut MonitorInfo<H>) { +fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) { let mut core = Core::new().unwrap(); let handle = core.handle(); |