aboutsummaryrefslogtreecommitdiff
path: root/lib/sharder.ml
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sharder.ml')
-rw-r--r--lib/sharder.ml41
1 files changed, 25 insertions, 16 deletions
diff --git a/lib/sharder.ml b/lib/sharder.ml
index ccfb047..defcfe1 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -11,17 +11,17 @@ module Shard = struct
mutable session: string option;
token: string;
shard: int * int;
- write: string Pipe.Writer.t;
- read: string Pipe.Reader.t;
+ write: Frame.t Pipe.Writer.t;
+ read: Frame.t Pipe.Reader.t;
ready: unit Ivar.t;
}
let identify_lock = Mutex.create ()
- let parse frame =
+ let parse (frame:[`Ok of Frame.t | `Eof]) =
match frame with
- | `Ok s -> Yojson.Basic.from_string s
- | `Eof -> raise Invalid_Payload (* This needs to go into reconnect code, or stop using client_ez and handle frames manually *)
+ | `Ok s -> Yojson.Basic.from_string s.content (* TODO Handler non-text frames *)
+ | `Eof -> raise Invalid_Payload (* TODO This needs to go into reconnect code, or stop using client_ez and handle frames manually *)
let push_frame ?payload ~ev shard =
print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
@@ -33,7 +33,7 @@ module Shard = struct
("d", p);
]
in
- Pipe.write shard.write content
+ Pipe.write shard.write @@ Frame.create ~content ()
>>| fun () ->
shard
@@ -156,7 +156,7 @@ module Shard = struct
| HELLO -> initialize ~data:(J.member "d" f) shard
| HEARTBEAT_ACK -> return shard
| opcode ->
- print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode;
+ print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode;
return shard
let create ~url ~shards ~token () =
@@ -170,17 +170,26 @@ module Shard = struct
| None, Some p -> p
| _ -> 443 in
let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in
- let tcp_fun (r,w) =
- let (read, write) = client_ez
+ let tcp_fun (net_to_ws, ws_to_net) =
+ let (app_to_ws, write) = Pipe.create () in
+ let (read, ws_to_app) = Pipe.create () in
+ let initialized = Ivar.create () in
+ client
+ ~initialized
~extra_headers
- uri r w
- in
- let rec ev_loop shard =
- Pipe.read read
+ ~app_to_ws
+ ~ws_to_app
+ ~net_to_ws
+ ~ws_to_net
+ uri
+ >>> ignore;
+ Ivar.read initialized >>| fun () ->
+ let rec ev_loop ~reader shard =
+ Pipe.read reader
>>= fun frame ->
handle_frame ~f:(parse frame) shard
>>= fun shard ->
- ev_loop shard
+ ev_loop ~reader shard
in
let shard = {
read;
@@ -193,8 +202,8 @@ module Shard = struct
token = token;
}
in
- ev_loop shard |> ignore;
- return shard
+ ev_loop ~reader:read shard |> ignore;
+ shard
in
match Unix.getaddrinfo host (string_of_int port) [] with
| [] -> failwithf "DNS resolution failed for %s" host ()