aboutsummaryrefslogtreecommitdiff
path: root/lib/client
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-11-17 19:49:45 -0700
committerMishio595 <[email protected]>2018-11-17 19:49:45 -0700
commitd92fe94da312c0a69dbe9bd7c2e525c594f20e40 (patch)
treeccd4a634996d97e4ba89f92cd904fbb0632f2fa4 /lib/client
parentstupid dune-project file... (diff)
downloaddisml-d92fe94da312c0a69dbe9bd7c2e525c594f20e40.tar.xz
disml-d92fe94da312c0a69dbe9bd7c2e525c594f20e40.zip
Rewrite from Lwt to Async
Diffstat (limited to 'lib/client')
-rw-r--r--lib/client/client.ml5
-rw-r--r--lib/client/sharder.ml250
2 files changed, 136 insertions, 119 deletions
diff --git a/lib/client/client.ml b/lib/client/client.ml
index b1f6f25..e69de29 100644
--- a/lib/client/client.ml
+++ b/lib/client/client.ml
@@ -1,5 +0,0 @@
-let notify t data =
- Yojson.Basic.pretty_print Format.std_formatter @@ `Assoc data;
- print_newline ();
- print_endline t;
- () \ No newline at end of file
diff --git a/lib/client/sharder.ml b/lib/client/sharder.ml
index 18d1da0..ce3f983 100644
--- a/lib/client/sharder.ml
+++ b/lib/client/sharder.ml
@@ -1,32 +1,34 @@
-open Lwt.Infix
-open Websocket
+open Async
+open Websocket_async
exception Invalid_Payload
-exception Invalid_Shards
type data = {
- shards: int list;
+ shards: int * int;
token: string;
url: string;
}
module Shard = struct
type t = {
- mutable hb: Lwt_engine.event option;
+ mutable hb: unit Ivar.t option;
mutable seq: int;
mutable session: string option;
token: string;
- shard: int list;
- send: Frame.t -> unit Lwt.t;
- recv: unit -> Frame.t Lwt.t;
- ready: unit Lwt.t;
+ shard: int * int;
+ write: string Pipe.Writer.t;
+ read: string Pipe.Reader.t;
+ ready: unit Ivar.t;
}
- let id_rt = Lwt_mutex.create ()
+ let identify_lock = Mutex.create ()
- let parse (frame : Frame.t) =
- frame.content
- |> Yojson.Basic.from_string
+ let parse frame =
+ match frame with
+ | `Ok s ->
+ (* print_endline s; *)
+ Yojson.Basic.from_string s
+ | `Eof -> raise Invalid_Payload
let encode term =
let content = term |> Yojson.Basic.to_string in
@@ -35,17 +37,16 @@ module Shard = struct
let push_frame ?payload shard (ev : Opcode.t) =
print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
let content = match payload with
- | None -> None
+ | None -> ""
| Some p ->
- Some (Yojson.Basic.to_string @@ `Assoc [
+ Yojson.Basic.to_string @@ `Assoc [
("op", `Int (Opcode.to_int ev));
("d", p);
- ])
+ ]
in
- let frame = Frame.create ?content () in
- print_endline @@ Frame.show frame;
- shard.send frame
- >|= fun () ->
+ print_endline content;
+ Pipe.write shard.write content
+ >>| fun () ->
shard
let heartbeat shard =
@@ -59,24 +60,25 @@ module Shard = struct
] in
push_frame ~payload shard HEARTBEAT
- let dispatch shard payload resolver =
- let t = List.assoc "t" payload
- |> Yojson.Basic.Util.to_string in
+ let dispatch shard payload =
+ let module J = Yojson.Basic.Util in
let seq = List.assoc "s" payload
- |> Yojson.Basic.Util.to_int in
- let data = List.assoc "d" payload
- |> Yojson.Basic.Util.to_assoc in
+ |> J.to_int in
shard.seq <- seq;
+ let t = List.assoc "t" payload
+ |> J.to_string in
+ let data = List.assoc "d" payload
+ |> J.to_assoc in
let _ = match t with
| "READY" ->
- Lwt.wakeup resolver ();
+ Ivar.fill_if_empty shard.ready ();
let session = List.assoc "session_id" data
- |> Yojson.Basic.Util.to_string in
+ |> J.to_string in
shard.session <- Some session;
| _ -> ()
in
- Client.notify t data;
- Lwt.return shard
+ (* Client.notify t data; *)
+ return shard
let set_status shard status =
let payload = match status with
@@ -102,7 +104,8 @@ module Shard = struct
]
| _ -> raise Invalid_Payload
in
- shard.ready >|= fun _ -> push_frame ~payload shard STATUS_UPDATE
+ Ivar.read shard.ready >>= fun _ ->
+ push_frame ~payload shard STATUS_UPDATE
let request_guild_members ~guild ?(query="") ?(limit=0) shard =
let payload = `Assoc [
@@ -110,28 +113,30 @@ module Shard = struct
("query", `String query);
("limit", `Int limit);
] in
- shard.ready >|= fun _ -> push_frame ~payload shard REQUEST_GUILD_MEMBERS
-
-
+ Ivar.read shard.ready >>= fun _ ->
+ push_frame ~payload shard REQUEST_GUILD_MEMBERS
let initialize shard data =
- print_endline "Initializing...";
+ let module J = Yojson.Basic.Util in
let hb = match shard.hb with
| None -> begin
- let hb_interval = List.assoc "heartbeat_interval" @@
- Yojson.Basic.Util.to_assoc data
- |> Yojson.Basic.Util.to_int
+ let hb_interval = J.member "heartbeat_interval" data
+ |> J.to_int
in
- Lwt_engine.on_timer
- (Float.of_int hb_interval /. 1000.0)
- true
- (fun _ev -> heartbeat shard |> ignore)
+ let finished = Ivar.create () in
+ Clock.every'
+ ~continue_on_error:true
+ ~finished
+ (Core.Time.Span.create ~ms:hb_interval ())
+ (fun () -> heartbeat shard >>= fun _ -> return ());
+ finished
end
| Some s -> s
in
shard.hb <- Some hb;
- Lwt_mutex.lock id_rt
- >>= fun () ->
+ Mutex.lock identify_lock;
+ let (cur, max) = shard.shard in
+ let shards = [`Int cur; `Int max] in
match shard.session with
| None ->
let payload = `Assoc [
@@ -143,7 +148,7 @@ module Shard = struct
]);
("compress", `Bool false); (* TODO add compression handling*)
("large_threshold", `Int 250);
- ("shard", `List (List.map (fun i -> `Int i) shard.shard))
+ ("shard", `List shards);
] in
push_frame ~payload shard IDENTIFY
| Some s ->
@@ -153,119 +158,136 @@ module Shard = struct
("seq", `Int shard.seq)
] in
push_frame ~payload shard RESUME
- >|= fun s ->
- Lwt_engine.on_timer 5.0 false (fun _ -> Lwt_mutex.unlock id_rt)
+ >>| fun s ->
+ Clock.after (Core.Time.Span.create ~sec:5 ())
+ >>| (fun _ -> Mutex.unlock identify_lock)
|> ignore;
s
- let handle_frame shard (term : Yojson.Basic.json) resolver =
+ let handle_frame shard (term : Yojson.Basic.json)=
match term with
| `Assoc term -> begin
(* Yojson.Basic.pretty_print Format.std_formatter @@ `Assoc term;
print_newline (); *)
let op = List.assoc "op" term
- |> Yojson.Basic.Util.to_int
- |> Opcode.from_int
+ |> Yojson.Basic.Util.to_int
+ |> Opcode.from_int
in
match op with
- | DISPATCH -> dispatch shard term resolver
+ | DISPATCH -> dispatch shard term
| HEARTBEAT -> heartbeat shard
- | RECONNECT -> print_endline "OP 7"; Lwt.return shard (* TODO reconnect *)
- | INVALID_SESSION -> print_endline "OP 9"; Lwt.return shard (* TODO invalid session *)
+ | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *)
+ | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *)
| HELLO -> initialize shard @@ List.assoc "d" term
- | HEARTBEAT_ACK -> Lwt.return shard
+ | HEARTBEAT_ACK -> return shard
| opcode ->
print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode;
- Lwt.return shard
+ return shard
end
| _ ->
print_endline "Invalid payload";
- Lwt.return shard
+ return shard
let create data =
+ let open Core in
let uri = (data.url ^ "?v=6&encoding=json") |> Uri.of_string in
- let http_uri = Uri.with_scheme uri (Some "https") in
- let headers = Http.Base.process_request_headers () in
- Resolver_lwt.resolve_uri ~uri:http_uri Resolver_lwt_unix.system >>= fun endp ->
- let ctx = Conduit_lwt_unix.default_ctx in
- Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client ->
- Websocket_lwt_unix.with_connection
- ~extra_headers:headers
- client
- uri
- >>= fun (recv, send) ->
- let (ready, ready_resolver) = Lwt.task () in
- let rec recv_forever s = begin
- s.recv ()
- >>= fun frame ->
- let p = parse frame in
- handle_frame s p ready_resolver
- >>= fun s -> recv_forever s
- end in
- let shard = {
- send;
- recv;
- ready;
- hb = None;
- seq = 0;
- shard = data.shards;
- session = None;
- token = data.token;
- } in
- Lwt.return (shard, recv_forever shard)
+ let extra_headers = Http.Base.process_request_headers () in
+ let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in
+ let port =
+ match Uri.port uri, Uri_services.tcp_port_of_uri uri with
+ | Some p, _ -> p
+ | None, Some p -> p
+ | _ -> 443 in
+ let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in
+ let tcp_fun (r,w) =
+ let (read, write) = client_ez
+ ~extra_headers
+ uri r w
+ in
+ let rec ev_loop shard =
+ Pipe.read read
+ >>= fun frame ->
+ handle_frame shard @@ parse frame
+ >>= fun shard ->
+ ev_loop shard
+ in
+ let shard = {
+ read;
+ write;
+ ready = Ivar.create ();
+ hb = None;
+ seq = 0;
+ shard = data.shards;
+ session = None;
+ token = data.token;
+ }
+ in
+ ev_loop shard |> ignore;
+ return shard
+ in
+ match Unix.getaddrinfo host (string_of_int port) [] with
+ | [] -> failwithf "DNS resolution failed for %s" host ()
+ | { ai_addr; _ } :: _ ->
+ let addr =
+ match scheme, ai_addr with
+ | _, ADDR_UNIX path -> `Unix_domain_socket path
+ | "https", ADDR_INET (h, p)
+ | "wss", ADDR_INET (h, p) ->
+ let h = Ipaddr_unix.of_inet_addr h in
+ `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ())
+ | _, ADDR_INET (h, p) ->
+ let h = Ipaddr_unix.of_inet_addr h in
+ `TCP (h, p)
+ in
+ Conduit_async.V2.connect addr >>= tcp_fun
end
type 'a t = {
- shards: (Shard.t * 'a Lwt.t) list;
- promise: 'a Lwt.t;
+ shards: Shard.t list;
}
let start ?count token =
- Http.get_gateway_bot ()
- >|= fun data ->
- let data = Yojson.Basic.Util.to_assoc data in
- let url = List.assoc "url" data
- |> Yojson.Basic.Util.to_string in
+ let module J = Yojson.Basic.Util in
+ Http.get_gateway_bot () >>| fun data ->
+ print_endline "Gateway obtained";
+ let url = J.member "url" data
+ |> J.to_string in
let count = match count with
| Some c -> c
- | None -> List.assoc "shards" data
- |> Yojson.Basic.Util.to_int
+ | None -> J.member "shards" data
+ |> J.to_int
in
- let shard_list = [0; count] in
- let rec gen_shards l accum =
+ let shard_list = (0, count) in
+ let rec gen_shards l a =
match l with
- | [id; total;] when id < total ->
- let shard_data = Lwt_main.run @@ Shard.create {
+ | (id, total) when id >= total -> return a
+ | (id, total) ->
+ Shard.create {
url;
- shards = [id; total;];
+ shards = (id, total);
token;
- } in
- shard_data :: gen_shards [id+1; total;] accum
- | [_; _;] -> accum
- | _ -> raise Invalid_Shards
+ }
+ >>= fun shard ->
+ let a = shard :: a in
+ gen_shards (id+1, total) a
in
- let shards = gen_shards shard_list [] in
- let p_list = List.map (fun (_, loop) -> loop) shards in
- let promise = Lwt.choose p_list in
+ gen_shards shard_list []
+ >>| fun shards ->
{
shards;
- promise;
}
let set_status sharder status =
- List.map (fun (shard, _) ->
+ Deferred.all @@ List.map (fun shard ->
Shard.set_status shard status
) sharder.shards
- |> Lwt.nchoose
let set_status_with sharder f =
- List.map (fun (shard, _) ->
+ Deferred.all @@ List.map (fun shard ->
Shard.set_status shard @@ f shard
) sharder.shards
- |> Lwt.nchoose
let request_guild_members ~guild ?query ?limit sharder =
- List.map (fun (shard, _) ->
+ Deferred.all @@ List.map (fun shard ->
Shard.request_guild_members ~guild ?query ?limit shard
- ) sharder.shards
- |> Lwt.nchoose \ No newline at end of file
+ ) sharder.shards \ No newline at end of file