aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-10-29 06:25:22 -0600
committerMishio595 <[email protected]>2018-10-29 06:25:22 -0600
commit5a1aae149fafa2848c2cd89a3c75992653380201 (patch)
treed6c2567ab3854aa6fb51aa4a2d715cf42f76ff52 /lib
parentMore random stuff with sharding (diff)
downloaddisml-5a1aae149fafa2848c2cd89a3c75992653380201.tar.xz
disml-5a1aae149fafa2848c2cd89a3c75992653380201.zip
Moving to LWT branch
Diffstat (limited to 'lib')
-rw-r--r--lib/client/sharder/conn.ml5
-rw-r--r--lib/client/sharder/opcode.ml2
-rw-r--r--lib/client/sharder/shard.ml62
-rw-r--r--lib/client/sharder/shardManager.ml32
-rw-r--r--lib/client/sharder/shard_manager.ml31
5 files changed, 62 insertions, 70 deletions
diff --git a/lib/client/sharder/conn.ml b/lib/client/sharder/conn.ml
deleted file mode 100644
index 396c170..0000000
--- a/lib/client/sharder/conn.ml
+++ /dev/null
@@ -1,5 +0,0 @@
-type t = {
-}
-
-let push_frame ~conn frame =
- () \ No newline at end of file
diff --git a/lib/client/sharder/opcode.ml b/lib/client/sharder/opcode.ml
index 679f45e..2462d05 100644
--- a/lib/client/sharder/opcode.ml
+++ b/lib/client/sharder/opcode.ml
@@ -38,7 +38,7 @@ let from_int = function
| 9 -> INVALID_SESSION
| 10 -> HELLO
| 11 -> HEARTBEAT_ACK
- | op -> raise Invalid_Opcode op
+ | op -> raise (Invalid_Opcode op)
let to_string = function
| DISPATCH -> "DISPATCH"
diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml
index 3b9cf8d..7be4aad 100644
--- a/lib/client/sharder/shard.ml
+++ b/lib/client/sharder/shard.ml
@@ -1,27 +1,28 @@
+open Lwt.Infix
+open Websocket
+
type t = {
- conn = Conn.t;
+ send: (Frame.t -> unit Lwt.t);
id: int;
+ total_shards: int;
hb_interval: int;
session_id: string;
seq: int;
+ token: string;
}
-let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~conn ~id () =
- { conn; id; hb_interval; session_id; seq; }
-
-let compare s1 s2 =
- Pervasives.compare s1.id s2.id
+let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~send ~id ~total_shards ~token () =
+ { send; id; total_shards; hb_interval; session_id; seq; token; }
-let push_frame ~payload shard =
- payload
- |> Conn.push_frame ~conn:shard.conn
+let push_frame shard frame =
+ shard.send frame
-let process_frame ~frame shard =
+let process_frame shard frame =
let json = frame |> Yojson.Basic.from_string in
match json with
- | `Assoc [("s", s); ("d", d); ("t", t); ("op", op);] -> begin
+ | `Assoc [("s", `Int _s); ("d", _d); ("t", `String _t); ("op", `Int op);] -> begin
match op |> Opcode.from_int with
- | DISPATCH -> dispatch t d (* Need to write the dispatcher and other ops *)
+ | DISPATCH -> () (* dispatch t d Need to write the dispatcher and other ops *)
| HEARTBEAT -> ()
| IDENTIFY -> ()
| STATUS_UPDATE -> ()
@@ -33,8 +34,8 @@ let process_frame ~frame shard =
| HELLO -> ()
| HEARTBEAT_ACK -> ()
|> ignore;
- { shard with seq = s; }
- end
+ (* { shard with seq = s; } *)
+ end
| _ -> shard
let wrap_payload d op =
@@ -46,40 +47,35 @@ let wrap_payload d op =
let create_frame content =
Frame.create ~content ()
-let identify ?(threshold=250) ~total ~token shard =
+let identify ?(threshold=250) shard=
let p = wrap_payload (`Assoc [
- ("token", `String token);
+ ("token", `String shard.token);
("properties", `Assoc [
("$os", `String Sys.os_type);
("$browser", `String "animus");
("$device", `String "animus");
]);
("large_threshold", `Int threshold);
- ("shard", `List [`Int shard.id; `Int total]);
- ]) (IDENTIFY |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ ("shard", `List [`Int shard.id; `Int shard.total_shards]);
+ ]) (Opcode.to_int IDENTIFY) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
-let resume ~token shard =
+let resume shard =
let p = wrap_payload (`Assoc [
- ("token", `String token);
+ ("token", `String shard.token);
("session_id", `String shard.session_id);
("seq", `Int shard.seq);
- ]) (RESUME |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ ]) (Opcode.to_int RESUME) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
let heartbeat shard =
- let p = wrap_payload (`Int shard.seq) (HEARTBEAT |> Opcode.to_int) in
- push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+ let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in
+ push_frame shard (Yojson.Basic.to_string p |> create_frame)
-let connect ~options ~uri ~id ~total ~token () =
+let connect ~_options ~uri ~id ~total ~token () =
let url = uri |> Uri.to_string in
let ip = Ipaddr.V4 Ipaddr.V4.any in
+ let client = Websocket.Connected_Client.create
Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *)
>|= fun (recv, send) ->
- let shard = init send id in
- heartbeat shard >>= (fun _ ->
- identify shard total token
- ) |> ignore; (* This feels really hacky *)
- let handle_frame = process_frame shard in
- let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in
- shard \ No newline at end of file
+ init ~send ~id ~token ~total_shards:total () \ No newline at end of file
diff --git a/lib/client/sharder/shardManager.ml b/lib/client/sharder/shardManager.ml
new file mode 100644
index 0000000..c5919f3
--- /dev/null
+++ b/lib/client/sharder/shardManager.ml
@@ -0,0 +1,32 @@
+open Lwt.Infix
+
+module ShardSet = Set.Make(struct
+ type t = Shard.t
+ let compare (s1:t) (s2:t) = Pervasives.compare s1.id s2.id
+end)
+
+type t = {
+ shards: ShardSet.t;
+ gateway_url: Uri.t;
+ token: string;
+}
+
+let create_shard ?(options=[]) manager =
+ let id = (ShardSet.cardinal manager.shards) + 1 in
+ Shard.connect ~_options:options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token ()
+ >|= fun shard ->
+ ShardSet.add shard manager.shards
+
+let update_shard manager shard =
+ match ShardSet.mem shard manager.shards with
+ | true -> ShardSet.add shard manager.shards
+ | false -> manager.shards
+
+let heartbeat _manager shard =
+ Shard.heartbeat shard
+
+let identify _manager shard =
+ Shard.identify shard
+
+let resume _manager shard =
+ Shard.resume shard \ No newline at end of file
diff --git a/lib/client/sharder/shard_manager.ml b/lib/client/sharder/shard_manager.ml
deleted file mode 100644
index 68b3d13..0000000
--- a/lib/client/sharder/shard_manager.ml
+++ /dev/null
@@ -1,31 +0,0 @@
-open Lwt.Infix
-open Websocket
-
-module ShardSet = Set.Make(Shard)
-
-type t = {
- shards: Shard.t ShardSet.t;
- gateway_url: Uri.t;
- token: string;
-}
-
-let create_shard ?(options=[]) manager =
- let id = (ShardSet.cardinal manager.shards) + 1 in
- Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token ()
- >|= fun shard ->
- ShardSet.add shard manager.shards
-
-let update_shard shard manager =
- match ShardSet.mem shard manager.shards with
- | true -> ShardSet.add shard manager.shards
- | false -> manager.shards
-
-let heartbeat shard manager =
- Shard.heartbeat shard
-
-let identify shard manager =
- let total = ShardSet.cardinal manager.shards in
- Shard.identify total manager.token shard
-
-let resume shard manager =
- Shard.resume manager.token shard \ No newline at end of file