aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-11-04 23:14:24 -0700
committerMishio595 <[email protected]>2018-11-04 23:14:24 -0700
commita5c83afc6ba737734ffbb5f510e34fcce2a5a3b1 (patch)
tree92c43be8effe3904d5d7af632812fb4990f7d4c7 /lib
parentFix build file (diff)
downloaddisml-a5c83afc6ba737734ffbb5f510e34fcce2a5a3b1.tar.xz
disml-a5c83afc6ba737734ffbb5f510e34fcce2a5a3b1.zip
Single shard mostly working
Diffstat (limited to 'lib')
-rw-r--r--lib/client/sharder.ml95
1 files changed, 59 insertions, 36 deletions
diff --git a/lib/client/sharder.ml b/lib/client/sharder.ml
index 7150f07..df5767e 100644
--- a/lib/client/sharder.ml
+++ b/lib/client/sharder.ml
@@ -1,6 +1,5 @@
open Lwt.Infix
open Websocket
-open Websocket_lwt_unix
type data = {
shards: int list;
@@ -15,37 +14,57 @@ module Shard = struct
session: string option;
token: string;
shard: int list;
- conn: Connected_client.t;
+ send: Frame.t -> unit Lwt.t;
+ recv: unit -> Frame.t Lwt.t;
}
- let parse frame =
- frame |> Frame.show |> Yojson.Basic.from_string
+ let parse (frame : Frame.t) =
+ try
+ frame.content
+ |> Yojson.Basic.from_string
+ with Yojson.Json_error err ->
+ print_endline err;
+ `String ""
let encode term =
let content = term |> Yojson.Basic.to_string in
Frame.create ~content ()
let push_frame ?payload shard (ev : Opcode.t) =
+ print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
let content = match payload with
| None -> None
- | Some p -> Some (Yojson.Basic.to_string p)
+ | Some p ->
+ Some (Yojson.Basic.to_string (`Assoc [
+ ("op", `Int (Opcode.to_int ev));
+ ("d", p);
+ ]))
in
let frame = Frame.create ?content () in
- Connected_client.send shard.conn frame
+ shard.send frame
|> ignore;
shard
let initialize shard data =
+ print_endline "Initializing...";
let hb = match shard.hb with
| None -> begin
- let hb_interval = List.assoc "hb_interval" @@
+ let hb_interval = List.assoc "heartbeat_interval" @@
Yojson.Basic.Util.to_assoc data
|> Yojson.Basic.Util.to_int
in
+ let seq = match shard.seq with
+ | 0 -> `Null
+ | i -> `Int i
+ in
+ let payload = `Assoc [
+ ("op", `Int 1);
+ ("d", seq);
+ ] in
Lwt_engine.on_timer
(Float.of_int hb_interval)
true
- (fun _ev -> push_frame shard HEARTBEAT |> ignore)
+ (fun _ev -> push_frame ~payload shard HEARTBEAT |> ignore)
end
| Some s -> s
in
@@ -59,7 +78,7 @@ module Shard = struct
("$device", `String "animus");
("$browser", `String "animus")
]);
- ("compress", `Bool true);
+ ("compress", `Bool false); (* TODO add compression handling*)
("large_threshold", `Int 250);
("shard", `List (List.map (fun i -> `Int i) shard.shard))
] in
@@ -75,50 +94,54 @@ module Shard = struct
let handle_frame shard (term : Yojson.Basic.json) =
+ Yojson.Basic.pretty_print Format.std_formatter term;
+ print_newline ();
match term with
- | `Assoc [
- ("op", `Int 0);
- ("t", `String t);
- ("s", `Int s);
- ("d", d)
- ] -> shard (* TODO dispatch *)
- | `Assoc [
- ("op", `Int 1)
- ] -> push_frame shard HEARTBEAT
- | `Assoc [
- ("op", `Int 7)
- ] -> shard (* TODO reconnect *)
- | `Assoc [
- ("op", `Int 9)
- ] -> shard (* TODO invalid session *)
- | `Assoc [
- ("op", `Int 10);
- ("d", data)
- ] -> initialize shard data
- | _data -> shard
+ | `Assoc term -> begin
+ let op = List.assoc "op" term
+ |> Yojson.Basic.Util.to_int
+ |> Opcode.from_int
+ in
+ match op with
+ | DISPATCH -> print_endline "OP 0"; shard (* TODO dispatch *)
+ | HEARTBEAT -> push_frame shard HEARTBEAT
+ | RECONNECT -> print_endline "OP 7"; shard (* TODO reconnect *)
+ | INVALID_SESSION -> print_endline "OP 9"; shard (* TODO invalid session *)
+ | HELLO ->
+ let data = List.assoc "d" term in
+ initialize shard data
+ | _opcode -> print_endline "no match"; shard
+ end
+ | _ -> shard
let create data =
let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in
+ let http_uri = Uri.with_scheme uri (Some "https") in
let headers = Http.Base.process_request_headers () in
- let req = Cohttp_lwt.Request.make ~headers uri in
- let ic, oc = Lwt_io.pipe () in
- let client = Connected_client.create req (`TCP (V4 Ipaddr.V4.any, 443)) ic oc in
+ Resolver_lwt.resolve_uri ~uri:http_uri Resolver_lwt_unix.system >>= fun endp ->
+ let ctx = Conduit_lwt_unix.default_ctx in
+ Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client ->
+ Websocket_lwt_unix.with_connection
+ ~extra_headers:headers
+ client
+ uri
+ >>= fun (recv, send) ->
let rec recv_forever shard = begin
- Connected_client.recv client
+ recv ()
>>= fun frame ->
Lwt.return @@ handle_frame shard @@ parse frame
>>= fun shard -> recv_forever shard
end in
let shard = {
- conn = client;
+ send;
+ recv;
hb = None;
seq = 0;
shard = data.shards;
session = None;
token = data.token;
} in
- recv_forever shard |> ignore;
- shard
+ recv_forever shard
end
type t = {