diff options
| author | Lyn Breedlove <[email protected]> | 2018-10-26 16:17:23 -0600 |
|---|---|---|
| committer | Lyn Breedlove <[email protected]> | 2018-10-26 16:17:23 -0600 |
| commit | 9d7cac5da110250539d1ae139cb159425b0e67b1 (patch) | |
| tree | 90f92553ac205f466aabb4c1129212c2660248d2 /lib | |
| parent | a few (possibly bad) updates (diff) | |
| download | disml-9d7cac5da110250539d1ae139cb159425b0e67b1.tar.xz disml-9d7cac5da110250539d1ae139cb159425b0e67b1.zip | |
More random stuff with sharding
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/client/shardManager.ml | 148 | ||||
| -rw-r--r-- | lib/client/sharder/conn.ml | 5 | ||||
| -rw-r--r-- | lib/client/sharder/opcode.ml | 54 | ||||
| -rw-r--r-- | lib/client/sharder/shard.ml | 85 | ||||
| -rw-r--r-- | lib/client/sharder/shard_manager.ml | 31 |
5 files changed, 175 insertions, 148 deletions
diff --git a/lib/client/shardManager.ml b/lib/client/shardManager.ml deleted file mode 100644 index 502b367..0000000 --- a/lib/client/shardManager.ml +++ /dev/null @@ -1,148 +0,0 @@ -open Lwt.Infix -open Websocket - -module Opcode = struct - type t = - | DISPATCH - | HEARTBEAT - | IDENTIFY - | STATUS_UPDATE - | VOICE_STATE_UPDATE - | RESUME - | RECONNECT - | REQUEST_GUILD_MEMBERS - | INVALID_SESSION - | HELLO - | HEARTBEAT_ACK - - let to_int = function - | DISPATCH -> 0 - | HEARTBEAT -> 1 - | IDENTIFY -> 2 - | STATUS_UPDATE -> 3 - | VOICE_STATE_UPDATE -> 4 - | RESUME -> 6 - | RECONNECT -> 7 - | REQUEST_GUILD_MEMBERS -> 8 - | INVALID_SESSION -> 9 - | HELLO -> 10 - | HEARTBEAT_ACK -> 11 - - let to_string = function - | DISPATCH -> "DISPATCH" - | HEARTBEAT -> "HEARTBEAT" - | IDENTIFY -> "IDENTIFY" - | STATUS_UPDATE -> "STATUS_UPDATE" - | VOICE_STATE_UPDATE -> "VOICE_STATE_UPDATE" - | RESUME -> "RESUME" - | RECONNECT -> "RECONNECT" - | REQUEST_GUILD_MEMBERS -> "REQUEST_GUILD_MEMBER" - | INVALID_SESSION -> "INVALID_SESSION" - | HELLO -> "HELLO" - | HEARTBEAT_ACK -> "HEARTBEAT_ACK" -end - -module Shard = struct - type t = { - send: (Frame.t -> unit Lwt.t); - id: int; - hb_interval: int; - session_id: string; - seq: int; - } - - let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) - send id = - { send; recv; id; hb_interval; session_id; seq; } - - let compare s1 s2 = - Pervasives.compare s1.id s2.id - - let send payload shard = - payload - |> shard.send - - let process_frame shard frame = - (* Add processor *) - - let wrap_payload d op = - `Assoc [ - ("op", `Int op); - ("d", d) - ] - - let create_frame content = - Frame.create ~content () - - let identify ?(threshold=250) total token shard = - let p = wrap_payload (`Assoc [ - ("token", `String token); - ("properties", `Assoc [ - ("$os", `String Sys.os_type); - ("$browser", `String "animus"); - ("$device", `String "animus"); - ]); - ("large_threshold", `Int threshold); - ("shard", `List [`Int shard.id; `Int total]); - ]) (Opcode.to_int IDENTIFY) in - let p = create_frame (Yojson.Basic.to_string p) in - send p shard - - let resume token shard = - let p = wrap_payload (`Assoc [ - ("token", `String token); - ("session_id", `String shard.session_id); - ("seq", `Int shard.seq); - ]) (Opcode.to_int RESUME) in - let p = create_frame (Yojson.Basic.to_string p) in - send p shard - - let heartbeat shard = - let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in - let p = create_frame (Yojson.Basic.to_string p) in - send p shard - - (* Use options *) - let connect ~options ~uri ~id ~total ~token () = - let url = uri |> Uri.to_string in - let ip = Ipaddr.V4 Ipaddr.V4.any in - Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *) - >|= fun (recv, send) -> - let shard = init send id in - heartbeat shard >>= (fun _ -> - identify shard total token - ) |> ignore; (* This feels really hacky *) - let handle_frame = process_frame shard in - let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in - recv_loop (); (* This is a bad way to do this. We should abstract the shard.send away and use Lwt.choose *) - shard -end - -module ShardSet = Set.Make(Shard) - -type t = { - shards: Shard.t ShardSet.t; - gateway_url: Uri.t; - token: string; -} - -let create_shard ?(options=[]) manager = - let id = (ShardSet.cardinal manager.shards) + 1 in - Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token () - >|= fun shard -> - ShardSet.add shard manager.shards - -let update_shard shard manager = - match ShardSet.mem shard manager.shards with - | true -> ShardSet.add shard manager.shards - | false -> manager.shards - -let heartbeat shard manager = - Shard.heartbeat shard - -let identify shard manager = - let total = ShardSet.cardinal manager.shards in - Shard.identify total manager.token shard - -let resume shard manager = - Shard.resume manager.token shard
\ No newline at end of file diff --git a/lib/client/sharder/conn.ml b/lib/client/sharder/conn.ml new file mode 100644 index 0000000..396c170 --- /dev/null +++ b/lib/client/sharder/conn.ml @@ -0,0 +1,5 @@ +type t = { +} + +let push_frame ~conn frame = + ()
\ No newline at end of file diff --git a/lib/client/sharder/opcode.ml b/lib/client/sharder/opcode.ml new file mode 100644 index 0000000..679f45e --- /dev/null +++ b/lib/client/sharder/opcode.ml @@ -0,0 +1,54 @@ +type t = + | DISPATCH + | HEARTBEAT + | IDENTIFY + | STATUS_UPDATE + | VOICE_STATE_UPDATE + | RESUME + | RECONNECT + | REQUEST_GUILD_MEMBERS + | INVALID_SESSION + | HELLO + | HEARTBEAT_ACK + +exception Invalid_Opcode of int + +let to_int = function + | DISPATCH -> 0 + | HEARTBEAT -> 1 + | IDENTIFY -> 2 + | STATUS_UPDATE -> 3 + | VOICE_STATE_UPDATE -> 4 + | RESUME -> 6 + | RECONNECT -> 7 + | REQUEST_GUILD_MEMBERS -> 8 + | INVALID_SESSION -> 9 + | HELLO -> 10 + | HEARTBEAT_ACK -> 11 + +let from_int = function + | 0 -> DISPATCH + | 1 -> HEARTBEAT + | 2 -> IDENTIFY + | 3 -> STATUS_UPDATE + | 4 -> VOICE_STATE_UPDATE + | 6 -> RESUME + | 7 -> RECONNECT + | 8 -> REQUEST_GUILD_MEMBERS + | 9 -> INVALID_SESSION + | 10 -> HELLO + | 11 -> HEARTBEAT_ACK + | op -> raise Invalid_Opcode op + +let to_string = function + | DISPATCH -> "DISPATCH" + | HEARTBEAT -> "HEARTBEAT" + | IDENTIFY -> "IDENTIFY" + | STATUS_UPDATE -> "STATUS_UPDATE" + | VOICE_STATE_UPDATE -> "VOICE_STATE_UPDATE" + | RESUME -> "RESUME" + | RECONNECT -> "RECONNECT" + | REQUEST_GUILD_MEMBERS -> "REQUEST_GUILD_MEMBER" + | INVALID_SESSION -> "INVALID_SESSION" + | HELLO -> "HELLO" + | HEARTBEAT_ACK -> "HEARTBEAT_ACK"
\ No newline at end of file diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml new file mode 100644 index 0000000..3b9cf8d --- /dev/null +++ b/lib/client/sharder/shard.ml @@ -0,0 +1,85 @@ +type t = { + conn = Conn.t; + id: int; + hb_interval: int; + session_id: string; + seq: int; +} + +let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~conn ~id () = + { conn; id; hb_interval; session_id; seq; } + +let compare s1 s2 = + Pervasives.compare s1.id s2.id + +let push_frame ~payload shard = + payload + |> Conn.push_frame ~conn:shard.conn + +let process_frame ~frame shard = + let json = frame |> Yojson.Basic.from_string in + match json with + | `Assoc [("s", s); ("d", d); ("t", t); ("op", op);] -> begin + match op |> Opcode.from_int with + | DISPATCH -> dispatch t d (* Need to write the dispatcher and other ops *) + | HEARTBEAT -> () + | IDENTIFY -> () + | STATUS_UPDATE -> () + | VOICE_STATE_UPDATE -> () + | RESUME -> () + | RECONNECT -> () + | REQUEST_GUILD_MEMBERS -> () + | INVALID_SESSION -> () + | HELLO -> () + | HEARTBEAT_ACK -> () + |> ignore; + { shard with seq = s; } + end + | _ -> shard + +let wrap_payload d op = + `Assoc [ + ("op", `Int op); + ("d", d) + ] + +let create_frame content = + Frame.create ~content () + +let identify ?(threshold=250) ~total ~token shard = + let p = wrap_payload (`Assoc [ + ("token", `String token); + ("properties", `Assoc [ + ("$os", `String Sys.os_type); + ("$browser", `String "animus"); + ("$device", `String "animus"); + ]); + ("large_threshold", `Int threshold); + ("shard", `List [`Int shard.id; `Int total]); + ]) (IDENTIFY |> Opcode.to_int) in + push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + +let resume ~token shard = + let p = wrap_payload (`Assoc [ + ("token", `String token); + ("session_id", `String shard.session_id); + ("seq", `Int shard.seq); + ]) (RESUME |> Opcode.to_int) in + push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + +let heartbeat shard = + let p = wrap_payload (`Int shard.seq) (HEARTBEAT |> Opcode.to_int) in + push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + +let connect ~options ~uri ~id ~total ~token () = + let url = uri |> Uri.to_string in + let ip = Ipaddr.V4 Ipaddr.V4.any in + Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *) + >|= fun (recv, send) -> + let shard = init send id in + heartbeat shard >>= (fun _ -> + identify shard total token + ) |> ignore; (* This feels really hacky *) + let handle_frame = process_frame shard in + let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in + shard
\ No newline at end of file diff --git a/lib/client/sharder/shard_manager.ml b/lib/client/sharder/shard_manager.ml new file mode 100644 index 0000000..68b3d13 --- /dev/null +++ b/lib/client/sharder/shard_manager.ml @@ -0,0 +1,31 @@ +open Lwt.Infix +open Websocket + +module ShardSet = Set.Make(Shard) + +type t = { + shards: Shard.t ShardSet.t; + gateway_url: Uri.t; + token: string; +} + +let create_shard ?(options=[]) manager = + let id = (ShardSet.cardinal manager.shards) + 1 in + Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token () + >|= fun shard -> + ShardSet.add shard manager.shards + +let update_shard shard manager = + match ShardSet.mem shard manager.shards with + | true -> ShardSet.add shard manager.shards + | false -> manager.shards + +let heartbeat shard manager = + Shard.heartbeat shard + +let identify shard manager = + let total = ShardSet.cardinal manager.shards in + Shard.identify total manager.token shard + +let resume shard manager = + Shard.resume manager.token shard
\ No newline at end of file |