aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoracdenisSK <[email protected]>2017-07-15 01:49:59 +0200
committeracdenisSK <[email protected]>2017-07-15 01:49:59 +0200
commit4ce2ddfb8dd04eefd769a1d17b590854f66af17c (patch)
treee0f997ad8c025f3e304e8434e667a909d904dfc1 /src
parentAdd a way to close all shards explictly (diff)
downloadserenity-4ce2ddfb8dd04eefd769a1d17b590854f66af17c.tar.xz
serenity-4ce2ddfb8dd04eefd769a1d17b590854f66af17c.zip
Remove more threads with futures
Diffstat (limited to 'src')
-rw-r--r--src/client/dispatch.rs12
-rw-r--r--src/client/mod.rs45
2 files changed, 28 insertions, 29 deletions
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index f2850ec..68190d1 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -93,7 +93,7 @@ macro_rules! impl_reaction_events {
}
#[cfg(feature="framework")]
-pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
+pub fn dispatch<H: EventHandler + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
framework: &Arc<sync::Mutex<Framework>>,
data: &Arc<Mutex<ShareMap>>,
@@ -126,7 +126,7 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
}
#[cfg(not(feature="framework"))]
-pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
+pub fn dispatch<H: EventHandler + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
data: &Arc<Mutex<ShareMap>>,
event_handler: &Arc<H>,
@@ -152,7 +152,7 @@ pub fn dispatch<H: EventHandler + Send + Sync + 'static>(event: Event,
}
#[allow(unused_mut)]
-fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context,
+fn dispatch_message<H: EventHandler + 'static>(context: Context,
mut message: Message,
event_handler: &Arc<H>,
tokio_handle: &Handle) {
@@ -169,7 +169,7 @@ fn dispatch_message<H: EventHandler + Send + Sync + 'static>(context: Context,
});
}
-fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Context,
+fn dispatch_reaction_add<H: EventHandler + 'static>(context: Context,
reaction: Reaction,
event_handler: &Arc<H>,
tokio_handle: &Handle) {
@@ -180,7 +180,7 @@ fn dispatch_reaction_add<H: EventHandler + Send + Sync + 'static>(context: Conte
});
}
-fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Context,
+fn dispatch_reaction_remove<H: EventHandler + 'static>(context: Context,
reaction: Reaction,
event_handler: &Arc<H>,
tokio_handle: &Handle) {
@@ -192,7 +192,7 @@ fn dispatch_reaction_remove<H: EventHandler + Send + Sync + 'static>(context: Co
}
#[allow(cyclomatic_complexity, unused_assignments, unused_mut)]
-fn handle_event<H: EventHandler + Send + Sync + 'static>(event: Event,
+fn handle_event<H: EventHandler + 'static>(event: Event,
conn: &Arc<Mutex<Shard>>,
data: &Arc<Mutex<ShareMap>>,
event_handler: &Arc<H>,
diff --git a/src/client/mod.rs b/src/client/mod.rs
index e59c87a..a0e6ffb 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -39,9 +39,10 @@ use self::dispatch::dispatch;
use std::sync::{self, Arc};
use std::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
use parking_lot::Mutex;
-use tokio_core::reactor::Core;
+use tokio_core::reactor::{Core, Handle};
+use futures;
use std::time::Duration;
-use std::{mem, thread};
+use std::mem;
use super::gateway::Shard;
use typemap::ShareMap;
use websocket::result::WebSocketError;
@@ -97,7 +98,7 @@ static HANDLE_STILL: AtomicBool = ATOMIC_BOOL_INIT;
/// [`on_message`]: #method.on_message
/// [`Event::MessageCreate`]: ../model/event/enum.Event.html#variant.MessageCreate
/// [sharding docs]: gateway/index.html#sharding
-pub struct Client<H: EventHandler + Send + Sync + 'static> {
+pub struct Client<H: EventHandler + 'static> {
/// A ShareMap which requires types to be Send + Sync. This is a map that
/// can be safely shared across contexts.
///
@@ -186,7 +187,7 @@ pub struct Client<H: EventHandler + Send + Sync + 'static> {
token: Arc<sync::Mutex<String>>,
}
-impl<H: EventHandler + Send + Sync + 'static> Client<H> {
+impl<H: EventHandler + 'static> Client<H> {
/// Creates a Client for a bot user.
///
/// Discord has a requirement of prefixing bot tokens with `"Bot "`, which
@@ -614,6 +615,9 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
fn start_connection(&mut self, shard_data: [u64; 3], url: String)
-> Result<()> {
HANDLE_STILL.store(true, Ordering::Relaxed);
+
+ let mut core = Core::new().unwrap();
+
// Update the framework's current user if the feature is enabled.
//
// This also acts as a form of check to ensure the token is correct.
@@ -631,7 +635,7 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
let shards_index = shard_data[0];
let shards_total = shard_data[1] + 1;
- let mut threads = vec![];
+ let mut futures = vec![];
for shard_number in shards_index..shards_total {
let shard_info = [shard_number, shard_data[2]];
@@ -667,8 +671,10 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
}
}};
- threads.push(thread::spawn(move || {
- monitor_shard(monitor_info);
+ let handle = core.handle();
+ futures.push(futures::future::lazy(move || {
+ monitor_shard(monitor_info, handle);
+ futures::future::ok::<(), ()>(())
}));
},
Err(why) => warn!("Error starting shard {:?}: {:?}", shard_info, why),
@@ -677,19 +683,17 @@ impl<H: EventHandler + Send + Sync + 'static> Client<H> {
// Wait 5 seconds between shard boots.
//
// We need to wait at least 5 seconds between READYs.
- thread::sleep(Duration::from_secs(5));
+ core.turn(Some(Duration::from_secs(5)));
}
- for thread in threads {
- let _ = thread.join();
- }
+ core.run(futures::future::join_all(futures)).unwrap();
Err(Error::Client(ClientError::Shutdown))
}
}
-impl<H: EventHandler + Send + Sync + 'static> Drop for Client<H> {
+impl<H: EventHandler + 'static> Drop for Client<H> {
fn drop(&mut self) {
self.close();
}
@@ -702,7 +706,7 @@ struct BootInfo {
}
#[cfg(feature="framework")]
-struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
+struct MonitorInfo<H: EventHandler + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
framework: Arc<sync::Mutex<Framework>>,
@@ -713,7 +717,7 @@ struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
}
#[cfg(not(feature="framework"))]
-struct MonitorInfo<H: EventHandler + Send + Sync + 'static> {
+struct MonitorInfo<H: EventHandler + 'static> {
data: Arc<Mutex<ShareMap>>,
event_handler: Arc<H>,
gateway_url: Arc<sync::Mutex<String>>,
@@ -762,8 +766,8 @@ fn boot_shard(info: &BootInfo) -> Result<Shard> {
Err(Error::Client(ClientError::ShardBootFailure))
}
-fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<H>) {
- handle_shard(&mut info);
+fn monitor_shard<H: EventHandler + 'static>(mut info: MonitorInfo<H>, handle: Handle) {
+ handle_shard(&mut info, &handle);
loop {
let mut boot_successful = false;
@@ -788,7 +792,7 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<
}
if boot_successful {
- handle_shard(&mut info);
+ handle_shard(&mut info, &handle);
} else {
break;
}
@@ -799,11 +803,8 @@ fn monitor_shard<H: EventHandler + Send + Sync + 'static>(mut info: MonitorInfo<
error!("Completely failed to reboot shard");
}
-fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<H>) {
+fn handle_shard<H: EventHandler + 'static>(info: &mut MonitorInfo<H>, handle: &Handle) {
// This is currently all ducktape. Redo this.
- let mut core = Core::new().unwrap();
- let handle = core.handle();
-
let handle_still = HANDLE_STILL.load(Ordering::Relaxed);
while handle_still {
{
@@ -885,8 +886,6 @@ fn handle_shard<H: EventHandler + Send + Sync + 'static>(info: &mut MonitorInfo<
&info.event_handler,
&handle);
}}
-
- core.turn(None);
}
}