diff options
| author | Adelyn Breedlove <[email protected]> | 2019-01-28 10:31:51 -0700 |
|---|---|---|
| committer | Adelyn Breedlove <[email protected]> | 2019-01-28 10:31:51 -0700 |
| commit | 8662e92987c437f59d09896a247ec2b5d82c4528 (patch) | |
| tree | f004cc14598351d4ad6b19d8e993d2f629c5e738 /lib/sharder.ml | |
| parent | Add more docs (diff) | |
| download | disml-8662e92987c437f59d09896a247ec2b5d82c4528.tar.xz disml-8662e92987c437f59d09896a247ec2b5d82c4528.zip | |
Publish docs updates
Diffstat (limited to 'lib/sharder.ml')
| -rw-r--r-- | lib/sharder.ml | 652 |
1 files changed, 326 insertions, 326 deletions
diff --git a/lib/sharder.ml b/lib/sharder.ml index 11a6bd8..f98ee8f 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -1,327 +1,327 @@ -open Async -open Core -open Websocket_async - -exception Invalid_Payload -exception Failure_to_Establish_Heartbeat - -module Shard = struct - type shard = { - hb_interval: Time.Span.t Ivar.t; - seq: int; - session: string option; - pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; - ready: unit Ivar.t; - url: string; - id: int * int; - _internal: Reader.t * Writer.t; - } - - type 'a t = { - mutable state: 'a; - } - - let identify_lock = Mvar.create () - let _ = Mvar.set identify_lock () - - let parse (frame:[`Ok of Frame.t | `Eof]) = - match frame with - | `Ok s -> begin - let open Frame.Opcode in - match s.opcode with - | Text -> Some (Yojson.Safe.from_string s.content) - | _ -> None - end - | `Eof -> None - - let push_frame ?payload ~ev shard = - let content = match payload with - | None -> "" - | Some p -> - Yojson.Safe.to_string @@ `Assoc [ - ("op", `Int (Opcode.to_int ev)); - ("d", p); - ] - in - let (_, write) = shard.pipe in - Pipe.write write @@ Frame.create ~content () - >>| fun () -> - shard - - let heartbeat shard = - Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq)); - let payload = match shard.seq with - | 0 -> `Null - | i -> `Int i - in - push_frame ~payload ~ev:HEARTBEAT shard - - let dispatch ~payload shard = - let module J = Yojson.Safe.Util in - let seq = J.(member "s" payload |> to_int) in - let t = J.(member "t" payload |> to_string) in - let data = J.member "d" payload in - let session = if t = "READY" then begin - Ivar.fill_if_empty shard.ready (); - Clock.after (Core.Time.Span.create ~sec:5 ()) - >>> (fun _ -> Mvar.put identify_lock () >>> ignore); - J.(member "session_id" data |> to_string_option) - end else None in - Event.handle_event ~ev:t data; - return { shard with - seq = seq; - session = session; - } - - let set_status ~(status:Yojson.Safe.json) shard = - let payload = match status with - | `Assoc [("name", `String name); ("type", `Int t)] -> - `Assoc [ - ("status", `String "online"); - ("afk", `Bool false); - ("since", `Null); - ("game", `Assoc [ - ("name", `String name); - ("type", `Int t) - ]) - ] - | `String name -> - `Assoc [ - ("status", `String "online"); - ("afk", `Bool false); - ("since", `Null); - ("game", `Assoc [ - ("name", `String name); - ("type", `Int 0) - ]) - ] - | _ -> raise Invalid_Payload - in - Ivar.read shard.ready >>= fun _ -> - push_frame ~payload ~ev:STATUS_UPDATE shard - - let request_guild_members ?(query="") ?(limit=0) ~guild shard = - let payload = `Assoc [ - ("guild_id", `String (Int.to_string guild)); - ("query", `String query); - ("limit", `Int limit); - ] in - Ivar.read shard.ready >>= fun _ -> - push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard - - let initialize ?data shard = - let module J = Yojson.Safe.Util in - let _ = match data with - | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ()) - | None -> raise Failure_to_Establish_Heartbeat - in - let shards = [`Int (fst shard.id); `Int (snd shard.id)] in - match shard.session with - | None -> begin - Mvar.take identify_lock >>= fun () -> - Logs.debug (fun m -> m "Identifying shard [%d, %d]" (fst shard.id) (snd shard.id)); - let payload = `Assoc [ - ("token", `String !Client_options.token); - ("properties", `Assoc [ - ("$os", `String Sys.os_type); - ("$device", `String "dis.ml"); - ("$browser", `String "dis.ml") - ]); - ("compress", `Bool false); (* TODO add compression handling*) - ("large_threshold", `Int 250); - ("shard", `List shards); - ] in - push_frame ~payload ~ev:IDENTIFY shard - >>| fun s -> s - end - | Some s -> - let payload = `Assoc [ - ("token", `String !Client_options.token); - ("session_id", `String s); - ("seq", `Int shard.seq) - ] in - push_frame ~payload ~ev:RESUME shard - - let handle_frame ~f shard = - let module J = Yojson.Safe.Util in - let op = J.(member "op" f |> to_int) |> Opcode.from_int in - match op with - | DISPATCH -> dispatch ~payload:f shard - | HEARTBEAT -> heartbeat shard - | INVALID_SESSION -> begin - Logs.err (fun m -> m "Received OP 9 on Shard [%d, %d]: %s" (fst shard.id) (snd shard.id) (Yojson.Safe.pretty_to_string f)); - if J.(member "d" f |> to_bool) then - initialize shard - else begin - initialize { shard with session = None; } - end - end - | RECONNECT -> initialize shard - | HELLO -> initialize ~data:(J.member "d" f) shard - | HEARTBEAT_ACK -> return shard - | opcode -> - Logs.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode)); - return shard - - let rec make_client - ~initialized - ~extra_headers - ~app_to_ws - ~ws_to_app - ~net_to_ws - ~ws_to_net - uri = - client - ~initialized - ~extra_headers - ~app_to_ws - ~ws_to_app - ~net_to_ws - ~ws_to_net - uri - >>> fun res -> - match res with - | Ok () -> () - | Error _ -> - let backoff = Time.Span.create ~ms:500 () in - Clock.after backoff >>> (fun () -> - make_client - ~initialized - ~extra_headers - ~app_to_ws - ~ws_to_app - ~net_to_ws - ~ws_to_net - uri) - - - let create ~url ~shards () = - let open Core in - let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in - let extra_headers = Http.Base.process_request_headers () in - let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in - let port = - match Uri.port uri, Uri_services.tcp_port_of_uri uri with - | Some p, _ -> p - | None, Some p -> p - | _ -> 443 in - let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in - let tcp_fun (net_to_ws, ws_to_net) = - (* Writer.monitor ws_to_net - |> Monitor.detach_and_get_error_stream - |> Stream.iter ~f:(fun e -> Logs.err (fun m -> m "Socket Connection Error: %s" (Exn.sexp_of_t e |> Sexp.to_string_hum))); *) - let (app_to_ws, write) = Pipe.create () in - let (read, ws_to_app) = Pipe.create () in - let initialized = Ivar.create () in - make_client - ~initialized - ~extra_headers - ~app_to_ws - ~ws_to_app - ~net_to_ws - ~ws_to_net - uri; - Ivar.read initialized >>| fun () -> - { - pipe = (read, write); - ready = Ivar.create (); - hb_interval = Ivar.create (); - seq = 0; - id = shards; - session = None; - url; - _internal = (net_to_ws, ws_to_net); - } - in - match Unix.getaddrinfo host (string_of_int port) [] with - | [] -> failwithf "DNS resolution failed for %s" host () - | { ai_addr; _ } :: _ -> - let addr = - match scheme, ai_addr with - | _, ADDR_UNIX path -> `Unix_domain_socket path - | "https", ADDR_INET (h, p) - | "wss", ADDR_INET (h, p) -> - let h = Ipaddr_unix.of_inet_addr h in - `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ()) - | _, ADDR_INET (h, p) -> - let h = Ipaddr_unix.of_inet_addr h in - `TCP (h, p) - in - Conduit_async.V2.connect addr >>= tcp_fun - - let shutdown_clean shard = - let (_,w) = shard._internal in - Writer.close w - - let recreate shard = - shutdown_clean shard >>= fun () -> - create ~url:(shard.url) ~shards:(shard.id) () -end - -type t = { - shards: (Shard.shard Shard.t) list; -} - -let start ?count () = - let module J = Yojson.Safe.Util in - Http.get_gateway_bot () >>= fun data -> - let data = match data with - | Ok d -> d - | Error e -> Error.raise e - in - let url = J.(member "url" data |> to_string) in - let count = match count with - | Some c -> c - | None -> J.(member "shards" data |> to_int) - in - let shard_list = (0, count) in - let rec ev_loop (t:Shard.shard Shard.t) = - let (read, _) = t.state.pipe in - Pipe.read read - >>= fun frame -> - (match Shard.parse frame with - | Some f -> begin - Shard.handle_frame ~f t.state - >>| fun s -> t.state <- s; t - end - | None -> begin - Shard.recreate t.state - >>| fun s -> t.state <- s; t - end) - >>= fun t -> - ev_loop t - in - let rec gen_shards l a = - match l with - | (id, total) when id >= total -> return a - | (id, total) -> - Shard.create ~url ~shards:(id, total) () - >>= fun shard -> - let t = Shard.{ state = shard; } in - let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in - ev_loop t >>> ignore; - gen_shards (id+1, total) (t :: a) - in - gen_shards shard_list [] - >>| fun shards -> - { shards; } - -let set_status ~status sharder = - Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status t.state - ) sharder.shards - -let set_status_with ~f sharder = - Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status:(f t.state) t.state - ) sharder.shards - -let request_guild_members ?query ?limit ~guild sharder = - Deferred.all @@ List.map ~f:(fun t -> - Shard.request_guild_members ~guild ?query ?limit t.state - ) sharder.shards - -let shutdown_all sharder = - Deferred.all @@ List.map ~f:(fun t -> - Shard.shutdown_clean t.state +open Async
+open Core
+open Websocket_async
+
+exception Invalid_Payload
+exception Failure_to_Establish_Heartbeat
+
+module Shard = struct
+ type shard = {
+ hb_interval: Time.Span.t Ivar.t;
+ seq: int;
+ session: string option;
+ pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t;
+ ready: unit Ivar.t;
+ url: string;
+ id: int * int;
+ _internal: Reader.t * Writer.t;
+ }
+
+ type 'a t = {
+ mutable state: 'a;
+ }
+
+ let identify_lock = Mvar.create ()
+ let _ = Mvar.set identify_lock ()
+
+ let parse (frame:[`Ok of Frame.t | `Eof]) =
+ match frame with
+ | `Ok s -> begin
+ let open Frame.Opcode in
+ match s.opcode with
+ | Text -> Some (Yojson.Safe.from_string s.content)
+ | _ -> None
+ end
+ | `Eof -> None
+
+ let push_frame ?payload ~ev shard =
+ let content = match payload with
+ | None -> ""
+ | Some p ->
+ Yojson.Safe.to_string @@ `Assoc [
+ ("op", `Int (Opcode.to_int ev));
+ ("d", p);
+ ]
+ in
+ let (_, write) = shard.pipe in
+ Pipe.write write @@ Frame.create ~content ()
+ >>| fun () ->
+ shard
+
+ let heartbeat shard =
+ Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
+ let payload = match shard.seq with
+ | 0 -> `Null
+ | i -> `Int i
+ in
+ push_frame ~payload ~ev:HEARTBEAT shard
+
+ let dispatch ~payload shard =
+ let module J = Yojson.Safe.Util in
+ let seq = J.(member "s" payload |> to_int) in
+ let t = J.(member "t" payload |> to_string) in
+ let data = J.member "d" payload in
+ let session = if t = "READY" then begin
+ Ivar.fill_if_empty shard.ready ();
+ Clock.after (Core.Time.Span.create ~sec:5 ())
+ >>> (fun _ -> Mvar.put identify_lock () >>> ignore);
+ J.(member "session_id" data |> to_string_option)
+ end else None in
+ Event.handle_event ~ev:t data;
+ return { shard with
+ seq = seq;
+ session = session;
+ }
+
+ let set_status ~(status:Yojson.Safe.json) shard =
+ let payload = match status with
+ | `Assoc [("name", `String name); ("type", `Int t)] ->
+ `Assoc [
+ ("status", `String "online");
+ ("afk", `Bool false);
+ ("since", `Null);
+ ("game", `Assoc [
+ ("name", `String name);
+ ("type", `Int t)
+ ])
+ ]
+ | `String name ->
+ `Assoc [
+ ("status", `String "online");
+ ("afk", `Bool false);
+ ("since", `Null);
+ ("game", `Assoc [
+ ("name", `String name);
+ ("type", `Int 0)
+ ])
+ ]
+ | _ -> raise Invalid_Payload
+ in
+ Ivar.read shard.ready >>= fun _ ->
+ push_frame ~payload ~ev:STATUS_UPDATE shard
+
+ let request_guild_members ?(query="") ?(limit=0) ~guild shard =
+ let payload = `Assoc [
+ ("guild_id", `String (Int.to_string guild));
+ ("query", `String query);
+ ("limit", `Int limit);
+ ] in
+ Ivar.read shard.ready >>= fun _ ->
+ push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard
+
+ let initialize ?data shard =
+ let module J = Yojson.Safe.Util in
+ let _ = match data with
+ | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ())
+ | None -> raise Failure_to_Establish_Heartbeat
+ in
+ let shards = [`Int (fst shard.id); `Int (snd shard.id)] in
+ match shard.session with
+ | None -> begin
+ Mvar.take identify_lock >>= fun () ->
+ Logs.debug (fun m -> m "Identifying shard [%d, %d]" (fst shard.id) (snd shard.id));
+ let payload = `Assoc [
+ ("token", `String !Client_options.token);
+ ("properties", `Assoc [
+ ("$os", `String Sys.os_type);
+ ("$device", `String "dis.ml");
+ ("$browser", `String "dis.ml")
+ ]);
+ ("compress", `Bool false); (* TODO add compression handling*)
+ ("large_threshold", `Int 250);
+ ("shard", `List shards);
+ ] in
+ push_frame ~payload ~ev:IDENTIFY shard
+ >>| fun s -> s
+ end
+ | Some s ->
+ let payload = `Assoc [
+ ("token", `String !Client_options.token);
+ ("session_id", `String s);
+ ("seq", `Int shard.seq)
+ ] in
+ push_frame ~payload ~ev:RESUME shard
+
+ let handle_frame ~f shard =
+ let module J = Yojson.Safe.Util in
+ let op = J.(member "op" f |> to_int) |> Opcode.from_int in
+ match op with
+ | DISPATCH -> dispatch ~payload:f shard
+ | HEARTBEAT -> heartbeat shard
+ | INVALID_SESSION -> begin
+ Logs.err (fun m -> m "Received OP 9 on Shard [%d, %d]: %s" (fst shard.id) (snd shard.id) (Yojson.Safe.pretty_to_string f));
+ if J.(member "d" f |> to_bool) then
+ initialize shard
+ else begin
+ initialize { shard with session = None; }
+ end
+ end
+ | RECONNECT -> initialize shard
+ | HELLO -> initialize ~data:(J.member "d" f) shard
+ | HEARTBEAT_ACK -> return shard
+ | opcode ->
+ Logs.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode));
+ return shard
+
+ let rec make_client
+ ~initialized
+ ~extra_headers
+ ~app_to_ws
+ ~ws_to_app
+ ~net_to_ws
+ ~ws_to_net
+ uri =
+ client
+ ~initialized
+ ~extra_headers
+ ~app_to_ws
+ ~ws_to_app
+ ~net_to_ws
+ ~ws_to_net
+ uri
+ >>> fun res ->
+ match res with
+ | Ok () -> ()
+ | Error _ ->
+ let backoff = Time.Span.create ~ms:500 () in
+ Clock.after backoff >>> (fun () ->
+ make_client
+ ~initialized
+ ~extra_headers
+ ~app_to_ws
+ ~ws_to_app
+ ~net_to_ws
+ ~ws_to_net
+ uri)
+
+
+ let create ~url ~shards () =
+ let open Core in
+ let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in
+ let extra_headers = Http.Base.process_request_headers () in
+ let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in
+ let port =
+ match Uri.port uri, Uri_services.tcp_port_of_uri uri with
+ | Some p, _ -> p
+ | None, Some p -> p
+ | _ -> 443 in
+ let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in
+ let tcp_fun (net_to_ws, ws_to_net) =
+ (* Writer.monitor ws_to_net
+ |> Monitor.detach_and_get_error_stream
+ |> Stream.iter ~f:(fun e -> Logs.err (fun m -> m "Socket Connection Error: %s" (Exn.sexp_of_t e |> Sexp.to_string_hum))); *)
+ let (app_to_ws, write) = Pipe.create () in
+ let (read, ws_to_app) = Pipe.create () in
+ let initialized = Ivar.create () in
+ make_client
+ ~initialized
+ ~extra_headers
+ ~app_to_ws
+ ~ws_to_app
+ ~net_to_ws
+ ~ws_to_net
+ uri;
+ Ivar.read initialized >>| fun () ->
+ {
+ pipe = (read, write);
+ ready = Ivar.create ();
+ hb_interval = Ivar.create ();
+ seq = 0;
+ id = shards;
+ session = None;
+ url;
+ _internal = (net_to_ws, ws_to_net);
+ }
+ in
+ match Unix.getaddrinfo host (string_of_int port) [] with
+ | [] -> failwithf "DNS resolution failed for %s" host ()
+ | { ai_addr; _ } :: _ ->
+ let addr =
+ match scheme, ai_addr with
+ | _, ADDR_UNIX path -> `Unix_domain_socket path
+ | "https", ADDR_INET (h, p)
+ | "wss", ADDR_INET (h, p) ->
+ let h = Ipaddr_unix.of_inet_addr h in
+ `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ())
+ | _, ADDR_INET (h, p) ->
+ let h = Ipaddr_unix.of_inet_addr h in
+ `TCP (h, p)
+ in
+ Conduit_async.V2.connect addr >>= tcp_fun
+
+ let shutdown_clean shard =
+ let (_,w) = shard._internal in
+ Writer.close w
+
+ let recreate shard =
+ shutdown_clean shard >>= fun () ->
+ create ~url:(shard.url) ~shards:(shard.id) ()
+end
+
+type t = {
+ shards: (Shard.shard Shard.t) list;
+}
+
+let start ?count () =
+ let module J = Yojson.Safe.Util in
+ Http.get_gateway_bot () >>= fun data ->
+ let data = match data with
+ | Ok d -> d
+ | Error e -> Error.raise e
+ in
+ let url = J.(member "url" data |> to_string) in
+ let count = match count with
+ | Some c -> c
+ | None -> J.(member "shards" data |> to_int)
+ in
+ let shard_list = (0, count) in
+ let rec ev_loop (t:Shard.shard Shard.t) =
+ let (read, _) = t.state.pipe in
+ Pipe.read read
+ >>= fun frame ->
+ (match Shard.parse frame with
+ | Some f -> begin
+ Shard.handle_frame ~f t.state
+ >>| fun s -> t.state <- s; t
+ end
+ | None -> begin
+ Shard.recreate t.state
+ >>| fun s -> t.state <- s; t
+ end)
+ >>= fun t ->
+ ev_loop t
+ in
+ let rec gen_shards l a =
+ match l with
+ | (id, total) when id >= total -> return a
+ | (id, total) ->
+ Shard.create ~url ~shards:(id, total) ()
+ >>= fun shard ->
+ let t = Shard.{ state = shard; } in
+ let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in
+ ev_loop t >>> ignore;
+ gen_shards (id+1, total) (t :: a)
+ in
+ gen_shards shard_list []
+ >>| fun shards ->
+ { shards; }
+
+let set_status ~status sharder =
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.set_status ~status t.state
+ ) sharder.shards
+
+let set_status_with ~f sharder =
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.set_status ~status:(f t.state) t.state
+ ) sharder.shards
+
+let request_guild_members ?query ?limit ~guild sharder =
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.request_guild_members ~guild ?query ?limit t.state
+ ) sharder.shards
+
+let shutdown_all sharder =
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.shutdown_clean t.state
) sharder.shards
\ No newline at end of file |