diff options
| author | Adelyn Breedlove <[email protected]> | 2019-03-04 20:26:39 -0700 |
|---|---|---|
| committer | Adelyn Breedlove <[email protected]> | 2019-03-04 20:26:39 -0700 |
| commit | a697f234233e97cacbda619dcdf1d7c86528d816 (patch) | |
| tree | 9a8e5336eae7bb2989e4f77ea0d1b31d1c833a20 /lib | |
| parent | Add basic time back to RL handler (diff) | |
| download | disml-a697f234233e97cacbda619dcdf1d7c86528d816.tar.xz disml-a697f234233e97cacbda619dcdf1d7c86528d816.zip | |
Improve shard shutdown/restart and handle heartbeat acks
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/gateway/sharder.ml | 29 | ||||
| -rw-r--r-- | lib/gateway/sharder.mli | 1 |
2 files changed, 21 insertions, 9 deletions
diff --git a/lib/gateway/sharder.ml b/lib/gateway/sharder.ml index d29f3c0..4de01d1 100644 --- a/lib/gateway/sharder.ml +++ b/lib/gateway/sharder.ml @@ -19,7 +19,7 @@ let decompress src = Zlib_inflate.bytes in_buf out_buf
(fun dst ->
let len = min 0xFFFF (src_len - !pos) in
- Caml.Bytes.blit_string src !pos dst 0 len;
+ Bytes.blit_string src !pos dst 0 len;
pos := !pos + len;
len)
(fun obuf len ->
@@ -34,6 +34,7 @@ module Shard = struct { compress: bool
; hb_interval: int Lwt.t * int Lwt.u
; hb_stopper: Lwt_engine.event option
+ ; hb_acked: bool
; id: int
; large_threshold: int
; ready: unit Lwt.t * unit Lwt.u
@@ -82,8 +83,12 @@ module Shard = struct match shard.seq with
| 0 -> Lwt.return shard
| i ->
+ if not shard.hb_acked then
+ shard.send (Frame.close 1001) >|= fun () -> shard
+ else
Logs_lwt.info (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
- push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
+ push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard >|= fun shard ->
+ { shard with hb_acked = false }
let dispatch ~payload shard =
let module J = Yojson.Safe.Util in
@@ -179,7 +184,7 @@ module Shard = struct end
| RECONNECT -> initialize shard
| HELLO -> initialize ~data:(J.member "d" f) shard
- | HEARTBEAT_ACK -> Lwt.return shard
+ | HEARTBEAT_ACK -> Lwt.return { shard with hb_acked = true }
| opcode ->
Logs_lwt.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode)) >|= fun () ->
shard
@@ -198,6 +203,7 @@ module Shard = struct { compress
; hb_interval = Lwt.wait ()
; hb_stopper = None
+ ; hb_acked = true
; id = fst shards
; large_threshold
; ready = Lwt.wait ()
@@ -210,13 +216,18 @@ module Shard = struct }
let shutdown ?(clean=false) ?(restart=true) t =
- let _ = clean in
t.can_resume <- restart;
- Lwt.wakeup_later (snd t.stop) ();
- Logs_lwt.info (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
- t.state.send (Frame.close 1001) >|= fun () ->
- Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev)
- |> ignore
+ if clean then
+ let re = if restart then "restart" else "shutdown" in
+ Lwt.wakeup_later (snd t.stop) ();
+ Logs_lwt.info (fun m -> m "Performing clean %s. Shard [%d, %d]" re t.state.id t.state.shard_count) >>= fun () ->
+ t.state.send (Frame.close 1000) >|= fun () ->
+ Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev) |> ignore
+ else
+ let re = if restart then "restarting..." else "shutting down." in
+ Logs_lwt.info (fun m -> m "Shard closed unexpectedly, %s Shard [%d, %d]" re t.state.id t.state.shard_count) >>= fun () ->
+ t.state.send (Frame.close 1000) >|= fun () ->
+ Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev) |> ignore
end
type t = { shards: (Shard.shard Shard.t) list }
diff --git a/lib/gateway/sharder.mli b/lib/gateway/sharder.mli index 57b5bc3..3f36d29 100644 --- a/lib/gateway/sharder.mli +++ b/lib/gateway/sharder.mli @@ -12,6 +12,7 @@ module Shard : sig { compress: bool (** Whether to compress payloads. *)
; hb_interval: int Lwt.t * int Lwt.u (** Time between heartbeats. Not known until HELLO is received. *)
; hb_stopper: Lwt_engine.event option (** Used to cancel heartbeat sequencer *)
+ ; hb_acked: bool (** Whether the last heartbeat was acked. Missing an ack will reconnect the shard. *)
; id: int (** ID of the current shard. Must be less than shard_count. *)
; large_threshold: int (** Minimum number of members needed for a guild to be considered large. *)
; ready: unit Lwt.t * unit Lwt.u (** A simple promise indicating if the shard has received READY. *)
|