diff options
| author | Mishio595 <[email protected]> | 2018-11-04 23:14:24 -0700 |
|---|---|---|
| committer | Mishio595 <[email protected]> | 2018-11-04 23:14:24 -0700 |
| commit | a5c83afc6ba737734ffbb5f510e34fcce2a5a3b1 (patch) | |
| tree | 92c43be8effe3904d5d7af632812fb4990f7d4c7 /lib | |
| parent | Fix build file (diff) | |
| download | disml-a5c83afc6ba737734ffbb5f510e34fcce2a5a3b1.tar.xz disml-a5c83afc6ba737734ffbb5f510e34fcce2a5a3b1.zip | |
Single shard mostly working
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/client/sharder.ml | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/lib/client/sharder.ml b/lib/client/sharder.ml index 7150f07..df5767e 100644 --- a/lib/client/sharder.ml +++ b/lib/client/sharder.ml @@ -1,6 +1,5 @@ open Lwt.Infix open Websocket -open Websocket_lwt_unix type data = { shards: int list; @@ -15,37 +14,57 @@ module Shard = struct session: string option; token: string; shard: int list; - conn: Connected_client.t; + send: Frame.t -> unit Lwt.t; + recv: unit -> Frame.t Lwt.t; } - let parse frame = - frame |> Frame.show |> Yojson.Basic.from_string + let parse (frame : Frame.t) = + try + frame.content + |> Yojson.Basic.from_string + with Yojson.Json_error err -> + print_endline err; + `String "" let encode term = let content = term |> Yojson.Basic.to_string in Frame.create ~content () let push_frame ?payload shard (ev : Opcode.t) = + print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; let content = match payload with | None -> None - | Some p -> Some (Yojson.Basic.to_string p) + | Some p -> + Some (Yojson.Basic.to_string (`Assoc [ + ("op", `Int (Opcode.to_int ev)); + ("d", p); + ])) in let frame = Frame.create ?content () in - Connected_client.send shard.conn frame + shard.send frame |> ignore; shard let initialize shard data = + print_endline "Initializing..."; let hb = match shard.hb with | None -> begin - let hb_interval = List.assoc "hb_interval" @@ + let hb_interval = List.assoc "heartbeat_interval" @@ Yojson.Basic.Util.to_assoc data |> Yojson.Basic.Util.to_int in + let seq = match shard.seq with + | 0 -> `Null + | i -> `Int i + in + let payload = `Assoc [ + ("op", `Int 1); + ("d", seq); + ] in Lwt_engine.on_timer (Float.of_int hb_interval) true - (fun _ev -> push_frame shard HEARTBEAT |> ignore) + (fun _ev -> push_frame ~payload shard HEARTBEAT |> ignore) end | Some s -> s in @@ -59,7 +78,7 @@ module Shard = struct ("$device", `String "animus"); ("$browser", `String "animus") ]); - ("compress", `Bool true); + ("compress", `Bool false); (* TODO add compression handling*) ("large_threshold", `Int 250); ("shard", `List (List.map (fun i -> `Int i) shard.shard)) ] in @@ -75,50 +94,54 @@ module Shard = struct let handle_frame shard (term : Yojson.Basic.json) = + Yojson.Basic.pretty_print Format.std_formatter term; + print_newline (); match term with - | `Assoc [ - ("op", `Int 0); - ("t", `String t); - ("s", `Int s); - ("d", d) - ] -> shard (* TODO dispatch *) - | `Assoc [ - ("op", `Int 1) - ] -> push_frame shard HEARTBEAT - | `Assoc [ - ("op", `Int 7) - ] -> shard (* TODO reconnect *) - | `Assoc [ - ("op", `Int 9) - ] -> shard (* TODO invalid session *) - | `Assoc [ - ("op", `Int 10); - ("d", data) - ] -> initialize shard data - | _data -> shard + | `Assoc term -> begin + let op = List.assoc "op" term + |> Yojson.Basic.Util.to_int + |> Opcode.from_int + in + match op with + | DISPATCH -> print_endline "OP 0"; shard (* TODO dispatch *) + | HEARTBEAT -> push_frame shard HEARTBEAT + | RECONNECT -> print_endline "OP 7"; shard (* TODO reconnect *) + | INVALID_SESSION -> print_endline "OP 9"; shard (* TODO invalid session *) + | HELLO -> + let data = List.assoc "d" term in + initialize shard data + | _opcode -> print_endline "no match"; shard + end + | _ -> shard let create data = let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in + let http_uri = Uri.with_scheme uri (Some "https") in let headers = Http.Base.process_request_headers () in - let req = Cohttp_lwt.Request.make ~headers uri in - let ic, oc = Lwt_io.pipe () in - let client = Connected_client.create req (`TCP (V4 Ipaddr.V4.any, 443)) ic oc in + Resolver_lwt.resolve_uri ~uri:http_uri Resolver_lwt_unix.system >>= fun endp -> + let ctx = Conduit_lwt_unix.default_ctx in + Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client -> + Websocket_lwt_unix.with_connection + ~extra_headers:headers + client + uri + >>= fun (recv, send) -> let rec recv_forever shard = begin - Connected_client.recv client + recv () >>= fun frame -> Lwt.return @@ handle_frame shard @@ parse frame >>= fun shard -> recv_forever shard end in let shard = { - conn = client; + send; + recv; hb = None; seq = 0; shard = data.shards; session = None; token = data.token; } in - recv_forever shard |> ignore; - shard + recv_forever shard end type t = { |