aboutsummaryrefslogtreecommitdiff
path: root/lib/client/sharder/shard.ml
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-10-29 06:25:22 -0600
committerMishio595 <[email protected]>2018-10-29 06:25:22 -0600
commit5a1aae149fafa2848c2cd89a3c75992653380201 (patch)
treed6c2567ab3854aa6fb51aa4a2d715cf42f76ff52 /lib/client/sharder/shard.ml
parentMore random stuff with sharding (diff)
downloaddisml-5a1aae149fafa2848c2cd89a3c75992653380201.tar.xz
disml-5a1aae149fafa2848c2cd89a3c75992653380201.zip
Moving to LWT branch
Diffstat (limited to 'lib/client/sharder/shard.ml')
-rw-r--r--lib/client/sharder/shard.ml62
1 files changed, 29 insertions, 33 deletions
diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml
index 3b9cf8d..7be4aad 100644
--- a/lib/client/sharder/shard.ml
+++ b/lib/client/sharder/shard.ml
@@ -1,27 +1,28 @@
+open Lwt.Infix
+open Websocket
+
type t = {
- conn = Conn.t;
+ send: (Frame.t -> unit Lwt.t);
id: int;
+ total_shards: int;
hb_interval: int;
session_id: string;
seq: int;
+ token: string;
}
-let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~conn ~id () =
- { conn; id; hb_interval; session_id; seq; }
-
-let compare s1 s2 =
- Pervasives.compare s1.id s2.id
+let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~send ~id ~total_shards ~token () =
+ { send; id; total_shards; hb_interval; session_id; seq; token; }
-let push_frame ~payload shard =
- payload
- |> Conn.push_frame ~conn:shard.conn
+let push_frame shard frame =
+ shard.send frame
-let process_frame ~frame shard =
+let process_frame shard frame =
let json = frame |> Yojson.Basic.from_string in
match json with
- | `Assoc [("s", s); ("d", d); ("t", t); ("op", op);] -> begin
+ | `Assoc [("s", `Int _s); ("d", _d); ("t", `String _t); ("op", `Int op);] -> begin
match op |> Opcode.from_int with
- | DISPATCH -> dispatch t d (* Need to write the dispatcher and other ops *)
+ | DISPATCH -> () (* dispatch t d Need to write the dispatcher and other ops *)
| HEARTBEAT -> ()
| IDENTIFY -> ()
| STATUS_UPDATE -> ()
@@ -33,8 +34,8 @@ let process_frame ~frame shard =
| HELLO -> ()
| HEARTBEAT_ACK -> ()
|> ignore;
- { shard with seq = s; }
- end
+ (* { shard with seq = s; } *)
+ end
| _ -> shard
let wrap_payload d op =
@@ -46,40 +47,35 @@ let wrap_payload d op =
let create_frame content =
Frame.create ~content ()
-let identify ?(threshold=250) ~total ~token shard =
+let identify ?(threshold=250) shard=
let p = wrap_payload (`Assoc [
- ("token", `String token);
+ ("token", `String shard.token);
("properties", `Assoc [
("$os", `String Sys.os_type);
("$browser", `String "animus");
("$device", `String "animus");
]);
("large_threshold", `Int threshold);
- ("shard", `List [`Int shard.id; `Int total]);
- ]) (IDENTIFY |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ ("shard", `List [`Int shard.id; `Int shard.total_shards]);
+ ]) (Opcode.to_int IDENTIFY) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
-let resume ~token shard =
+let resume shard =
let p = wrap_payload (`Assoc [
- ("token", `String token);
+ ("token", `String shard.token);
("session_id", `String shard.session_id);
("seq", `Int shard.seq);
- ]) (RESUME |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ ]) (Opcode.to_int RESUME) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
let heartbeat shard =
- let p = wrap_payload (`Int shard.seq) (HEARTBEAT |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
-let connect ~options ~uri ~id ~total ~token () =
+let connect ~_options ~uri ~id ~total ~token () =
let url = uri |> Uri.to_string in
let ip = Ipaddr.V4 Ipaddr.V4.any in
+ let client = Websocket.Connected_Client.create
Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *)
>|= fun (recv, send) ->
- let shard = init send id in
- heartbeat shard >>= (fun _ ->
- identify shard total token
- ) |> ignore; (* This feels really hacky *)
- let handle_frame = process_frame shard in
- let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in
- shard \ No newline at end of file
+ init ~send ~id ~token ~total_shards:total () \ No newline at end of file