aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLyn Breedlove <[email protected]>2018-10-26 16:17:23 -0600
committerLyn Breedlove <[email protected]>2018-10-26 16:17:23 -0600
commit9d7cac5da110250539d1ae139cb159425b0e67b1 (patch)
tree90f92553ac205f466aabb4c1129212c2660248d2 /lib
parenta few (possibly bad) updates (diff)
downloaddisml-9d7cac5da110250539d1ae139cb159425b0e67b1.tar.xz
disml-9d7cac5da110250539d1ae139cb159425b0e67b1.zip
More random stuff with sharding
Diffstat (limited to 'lib')
-rw-r--r--lib/client/shardManager.ml148
-rw-r--r--lib/client/sharder/conn.ml5
-rw-r--r--lib/client/sharder/opcode.ml54
-rw-r--r--lib/client/sharder/shard.ml85
-rw-r--r--lib/client/sharder/shard_manager.ml31
5 files changed, 175 insertions, 148 deletions
diff --git a/lib/client/shardManager.ml b/lib/client/shardManager.ml
deleted file mode 100644
index 502b367..0000000
--- a/lib/client/shardManager.ml
+++ /dev/null
@@ -1,148 +0,0 @@
-open Lwt.Infix
-open Websocket
-
-module Opcode = struct
- type t =
- | DISPATCH
- | HEARTBEAT
- | IDENTIFY
- | STATUS_UPDATE
- | VOICE_STATE_UPDATE
- | RESUME
- | RECONNECT
- | REQUEST_GUILD_MEMBERS
- | INVALID_SESSION
- | HELLO
- | HEARTBEAT_ACK
-
- let to_int = function
- | DISPATCH -> 0
- | HEARTBEAT -> 1
- | IDENTIFY -> 2
- | STATUS_UPDATE -> 3
- | VOICE_STATE_UPDATE -> 4
- | RESUME -> 6
- | RECONNECT -> 7
- | REQUEST_GUILD_MEMBERS -> 8
- | INVALID_SESSION -> 9
- | HELLO -> 10
- | HEARTBEAT_ACK -> 11
-
- let to_string = function
- | DISPATCH -> "DISPATCH"
- | HEARTBEAT -> "HEARTBEAT"
- | IDENTIFY -> "IDENTIFY"
- | STATUS_UPDATE -> "STATUS_UPDATE"
- | VOICE_STATE_UPDATE -> "VOICE_STATE_UPDATE"
- | RESUME -> "RESUME"
- | RECONNECT -> "RECONNECT"
- | REQUEST_GUILD_MEMBERS -> "REQUEST_GUILD_MEMBER"
- | INVALID_SESSION -> "INVALID_SESSION"
- | HELLO -> "HELLO"
- | HEARTBEAT_ACK -> "HEARTBEAT_ACK"
-end
-
-module Shard = struct
- type t = {
- send: (Frame.t -> unit Lwt.t);
- id: int;
- hb_interval: int;
- session_id: string;
- seq: int;
- }
-
- let init ?(hb_interval=42500) ?(session_id="") ?(seq=0)
- send id =
- { send; recv; id; hb_interval; session_id; seq; }
-
- let compare s1 s2 =
- Pervasives.compare s1.id s2.id
-
- let send payload shard =
- payload
- |> shard.send
-
- let process_frame shard frame =
- (* Add processor *)
-
- let wrap_payload d op =
- `Assoc [
- ("op", `Int op);
- ("d", d)
- ]
-
- let create_frame content =
- Frame.create ~content ()
-
- let identify ?(threshold=250) total token shard =
- let p = wrap_payload (`Assoc [
- ("token", `String token);
- ("properties", `Assoc [
- ("$os", `String Sys.os_type);
- ("$browser", `String "animus");
- ("$device", `String "animus");
- ]);
- ("large_threshold", `Int threshold);
- ("shard", `List [`Int shard.id; `Int total]);
- ]) (Opcode.to_int IDENTIFY) in
- let p = create_frame (Yojson.Basic.to_string p) in
- send p shard
-
- let resume token shard =
- let p = wrap_payload (`Assoc [
- ("token", `String token);
- ("session_id", `String shard.session_id);
- ("seq", `Int shard.seq);
- ]) (Opcode.to_int RESUME) in
- let p = create_frame (Yojson.Basic.to_string p) in
- send p shard
-
- let heartbeat shard =
- let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in
- let p = create_frame (Yojson.Basic.to_string p) in
- send p shard
-
- (* Use options *)
- let connect ~options ~uri ~id ~total ~token () =
- let url = uri |> Uri.to_string in
- let ip = Ipaddr.V4 Ipaddr.V4.any in
- Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *)
- >|= fun (recv, send) ->
- let shard = init send id in
- heartbeat shard >>= (fun _ ->
- identify shard total token
- ) |> ignore; (* This feels really hacky *)
- let handle_frame = process_frame shard in
- let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in
- recv_loop (); (* This is a bad way to do this. We should abstract the shard.send away and use Lwt.choose *)
- shard
-end
-
-module ShardSet = Set.Make(Shard)
-
-type t = {
- shards: Shard.t ShardSet.t;
- gateway_url: Uri.t;
- token: string;
-}
-
-let create_shard ?(options=[]) manager =
- let id = (ShardSet.cardinal manager.shards) + 1 in
- Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token ()
- >|= fun shard ->
- ShardSet.add shard manager.shards
-
-let update_shard shard manager =
- match ShardSet.mem shard manager.shards with
- | true -> ShardSet.add shard manager.shards
- | false -> manager.shards
-
-let heartbeat shard manager =
- Shard.heartbeat shard
-
-let identify shard manager =
- let total = ShardSet.cardinal manager.shards in
- Shard.identify total manager.token shard
-
-let resume shard manager =
- Shard.resume manager.token shard \ No newline at end of file
diff --git a/lib/client/sharder/conn.ml b/lib/client/sharder/conn.ml
new file mode 100644
index 0000000..396c170
--- /dev/null
+++ b/lib/client/sharder/conn.ml
@@ -0,0 +1,5 @@
+type t = {
+}
+
+let push_frame ~conn frame =
+ () \ No newline at end of file
diff --git a/lib/client/sharder/opcode.ml b/lib/client/sharder/opcode.ml
new file mode 100644
index 0000000..679f45e
--- /dev/null
+++ b/lib/client/sharder/opcode.ml
@@ -0,0 +1,54 @@
+type t =
+ | DISPATCH
+ | HEARTBEAT
+ | IDENTIFY
+ | STATUS_UPDATE
+ | VOICE_STATE_UPDATE
+ | RESUME
+ | RECONNECT
+ | REQUEST_GUILD_MEMBERS
+ | INVALID_SESSION
+ | HELLO
+ | HEARTBEAT_ACK
+
+exception Invalid_Opcode of int
+
+let to_int = function
+ | DISPATCH -> 0
+ | HEARTBEAT -> 1
+ | IDENTIFY -> 2
+ | STATUS_UPDATE -> 3
+ | VOICE_STATE_UPDATE -> 4
+ | RESUME -> 6
+ | RECONNECT -> 7
+ | REQUEST_GUILD_MEMBERS -> 8
+ | INVALID_SESSION -> 9
+ | HELLO -> 10
+ | HEARTBEAT_ACK -> 11
+
+let from_int = function
+ | 0 -> DISPATCH
+ | 1 -> HEARTBEAT
+ | 2 -> IDENTIFY
+ | 3 -> STATUS_UPDATE
+ | 4 -> VOICE_STATE_UPDATE
+ | 6 -> RESUME
+ | 7 -> RECONNECT
+ | 8 -> REQUEST_GUILD_MEMBERS
+ | 9 -> INVALID_SESSION
+ | 10 -> HELLO
+ | 11 -> HEARTBEAT_ACK
+ | op -> raise Invalid_Opcode op
+
+let to_string = function
+ | DISPATCH -> "DISPATCH"
+ | HEARTBEAT -> "HEARTBEAT"
+ | IDENTIFY -> "IDENTIFY"
+ | STATUS_UPDATE -> "STATUS_UPDATE"
+ | VOICE_STATE_UPDATE -> "VOICE_STATE_UPDATE"
+ | RESUME -> "RESUME"
+ | RECONNECT -> "RECONNECT"
+ | REQUEST_GUILD_MEMBERS -> "REQUEST_GUILD_MEMBER"
+ | INVALID_SESSION -> "INVALID_SESSION"
+ | HELLO -> "HELLO"
+ | HEARTBEAT_ACK -> "HEARTBEAT_ACK" \ No newline at end of file
diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml
new file mode 100644
index 0000000..3b9cf8d
--- /dev/null
+++ b/lib/client/sharder/shard.ml
@@ -0,0 +1,85 @@
+type t = {
+ conn = Conn.t;
+ id: int;
+ hb_interval: int;
+ session_id: string;
+ seq: int;
+}
+
+let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~conn ~id () =
+ { conn; id; hb_interval; session_id; seq; }
+
+let compare s1 s2 =
+ Pervasives.compare s1.id s2.id
+
+let push_frame ~payload shard =
+ payload
+ |> Conn.push_frame ~conn:shard.conn
+
+let process_frame ~frame shard =
+ let json = frame |> Yojson.Basic.from_string in
+ match json with
+ | `Assoc [("s", s); ("d", d); ("t", t); ("op", op);] -> begin
+ match op |> Opcode.from_int with
+ | DISPATCH -> dispatch t d (* Need to write the dispatcher and other ops *)
+ | HEARTBEAT -> ()
+ | IDENTIFY -> ()
+ | STATUS_UPDATE -> ()
+ | VOICE_STATE_UPDATE -> ()
+ | RESUME -> ()
+ | RECONNECT -> ()
+ | REQUEST_GUILD_MEMBERS -> ()
+ | INVALID_SESSION -> ()
+ | HELLO -> ()
+ | HEARTBEAT_ACK -> ()
+ |> ignore;
+ { shard with seq = s; }
+ end
+ | _ -> shard
+
+let wrap_payload d op =
+ `Assoc [
+ ("op", `Int op);
+ ("d", d)
+ ]
+
+let create_frame content =
+ Frame.create ~content ()
+
+let identify ?(threshold=250) ~total ~token shard =
+ let p = wrap_payload (`Assoc [
+ ("token", `String token);
+ ("properties", `Assoc [
+ ("$os", `String Sys.os_type);
+ ("$browser", `String "animus");
+ ("$device", `String "animus");
+ ]);
+ ("large_threshold", `Int threshold);
+ ("shard", `List [`Int shard.id; `Int total]);
+ ]) (IDENTIFY |> Opcode.to_int) in
+ push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+
+let resume ~token shard =
+ let p = wrap_payload (`Assoc [
+ ("token", `String token);
+ ("session_id", `String shard.session_id);
+ ("seq", `Int shard.seq);
+ ]) (RESUME |> Opcode.to_int) in
+ push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+
+let heartbeat shard =
+ let p = wrap_payload (`Int shard.seq) (HEARTBEAT |> Opcode.to_int) in
+ push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard
+
+let connect ~options ~uri ~id ~total ~token () =
+ let url = uri |> Uri.to_string in
+ let ip = Ipaddr.V4 Ipaddr.V4.any in
+ Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *)
+ >|= fun (recv, send) ->
+ let shard = init send id in
+ heartbeat shard >>= (fun _ ->
+ identify shard total token
+ ) |> ignore; (* This feels really hacky *)
+ let handle_frame = process_frame shard in
+ let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in
+ shard \ No newline at end of file
diff --git a/lib/client/sharder/shard_manager.ml b/lib/client/sharder/shard_manager.ml
new file mode 100644
index 0000000..68b3d13
--- /dev/null
+++ b/lib/client/sharder/shard_manager.ml
@@ -0,0 +1,31 @@
+open Lwt.Infix
+open Websocket
+
+module ShardSet = Set.Make(Shard)
+
+type t = {
+ shards: Shard.t ShardSet.t;
+ gateway_url: Uri.t;
+ token: string;
+}
+
+let create_shard ?(options=[]) manager =
+ let id = (ShardSet.cardinal manager.shards) + 1 in
+ Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token ()
+ >|= fun shard ->
+ ShardSet.add shard manager.shards
+
+let update_shard shard manager =
+ match ShardSet.mem shard manager.shards with
+ | true -> ShardSet.add shard manager.shards
+ | false -> manager.shards
+
+let heartbeat shard manager =
+ Shard.heartbeat shard
+
+let identify shard manager =
+ let total = ShardSet.cardinal manager.shards in
+ Shard.identify total manager.token shard
+
+let resume shard manager =
+ Shard.resume manager.token shard \ No newline at end of file