1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex as ParkingLotMutex;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::super::super::EventHandler;
use super::{
ShardId,
ShardManagerMessage,
ShardQueuerMessage,
ShardRunner,
ShardRunnerInfo,
};
use threadpool::ThreadPool;
use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
/// A shard queuer instance _should_ be run in its own thread, due to the
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
pub data: Arc<ParkingLotMutex<ShareMap>>,
pub event_handler: Arc<H>,
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
pub last_start: Option<Instant>,
pub manager_tx: Sender<ShardManagerMessage>,
pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
pub rx: Receiver<ShardQueuerMessage>,
pub threadpool: ThreadPool,
pub token: Arc<Mutex<String>>,
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
pub fn run(&mut self) {
while let Ok(msg) = self.rx.recv() {
match msg {
ShardQueuerMessage::Shutdown => break,
ShardQueuerMessage::Start(shard_id, shard_total) => {
self.check_last_start();
if let Err(why) = self.start(shard_id, shard_total) {
warn!("Err starting shard {}: {:?}", shard_id, why);
}
self.last_start = Some(Instant::now());
},
}
}
}
fn check_last_start(&mut self) {
let instant = match self.last_start {
Some(instant) => instant,
None => return,
};
// We must wait 5 seconds between IDENTIFYs to avoid session
// invalidations.
let duration = Duration::from_secs(5);
let elapsed = instant.elapsed();
if elapsed >= duration {
return;
}
let to_sleep = duration - elapsed;
thread::sleep(to_sleep);
}
fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
let locked = Arc::new(ParkingLotMutex::new(shard));
let mut runner = feature_framework! {{
ShardRunner::new(
locked.clone(),
self.manager_tx.clone(),
self.framework.clone(),
self.data.clone(),
self.event_handler.clone(),
self.threadpool.clone(),
)
} else {
ShardRunner::new(
locked.clone(),
self.manager_tx.clone(),
self.data.clone(),
self.event_handler.clone(),
self.threadpool.clone(),
)
}};
let runner_info = ShardRunnerInfo {
runner_tx: runner.runner_tx(),
shard: locked,
};
thread::spawn(move || {
let _ = runner.run();
});
self.runners.lock().insert(shard_id, runner_info);
Ok(())
}
}
|