aboutsummaryrefslogtreecommitdiff
path: root/src/client/bridge/gateway/shard_queuer.rs
blob: ea49de424e9c731ad9ccea5a0573066882a5f21d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex as ParkingLotMutex;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::super::super::EventHandler;
use super::{
    ShardId,
    ShardManagerMessage,
    ShardQueuerMessage,
    ShardRunner,
    ShardRunnerInfo,
};
use threadpool::ThreadPool;
use typemap::ShareMap;

#[cfg(feature = "framework")]
use framework::Framework;

/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
/// A shard queuer instance _should_ be run in its own thread, due to the
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
    pub data: Arc<ParkingLotMutex<ShareMap>>,
    pub event_handler: Arc<H>,
    #[cfg(feature = "framework")]
    pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
    pub last_start: Option<Instant>,
    pub manager_tx: Sender<ShardManagerMessage>,
    pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
    pub rx: Receiver<ShardQueuerMessage>,
    pub threadpool: ThreadPool,
    pub token: Arc<Mutex<String>>,
    pub ws_url: Arc<Mutex<String>>,
}

impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
    pub fn run(&mut self) {
        while let Ok(msg) = self.rx.recv() {
            match msg {
                ShardQueuerMessage::Shutdown => break,
                ShardQueuerMessage::Start(shard_id, shard_total) => {
                    self.check_last_start();

                    if let Err(why) = self.start(shard_id, shard_total) {
                        warn!("Err starting shard {}: {:?}", shard_id, why);
                    }

                    self.last_start = Some(Instant::now());
                },
            }
        }
    }

    fn check_last_start(&mut self) {
        let instant = match self.last_start {
            Some(instant) => instant,
            None => return,
        };

        // We must wait 5 seconds between IDENTIFYs to avoid session
        // invalidations.
        let duration = Duration::from_secs(5);
        let elapsed = instant.elapsed();

        if elapsed >= duration {
            return;
        }

        let to_sleep = duration - elapsed;

        thread::sleep(to_sleep);
    }

    fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
        let shard_info = [shard_id.0, shard_total.0];
        let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
        let locked = Arc::new(ParkingLotMutex::new(shard));

        let mut runner = feature_framework! {{
            ShardRunner::new(
                locked.clone(),
                self.manager_tx.clone(),
                self.framework.clone(),
                self.data.clone(),
                self.event_handler.clone(),
                self.threadpool.clone(),
            )
        } else {
            ShardRunner::new(
                locked.clone(),
                self.manager_tx.clone(),
                self.data.clone(),
                self.event_handler.clone(),
                self.threadpool.clone(),
            )
        }};

        let runner_info = ShardRunnerInfo {
            runner_tx: runner.runner_tx(),
            shard: locked,
        };

        thread::spawn(move || {
            let _ = runner.run();
        });

        self.runners.lock().insert(shard_id, runner_info);

        Ok(())
    }
}