diff options
| author | Adelyn Breelove <[email protected]> | 2019-01-21 12:32:37 -0700 |
|---|---|---|
| committer | Adelyn Breelove <[email protected]> | 2019-01-21 12:32:37 -0700 |
| commit | 5b728870c2b1475e4d9a30f43400c2ee5653c8d4 (patch) | |
| tree | 6777234202c71ee409f57e0fd1f9abd2c74a3214 /lib | |
| parent | More signature improvements (diff) | |
| download | disml-5b728870c2b1475e4d9a30f43400c2ee5653c8d4.tar.xz disml-5b728870c2b1475e4d9a30f43400c2ee5653c8d4.zip | |
Fix sharding logic
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sharder.ml | 64 |
1 files changed, 20 insertions, 44 deletions
diff --git a/lib/sharder.ml b/lib/sharder.ml index f03c7dd..afa8965 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -7,7 +7,7 @@ exception Failure_to_Establish_Heartbeat module Shard = struct type shard = { - hb: unit Ivar.t option; + hb_interval: Time.Span.t Ivar.t; seq: int; session: string option; pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; @@ -20,7 +20,8 @@ module Shard = struct mutable state: 'a; } - let identify_lock = Mutex.create () + let identify_lock = Mvar.create () + let _ = Mvar.set identify_lock () let parse (frame:[`Ok of Frame.t | `Eof]) = match frame with @@ -33,7 +34,6 @@ module Shard = struct | `Eof -> None let push_frame ?payload ~ev shard = - Logs.debug (fun m -> m "Pushing frame with Opcode: %s" (Opcode.to_string ev)); let content = match payload with | None -> "" | Some p -> @@ -48,6 +48,7 @@ module Shard = struct shard let heartbeat shard = + Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq)); let payload = match shard.seq with | 0 -> `Null | i -> `Int i @@ -61,6 +62,8 @@ module Shard = struct let data = J.member "d" payload in let session = if t = "READY" then begin Ivar.fill_if_empty shard.ready (); + Clock.after (Core.Time.Span.create ~sec:5 ()) + >>> (fun _ -> Mvar.put identify_lock () >>> ignore); J.(member "session_id" data |> to_string_option) end else None in Event.handle_event ~ev:t data; @@ -107,34 +110,15 @@ module Shard = struct let initialize ?data shard = let module J = Yojson.Safe.Util in - let hb = match shard.hb with - | None -> begin - match data with - | Some data -> - let hb_interval = J.(member "heartbeat_interval" data |> to_int) in - let stop_hb = Ivar.create () in - let stopper i = - Ivar.read stop_hb - >>> fun () -> - Ivar.fill_if_empty i () - in - let stop = Deferred.create stopper in - Clock.every' - ~continue_on_error:true - ~stop - (Core.Time.Span.create ~ms:hb_interval ()) - (fun () -> heartbeat shard >>= fun _ -> return ()); - stop_hb - | None -> raise Failure_to_Establish_Heartbeat - end - | Some s -> s + let _ = match data with + | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ()) + | None -> raise Failure_to_Establish_Heartbeat in - let shard = { shard with hb = Some hb; } in - let (cur, max) = shard.id in - let shards = [`Int cur; `Int max] in + let shards = [`Int (fst shard.id); `Int (snd shard.id)] in match shard.session with | None -> begin - Mutex.lock identify_lock; + Mvar.take identify_lock >>= fun () -> + Logs.debug (fun m -> m "Identifying shard [%d, %d]" (fst shard.id) (snd shard.id)); let payload = `Assoc [ ("token", `String !Client_options.token); ("properties", `Assoc [ @@ -147,11 +131,7 @@ module Shard = struct ("shard", `List shards); ] in push_frame ~payload ~ev:IDENTIFY shard - >>| fun s -> begin - Clock.after (Core.Time.Span.create ~sec:5 ()) - >>> (fun _ -> Mutex.unlock identify_lock); - s - end + >>| fun s -> s end | Some s -> let payload = `Assoc [ @@ -163,13 +143,12 @@ module Shard = struct let handle_frame ~f shard = let module J = Yojson.Safe.Util in - let op = J.(member "op" f |> to_int) - |> Opcode.from_int - in + let op = J.(member "op" f |> to_int) |> Opcode.from_int in match op with | DISPATCH -> dispatch ~payload:f shard | HEARTBEAT -> heartbeat shard | INVALID_SESSION -> begin + Logs.err (fun m -> m "Received OP 9 on Shard [%d, %d]: %s" (fst shard.id) (snd shard.id) (Yojson.Safe.pretty_to_string f)); if J.(member "d" f |> to_bool) then initialize shard else begin @@ -180,7 +159,7 @@ module Shard = struct | HELLO -> initialize ~data:(J.member "d" f) shard | HEARTBEAT_ACK -> return shard | opcode -> - print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; + Logs.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode)); return shard let rec make_client @@ -242,7 +221,7 @@ module Shard = struct { pipe = (read, write); ready = Ivar.create (); - hb = None; + hb_interval = Ivar.create (); seq = 0; id = shards; session = None; @@ -267,10 +246,6 @@ module Shard = struct let recreate shard = print_endline "Reconnecting..."; - (match shard.hb with - | Some hb -> Ivar.fill_if_empty hb () - | None -> () - ); create ~url:(shard.url) ~shards:(shard.id) () end @@ -298,11 +273,11 @@ let start ?count () = (match Shard.parse frame with | Some f -> begin Shard.handle_frame ~f t.state - >>| fun s -> (t.state <- s; t) + >>| fun s -> t.state <- s; t end | None -> begin Shard.recreate t.state - >>| fun s -> (t.state <- s; t) + >>| fun s -> t.state <- s; t end) >>= fun t -> ev_loop t @@ -314,6 +289,7 @@ let start ?count () = Shard.create ~url ~shards:(id, total) () >>= fun shard -> let t = Shard.{ state = shard; } in + let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in ev_loop t >>> ignore; gen_shards (id+1, total) (t :: a) in |