1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
use parking_lot::Mutex;
use std::sync::{
mpsc::Receiver,
Arc
};
use super::{ShardManager, ShardManagerMessage};
/// The shard manager monitor does what it says on the tin -- it monitors the
/// shard manager and performs actions on it as received.
///
/// The monitor is essentially responsible for running in its own thread and
/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or
/// shutdown everything entirely.
///
/// [`ShardManagerMessage`]: enum.ShardManagerMessage.html
#[derive(Debug)]
pub struct ShardManagerMonitor {
/// An clone of the Arc to the manager itself.
pub manager: Arc<Mutex<ShardManager>>,
/// The mpsc Receiver channel to receive shard manager messages over.
pub rx: Receiver<ShardManagerMessage>,
}
impl ShardManagerMonitor {
/// "Runs" the monitor, waiting for messages over the Receiver.
///
/// This should be called in its own thread due to its blocking, looped
/// nature.
///
/// This will continue running until either:
///
/// - a [`ShardManagerMessage::ShutdownAll`] has been received
/// - an error is returned while receiving a message from the
/// channel (probably indicating that the shard manager should stop anyway)
///
/// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll
pub fn run(&mut self) {
debug!("Starting shard manager worker");
while let Ok(value) = self.rx.recv() {
match value {
ShardManagerMessage::Restart(shard_id) => {
self.manager.lock().restart(shard_id);
},
ShardManagerMessage::ShardUpdate { id, latency, stage } => {
let manager = self.manager.lock();
let mut runners = manager.runners.lock();
if let Some(runner) = runners.get_mut(&id) {
runner.latency = latency;
runner.stage = stage;
}
}
ShardManagerMessage::Shutdown(shard_id) => {
self.manager.lock().shutdown(shard_id);
},
ShardManagerMessage::ShutdownAll => {
self.manager.lock().shutdown_all();
break;
},
ShardManagerMessage::ShutdownInitiated => break,
}
}
}
}
|