aboutsummaryrefslogtreecommitdiff
path: root/lib/gateway
diff options
context:
space:
mode:
authorAdelyn Breedlove <[email protected]>2019-03-03 11:51:39 -0700
committerAdelyn Breedlove <[email protected]>2019-03-03 11:51:39 -0700
commit7114103dbdc74118ffc519db4859da3f5eef1ba4 (patch)
treea544f485603c8b0988cf402fcbccabe8b1fda1f8 /lib/gateway
parentMaybe it wasn't opam (diff)
downloaddisml-7114103dbdc74118ffc519db4859da3f5eef1ba4.tar.xz
disml-7114103dbdc74118ffc519db4859da3f5eef1ba4.zip
IT WORKS
Diffstat (limited to 'lib/gateway')
-rw-r--r--lib/gateway/sharder.ml46
-rw-r--r--lib/gateway/sharder.mli34
2 files changed, 42 insertions, 38 deletions
diff --git a/lib/gateway/sharder.ml b/lib/gateway/sharder.ml
index 9c98979..d29f3c0 100644
--- a/lib/gateway/sharder.ml
+++ b/lib/gateway/sharder.ml
@@ -33,11 +33,11 @@ module Shard = struct
type shard =
{ compress: bool
; hb_interval: int Lwt.t * int Lwt.u
- ; hb_stopper: unit Lwt.t * unit Lwt.u
+ ; hb_stopper: Lwt_engine.event option
; id: int
; large_threshold: int
; ready: unit Lwt.t * unit Lwt.u
- ; recv: Frame.t Lwt_stream.t
+ ; recv: (unit -> Frame.t Lwt.t)
; send: (Frame.t -> unit Lwt.t)
; seq: int
; session: string option
@@ -47,7 +47,7 @@ module Shard = struct
type 'a t =
{ mutable state: 'a
- ; mutable stopped: bool
+ ; mutable stop: unit Lwt.t * unit Lwt.u
; mutable can_resume: bool
}
@@ -82,7 +82,7 @@ module Shard = struct
match shard.seq with
| 0 -> Lwt.return shard
| i ->
- Logs_lwt.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
+ Logs_lwt.info (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
let dispatch ~payload shard =
@@ -92,9 +92,8 @@ module Shard = struct
let data = J.member "d" payload in
let session = if t = "READY" then begin
Lwt.wakeup_later (snd shard.ready) ();
- (* TODO figure out action after time in Lwt *)
- (* Clock.after (Core.Time.Span.create ~sec:5 ())
- >>> (fun _ -> Lwt_mvar.put identify_lock () >>> ignore); *)
+ let _ = Lwt_engine.on_timer 5.0 false
+ (fun _ -> Lwt.async (fun () -> Lwt_mvar.put identify_lock ())) in
J.(member "session_id" data |> to_string_option)
end else shard.session in
Event.handle_event ~ev:t data >|= fun () ->
@@ -143,7 +142,7 @@ module Shard = struct
match shard.session with
| None -> begin
Lwt_mvar.take identify_lock >>= fun () ->
- Logs_lwt.debug (fun m -> m "Identifying shard [%d, %d]" shard.id shard.shard_count) >>= fun () ->
+ Logs_lwt.info (fun m -> m "Identifying shard [%d, %d]" shard.id shard.shard_count) >>= fun () ->
let payload = `Assoc
[ "token", `String !Client_options.token
; "properties", `Assoc
@@ -196,10 +195,9 @@ module Shard = struct
let uri = Uri.(with_query' (of_string url) ["encoding", "json"; "v", "6"]) in
let extra_headers = Http.Base.process_request_headers () in
make_client ~extra_headers uri >|= fun (recv, send) ->
- let recv = mk_frame_stream recv in
{ compress
; hb_interval = Lwt.wait ()
- ; hb_stopper = Lwt.wait ()
+ ; hb_stopper = None
; id = fst shards
; large_threshold
; ready = Lwt.wait ()
@@ -214,10 +212,11 @@ module Shard = struct
let shutdown ?(clean=false) ?(restart=true) t =
let _ = clean in
t.can_resume <- restart;
- t.stopped <- true;
- Logs_lwt.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
+ Lwt.wakeup_later (snd t.stop) ();
+ Logs_lwt.info (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
t.state.send (Frame.close 1001) >|= fun () ->
- Lwt.wakeup_later (snd t.state.hb_stopper) ()
+ Option.map t.state.hb_stopper ~f:(fun ev -> Lwt_engine.stop_event ev)
+ |> ignore
end
type t = { shards: (Shard.shard Shard.t) list }
@@ -238,7 +237,7 @@ let start ?count ?compress ?large_threshold () =
Logs.info (fun m -> m "Connecting to %s" url);
let rec ev_loop (t:Shard.shard Shard.t) =
let step (t:Shard.shard Shard.t) =
- Lwt_stream.get t.state.recv >>= function None -> Lwt.return t | Some frame ->
+ t.state.recv () >>= function frame ->
begin match Shard.parse ~compress:t.state.compress frame with
| `Ok f ->
Shard.handle_frame ~f t.state >|= fun s ->
@@ -254,7 +253,7 @@ let start ?count ?compress ?large_threshold () =
Shard.shutdown t
end >|= fun () -> t
in
- if t.stopped then Lwt.return_unit
+ if not (Lwt.is_sleeping (fst t.stop)) then Lwt.return_unit
else step t >>= ev_loop
in
let rec gen_shards l a =
@@ -264,11 +263,11 @@ let start ?count ?compress ?large_threshold () =
let wrap ?(reuse:Shard.shard Shard.t option) state = match reuse with
| Some t ->
t.state <- state;
- t.stopped <- false;
+ t.stop <- Lwt.wait ();
Lwt.return t
| None ->
Lwt.return Shard.{ state
- ; stopped = false
+ ; stop = Lwt.wait ()
; can_resume = true
}
in
@@ -277,12 +276,15 @@ let start ?count ?compress ?large_threshold () =
in
let rec bind (t:Shard.shard Shard.t) =
Lwt.async (fun () ->
- fst t.state.hb_interval >|= fun _hb -> ()
- (* TODO figure out clocks in Lwt *)
+ fst t.state.hb_interval >>= fun hb ->
+ Logs_lwt.info (fun m -> m "Starting heartbeats") >|= fun () ->
+ let hb = float_of_int hb /. 1000.0 in
+ let ev = Lwt_engine.on_timer hb true
+ (fun _ -> Lwt.async (fun () -> Shard.heartbeat t.state)) in
+ t.state <- { t.state with hb_stopper = Some ev }
);
- Lwt.async (fun () -> ev_loop t >>= fun () -> Logs_lwt.debug (fun m -> m "Event loop stopped."));
- (* TODO figure out how to bind to closed websocket *)
- Lwt.async (fun () -> Lwt_stream.closed t.state.recv >>= fun () ->
+ Lwt.async (fun () -> ev_loop t >>= fun () -> Logs_lwt.info (fun m -> m "Event loop stopped."));
+ Lwt.async (fun () -> fst t.stop >>= fun () ->
if t.can_resume then create () >>= wrap ~reuse:t >>= bind >|= ignore
else Lwt.return_unit);
Lwt.return t
diff --git a/lib/gateway/sharder.mli b/lib/gateway/sharder.mli
index 20186f8..57b5bc3 100644
--- a/lib/gateway/sharder.mli
+++ b/lib/gateway/sharder.mli
@@ -5,27 +5,17 @@ open Websocket
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
-type t
-
-(** Start the Sharder. This is called by {!Client.start}. *)
-val start :
- ?count:int ->
- ?compress:bool ->
- ?large_threshold:int ->
- unit ->
- t Lwt.t
-
(** Module representing a single shard. *)
module Shard : sig
(** Representation of the state of a shard. *)
type shard =
{ compress: bool (** Whether to compress payloads. *)
; hb_interval: int Lwt.t * int Lwt.u (** Time between heartbeats. Not known until HELLO is received. *)
- ; hb_stopper: unit Lwt.t * unit Lwt.u (** Stops the heartbeat sequencing when filled *)
+ ; hb_stopper: Lwt_engine.event option (** Used to cancel heartbeat sequencer *)
; id: int (** ID of the current shard. Must be less than shard_count. *)
; large_threshold: int (** Minimum number of members needed for a guild to be considered large. *)
; ready: unit Lwt.t * unit Lwt.u (** A simple promise indicating if the shard has received READY. *)
- ; recv: Frame.t Lwt_stream.t (** Receiver function for the websocket. *)
+ ; recv: (unit -> Frame.t Lwt.t) (** Receiver function for the websocket. *)
; send: (Frame.t -> unit Lwt.t) (** Sender function for the websocket. *)
; seq: int (** Current sequence number for the session. *)
; session: string option (** Current session ID *)
@@ -34,10 +24,10 @@ module Shard : sig
}
(** Wrapper around an internal state, used to wrap {!shard}. *)
- type 'a t = {
- mutable state: 'a;
- mutable stopped: bool;
- mutable can_resume: bool;
+ type 'a t =
+ { mutable state: 'a
+ ; mutable stop: unit Lwt.t * unit Lwt.u
+ ; mutable can_resume: bool
}
(** Send a heartbeat to Discord. This is handled automatically. *)
@@ -79,6 +69,18 @@ module Shard : sig
unit Lwt.t
end
+type t =
+{ shards: Shard.shard Shard.t list
+}
+
+(** Start the Sharder. This is called by {!Client.start}. *)
+val start :
+ ?count:int ->
+ ?compress:bool ->
+ ?large_threshold:int ->
+ unit ->
+ t Lwt.t
+
(** Calls {!Shard.set_status} for each shard registered with the sharder. *)
val set_status :
?status:string ->