aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorZeyla Hellyer <[email protected]>2017-09-18 18:10:12 -0700
committerZeyla Hellyer <[email protected]>2017-09-18 18:10:12 -0700
commit7c4b052d7b5a50f234721249bd0221f037e48ea9 (patch)
tree03e7637731782f72ff1490747019491194167b54 /src/client
parenttravis: don't test rustfmt (diff)
downloadserenity-7c4b052d7b5a50f234721249bd0221f037e48ea9.tar.xz
serenity-7c4b052d7b5a50f234721249bd0221f037e48ea9.zip
Fix block on spawning multiple shards
When spawning multiple shards (via an equal number of futures - one per shard) joined on a core.run use, the very first future executed would block forever due to a sync, blocking `monitor_shard` use. While this defeats the purpose of tokio, this was meant to be a first step to an async serenity implementation. To "fix" this blocking call until a deeper async implementation is made, spawn a new thread per tokio core (and thus per shard). This causes the same expected behaviour, just with multiple threads like before.
Diffstat (limited to 'src/client')
-rw-r--r--src/client/context.rs1
-rw-r--r--src/client/dispatch.rs2
-rw-r--r--src/client/mod.rs47
3 files changed, 28 insertions, 22 deletions
diff --git a/src/client/context.rs b/src/client/context.rs
index 9ed3652..614f79a 100644
--- a/src/client/context.rs
+++ b/src/client/context.rs
@@ -400,6 +400,7 @@ impl Context {
/// [`Client::start`]: ./struct.Client.html#method.start
pub fn quit(&self) -> Result<()> {
let mut shard = self.shard.lock();
+
shard.shutdown_clean()
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 494ab6d..490f4b6 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -45,7 +45,7 @@ fn context(conn: &Arc<Mutex<Shard>>, data: &Arc<Mutex<ShareMap>>, handle: &Handl
#[cfg(feature = "framework")]
pub fn dispatch<H: EventHandler + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
- framework: &Arc<sync::Mutex<Option<Box<Framework>>>>,
+ framework: &Arc<sync::Mutex<Option<Box<Framework + Send>>>>,
data: &Arc<Mutex<ShareMap>>,
event_handler: &Arc<H>,
tokio_handle: &Handle) {
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 6bd1579..5eec673 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -43,7 +43,7 @@ use tokio_core::reactor::Core;
use futures;
use std::collections::HashMap;
use std::time::Duration;
-use std::mem;
+use std::{mem, thread};
use super::gateway::Shard;
use typemap::ShareMap;
use websocket::result::WebSocketError;
@@ -107,7 +107,7 @@ impl CloseHandle {
/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate
/// [sharding docs]: gateway/index.html#sharding
#[derive(Clone)]
-pub struct Client<H: EventHandler + 'static> {
+pub struct Client<H: EventHandler + Send + Sync + 'static> {
/// A ShareMap which requires types to be Send + Sync. This is a map that
/// can be safely shared across contexts.
///
@@ -195,7 +195,7 @@ pub struct Client<H: EventHandler + 'static> {
/// [`Event::Ready`]: ../model/event/enum.Event.html#variant.Ready
/// [`on_ready`]: #method.on_ready
event_handler: Arc<H>,
- #[cfg(feature = "framework")] framework: Arc<sync::Mutex<Option<Box<Framework>>>>,
+ #[cfg(feature = "framework")] framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>,
/// A HashMap of all shards instantiated by the Client.
///
/// The key is the shard ID and the value is the shard itself.
@@ -250,7 +250,7 @@ pub struct Client<H: EventHandler + 'static> {
token: Arc<sync::Mutex<String>>,
}
-impl<H: EventHandler + 'static> Client<H> {
+impl<H: EventHandler + Send + Sync + 'static> Client<H> {
/// Creates a Client for a bot user.
///
/// Discord has a requirement of prefixing bot tokens with `"Bot "`, which
@@ -408,7 +408,7 @@ impl<H: EventHandler + 'static> Client<H> {
/// [`on_message`]: #method.on_message
/// [framework docs]: ../framework/index.html
#[cfg(feature = "framework")]
- pub fn with_framework<F: Framework + 'static>(&mut self, f: F) {
+ pub fn with_framework<F: Framework + Send + 'static>(&mut self, f: F) {
self.framework = Arc::new(sync::Mutex::new(Some(Box::new(f))));
}
@@ -740,8 +740,6 @@ impl<H: EventHandler + 'static> Client<H> {
fn start_connection(&mut self, shard_data: [u64; 3], url: String) -> Result<()> {
HANDLE_STILL.store(true, Ordering::Relaxed);
- let mut core = Core::new().unwrap();
-
// Update the framework's current user if the feature is enabled.
//
// This also acts as a form of check to ensure the token is correct.
@@ -759,7 +757,7 @@ impl<H: EventHandler + 'static> Client<H> {
let shards_index = shard_data[0];
let shards_total = shard_data[1] + 1;
- let mut futures = vec![];
+ let mut threads = vec![];
for shard_number in shards_index..shards_total {
let shard_info = [shard_number, shard_data[2]];
@@ -799,28 +797,35 @@ impl<H: EventHandler + 'static> Client<H> {
}
}};
- futures.push(futures::future::lazy(move || {
- monitor_shard(monitor_info);
- futures::future::ok::<(), ()>(())
+ threads.push(thread::spawn(move || {
+ let mut core = Core::new().unwrap();
+
+ core.run(futures::future::lazy(move || {
+ monitor_shard(monitor_info);
+
+ futures::future::ok::<(), ()>(())
+ })).unwrap();
}));
},
Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why),
}
- // Wait 5 seconds between shard boots.
+ // We need to wait at least 5 seconds between IDENTIFYs.
//
- // We need to wait at least 5 seconds between READYs.
- core.turn(Some(Duration::from_secs(5)));
+ // Add an extra second as a buffer.
+ thread::sleep(Duration::from_secs(6));
}
- core.run(futures::future::join_all(futures)).unwrap();
+ for thread in threads {
+ let _ = thread.join();
+ }
Err(Error::Client(ClientError::Shutdown))
}
}
-impl<H: EventHandler + 'static> Drop for Client<H> {
+impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
fn drop(&mut self) { self.close_handle().close(); }
}
@@ -831,10 +836,10 @@ struct BootInfo {
}
#[cfg(feature = "framework")]
-struct MonitorInfo<H: EventHandler + 'static> {
+struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
- framework: Arc<sync::Mutex<Option<Box<Framework>>>>,
+ framework: Arc<sync::Mutex<Option<Box<Framework + Send>>>>,
gateway_url: Arc<sync::Mutex<String>>,
shard: Arc<Mutex<Shard>>,
shard_info: [u64; 2],
@@ -842,7 +847,7 @@ struct MonitorInfo<H: EventHandler + 'static> {
}
#[cfg(not(feature = "framework"))]
-struct MonitorInfo<H: EventHandler + 'static> {
+struct MonitorInfo<H: EventHandler + Send + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
gateway_url: Arc<sync::Mutex<String>>,
@@ -894,7 +899,7 @@ fn boot_shard(info: &BootInfo) -> Result<Shard> {
Err(Error::Client(ClientError::ShardBootFailure))
}
-fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>) {
+fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) {
handle_shard(&mut info);
let mut handle_still = HANDLE_STILL.load(Ordering::Relaxed);
@@ -948,7 +953,7 @@ fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>) {
}
}
-fn handle_shard<H: EventHandler + 'static>(info: &mut MonitorInfo<H>) {
+fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) {
let mut core = Core::new().unwrap();
let handle = core.handle();