From d342c4cf9fe907d2107cd815f9988f8ad147218b Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Sat, 24 Nov 2018 09:51:03 -0700 Subject: Major structural changes --- lib/sharder.ml | 267 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 lib/sharder.ml (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml new file mode 100644 index 0000000..9e2ef74 --- /dev/null +++ b/lib/sharder.ml @@ -0,0 +1,267 @@ +open Async +open Core +open Websocket_async + +exception Invalid_Payload + +module Shard = struct + type t = { + mutable hb: unit Ivar.t option; + mutable seq: int; + mutable session: string option; + handler: (string * Model.t) Pipe.Writer.t; + token: string; + shard: int * int; + write: string Pipe.Writer.t; + read: string Pipe.Reader.t; + ready: unit Ivar.t; + } + + let identify_lock = Mutex.create () + + let parse frame = + match frame with + | `Ok s -> Yojson.Basic.from_string s + | `Eof -> raise Invalid_Payload (* This needs to go into reconnect code, or stop using client_ez and handle frames manually *) + + let push_frame ?payload shard ev = + print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; + let content = match payload with + | None -> "" + | Some p -> + Yojson.Basic.to_string @@ `Assoc [ + ("op", `Int (Opcode.to_int ev)); + ("d", p); + ] + in + Pipe.write shard.write content + >>| fun () -> + shard + + let heartbeat shard = + let seq = match shard.seq with + | 0 -> `Null + | i -> `Int i + in + let payload = `Assoc [ + ("op", `Int 1); + ("d", seq); + ] in + push_frame ~payload shard HEARTBEAT + + let dispatch shard payload = + let module J = Yojson.Basic.Util in + let seq = J.(member "s" payload |> to_int) in + shard.seq <- seq; + let t = J.(member "t" payload |> to_string) in + let data = J.member "d" payload in + let _ = match t with + | "READY" -> + Ivar.fill_if_empty shard.ready (); + let session = J.(member "session_id" data |> to_string) in + shard.session <- Some session + | "MESSAGE_CREATE" -> + let msg = Model.Message.from_json data in + Pipe.write shard.handler (t, Message msg) >>> ignore + | "GUILD_CREATE" -> + let guild = Model.Guild.from_json data in + Pipe.write shard.handler (t, Guild guild) >>> ignore + | _ -> () + in + return shard + + let set_status shard status = + let payload = match status with + | `Assoc [("name", `String name); ("type", `Int t)] -> + `Assoc [ + ("status", `String "online"); + ("afk", `Bool false); + ("since", `Null); + ("game", `Assoc [ + ("name", `String name); + ("type", `Int t) + ]) + ] + | `String name -> + `Assoc [ + ("status", `String "online"); + ("afk", `Bool false); + ("since", `Null); + ("game", `Assoc [ + ("name", `String name); + ("type", `Int 0) + ]) + ] + | _ -> raise Invalid_Payload + in + Ivar.read shard.ready >>= fun _ -> + push_frame ~payload shard STATUS_UPDATE + + let request_guild_members ~guild ?(query="") ?(limit=0) shard = + let payload = `Assoc [ + ("guild_id", `String (string_of_int guild)); + ("query", `String query); + ("limit", `Int limit); + ] in + Ivar.read shard.ready >>= fun _ -> + push_frame ~payload shard REQUEST_GUILD_MEMBERS + + let initialize shard data = + let module J = Yojson.Basic.Util in + let hb = match shard.hb with + | None -> begin + let hb_interval = J.(member "heartbeat_interval" data |> to_int) in + let finished = Ivar.create () in + Clock.every' + ~continue_on_error:true + ~finished + (Core.Time.Span.create ~ms:hb_interval ()) + (fun () -> heartbeat shard >>= fun _ -> return ()); + finished + end + | Some s -> s + in + shard.hb <- Some hb; + Mutex.lock identify_lock; + let (cur, max) = shard.shard in + let shards = [`Int cur; `Int max] in + match shard.session with + | None -> + let payload = `Assoc [ + ("token", `String shard.token); + ("properties", `Assoc [ + ("$os", `String Sys.os_type); + ("$device", `String "dis.ml"); + ("$browser", `String "dis.ml") + ]); + ("compress", `Bool false); (* TODO add compression handling*) + ("large_threshold", `Int 250); + ("shard", `List shards); + ] in + push_frame ~payload shard IDENTIFY + | Some s -> + let payload = `Assoc [ + ("token", `String shard.token); + ("session_id", `String s); + ("seq", `Int shard.seq) + ] in + push_frame ~payload shard RESUME + >>| fun s -> + Clock.after (Core.Time.Span.create ~sec:5 ()) + >>| (fun _ -> Mutex.unlock identify_lock) + |> ignore; + s + + let handle_frame shard term = + let module J = Yojson.Basic.Util in + let op = J.(member "op" term |> to_int) + |> Opcode.from_int + in + match op with + | DISPATCH -> dispatch shard term + | HEARTBEAT -> heartbeat shard + | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *) + | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *) + | HELLO -> initialize shard @@ J.member "d" term + | HEARTBEAT_ACK -> return shard + | opcode -> + print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode; + return shard + + let create ~url ~shards ~token ~handler () = + let open Core in + let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in + let extra_headers = Http.Base.process_request_headers () in + let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in + let port = + match Uri.port uri, Uri_services.tcp_port_of_uri uri with + | Some p, _ -> p + | None, Some p -> p + | _ -> 443 in + let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in + let tcp_fun (r,w) = + let (read, write) = client_ez + ~extra_headers + uri r w + in + let rec ev_loop shard = + Pipe.read read + >>= fun frame -> + handle_frame shard @@ parse frame + >>= fun shard -> + ev_loop shard + in + let shard = { + read; + write; + handler; + ready = Ivar.create (); + hb = None; + seq = 0; + shard = shards; + session = None; + token = token; + } + in + ev_loop shard |> ignore; + return shard + in + match Unix.getaddrinfo host (string_of_int port) [] with + | [] -> failwithf "DNS resolution failed for %s" host () + | { ai_addr; _ } :: _ -> + let addr = + match scheme, ai_addr with + | _, ADDR_UNIX path -> `Unix_domain_socket path + | "https", ADDR_INET (h, p) + | "wss", ADDR_INET (h, p) -> + let h = Ipaddr_unix.of_inet_addr h in + `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ()) + | _, ADDR_INET (h, p) -> + let h = Ipaddr_unix.of_inet_addr h in + `TCP (h, p) + in + Conduit_async.V2.connect addr >>= tcp_fun +end + +type t = { + shards: Shard.t list; +} + +let start ?count ~handler token = + let module J = Yojson.Basic.Util in + Http.get_gateway_bot () >>= fun data -> + let url = J.(member "url" data |> to_string) in + let count = match count with + | Some c -> c + | None -> J.(member "shards" data |> to_int) + in + let shard_list = (0, count) in + let rec gen_shards l a = + match l with + | (id, total) when id >= total -> return a + | (id, total) -> + Shard.create ~url ~shards:(id, total) ~token ~handler () + >>= fun shard -> + let a = shard :: a in + gen_shards (id+1, total) a + in + gen_shards shard_list [] + >>| fun shards -> + { + shards; + } + +let set_status sharder status = + Deferred.all @@ List.map ~f:(fun shard -> + Shard.set_status shard status + ) sharder.shards + +let set_status_with sharder f = + Deferred.all @@ List.map ~f:(fun shard -> + Shard.set_status shard @@ f shard + ) sharder.shards + +let request_guild_members ~guild ?query ?limit sharder = + Deferred.all @@ List.map ~f:(fun shard -> + Shard.request_guild_members ~guild ?query ?limit shard + ) sharder.shards -- cgit v1.2.3 From 011e3224c0292dfcb0024daf474d4ef1e00b82f0 Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Sun, 25 Nov 2018 16:02:37 -0700 Subject: A lot is going on... --- lib/sharder.ml | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 9e2ef74..5d665b9 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -9,7 +9,6 @@ module Shard = struct mutable hb: unit Ivar.t option; mutable seq: int; mutable session: string option; - handler: (string * Model.t) Pipe.Writer.t; token: string; shard: int * int; write: string Pipe.Writer.t; @@ -55,19 +54,11 @@ module Shard = struct shard.seq <- seq; let t = J.(member "t" payload |> to_string) in let data = J.member "d" payload in - let _ = match t with - | "READY" -> + if t = "READY" then begin Ivar.fill_if_empty shard.ready (); let session = J.(member "session_id" data |> to_string) in shard.session <- Some session - | "MESSAGE_CREATE" -> - let msg = Model.Message.from_json data in - Pipe.write shard.handler (t, Message msg) >>> ignore - | "GUILD_CREATE" -> - let guild = Model.Guild.from_json data in - Pipe.write shard.handler (t, Guild guild) >>> ignore - | _ -> () - in + end; return shard let set_status shard status = @@ -168,7 +159,7 @@ module Shard = struct print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode; return shard - let create ~url ~shards ~token ~handler () = + let create ~url ~shards ~token () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = Http.Base.process_request_headers () in @@ -194,7 +185,6 @@ module Shard = struct let shard = { read; write; - handler; ready = Ivar.create (); hb = None; seq = 0; @@ -227,7 +217,7 @@ type t = { shards: Shard.t list; } -let start ?count ~handler token = +let start ?count token = let module J = Yojson.Basic.Util in Http.get_gateway_bot () >>= fun data -> let url = J.(member "url" data |> to_string) in @@ -240,7 +230,7 @@ let start ?count ~handler token = match l with | (id, total) when id >= total -> return a | (id, total) -> - Shard.create ~url ~shards:(id, total) ~token ~handler () + Shard.create ~url ~shards:(id, total) ~token () >>= fun shard -> let a = shard :: a in gen_shards (id+1, total) a -- cgit v1.2.3 From 048fe6388e84eaccb3440fe0d0c7101b3877701d Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Sun, 25 Nov 2018 16:13:59 -0700 Subject: Naming consistency --- lib/sharder.ml | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 5d665b9..ccfb047 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -23,7 +23,7 @@ module Shard = struct | `Ok s -> Yojson.Basic.from_string s | `Eof -> raise Invalid_Payload (* This needs to go into reconnect code, or stop using client_ez and handle frames manually *) - let push_frame ?payload shard ev = + let push_frame ?payload ~ev shard = print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; let content = match payload with | None -> "" @@ -46,9 +46,9 @@ module Shard = struct ("op", `Int 1); ("d", seq); ] in - push_frame ~payload shard HEARTBEAT + push_frame ~payload ~ev:HEARTBEAT shard - let dispatch shard payload = + let dispatch ~payload shard = let module J = Yojson.Basic.Util in let seq = J.(member "s" payload |> to_int) in shard.seq <- seq; @@ -61,7 +61,7 @@ module Shard = struct end; return shard - let set_status shard status = + let set_status ~status shard = let payload = match status with | `Assoc [("name", `String name); ("type", `Int t)] -> `Assoc [ @@ -86,7 +86,7 @@ module Shard = struct | _ -> raise Invalid_Payload in Ivar.read shard.ready >>= fun _ -> - push_frame ~payload shard STATUS_UPDATE + push_frame ~payload ~ev:STATUS_UPDATE shard let request_guild_members ~guild ?(query="") ?(limit=0) shard = let payload = `Assoc [ @@ -95,9 +95,9 @@ module Shard = struct ("limit", `Int limit); ] in Ivar.read shard.ready >>= fun _ -> - push_frame ~payload shard REQUEST_GUILD_MEMBERS + push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard - let initialize shard data = + let initialize ~data shard = let module J = Yojson.Basic.Util in let hb = match shard.hb with | None -> begin @@ -129,31 +129,31 @@ module Shard = struct ("large_threshold", `Int 250); ("shard", `List shards); ] in - push_frame ~payload shard IDENTIFY + push_frame ~payload ~ev:IDENTIFY shard | Some s -> let payload = `Assoc [ ("token", `String shard.token); ("session_id", `String s); ("seq", `Int shard.seq) ] in - push_frame ~payload shard RESUME + push_frame ~payload ~ev:RESUME shard >>| fun s -> Clock.after (Core.Time.Span.create ~sec:5 ()) >>| (fun _ -> Mutex.unlock identify_lock) |> ignore; s - let handle_frame shard term = + let handle_frame ~f shard = let module J = Yojson.Basic.Util in - let op = J.(member "op" term |> to_int) + let op = J.(member "op" f |> to_int) |> Opcode.from_int in match op with - | DISPATCH -> dispatch shard term + | DISPATCH -> dispatch ~payload:f shard | HEARTBEAT -> heartbeat shard | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *) | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *) - | HELLO -> initialize shard @@ J.member "d" term + | HELLO -> initialize ~data:(J.member "d" f) shard | HEARTBEAT_ACK -> return shard | opcode -> print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode; @@ -178,7 +178,7 @@ module Shard = struct let rec ev_loop shard = Pipe.read read >>= fun frame -> - handle_frame shard @@ parse frame + handle_frame ~f:(parse frame) shard >>= fun shard -> ev_loop shard in @@ -243,12 +243,12 @@ let start ?count token = let set_status sharder status = Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status shard status + Shard.set_status ~status shard ) sharder.shards let set_status_with sharder f = Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status shard @@ f shard + Shard.set_status ~status:(f shard) shard ) sharder.shards let request_guild_members ~guild ?query ?limit sharder = -- cgit v1.2.3 From ad8a13b186683cb1e6c5ef405df503f58b751ffa Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Sun, 25 Nov 2018 18:51:44 -0700 Subject: convert to client from client_ez --- lib/sharder.ml | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index ccfb047..defcfe1 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -11,17 +11,17 @@ module Shard = struct mutable session: string option; token: string; shard: int * int; - write: string Pipe.Writer.t; - read: string Pipe.Reader.t; + write: Frame.t Pipe.Writer.t; + read: Frame.t Pipe.Reader.t; ready: unit Ivar.t; } let identify_lock = Mutex.create () - let parse frame = + let parse (frame:[`Ok of Frame.t | `Eof]) = match frame with - | `Ok s -> Yojson.Basic.from_string s - | `Eof -> raise Invalid_Payload (* This needs to go into reconnect code, or stop using client_ez and handle frames manually *) + | `Ok s -> Yojson.Basic.from_string s.content (* TODO Handler non-text frames *) + | `Eof -> raise Invalid_Payload (* TODO This needs to go into reconnect code, or stop using client_ez and handle frames manually *) let push_frame ?payload ~ev shard = print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; @@ -33,7 +33,7 @@ module Shard = struct ("d", p); ] in - Pipe.write shard.write content + Pipe.write shard.write @@ Frame.create ~content () >>| fun () -> shard @@ -156,7 +156,7 @@ module Shard = struct | HELLO -> initialize ~data:(J.member "d" f) shard | HEARTBEAT_ACK -> return shard | opcode -> - print_endline @@ "Invalid Opcode:" ^ Opcode.to_string opcode; + print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard let create ~url ~shards ~token () = @@ -170,17 +170,26 @@ module Shard = struct | None, Some p -> p | _ -> 443 in let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in - let tcp_fun (r,w) = - let (read, write) = client_ez + let tcp_fun (net_to_ws, ws_to_net) = + let (app_to_ws, write) = Pipe.create () in + let (read, ws_to_app) = Pipe.create () in + let initialized = Ivar.create () in + client + ~initialized ~extra_headers - uri r w - in - let rec ev_loop shard = - Pipe.read read + ~app_to_ws + ~ws_to_app + ~net_to_ws + ~ws_to_net + uri + >>> ignore; + Ivar.read initialized >>| fun () -> + let rec ev_loop ~reader shard = + Pipe.read reader >>= fun frame -> handle_frame ~f:(parse frame) shard >>= fun shard -> - ev_loop shard + ev_loop ~reader shard in let shard = { read; @@ -193,8 +202,8 @@ module Shard = struct token = token; } in - ev_loop shard |> ignore; - return shard + ev_loop ~reader:read shard |> ignore; + shard in match Unix.getaddrinfo host (string_of_int port) [] with | [] -> failwithf "DNS resolution failed for %s" host () -- cgit v1.2.3 From 77f522a5f3fd74749e7a2cd4c849e520f2b6ba89 Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Thu, 29 Nov 2018 06:16:23 -0700 Subject: Some sharding work, reconnect is mostly working --- lib/sharder.ml | 161 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 99 insertions(+), 62 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index defcfe1..74870bd 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -3,25 +3,39 @@ open Core open Websocket_async exception Invalid_Payload +exception Failure_to_Establish_Heartbeat module Shard = struct - type t = { - mutable hb: unit Ivar.t option; - mutable seq: int; - mutable session: string option; - token: string; - shard: int * int; - write: Frame.t Pipe.Writer.t; - read: Frame.t Pipe.Reader.t; + type shard = { + hb: unit Ivar.t option; + seq: int; + session: string option; + pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; ready: unit Ivar.t; + token: string; + url: string; + id: int * int; + } + + type 'a t = { + mutable shard: 'a; + mutable binds: ('a -> unit) list; } let identify_lock = Mutex.create () + let bind ~f t = + t.binds <- f :: t.binds + let parse (frame:[`Ok of Frame.t | `Eof]) = match frame with - | `Ok s -> Yojson.Basic.from_string s.content (* TODO Handler non-text frames *) - | `Eof -> raise Invalid_Payload (* TODO This needs to go into reconnect code, or stop using client_ez and handle frames manually *) + | `Ok s -> begin + let open Frame.Opcode in + match s.opcode with + | Text -> Some (Yojson.Basic.from_string s.content) + | _ -> None + end + | `Eof -> None let push_frame ?payload ~ev shard = print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; @@ -33,33 +47,31 @@ module Shard = struct ("d", p); ] in - Pipe.write shard.write @@ Frame.create ~content () + let (_, write) = shard.pipe in + Pipe.write_if_open write @@ Frame.create ~content () >>| fun () -> shard let heartbeat shard = - let seq = match shard.seq with + let payload = match shard.seq with | 0 -> `Null | i -> `Int i in - let payload = `Assoc [ - ("op", `Int 1); - ("d", seq); - ] in push_frame ~payload ~ev:HEARTBEAT shard let dispatch ~payload shard = let module J = Yojson.Basic.Util in let seq = J.(member "s" payload |> to_int) in - shard.seq <- seq; let t = J.(member "t" payload |> to_string) in let data = J.member "d" payload in + let session = J.(member "session_id" data |> to_string_option) in if t = "READY" then begin Ivar.fill_if_empty shard.ready (); - let session = J.(member "session_id" data |> to_string) in - shard.session <- Some session end; - return shard + return { shard with + seq = seq; + session = session; + } let set_status ~status shard = let payload = match status with @@ -97,27 +109,30 @@ module Shard = struct Ivar.read shard.ready >>= fun _ -> push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard - let initialize ~data shard = + let initialize ?data shard = let module J = Yojson.Basic.Util in let hb = match shard.hb with | None -> begin - let hb_interval = J.(member "heartbeat_interval" data |> to_int) in - let finished = Ivar.create () in - Clock.every' - ~continue_on_error:true - ~finished - (Core.Time.Span.create ~ms:hb_interval ()) - (fun () -> heartbeat shard >>= fun _ -> return ()); - finished + match data with + | Some data -> + let hb_interval = J.(member "heartbeat_interval" data |> to_int) in + let finished = Ivar.create () in + Clock.every' + ~continue_on_error:true + ~finished + (Core.Time.Span.create ~ms:hb_interval ()) + (fun () -> heartbeat shard >>= fun _ -> return ()); + finished + | None -> raise Failure_to_Establish_Heartbeat end | Some s -> s in - shard.hb <- Some hb; - Mutex.lock identify_lock; - let (cur, max) = shard.shard in + let shard = { shard with hb = Some hb; } in + let (cur, max) = shard.id in let shards = [`Int cur; `Int max] in match shard.session with - | None -> + | None -> begin + Mutex.lock identify_lock; let payload = `Assoc [ ("token", `String shard.token); ("properties", `Assoc [ @@ -130,6 +145,12 @@ module Shard = struct ("shard", `List shards); ] in push_frame ~payload ~ev:IDENTIFY shard + >>| fun s -> begin + Clock.after (Core.Time.Span.create ~sec:5 ()) + >>> (fun _ -> Mutex.unlock identify_lock); + s + end + end | Some s -> let payload = `Assoc [ ("token", `String shard.token); @@ -137,11 +158,6 @@ module Shard = struct ("seq", `Int shard.seq) ] in push_frame ~payload ~ev:RESUME shard - >>| fun s -> - Clock.after (Core.Time.Span.create ~sec:5 ()) - >>| (fun _ -> Mutex.unlock identify_lock) - |> ignore; - s let handle_frame ~f shard = let module J = Yojson.Basic.Util in @@ -151,15 +167,21 @@ module Shard = struct match op with | DISPATCH -> dispatch ~payload:f shard | HEARTBEAT -> heartbeat shard - | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *) - | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *) + | INVALID_SESSION -> begin + if J.(member "d" f |> to_bool) then + initialize shard + else begin + initialize { shard with session = None; } + end + end + | RECONNECT -> initialize shard | HELLO -> initialize ~data:(J.member "d" f) shard | HEARTBEAT_ACK -> return shard | opcode -> print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard - let create ~url ~shards ~token () = + let rec create ~url ~shards ~token () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = Http.Base.process_request_headers () in @@ -184,26 +206,36 @@ module Shard = struct uri >>> ignore; Ivar.read initialized >>| fun () -> - let rec ev_loop ~reader shard = - Pipe.read reader + let rec ev_loop t = + let (read, _) = t.shard.pipe in + Pipe.read read >>= fun frame -> - handle_frame ~f:(parse frame) shard - >>= fun shard -> - ev_loop ~reader shard + (match parse frame with + | Some f -> begin + handle_frame ~f t.shard + >>| fun shard -> + t.shard <- shard; + t + end + | None -> recreate t.shard) + >>= fun t -> + List.iter ~f:(fun f -> f t.shard) t.binds; + ev_loop t in let shard = { - read; - write; + pipe = (read, write); ready = Ivar.create (); hb = None; seq = 0; - shard = shards; + id = shards; session = None; - token = token; + token; + url; } in - ev_loop ~reader:read shard |> ignore; - shard + let t = { shard; binds = []; } in + ev_loop t >>> ignore; + t in match Unix.getaddrinfo host (string_of_int port) [] with | [] -> failwithf "DNS resolution failed for %s" host () @@ -220,10 +252,17 @@ module Shard = struct `TCP (h, p) in Conduit_async.V2.connect addr >>= tcp_fun + and recreate shard = + print_endline "Reconnecting..."; + (match shard.hb with + | Some hb -> Ivar.fill_if_empty hb () + | None -> () + ); + create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) () end type t = { - shards: Shard.t list; + mutable shards: (Shard.shard Shard.t) list; } let start ?count token = @@ -246,21 +285,19 @@ let start ?count token = in gen_shards shard_list [] >>| fun shards -> - { - shards; - } + { shards; } let set_status sharder status = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status ~status shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status t.shard ) sharder.shards let set_status_with sharder f = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status ~status:(f shard) shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status:(f t.shard) t.shard ) sharder.shards let request_guild_members ~guild ?query ?limit sharder = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.request_guild_members ~guild ?query ?limit shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.request_guild_members ~guild ?query ?limit t.shard ) sharder.shards -- cgit v1.2.3 From 7e5604758c5326c2800ef48bdde64dec2ab37409 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 29 Nov 2018 10:23:20 -0700 Subject: hopefully fixes the shard changes not propogating on reconnect --- lib/sharder.ml | 46 ++++++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 24 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 74870bd..27b3b41 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -204,25 +204,9 @@ module Shard = struct ~net_to_ws ~ws_to_net uri - >>> ignore; + >>> ignore; (* TODO this needs to error check and retry with backoff *) Ivar.read initialized >>| fun () -> - let rec ev_loop t = - let (read, _) = t.shard.pipe in - Pipe.read read - >>= fun frame -> - (match parse frame with - | Some f -> begin - handle_frame ~f t.shard - >>| fun shard -> - t.shard <- shard; - t - end - | None -> recreate t.shard) - >>= fun t -> - List.iter ~f:(fun f -> f t.shard) t.binds; - ev_loop t - in - let shard = { + { pipe = (read, write); ready = Ivar.create (); hb = None; @@ -232,10 +216,6 @@ module Shard = struct token; url; } - in - let t = { shard; binds = []; } in - ev_loop t >>> ignore; - t in match Unix.getaddrinfo host (string_of_int port) [] with | [] -> failwithf "DNS resolution failed for %s" host () @@ -262,7 +242,7 @@ module Shard = struct end type t = { - mutable shards: (Shard.shard Shard.t) list; + shards: (Shard.shard Shard.t) list; } let start ?count token = @@ -280,7 +260,25 @@ let start ?count token = | (id, total) -> Shard.create ~url ~shards:(id, total) ~token () >>= fun shard -> - let a = shard :: a in + let rec ev_loop t = + let (read, _) = t.shard.pipe in + Pipe.read read + >>= fun frame -> + (match parse frame with + | Some f -> begin + handle_frame ~f t.shard + >>| fun shard -> + t.shard <- shard; + t + end + | None -> recreate t.shard) + >>= fun t -> + List.iter ~f:(fun f -> f t.shard) t.binds; + ev_loop t + in + let t = { shard; binds = []; } in + ev_loop t >>> ignore; + let a = t :: a in gen_shards (id+1, total) a in gen_shards shard_list [] -- cgit v1.2.3 From b50ec1c46e8c73c7993898d52a567e1d662218cc Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 29 Nov 2018 10:27:06 -0700 Subject: Missed a couple things --- lib/sharder.ml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 27b3b41..1deae72 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -263,15 +263,16 @@ let start ?count token = let rec ev_loop t = let (read, _) = t.shard.pipe in Pipe.read read - >>= fun frame -> - (match parse frame with + >>| fun frame -> + let _ = match parse frame with | Some f -> begin handle_frame ~f t.shard - >>| fun shard -> + >>> fun shard -> t.shard <- shard; - t end - | None -> recreate t.shard) + | None -> t.shard <- recreate t.shard; + in + t >>= fun t -> List.iter ~f:(fun f -> f t.shard) t.binds; ev_loop t -- cgit v1.2.3 From a1e0eed9739c2006c8012a8a9e0f8cde5f3c250d Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 29 Nov 2018 10:29:27 -0700 Subject: Clean up a bit --- lib/sharder.ml | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 1deae72..7f66b4e 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -254,33 +254,32 @@ let start ?count token = | None -> J.(member "shards" data |> to_int) in let shard_list = (0, count) in + let rec ev_loop t = + let (read, _) = t.shard.pipe in + Pipe.read read + >>| fun frame -> + let _ = match parse frame with + | Some f -> begin + handle_frame ~f t.shard + >>> fun shard -> + t.shard <- shard; + end + | None -> t.shard <- recreate t.shard; + in + t + >>= fun t -> + List.iter ~f:(fun f -> f t.shard) t.binds; + ev_loop t + in let rec gen_shards l a = match l with | (id, total) when id >= total -> return a | (id, total) -> Shard.create ~url ~shards:(id, total) ~token () >>= fun shard -> - let rec ev_loop t = - let (read, _) = t.shard.pipe in - Pipe.read read - >>| fun frame -> - let _ = match parse frame with - | Some f -> begin - handle_frame ~f t.shard - >>> fun shard -> - t.shard <- shard; - end - | None -> t.shard <- recreate t.shard; - in - t - >>= fun t -> - List.iter ~f:(fun f -> f t.shard) t.binds; - ev_loop t - in let t = { shard; binds = []; } in ev_loop t >>> ignore; - let a = t :: a in - gen_shards (id+1, total) a + gen_shards (id+1, total) (t :: a) in gen_shards shard_list [] >>| fun shards -> -- cgit v1.2.3 From 473072f66e6c7e228b4f26730cbc7304941fb12b Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 29 Nov 2018 12:55:10 -0700 Subject: functors! --- lib/sharder.ml | 549 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 276 insertions(+), 273 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 7f66b4e..1fa97a0 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -2,300 +2,303 @@ open Async open Core open Websocket_async -exception Invalid_Payload -exception Failure_to_Establish_Heartbeat +module Make(H: S.Http) = struct + exception Invalid_Payload + exception Failure_to_Establish_Heartbeat -module Shard = struct - type shard = { - hb: unit Ivar.t option; - seq: int; - session: string option; - pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; - ready: unit Ivar.t; - token: string; - url: string; - id: int * int; - } + let token = H.token - type 'a t = { - mutable shard: 'a; - mutable binds: ('a -> unit) list; - } + module Shard = struct + type shard = { + hb: unit Ivar.t option; + seq: int; + session: string option; + pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; + ready: unit Ivar.t; + url: string; + id: int * int; + } - let identify_lock = Mutex.create () + type 'a t = { + mutable shard: 'a; + mutable binds: ('a -> unit) list; + } - let bind ~f t = - t.binds <- f :: t.binds + let identify_lock = Mutex.create () - let parse (frame:[`Ok of Frame.t | `Eof]) = - match frame with - | `Ok s -> begin - let open Frame.Opcode in - match s.opcode with - | Text -> Some (Yojson.Basic.from_string s.content) - | _ -> None - end - | `Eof -> None + let bind ~f t = + t.binds <- f :: t.binds - let push_frame ?payload ~ev shard = - print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; - let content = match payload with - | None -> "" - | Some p -> - Yojson.Basic.to_string @@ `Assoc [ - ("op", `Int (Opcode.to_int ev)); - ("d", p); - ] - in - let (_, write) = shard.pipe in - Pipe.write_if_open write @@ Frame.create ~content () - >>| fun () -> - shard + let parse (frame:[`Ok of Frame.t | `Eof]) = + match frame with + | `Ok s -> begin + let open Frame.Opcode in + match s.opcode with + | Text -> Some (Yojson.Basic.from_string s.content) + | _ -> None + end + | `Eof -> None - let heartbeat shard = - let payload = match shard.seq with - | 0 -> `Null - | i -> `Int i - in - push_frame ~payload ~ev:HEARTBEAT shard + let push_frame ?payload ~ev shard = + print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; + let content = match payload with + | None -> "" + | Some p -> + Yojson.Basic.to_string @@ `Assoc [ + ("op", `Int (Opcode.to_int ev)); + ("d", p); + ] + in + let (_, write) = shard.pipe in + Pipe.write_if_open write @@ Frame.create ~content () + >>| fun () -> + shard - let dispatch ~payload shard = - let module J = Yojson.Basic.Util in - let seq = J.(member "s" payload |> to_int) in - let t = J.(member "t" payload |> to_string) in - let data = J.member "d" payload in - let session = J.(member "session_id" data |> to_string_option) in - if t = "READY" then begin - Ivar.fill_if_empty shard.ready (); - end; - return { shard with - seq = seq; - session = session; - } + let heartbeat shard = + let payload = match shard.seq with + | 0 -> `Null + | i -> `Int i + in + push_frame ~payload ~ev:HEARTBEAT shard - let set_status ~status shard = - let payload = match status with - | `Assoc [("name", `String name); ("type", `Int t)] -> - `Assoc [ - ("status", `String "online"); - ("afk", `Bool false); - ("since", `Null); - ("game", `Assoc [ - ("name", `String name); - ("type", `Int t) - ]) - ] - | `String name -> - `Assoc [ - ("status", `String "online"); - ("afk", `Bool false); - ("since", `Null); - ("game", `Assoc [ - ("name", `String name); - ("type", `Int 0) - ]) - ] - | _ -> raise Invalid_Payload - in - Ivar.read shard.ready >>= fun _ -> - push_frame ~payload ~ev:STATUS_UPDATE shard + let dispatch ~payload shard = + let module J = Yojson.Basic.Util in + let seq = J.(member "s" payload |> to_int) in + let t = J.(member "t" payload |> to_string) in + let data = J.member "d" payload in + let session = J.(member "session_id" data |> to_string_option) in + if t = "READY" then begin + Ivar.fill_if_empty shard.ready (); + end; + return { shard with + seq = seq; + session = session; + } - let request_guild_members ~guild ?(query="") ?(limit=0) shard = - let payload = `Assoc [ - ("guild_id", `String (string_of_int guild)); - ("query", `String query); - ("limit", `Int limit); - ] in - Ivar.read shard.ready >>= fun _ -> - push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard + let set_status ~status shard = + let payload = match status with + | `Assoc [("name", `String name); ("type", `Int t)] -> + `Assoc [ + ("status", `String "online"); + ("afk", `Bool false); + ("since", `Null); + ("game", `Assoc [ + ("name", `String name); + ("type", `Int t) + ]) + ] + | `String name -> + `Assoc [ + ("status", `String "online"); + ("afk", `Bool false); + ("since", `Null); + ("game", `Assoc [ + ("name", `String name); + ("type", `Int 0) + ]) + ] + | _ -> raise Invalid_Payload + in + Ivar.read shard.ready >>= fun _ -> + push_frame ~payload ~ev:STATUS_UPDATE shard - let initialize ?data shard = - let module J = Yojson.Basic.Util in - let hb = match shard.hb with - | None -> begin - match data with - | Some data -> - let hb_interval = J.(member "heartbeat_interval" data |> to_int) in - let finished = Ivar.create () in - Clock.every' - ~continue_on_error:true - ~finished - (Core.Time.Span.create ~ms:hb_interval ()) - (fun () -> heartbeat shard >>= fun _ -> return ()); - finished - | None -> raise Failure_to_Establish_Heartbeat - end - | Some s -> s - in - let shard = { shard with hb = Some hb; } in - let (cur, max) = shard.id in - let shards = [`Int cur; `Int max] in - match shard.session with - | None -> begin - Mutex.lock identify_lock; + let request_guild_members ~guild ?(query="") ?(limit=0) shard = let payload = `Assoc [ - ("token", `String shard.token); - ("properties", `Assoc [ - ("$os", `String Sys.os_type); - ("$device", `String "dis.ml"); - ("$browser", `String "dis.ml") - ]); - ("compress", `Bool false); (* TODO add compression handling*) - ("large_threshold", `Int 250); - ("shard", `List shards); + ("guild_id", `String (string_of_int guild)); + ("query", `String query); + ("limit", `Int limit); ] in - push_frame ~payload ~ev:IDENTIFY shard - >>| fun s -> begin - Clock.after (Core.Time.Span.create ~sec:5 ()) - >>> (fun _ -> Mutex.unlock identify_lock); - s + Ivar.read shard.ready >>= fun _ -> + push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard + + let initialize ?data shard = + let module J = Yojson.Basic.Util in + let hb = match shard.hb with + | None -> begin + match data with + | Some data -> + let hb_interval = J.(member "heartbeat_interval" data |> to_int) in + let finished = Ivar.create () in + Clock.every' + ~continue_on_error:true + ~finished + (Core.Time.Span.create ~ms:hb_interval ()) + (fun () -> heartbeat shard >>= fun _ -> return ()); + finished + | None -> raise Failure_to_Establish_Heartbeat end - end - | Some s -> - let payload = `Assoc [ - ("token", `String shard.token); - ("session_id", `String s); - ("seq", `Int shard.seq) - ] in - push_frame ~payload ~ev:RESUME shard + | Some s -> s + in + let shard = { shard with hb = Some hb; } in + let (cur, max) = shard.id in + let shards = [`Int cur; `Int max] in + match shard.session with + | None -> begin + Mutex.lock identify_lock; + let payload = `Assoc [ + ("token", `String shard.token); + ("properties", `Assoc [ + ("$os", `String Sys.os_type); + ("$device", `String "dis.ml"); + ("$browser", `String "dis.ml") + ]); + ("compress", `Bool false); (* TODO add compression handling*) + ("large_threshold", `Int 250); + ("shard", `List shards); + ] in + push_frame ~payload ~ev:IDENTIFY shard + >>| fun s -> begin + Clock.after (Core.Time.Span.create ~sec:5 ()) + >>> (fun _ -> Mutex.unlock identify_lock); + s + end + end + | Some s -> + let payload = `Assoc [ + ("token", `String shard.token); + ("session_id", `String s); + ("seq", `Int shard.seq) + ] in + push_frame ~payload ~ev:RESUME shard - let handle_frame ~f shard = - let module J = Yojson.Basic.Util in - let op = J.(member "op" f |> to_int) - |> Opcode.from_int - in - match op with - | DISPATCH -> dispatch ~payload:f shard - | HEARTBEAT -> heartbeat shard - | INVALID_SESSION -> begin - if J.(member "d" f |> to_bool) then - initialize shard - else begin - initialize { shard with session = None; } + let handle_frame ~f shard = + let module J = Yojson.Basic.Util in + let op = J.(member "op" f |> to_int) + |> Opcode.from_int + in + match op with + | DISPATCH -> dispatch ~payload:f shard + | HEARTBEAT -> heartbeat shard + | INVALID_SESSION -> begin + if J.(member "d" f |> to_bool) then + initialize shard + else begin + initialize { shard with session = None; } + end end - end - | RECONNECT -> initialize shard - | HELLO -> initialize ~data:(J.member "d" f) shard - | HEARTBEAT_ACK -> return shard - | opcode -> - print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; - return shard + | RECONNECT -> initialize shard + | HELLO -> initialize ~data:(J.member "d" f) shard + | HEARTBEAT_ACK -> return shard + | opcode -> + print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; + return shard - let rec create ~url ~shards ~token () = - let open Core in - let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in - let extra_headers = Http.Base.process_request_headers () in - let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in - let port = - match Uri.port uri, Uri_services.tcp_port_of_uri uri with - | Some p, _ -> p - | None, Some p -> p - | _ -> 443 in - let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in - let tcp_fun (net_to_ws, ws_to_net) = - let (app_to_ws, write) = Pipe.create () in - let (read, ws_to_app) = Pipe.create () in - let initialized = Ivar.create () in - client - ~initialized - ~extra_headers - ~app_to_ws - ~ws_to_app - ~net_to_ws - ~ws_to_net - uri - >>> ignore; (* TODO this needs to error check and retry with backoff *) - Ivar.read initialized >>| fun () -> - { - pipe = (read, write); - ready = Ivar.create (); - hb = None; - seq = 0; - id = shards; - session = None; - token; - url; - } - in - match Unix.getaddrinfo host (string_of_int port) [] with - | [] -> failwithf "DNS resolution failed for %s" host () - | { ai_addr; _ } :: _ -> - let addr = - match scheme, ai_addr with - | _, ADDR_UNIX path -> `Unix_domain_socket path - | "https", ADDR_INET (h, p) - | "wss", ADDR_INET (h, p) -> - let h = Ipaddr_unix.of_inet_addr h in - `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ()) - | _, ADDR_INET (h, p) -> - let h = Ipaddr_unix.of_inet_addr h in - `TCP (h, p) + let rec create ~url ~shards ~token () = + let open Core in + let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in + let extra_headers = H.Base.process_request_headers () in + let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in + let port = + match Uri.port uri, Uri_services.tcp_port_of_uri uri with + | Some p, _ -> p + | None, Some p -> p + | _ -> 443 in + let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in + let tcp_fun (net_to_ws, ws_to_net) = + let (app_to_ws, write) = Pipe.create () in + let (read, ws_to_app) = Pipe.create () in + let initialized = Ivar.create () in + client + ~initialized + ~extra_headers + ~app_to_ws + ~ws_to_app + ~net_to_ws + ~ws_to_net + uri + >>> ignore; (* TODO this needs to error check and retry with backoff *) + Ivar.read initialized >>| fun () -> + { + pipe = (read, write); + ready = Ivar.create (); + hb = None; + seq = 0; + id = shards; + session = None; + token; + url; + } in - Conduit_async.V2.connect addr >>= tcp_fun - and recreate shard = - print_endline "Reconnecting..."; - (match shard.hb with - | Some hb -> Ivar.fill_if_empty hb () - | None -> () - ); - create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) () -end + match Unix.getaddrinfo host (string_of_int port) [] with + | [] -> failwithf "DNS resolution failed for %s" host () + | { ai_addr; _ } :: _ -> + let addr = + match scheme, ai_addr with + | _, ADDR_UNIX path -> `Unix_domain_socket path + | "https", ADDR_INET (h, p) + | "wss", ADDR_INET (h, p) -> + let h = Ipaddr_unix.of_inet_addr h in + `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ()) + | _, ADDR_INET (h, p) -> + let h = Ipaddr_unix.of_inet_addr h in + `TCP (h, p) + in + Conduit_async.V2.connect addr >>= tcp_fun + and recreate shard = + print_endline "Reconnecting..."; + (match shard.hb with + | Some hb -> Ivar.fill_if_empty hb () + | None -> () + ); + create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) () + end -type t = { - shards: (Shard.shard Shard.t) list; -} + type t = { + shards: (Shard.shard Shard.t) list; + } -let start ?count token = - let module J = Yojson.Basic.Util in - Http.get_gateway_bot () >>= fun data -> - let url = J.(member "url" data |> to_string) in - let count = match count with - | Some c -> c - | None -> J.(member "shards" data |> to_int) - in - let shard_list = (0, count) in - let rec ev_loop t = - let (read, _) = t.shard.pipe in - Pipe.read read - >>| fun frame -> - let _ = match parse frame with - | Some f -> begin - handle_frame ~f t.shard - >>> fun shard -> - t.shard <- shard; - end - | None -> t.shard <- recreate t.shard; + let start ?count token = + let module J = Yojson.Basic.Util in + Http.get_gateway_bot () >>= fun data -> + let url = J.(member "url" data |> to_string) in + let count = match count with + | Some c -> c + | None -> J.(member "shards" data |> to_int) + in + let shard_list = (0, count) in + let rec ev_loop t = + let (read, _) = t.shard.pipe in + Pipe.read read + >>| fun frame -> + let _ = match parse frame with + | Some f -> begin + handle_frame ~f t.shard + >>> fun shard -> + t.shard <- shard; + end + | None -> t.shard <- recreate t.shard; + in + t + >>= fun t -> + List.iter ~f:(fun f -> f t.shard) t.binds; + ev_loop t + in + let rec gen_shards l a = + match l with + | (id, total) when id >= total -> return a + | (id, total) -> + Shard.create ~url ~shards:(id, total) ~token () + >>= fun shard -> + let t = { shard; binds = []; } in + ev_loop t >>> ignore; + gen_shards (id+1, total) (t :: a) in - t - >>= fun t -> - List.iter ~f:(fun f -> f t.shard) t.binds; - ev_loop t - in - let rec gen_shards l a = - match l with - | (id, total) when id >= total -> return a - | (id, total) -> - Shard.create ~url ~shards:(id, total) ~token () - >>= fun shard -> - let t = { shard; binds = []; } in - ev_loop t >>> ignore; - gen_shards (id+1, total) (t :: a) - in - gen_shards shard_list [] - >>| fun shards -> - { shards; } + gen_shards shard_list [] + >>| fun shards -> + { shards; } -let set_status sharder status = - Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status t.shard - ) sharder.shards + let set_status sharder status = + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status t.shard + ) sharder.shards -let set_status_with sharder f = - Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status:(f t.shard) t.shard - ) sharder.shards + let set_status_with sharder f = + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status:(f t.shard) t.shard + ) sharder.shards -let request_guild_members ~guild ?query ?limit sharder = - Deferred.all @@ List.map ~f:(fun t -> - Shard.request_guild_members ~guild ?query ?limit t.shard - ) sharder.shards + let request_guild_members ~guild ?query ?limit sharder = + Deferred.all @@ List.map ~f:(fun t -> + Shard.request_guild_members ~guild ?query ?limit t.shard + ) sharder.shards +end \ No newline at end of file -- cgit v1.2.3 From 18f4b7e8cada448f6fc15ee8ee18944dcb0b1676 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 29 Nov 2018 13:50:53 -0700 Subject: Try to make it a more properly structured lib --- lib/sharder.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 1fa97a0..0984050 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -104,7 +104,7 @@ module Make(H: S.Http) = struct let request_guild_members ~guild ?(query="") ?(limit=0) shard = let payload = `Assoc [ - ("guild_id", `String (string_of_int guild)); + ("guild_id", `String (Snowflake.to_string guild)); ("query", `String query); ("limit", `Int limit); ] in @@ -287,12 +287,12 @@ module Make(H: S.Http) = struct >>| fun shards -> { shards; } - let set_status sharder status = + let set_status ~status sharder = Deferred.all @@ List.map ~f:(fun t -> Shard.set_status ~status t.shard ) sharder.shards - let set_status_with sharder f = + let set_status_with ~f sharder = Deferred.all @@ List.map ~f:(fun t -> Shard.set_status ~status:(f t.shard) t.shard ) sharder.shards -- cgit v1.2.3 From eaccd45894e5b519bca82662d0b950b5f1d9c598 Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Thu, 29 Nov 2018 18:10:45 -0700 Subject: Fix all the errors from coding without merlin --- lib/sharder.ml | 57 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 28 insertions(+), 29 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 0984050..7b14884 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -1,8 +1,8 @@ -open Async -open Core -open Websocket_async - module Make(H: S.Http) = struct + open Async + open Core + open Websocket_async + exception Invalid_Payload exception Failure_to_Establish_Heartbeat @@ -20,7 +20,7 @@ module Make(H: S.Http) = struct } type 'a t = { - mutable shard: 'a; + mutable state: 'a; mutable binds: ('a -> unit) list; } @@ -75,7 +75,7 @@ module Make(H: S.Http) = struct session = session; } - let set_status ~status shard = + let set_status ~(status:Yojson.Basic.json) shard = let payload = match status with | `Assoc [("name", `String name); ("type", `Int t)] -> `Assoc [ @@ -102,7 +102,7 @@ module Make(H: S.Http) = struct Ivar.read shard.ready >>= fun _ -> push_frame ~payload ~ev:STATUS_UPDATE shard - let request_guild_members ~guild ?(query="") ?(limit=0) shard = + let request_guild_members ?(query="") ?(limit=0) ~guild shard = let payload = `Assoc [ ("guild_id", `String (Snowflake.to_string guild)); ("query", `String query); @@ -136,7 +136,7 @@ module Make(H: S.Http) = struct | None -> begin Mutex.lock identify_lock; let payload = `Assoc [ - ("token", `String shard.token); + ("token", `String token); ("properties", `Assoc [ ("$os", `String Sys.os_type); ("$device", `String "dis.ml"); @@ -155,7 +155,7 @@ module Make(H: S.Http) = struct end | Some s -> let payload = `Assoc [ - ("token", `String shard.token); + ("token", `String token); ("session_id", `String s); ("seq", `Int shard.seq) ] in @@ -183,7 +183,7 @@ module Make(H: S.Http) = struct print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard - let rec create ~url ~shards ~token () = + let rec create ~url ~shards () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = H.Base.process_request_headers () in @@ -215,7 +215,6 @@ module Make(H: S.Http) = struct seq = 0; id = shards; session = None; - token; url; } in @@ -240,46 +239,46 @@ module Make(H: S.Http) = struct | Some hb -> Ivar.fill_if_empty hb () | None -> () ); - create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) () + create ~url:(shard.url) ~shards:(shard.id) () end type t = { shards: (Shard.shard Shard.t) list; } - let start ?count token = + let start ?count () = let module J = Yojson.Basic.Util in - Http.get_gateway_bot () >>= fun data -> + H.get_gateway_bot () >>= fun data -> let url = J.(member "url" data |> to_string) in let count = match count with | Some c -> c | None -> J.(member "shards" data |> to_int) in let shard_list = (0, count) in - let rec ev_loop t = - let (read, _) = t.shard.pipe in + let rec ev_loop (t:Shard.shard Shard.t) = + let (read, _) = t.state.pipe in Pipe.read read - >>| fun frame -> - let _ = match parse frame with + >>= fun frame -> + let _ = match Shard.parse frame with | Some f -> begin - handle_frame ~f t.shard + Shard.handle_frame ~f t.state >>> fun shard -> - t.shard <- shard; + t.state <- shard; end - | None -> t.shard <- recreate t.shard; + | None -> Shard.recreate t.state >>> fun s -> t.state <- s; in - t + return t >>= fun t -> - List.iter ~f:(fun f -> f t.shard) t.binds; + List.iter ~f:(fun f -> f t.state) t.binds; ev_loop t in let rec gen_shards l a = match l with | (id, total) when id >= total -> return a | (id, total) -> - Shard.create ~url ~shards:(id, total) ~token () + Shard.create ~url ~shards:(id, total) () >>= fun shard -> - let t = { shard; binds = []; } in + let t = Shard.{ state = shard; binds = []; } in ev_loop t >>> ignore; gen_shards (id+1, total) (t :: a) in @@ -289,16 +288,16 @@ module Make(H: S.Http) = struct let set_status ~status sharder = Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status t.shard + Shard.set_status ~status t.state ) sharder.shards let set_status_with ~f sharder = Deferred.all @@ List.map ~f:(fun t -> - Shard.set_status ~status:(f t.shard) t.shard + Shard.set_status ~status:(f t.state) t.state ) sharder.shards - let request_guild_members ~guild ?query ?limit sharder = + let request_guild_members ?query ?limit ~guild sharder = Deferred.all @@ List.map ~f:(fun t -> - Shard.request_guild_members ~guild ?query ?limit t.shard + Shard.request_guild_members ~guild ?query ?limit t.state ) sharder.shards end \ No newline at end of file -- cgit v1.2.3 From 38a968c585f87736397fd5b576c51f7dbe5d393a Mon Sep 17 00:00:00 2001 From: Mishio595 Date: Fri, 30 Nov 2018 07:41:33 -0700 Subject: some improvements --- lib/sharder.ml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 7b14884..8e8c1f9 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -183,7 +183,7 @@ module Make(H: S.Http) = struct print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard - let rec create ~url ~shards () = + let create ~url ~shards () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = H.Base.process_request_headers () in @@ -233,7 +233,8 @@ module Make(H: S.Http) = struct `TCP (h, p) in Conduit_async.V2.connect addr >>= tcp_fun - and recreate shard = + + let recreate shard = print_endline "Reconnecting..."; (match shard.hb with | Some hb -> Ivar.fill_if_empty hb () @@ -259,14 +260,14 @@ module Make(H: S.Http) = struct let (read, _) = t.state.pipe in Pipe.read read >>= fun frame -> - let _ = match Shard.parse frame with + (match Shard.parse frame with | Some f -> begin Shard.handle_frame ~f t.state >>> fun shard -> t.state <- shard; end | None -> Shard.recreate t.state >>> fun s -> t.state <- s; - in + ); return t >>= fun t -> List.iter ~f:(fun f -> f t.state) t.binds; -- cgit v1.2.3 From dd14dca65cc468d9bd7a537f2d9a0e07e53b7f07 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Fri, 30 Nov 2018 10:21:51 -0700 Subject: Fix the multiple reconnect issue --- lib/sharder.ml | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 8e8c1f9..9b64bba 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -50,7 +50,7 @@ module Make(H: S.Http) = struct ] in let (_, write) = shard.pipe in - Pipe.write_if_open write @@ Frame.create ~content () + Pipe.write write @@ Frame.create ~content () >>| fun () -> shard @@ -118,13 +118,19 @@ module Make(H: S.Http) = struct match data with | Some data -> let hb_interval = J.(member "heartbeat_interval" data |> to_int) in - let finished = Ivar.create () in + let stop_hb = Ivar.create () in + let stopper i = + Ivar.read stop_hb + >>> fun () -> + Ivar.fill_if_empty i; + in + let stop = Deferred.create stopper in Clock.every' ~continue_on_error:true - ~finished + ~stop (Core.Time.Span.create ~ms:hb_interval ()) (fun () -> heartbeat shard >>= fun _ -> return ()); - finished + stop_hb | None -> raise Failure_to_Establish_Heartbeat end | Some s -> s @@ -263,12 +269,12 @@ module Make(H: S.Http) = struct (match Shard.parse frame with | Some f -> begin Shard.handle_frame ~f t.state - >>> fun shard -> - t.state <- shard; + >>| fun s -> (t.state <- s; t) end - | None -> Shard.recreate t.state >>> fun s -> t.state <- s; - ); - return t + | None -> begin + Shard.recreate t.state + >>| fun s -> (t.state <- s; t) + end) >>= fun t -> List.iter ~f:(fun f -> f t.state) t.binds; ev_loop t -- cgit v1.2.3 From d9fb404c8345cb71583f1fdc6988ad63de3002f2 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Mon, 3 Dec 2018 15:57:58 -0700 Subject: Add retry on connect fail logic --- lib/sharder.ml | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 9b64bba..1c26d8c 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -122,7 +122,7 @@ module Make(H: S.Http) = struct let stopper i = Ivar.read stop_hb >>> fun () -> - Ivar.fill_if_empty i; + Ivar.fill_if_empty i () in let stop = Deferred.create stopper in Clock.every' @@ -189,6 +189,38 @@ module Make(H: S.Http) = struct print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard + let rec make_client + ~initialized + ~extra_headers + ~app_to_ws + ~ws_to_app + ~net_to_ws + ~ws_to_net + uri = + client + ~initialized + ~extra_headers + ~app_to_ws + ~ws_to_app + ~net_to_ws + ~ws_to_net + uri + >>> fun res -> + match res with + | Ok () -> () + | Error _ -> + let backoff = Time.Span.create ~ms:500 () in + Clock.after backoff >>> (fun () -> + make_client + ~initialized + ~extra_headers + ~app_to_ws + ~ws_to_app + ~net_to_ws + ~ws_to_net + uri) + + let create ~url ~shards () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in @@ -204,15 +236,14 @@ module Make(H: S.Http) = struct let (app_to_ws, write) = Pipe.create () in let (read, ws_to_app) = Pipe.create () in let initialized = Ivar.create () in - client + make_client ~initialized ~extra_headers ~app_to_ws ~ws_to_app ~net_to_ws ~ws_to_net - uri - >>> ignore; (* TODO this needs to error check and retry with backoff *) + uri; Ivar.read initialized >>| fun () -> { pipe = (read, write); -- cgit v1.2.3 From 260ccd9960b852b9c69b88e9840d5a8b22bb8e1d Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Wed, 12 Dec 2018 15:00:46 -0700 Subject: Work on event dispatch and add model derives --- lib/sharder.ml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 1c26d8c..b1f5d62 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -34,7 +34,7 @@ module Make(H: S.Http) = struct | `Ok s -> begin let open Frame.Opcode in match s.opcode with - | Text -> Some (Yojson.Basic.from_string s.content) + | Text -> Some (Yojson.Safe.from_string s.content) | _ -> None end | `Eof -> None @@ -44,7 +44,7 @@ module Make(H: S.Http) = struct let content = match payload with | None -> "" | Some p -> - Yojson.Basic.to_string @@ `Assoc [ + Yojson.Safe.to_string @@ `Assoc [ ("op", `Int (Opcode.to_int ev)); ("d", p); ] @@ -62,7 +62,7 @@ module Make(H: S.Http) = struct push_frame ~payload ~ev:HEARTBEAT shard let dispatch ~payload shard = - let module J = Yojson.Basic.Util in + let module J = Yojson.Safe.Util in let seq = J.(member "s" payload |> to_int) in let t = J.(member "t" payload |> to_string) in let data = J.member "d" payload in @@ -75,7 +75,7 @@ module Make(H: S.Http) = struct session = session; } - let set_status ~(status:Yojson.Basic.json) shard = + let set_status ~(status:Yojson.Safe.json) shard = let payload = match status with | `Assoc [("name", `String name); ("type", `Int t)] -> `Assoc [ @@ -112,7 +112,7 @@ module Make(H: S.Http) = struct push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard let initialize ?data shard = - let module J = Yojson.Basic.Util in + let module J = Yojson.Safe.Util in let hb = match shard.hb with | None -> begin match data with @@ -168,7 +168,7 @@ module Make(H: S.Http) = struct push_frame ~payload ~ev:RESUME shard let handle_frame ~f shard = - let module J = Yojson.Basic.Util in + let module J = Yojson.Safe.Util in let op = J.(member "op" f |> to_int) |> Opcode.from_int in @@ -285,7 +285,7 @@ module Make(H: S.Http) = struct } let start ?count () = - let module J = Yojson.Basic.Util in + let module J = Yojson.Safe.Util in H.get_gateway_bot () >>= fun data -> let url = J.(member "url" data |> to_string) in let count = match count with -- cgit v1.2.3 From c046760eb599e42226c683aecbe33753dfc4d500 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Wed, 12 Dec 2018 15:23:14 -0700 Subject: Complete event dispatch --- lib/sharder.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index b1f5d62..e28a306 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -1,4 +1,4 @@ -module Make(H: S.Http) = struct +module Make(H: S.Http)(D : S.Dispatch) = struct open Async open Core open Websocket_async @@ -68,8 +68,9 @@ module Make(H: S.Http) = struct let data = J.member "d" payload in let session = J.(member "session_id" data |> to_string_option) in if t = "READY" then begin - Ivar.fill_if_empty shard.ready (); + Ivar.fill_if_empty shard.ready () end; + D.dispatch ~ev:t data; return { shard with seq = seq; session = session; -- cgit v1.2.3 From 31fe810ad9679df6a5f83071717c94315058bfd4 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Wed, 12 Dec 2018 15:54:59 -0700 Subject: Fix Sharder.Make sig --- lib/sharder.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index e28a306..6ac8584 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -1,4 +1,4 @@ -module Make(H: S.Http)(D : S.Dispatch) = struct +module Make(H : S.Http)(D : S.Dispatch) : S.Sharder = struct open Async open Core open Websocket_async -- cgit v1.2.3 From 73d115ce6260e97f5f7ee47f743d842ffd292662 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Thu, 13 Dec 2018 14:11:54 -0700 Subject: Working on deriving types from json --- lib/sharder.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 6ac8584..7366fd9 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -70,7 +70,8 @@ module Make(H : S.Http)(D : S.Dispatch) : S.Sharder = struct if t = "READY" then begin Ivar.fill_if_empty shard.ready () end; - D.dispatch ~ev:t data; + print_endline @@ Yojson.Safe.pretty_to_string data; + D.dispatch ~ev:t (Yojson.Safe.to_string data); return { shard with seq = seq; session = session; @@ -105,7 +106,7 @@ module Make(H : S.Http)(D : S.Dispatch) : S.Sharder = struct let request_guild_members ?(query="") ?(limit=0) ~guild shard = let payload = `Assoc [ - ("guild_id", `String (Snowflake.to_string guild)); + ("guild_id", `String (Int.to_string guild)); ("query", `String query); ("limit", `Int limit); ] in -- cgit v1.2.3 From af684566617ebce536e9f30693aa3e225af906c4 Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Fri, 14 Dec 2018 10:52:36 -0700 Subject: There's a lot going on --- lib/sharder.ml | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 7366fd9..98df132 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -70,7 +70,6 @@ module Make(H : S.Http)(D : S.Dispatch) : S.Sharder = struct if t = "READY" then begin Ivar.fill_if_empty shard.ready () end; - print_endline @@ Yojson.Safe.pretty_to_string data; D.dispatch ~ev:t (Yojson.Safe.to_string data); return { shard with seq = seq; -- cgit v1.2.3