aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAdelyn Breelove <[email protected]>2019-01-21 12:32:37 -0700
committerAdelyn Breelove <[email protected]>2019-01-21 12:32:37 -0700
commit5b728870c2b1475e4d9a30f43400c2ee5653c8d4 (patch)
tree6777234202c71ee409f57e0fd1f9abd2c74a3214 /lib
parentMore signature improvements (diff)
downloaddisml-5b728870c2b1475e4d9a30f43400c2ee5653c8d4.tar.xz
disml-5b728870c2b1475e4d9a30f43400c2ee5653c8d4.zip
Fix sharding logic
Diffstat (limited to 'lib')
-rw-r--r--lib/sharder.ml64
1 files changed, 20 insertions, 44 deletions
diff --git a/lib/sharder.ml b/lib/sharder.ml
index f03c7dd..afa8965 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -7,7 +7,7 @@ exception Failure_to_Establish_Heartbeat
module Shard = struct
type shard = {
- hb: unit Ivar.t option;
+ hb_interval: Time.Span.t Ivar.t;
seq: int;
session: string option;
pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t;
@@ -20,7 +20,8 @@ module Shard = struct
mutable state: 'a;
}
- let identify_lock = Mutex.create ()
+ let identify_lock = Mvar.create ()
+ let _ = Mvar.set identify_lock ()
let parse (frame:[`Ok of Frame.t | `Eof]) =
match frame with
@@ -33,7 +34,6 @@ module Shard = struct
| `Eof -> None
let push_frame ?payload ~ev shard =
- Logs.debug (fun m -> m "Pushing frame with Opcode: %s" (Opcode.to_string ev));
let content = match payload with
| None -> ""
| Some p ->
@@ -48,6 +48,7 @@ module Shard = struct
shard
let heartbeat shard =
+ Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
let payload = match shard.seq with
| 0 -> `Null
| i -> `Int i
@@ -61,6 +62,8 @@ module Shard = struct
let data = J.member "d" payload in
let session = if t = "READY" then begin
Ivar.fill_if_empty shard.ready ();
+ Clock.after (Core.Time.Span.create ~sec:5 ())
+ >>> (fun _ -> Mvar.put identify_lock () >>> ignore);
J.(member "session_id" data |> to_string_option)
end else None in
Event.handle_event ~ev:t data;
@@ -107,34 +110,15 @@ module Shard = struct
let initialize ?data shard =
let module J = Yojson.Safe.Util in
- let hb = match shard.hb with
- | None -> begin
- match data with
- | Some data ->
- let hb_interval = J.(member "heartbeat_interval" data |> to_int) in
- let stop_hb = Ivar.create () in
- let stopper i =
- Ivar.read stop_hb
- >>> fun () ->
- Ivar.fill_if_empty i ()
- in
- let stop = Deferred.create stopper in
- Clock.every'
- ~continue_on_error:true
- ~stop
- (Core.Time.Span.create ~ms:hb_interval ())
- (fun () -> heartbeat shard >>= fun _ -> return ());
- stop_hb
- | None -> raise Failure_to_Establish_Heartbeat
- end
- | Some s -> s
+ let _ = match data with
+ | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ())
+ | None -> raise Failure_to_Establish_Heartbeat
in
- let shard = { shard with hb = Some hb; } in
- let (cur, max) = shard.id in
- let shards = [`Int cur; `Int max] in
+ let shards = [`Int (fst shard.id); `Int (snd shard.id)] in
match shard.session with
| None -> begin
- Mutex.lock identify_lock;
+ Mvar.take identify_lock >>= fun () ->
+ Logs.debug (fun m -> m "Identifying shard [%d, %d]" (fst shard.id) (snd shard.id));
let payload = `Assoc [
("token", `String !Client_options.token);
("properties", `Assoc [
@@ -147,11 +131,7 @@ module Shard = struct
("shard", `List shards);
] in
push_frame ~payload ~ev:IDENTIFY shard
- >>| fun s -> begin
- Clock.after (Core.Time.Span.create ~sec:5 ())
- >>> (fun _ -> Mutex.unlock identify_lock);
- s
- end
+ >>| fun s -> s
end
| Some s ->
let payload = `Assoc [
@@ -163,13 +143,12 @@ module Shard = struct
let handle_frame ~f shard =
let module J = Yojson.Safe.Util in
- let op = J.(member "op" f |> to_int)
- |> Opcode.from_int
- in
+ let op = J.(member "op" f |> to_int) |> Opcode.from_int in
match op with
| DISPATCH -> dispatch ~payload:f shard
| HEARTBEAT -> heartbeat shard
| INVALID_SESSION -> begin
+ Logs.err (fun m -> m "Received OP 9 on Shard [%d, %d]: %s" (fst shard.id) (snd shard.id) (Yojson.Safe.pretty_to_string f));
if J.(member "d" f |> to_bool) then
initialize shard
else begin
@@ -180,7 +159,7 @@ module Shard = struct
| HELLO -> initialize ~data:(J.member "d" f) shard
| HEARTBEAT_ACK -> return shard
| opcode ->
- print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode;
+ Logs.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode));
return shard
let rec make_client
@@ -242,7 +221,7 @@ module Shard = struct
{
pipe = (read, write);
ready = Ivar.create ();
- hb = None;
+ hb_interval = Ivar.create ();
seq = 0;
id = shards;
session = None;
@@ -267,10 +246,6 @@ module Shard = struct
let recreate shard =
print_endline "Reconnecting...";
- (match shard.hb with
- | Some hb -> Ivar.fill_if_empty hb ()
- | None -> ()
- );
create ~url:(shard.url) ~shards:(shard.id) ()
end
@@ -298,11 +273,11 @@ let start ?count () =
(match Shard.parse frame with
| Some f -> begin
Shard.handle_frame ~f t.state
- >>| fun s -> (t.state <- s; t)
+ >>| fun s -> t.state <- s; t
end
| None -> begin
Shard.recreate t.state
- >>| fun s -> (t.state <- s; t)
+ >>| fun s -> t.state <- s; t
end)
>>= fun t ->
ev_loop t
@@ -314,6 +289,7 @@ let start ?count () =
Shard.create ~url ~shards:(id, total) ()
>>= fun shard ->
let t = Shard.{ state = shard; } in
+ let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in
ev_loop t >>> ignore;
gen_shards (id+1, total) (t :: a)
in