aboutsummaryrefslogtreecommitdiff
path: root/lib/client/sharder.ml
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-11-04 20:20:27 -0700
committerMishio595 <[email protected]>2018-11-04 20:20:27 -0700
commit450f62a3d695a58bc66f13f77cba0f267c71dba6 (patch)
treefa7d03d987ccaa0d130f79df152ad0c8b2104de9 /lib/client/sharder.ml
parentidek (diff)
downloaddisml-450f62a3d695a58bc66f13f77cba0f267c71dba6.tar.xz
disml-450f62a3d695a58bc66f13f77cba0f267c71dba6.zip
Major changes and refactoring
Diffstat (limited to 'lib/client/sharder.ml')
-rw-r--r--lib/client/sharder.ml126
1 files changed, 126 insertions, 0 deletions
diff --git a/lib/client/sharder.ml b/lib/client/sharder.ml
new file mode 100644
index 0000000..7150f07
--- /dev/null
+++ b/lib/client/sharder.ml
@@ -0,0 +1,126 @@
+open Lwt.Infix
+open Websocket
+open Websocket_lwt_unix
+
+type data = {
+ shards: int list;
+ token: string;
+ url: string;
+}
+
+module Shard = struct
+ type t = {
+ mutable hb: Lwt_engine.event option;
+ mutable seq: int;
+ session: string option;
+ token: string;
+ shard: int list;
+ conn: Connected_client.t;
+ }
+
+ let parse frame =
+ frame |> Frame.show |> Yojson.Basic.from_string
+
+ let encode term =
+ let content = term |> Yojson.Basic.to_string in
+ Frame.create ~content ()
+
+ let push_frame ?payload shard (ev : Opcode.t) =
+ let content = match payload with
+ | None -> None
+ | Some p -> Some (Yojson.Basic.to_string p)
+ in
+ let frame = Frame.create ?content () in
+ Connected_client.send shard.conn frame
+ |> ignore;
+ shard
+
+ let initialize shard data =
+ let hb = match shard.hb with
+ | None -> begin
+ let hb_interval = List.assoc "hb_interval" @@
+ Yojson.Basic.Util.to_assoc data
+ |> Yojson.Basic.Util.to_int
+ in
+ Lwt_engine.on_timer
+ (Float.of_int hb_interval)
+ true
+ (fun _ev -> push_frame shard HEARTBEAT |> ignore)
+ end
+ | Some s -> s
+ in
+ let shard = { shard with hb = Some hb; } in
+ match shard.session with
+ | None ->
+ let payload = `Assoc [
+ ("token", `String shard.token);
+ ("properties", `Assoc [
+ ("$os", `String Sys.os_type);
+ ("$device", `String "animus");
+ ("$browser", `String "animus")
+ ]);
+ ("compress", `Bool true);
+ ("large_threshold", `Int 250);
+ ("shard", `List (List.map (fun i -> `Int i) shard.shard))
+ ] in
+ push_frame ~payload shard IDENTIFY
+ | Some s ->
+ let payload = `Assoc [
+ ("token", `String shard.token);
+ ("session_id", `String s);
+ ("seq", `Int shard.seq)
+ ] in
+ push_frame ~payload shard RECONNECT
+
+
+
+ let handle_frame shard (term : Yojson.Basic.json) =
+ match term with
+ | `Assoc [
+ ("op", `Int 0);
+ ("t", `String t);
+ ("s", `Int s);
+ ("d", d)
+ ] -> shard (* TODO dispatch *)
+ | `Assoc [
+ ("op", `Int 1)
+ ] -> push_frame shard HEARTBEAT
+ | `Assoc [
+ ("op", `Int 7)
+ ] -> shard (* TODO reconnect *)
+ | `Assoc [
+ ("op", `Int 9)
+ ] -> shard (* TODO invalid session *)
+ | `Assoc [
+ ("op", `Int 10);
+ ("d", data)
+ ] -> initialize shard data
+ | _data -> shard
+
+ let create data =
+ let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in
+ let headers = Http.Base.process_request_headers () in
+ let req = Cohttp_lwt.Request.make ~headers uri in
+ let ic, oc = Lwt_io.pipe () in
+ let client = Connected_client.create req (`TCP (V4 Ipaddr.V4.any, 443)) ic oc in
+ let rec recv_forever shard = begin
+ Connected_client.recv client
+ >>= fun frame ->
+ Lwt.return @@ handle_frame shard @@ parse frame
+ >>= fun shard -> recv_forever shard
+ end in
+ let shard = {
+ conn = client;
+ hb = None;
+ seq = 0;
+ shard = data.shards;
+ session = None;
+ token = data.token;
+ } in
+ recv_forever shard |> ignore;
+ shard
+end
+
+type t = {
+ shards: Shard.t list;
+} \ No newline at end of file