aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAdelyn Breedlove <[email protected]>2019-03-04 20:26:39 -0700
committerAdelyn Breedlove <[email protected]>2019-03-04 20:26:39 -0700
commita697f234233e97cacbda619dcdf1d7c86528d816 (patch)
tree9a8e5336eae7bb2989e4f77ea0d1b31d1c833a20 /lib
parentAdd basic time back to RL handler (diff)
downloaddisml-a697f234233e97cacbda619dcdf1d7c86528d816.tar.xz
disml-a697f234233e97cacbda619dcdf1d7c86528d816.zip
Improve shard shutdown/restart and handle heartbeat acks
Diffstat (limited to 'lib')
-rw-r--r--lib/gateway/sharder.ml29
-rw-r--r--lib/gateway/sharder.mli1
2 files changed, 21 insertions, 9 deletions
diff --git a/lib/gateway/sharder.ml b/lib/gateway/sharder.ml
index d29f3c0..4de01d1 100644
--- a/lib/gateway/sharder.ml
+++ b/lib/gateway/sharder.ml
@@ -19,7 +19,7 @@ let decompress src =
Zlib_inflate.bytes in_buf out_buf
(fun dst ->
let len = min 0xFFFF (src_len - !pos) in
- Caml.Bytes.blit_string src !pos dst 0 len;
+ Bytes.blit_string src !pos dst 0 len;
pos := !pos + len;
len)
(fun obuf len ->
@@ -34,6 +34,7 @@ module Shard = struct
{ compress: bool
; hb_interval: int Lwt.t * int Lwt.u
; hb_stopper: Lwt_engine.event option
+ ; hb_acked: bool
; id: int
; large_threshold: int
; ready: unit Lwt.t * unit Lwt.u
@@ -82,8 +83,12 @@ module Shard = struct
match shard.seq with
| 0 -> Lwt.return shard
| i ->
+ if not shard.hb_acked then
+ shard.send (Frame.close 1001) >|= fun () -> shard
+ else
Logs_lwt.info (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
- push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
+ push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard >|= fun shard ->
+ { shard with hb_acked = false }
let dispatch ~payload shard =
let module J = Yojson.Safe.Util in
@@ -179,7 +184,7 @@ module Shard = struct
end
| RECONNECT -> initialize shard
| HELLO -> initialize ~data:(J.member "d" f) shard
- | HEARTBEAT_ACK -> Lwt.return shard
+ | HEARTBEAT_ACK -> Lwt.return { shard with hb_acked = true }
| opcode ->
Logs_lwt.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode)) >|= fun () ->
shard
@@ -198,6 +203,7 @@ module Shard = struct
{ compress
; hb_interval = Lwt.wait ()
; hb_stopper = None
+ ; hb_acked = true
; id = fst shards
; large_threshold
; ready = Lwt.wait ()
@@ -210,13 +216,18 @@ module Shard = struct
}
let shutdown ?(clean=false) ?(restart=true) t =
- let _ = clean in
t.can_resume <- restart;
- Lwt.wakeup_later (snd t.stop) ();
- Logs_lwt.info (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
- t.state.send (Frame.close 1001) >|= fun () ->
- Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev)
- |> ignore
+ if clean then
+ let re = if restart then "restart" else "shutdown" in
+ Lwt.wakeup_later (snd t.stop) ();
+ Logs_lwt.info (fun m -> m "Performing clean %s. Shard [%d, %d]" re t.state.id t.state.shard_count) >>= fun () ->
+ t.state.send (Frame.close 1000) >|= fun () ->
+ Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev) |> ignore
+ else
+ let re = if restart then "restarting..." else "shutting down." in
+ Logs_lwt.info (fun m -> m "Shard closed unexpectedly, %s Shard [%d, %d]" re t.state.id t.state.shard_count) >>= fun () ->
+ t.state.send (Frame.close 1000) >|= fun () ->
+ Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev) |> ignore
end
type t = { shards: (Shard.shard Shard.t) list }
diff --git a/lib/gateway/sharder.mli b/lib/gateway/sharder.mli
index 57b5bc3..3f36d29 100644
--- a/lib/gateway/sharder.mli
+++ b/lib/gateway/sharder.mli
@@ -12,6 +12,7 @@ module Shard : sig
{ compress: bool (** Whether to compress payloads. *)
; hb_interval: int Lwt.t * int Lwt.u (** Time between heartbeats. Not known until HELLO is received. *)
; hb_stopper: Lwt_engine.event option (** Used to cancel heartbeat sequencer *)
+ ; hb_acked: bool (** Whether the last heartbeat was acked. Missing an ack will reconnect the shard. *)
; id: int (** ID of the current shard. Must be less than shard_count. *)
; large_threshold: int (** Minimum number of members needed for a guild to be considered large. *)
; ready: unit Lwt.t * unit Lwt.u (** A simple promise indicating if the shard has received READY. *)