diff options
| -rw-r--r-- | dune | 6 | ||||
| -rw-r--r-- | lib/client/opcode.ml (renamed from lib/client/sharder/opcode.ml) | 0 | ||||
| -rw-r--r-- | lib/client/sharder.ml | 126 | ||||
| -rw-r--r-- | lib/client/sharder/shard.ml | 81 | ||||
| -rw-r--r-- | lib/client/sharder/shardManager.ml | 32 |
5 files changed, 129 insertions, 116 deletions
@@ -1,14 +1,14 @@ (library (name animus) - (modules endpoints client shard opcode shardManager http) - (libraries core async cohttp cohttp.async yojson websocket websocket.async) + (modules endpoints http client shard opcode shardManager) + (libraries lwt cohttp cohttp.lwt yojson websocket websocket-lwt-unix) ) ; Test executable (executable (name bot) (modules bot) - (libraries core async animus) + (libraries lwt animus) ) (include_subdirs unqualified)
\ No newline at end of file diff --git a/lib/client/sharder/opcode.ml b/lib/client/opcode.ml index 2462d05..2462d05 100644 --- a/lib/client/sharder/opcode.ml +++ b/lib/client/opcode.ml diff --git a/lib/client/sharder.ml b/lib/client/sharder.ml new file mode 100644 index 0000000..7150f07 --- /dev/null +++ b/lib/client/sharder.ml @@ -0,0 +1,126 @@ +open Lwt.Infix +open Websocket +open Websocket_lwt_unix + +type data = { + shards: int list; + token: string; + url: string; +} + +module Shard = struct + type t = { + mutable hb: Lwt_engine.event option; + mutable seq: int; + session: string option; + token: string; + shard: int list; + conn: Connected_client.t; + } + + let parse frame = + frame |> Frame.show |> Yojson.Basic.from_string + + let encode term = + let content = term |> Yojson.Basic.to_string in + Frame.create ~content () + + let push_frame ?payload shard (ev : Opcode.t) = + let content = match payload with + | None -> None + | Some p -> Some (Yojson.Basic.to_string p) + in + let frame = Frame.create ?content () in + Connected_client.send shard.conn frame + |> ignore; + shard + + let initialize shard data = + let hb = match shard.hb with + | None -> begin + let hb_interval = List.assoc "hb_interval" @@ + Yojson.Basic.Util.to_assoc data + |> Yojson.Basic.Util.to_int + in + Lwt_engine.on_timer + (Float.of_int hb_interval) + true + (fun _ev -> push_frame shard HEARTBEAT |> ignore) + end + | Some s -> s + in + let shard = { shard with hb = Some hb; } in + match shard.session with + | None -> + let payload = `Assoc [ + ("token", `String shard.token); + ("properties", `Assoc [ + ("$os", `String Sys.os_type); + ("$device", `String "animus"); + ("$browser", `String "animus") + ]); + ("compress", `Bool true); + ("large_threshold", `Int 250); + ("shard", `List (List.map (fun i -> `Int i) shard.shard)) + ] in + push_frame ~payload shard IDENTIFY + | Some s -> + let payload = `Assoc [ + ("token", `String shard.token); + ("session_id", `String s); + ("seq", `Int shard.seq) + ] in + push_frame ~payload shard RECONNECT + + + + let handle_frame shard (term : Yojson.Basic.json) = + match term with + | `Assoc [ + ("op", `Int 0); + ("t", `String t); + ("s", `Int s); + ("d", d) + ] -> shard (* TODO dispatch *) + | `Assoc [ + ("op", `Int 1) + ] -> push_frame shard HEARTBEAT + | `Assoc [ + ("op", `Int 7) + ] -> shard (* TODO reconnect *) + | `Assoc [ + ("op", `Int 9) + ] -> shard (* TODO invalid session *) + | `Assoc [ + ("op", `Int 10); + ("d", data) + ] -> initialize shard data + | _data -> shard + + let create data = + let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in + let headers = Http.Base.process_request_headers () in + let req = Cohttp_lwt.Request.make ~headers uri in + let ic, oc = Lwt_io.pipe () in + let client = Connected_client.create req (`TCP (V4 Ipaddr.V4.any, 443)) ic oc in + let rec recv_forever shard = begin + Connected_client.recv client + >>= fun frame -> + Lwt.return @@ handle_frame shard @@ parse frame + >>= fun shard -> recv_forever shard + end in + let shard = { + conn = client; + hb = None; + seq = 0; + shard = data.shards; + session = None; + token = data.token; + } in + recv_forever shard |> ignore; + shard +end + +type t = { + shards: Shard.t list; +}
\ No newline at end of file diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml deleted file mode 100644 index 7be4aad..0000000 --- a/lib/client/sharder/shard.ml +++ /dev/null @@ -1,81 +0,0 @@ -open Lwt.Infix -open Websocket - -type t = { - send: (Frame.t -> unit Lwt.t); - id: int; - total_shards: int; - hb_interval: int; - session_id: string; - seq: int; - token: string; -} - -let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~send ~id ~total_shards ~token () = - { send; id; total_shards; hb_interval; session_id; seq; token; } - -let push_frame shard frame = - shard.send frame - -let process_frame shard frame = - let json = frame |> Yojson.Basic.from_string in - match json with - | `Assoc [("s", `Int _s); ("d", _d); ("t", `String _t); ("op", `Int op);] -> begin - match op |> Opcode.from_int with - | DISPATCH -> () (* dispatch t d Need to write the dispatcher and other ops *) - | HEARTBEAT -> () - | IDENTIFY -> () - | STATUS_UPDATE -> () - | VOICE_STATE_UPDATE -> () - | RESUME -> () - | RECONNECT -> () - | REQUEST_GUILD_MEMBERS -> () - | INVALID_SESSION -> () - | HELLO -> () - | HEARTBEAT_ACK -> () - |> ignore; - (* { shard with seq = s; } *) - end - | _ -> shard - -let wrap_payload d op = - `Assoc [ - ("op", `Int op); - ("d", d) - ] - -let create_frame content = - Frame.create ~content () - -let identify ?(threshold=250) shard= - let p = wrap_payload (`Assoc [ - ("token", `String shard.token); - ("properties", `Assoc [ - ("$os", `String Sys.os_type); - ("$browser", `String "animus"); - ("$device", `String "animus"); - ]); - ("large_threshold", `Int threshold); - ("shard", `List [`Int shard.id; `Int shard.total_shards]); - ]) (Opcode.to_int IDENTIFY) in - push_frame shard (Yojson.Basic.to_string p |> create_frame) - -let resume shard = - let p = wrap_payload (`Assoc [ - ("token", `String shard.token); - ("session_id", `String shard.session_id); - ("seq", `Int shard.seq); - ]) (Opcode.to_int RESUME) in - push_frame shard (Yojson.Basic.to_string p |> create_frame) - -let heartbeat shard = - let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in - push_frame shard (Yojson.Basic.to_string p |> create_frame) - -let connect ~_options ~uri ~id ~total ~token () = - let url = uri |> Uri.to_string in - let ip = Ipaddr.V4 Ipaddr.V4.any in - let client = Websocket.Connected_Client.create - Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *) - >|= fun (recv, send) -> - init ~send ~id ~token ~total_shards:total ()
\ No newline at end of file diff --git a/lib/client/sharder/shardManager.ml b/lib/client/sharder/shardManager.ml deleted file mode 100644 index c5919f3..0000000 --- a/lib/client/sharder/shardManager.ml +++ /dev/null @@ -1,32 +0,0 @@ -open Lwt.Infix - -module ShardSet = Set.Make(struct - type t = Shard.t - let compare (s1:t) (s2:t) = Pervasives.compare s1.id s2.id -end) - -type t = { - shards: ShardSet.t; - gateway_url: Uri.t; - token: string; -} - -let create_shard ?(options=[]) manager = - let id = (ShardSet.cardinal manager.shards) + 1 in - Shard.connect ~_options:options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token () - >|= fun shard -> - ShardSet.add shard manager.shards - -let update_shard manager shard = - match ShardSet.mem shard manager.shards with - | true -> ShardSet.add shard manager.shards - | false -> manager.shards - -let heartbeat _manager shard = - Shard.heartbeat shard - -let identify _manager shard = - Shard.identify shard - -let resume _manager shard = - Shard.resume shard
\ No newline at end of file |