1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex;
use std::{
collections::{HashMap, VecDeque},
sync::{
mpsc::{
Receiver,
RecvTimeoutError,
Sender},
Arc
},
thread,
time::{Duration, Instant}
};
use super::super::super::EventHandler;
use super::{
ShardId,
ShardManagerMessage,
ShardQueuerMessage,
ShardRunner,
ShardRunnerInfo,
ShardRunnerOptions,
};
use threadpool::ThreadPool;
use typemap::ShareMap;
use ::gateway::ConnectionStage;
#[cfg(feature = "voice")]
use client::bridge::voice::ClientVoiceManager;
#[cfg(feature = "framework")]
use framework::Framework;
const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5;
/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
/// A shard queuer instance _should_ be run in its own thread, due to the
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
/// A copy of [`Client::data`] to be given to runners for contextual
/// dispatching.
///
/// [`Client::data`]: ../../struct.Client.html#structfield.data
pub data: Arc<Mutex<ShareMap>>,
/// A reference to an `EventHandler`, such as the one given to the
/// [`Client`].
///
/// [`Client`]: ../../struct.Client.html
pub event_handler: Arc<H>,
/// A copy of the framework
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
/// The instant that a shard was last started.
///
/// This is used to determine how long to wait between shard IDENTIFYs.
pub last_start: Option<Instant>,
/// A copy of the sender channel to communicate with the
/// [`ShardManagerMonitor`].
///
/// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
/// The shards that are queued for booting.
///
/// This will typically be filled with previously failed boots.
pub queue: VecDeque<(u64, u64)>,
/// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// A receiver channel for the shard queuer to be told to start shards.
pub rx: Receiver<ShardQueuerMessage>,
/// A copy of a threadpool to give shard runners.
///
/// For example, when using the [`Client`], this will be a copy of
/// [`Client::threadpool`].
///
/// [`Client`]: ../../struct.Client.html
/// [`Client::threadpool`]: ../../struct.Client.html#structfield.threadpool
pub threadpool: ThreadPool,
/// A copy of the token to connect with.
pub token: Arc<Mutex<String>>,
/// A copy of the client's voice manager.
#[cfg(feature = "voice")]
pub voice_manager: Arc<Mutex<ClientVoiceManager>>,
/// A copy of the URI to use to connect to the gateway.
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
/// Begins the shard queuer loop.
///
/// This will loop over the internal [`rx`] for [`ShardQueuerMessage`]s,
/// blocking for messages on what to do.
///
/// If a [`ShardQueuerMessage::Start`] is received, this will:
///
/// 1. Check how much time has passed since the last shard was started
/// 2. If the amount of time is less than the ratelimit, it will sleep until
/// that time has passed
/// 3. Start the shard by ID
///
/// If a [`ShardQueuerMessage::Shutdown`] is received, this will return and
/// the loop will be over.
///
/// **Note**: This should be run in its own thread due to the blocking
/// nature of the loop.
///
/// [`ShardQueuerMessage`]: enum.ShardQueuerMessage.html
/// [`ShardQueuerMessage::Shutdown`]: enum.ShardQueuerMessage.html#variant.Shutdown
/// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
/// [`rx`]: #structfield.rx
pub fn run(&mut self) {
// The duration to timeout from reads over the Rx channel. This can be
// done in a loop, and if the read times out then a shard can be
// started if one is presently waiting in the queue.
let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
loop {
match self.rx.recv_timeout(wait_duration) {
Ok(ShardQueuerMessage::Shutdown) => break,
Ok(ShardQueuerMessage::Start(id, total)) => {
self.checked_start(id.0, total.0);
},
Err(RecvTimeoutError::Disconnected) => {
// If the sender half has disconnected then the queuer's
// lifespan has passed and can shutdown.
break;
},
Err(RecvTimeoutError::Timeout) => {
if let Some((id, total)) = self.queue.pop_front() {
self.checked_start(id, total);
}
}
}
}
}
fn check_last_start(&mut self) {
let instant = match self.last_start {
Some(instant) => instant,
None => return,
};
// We must wait 5 seconds between IDENTIFYs to avoid session
// invalidations.
let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
let elapsed = instant.elapsed();
if elapsed >= duration {
return;
}
let to_sleep = duration - elapsed;
thread::sleep(to_sleep);
}
fn checked_start(&mut self, id: u64, total: u64) {
self.check_last_start();
if let Err(why) = self.start(id, total) {
warn!("Err starting shard {}: {:?}", id, why);
info!("Re-queueing start of shard {}", id);
self.queue.push_back((id, total));
}
self.last_start = Some(Instant::now());
}
fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> {
let shard_info = [shard_id, shard_total];
let shard = Shard::new(
Arc::clone(&self.ws_url),
Arc::clone(&self.token),
shard_info,
)?;
let mut runner = ShardRunner::new(ShardRunnerOptions {
data: Arc::clone(&self.data),
event_handler: Arc::clone(&self.event_handler),
#[cfg(feature = "framework")]
framework: Arc::clone(&self.framework),
manager_tx: self.manager_tx.clone(),
threadpool: self.threadpool.clone(),
#[cfg(feature = "voice")]
voice_manager: Arc::clone(&self.voice_manager),
shard,
});
let runner_info = ShardRunnerInfo {
latency: None,
runner_tx: runner.runner_tx(),
stage: ConnectionStage::Disconnected,
};
thread::spawn(move || {
let _ = runner.run();
});
self.runners.lock().insert(ShardId(shard_id), runner_info);
Ok(())
}
}
|