aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_manager_monitor.rs
blob: 4989ef29be4609d42fa4fa2cbf17e4319362322b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use parking_lot::Mutex;
use std::sync::{
    mpsc::Receiver,
    Arc
};
use super::{ShardManager, ShardManagerMessage};

/// The shard manager monitor does what it says on the tin -- it monitors the
/// shard manager and performs actions on it as received.
///
/// The monitor is essentially responsible for running in its own thread and
/// receiving [`ShardManagerMessage`]s, such as whether to shutdown a shard or
/// shutdown everything entirely.
///
/// [`ShardManagerMessage`]: enum.ShardManagerMessage.html
#[derive(Debug)]
pub struct ShardManagerMonitor {
    /// An clone of the Arc to the manager itself.
    pub manager: Arc<Mutex<ShardManager>>,
    /// The mpsc Receiver channel to receive shard manager messages over.
    pub rx: Receiver<ShardManagerMessage>,
}

impl ShardManagerMonitor {
    /// "Runs" the monitor, waiting for messages over the Receiver.
    ///
    /// This should be called in its own thread due to its blocking, looped
    /// nature.
    ///
    /// This will continue running until either:
    ///
    /// - a [`ShardManagerMessage::ShutdownAll`] has been received
    /// - an error is returned while receiving a message from the
    /// channel (probably indicating that the shard manager should stop anyway)
    ///
    /// [`ShardManagerMessage::ShutdownAll`]: enum.ShardManagerMessage.html#variant.ShutdownAll
    pub fn run(&mut self) {
        debug!("Starting shard manager worker");

        while let Ok(value) = self.rx.recv() {
            match value {
                ShardManagerMessage::Restart(shard_id) => {
                    self.manager.lock().restart(shard_id);
                },
                ShardManagerMessage::ShardUpdate { id, latency, stage } => {
                    let manager = self.manager.lock();
                    let mut runners = manager.runners.lock();

                    if let Some(runner) = runners.get_mut(&id) {
                        runner.latency = latency;
                        runner.stage = stage;
                    }
                }
                ShardManagerMessage::Shutdown(shard_id) => {
                    self.manager.lock().shutdown(shard_id);
                },
                ShardManagerMessage::ShutdownAll => {
                    self.manager.lock().shutdown_all();

                    break;
                },
                ShardManagerMessage::ShutdownInitiated => break,
            }
        }
    }
}