diff options
| author | Mishio595 <[email protected]> | 2018-10-29 06:25:22 -0600 |
|---|---|---|
| committer | Mishio595 <[email protected]> | 2018-10-29 06:25:22 -0600 |
| commit | 5a1aae149fafa2848c2cd89a3c75992653380201 (patch) | |
| tree | d6c2567ab3854aa6fb51aa4a2d715cf42f76ff52 /lib | |
| parent | More random stuff with sharding (diff) | |
| download | disml-5a1aae149fafa2848c2cd89a3c75992653380201.tar.xz disml-5a1aae149fafa2848c2cd89a3c75992653380201.zip | |
Moving to LWT branch
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/client/sharder/conn.ml | 5 | ||||
| -rw-r--r-- | lib/client/sharder/opcode.ml | 2 | ||||
| -rw-r--r-- | lib/client/sharder/shard.ml | 62 | ||||
| -rw-r--r-- | lib/client/sharder/shardManager.ml | 32 | ||||
| -rw-r--r-- | lib/client/sharder/shard_manager.ml | 31 |
5 files changed, 62 insertions, 70 deletions
diff --git a/lib/client/sharder/conn.ml b/lib/client/sharder/conn.ml deleted file mode 100644 index 396c170..0000000 --- a/lib/client/sharder/conn.ml +++ /dev/null @@ -1,5 +0,0 @@ -type t = { -} - -let push_frame ~conn frame = - ()
\ No newline at end of file diff --git a/lib/client/sharder/opcode.ml b/lib/client/sharder/opcode.ml index 679f45e..2462d05 100644 --- a/lib/client/sharder/opcode.ml +++ b/lib/client/sharder/opcode.ml @@ -38,7 +38,7 @@ let from_int = function | 9 -> INVALID_SESSION | 10 -> HELLO | 11 -> HEARTBEAT_ACK - | op -> raise Invalid_Opcode op + | op -> raise (Invalid_Opcode op) let to_string = function | DISPATCH -> "DISPATCH" diff --git a/lib/client/sharder/shard.ml b/lib/client/sharder/shard.ml index 3b9cf8d..7be4aad 100644 --- a/lib/client/sharder/shard.ml +++ b/lib/client/sharder/shard.ml @@ -1,27 +1,28 @@ +open Lwt.Infix +open Websocket + type t = { - conn = Conn.t; + send: (Frame.t -> unit Lwt.t); id: int; + total_shards: int; hb_interval: int; session_id: string; seq: int; + token: string; } -let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~conn ~id () = - { conn; id; hb_interval; session_id; seq; } - -let compare s1 s2 = - Pervasives.compare s1.id s2.id +let init ?(hb_interval=42500) ?(session_id="") ?(seq=0) ~send ~id ~total_shards ~token () = + { send; id; total_shards; hb_interval; session_id; seq; token; } -let push_frame ~payload shard = - payload - |> Conn.push_frame ~conn:shard.conn +let push_frame shard frame = + shard.send frame -let process_frame ~frame shard = +let process_frame shard frame = let json = frame |> Yojson.Basic.from_string in match json with - | `Assoc [("s", s); ("d", d); ("t", t); ("op", op);] -> begin + | `Assoc [("s", `Int _s); ("d", _d); ("t", `String _t); ("op", `Int op);] -> begin match op |> Opcode.from_int with - | DISPATCH -> dispatch t d (* Need to write the dispatcher and other ops *) + | DISPATCH -> () (* dispatch t d Need to write the dispatcher and other ops *) | HEARTBEAT -> () | IDENTIFY -> () | STATUS_UPDATE -> () @@ -33,8 +34,8 @@ let process_frame ~frame shard = | HELLO -> () | HEARTBEAT_ACK -> () |> ignore; - { shard with seq = s; } - end + (* { shard with seq = s; } *) + end | _ -> shard let wrap_payload d op = @@ -46,40 +47,35 @@ let wrap_payload d op = let create_frame content = Frame.create ~content () -let identify ?(threshold=250) ~total ~token shard = +let identify ?(threshold=250) shard= let p = wrap_payload (`Assoc [ - ("token", `String token); + ("token", `String shard.token); ("properties", `Assoc [ ("$os", `String Sys.os_type); ("$browser", `String "animus"); ("$device", `String "animus"); ]); ("large_threshold", `Int threshold); - ("shard", `List [`Int shard.id; `Int total]); - ]) (IDENTIFY |> Opcode.to_int) in - push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + ("shard", `List [`Int shard.id; `Int shard.total_shards]); + ]) (Opcode.to_int IDENTIFY) in + push_frame shard (Yojson.Basic.to_string p |> create_frame) -let resume ~token shard = +let resume shard = let p = wrap_payload (`Assoc [ - ("token", `String token); + ("token", `String shard.token); ("session_id", `String shard.session_id); ("seq", `Int shard.seq); - ]) (RESUME |> Opcode.to_int) in - push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + ]) (Opcode.to_int RESUME) in + push_frame shard (Yojson.Basic.to_string p |> create_frame) let heartbeat shard = - let p = wrap_payload (`Int shard.seq) (HEARTBEAT |> Opcode.to_int) in - push_frame ~frame:(Yojson.Basic.to_string p |> create_frame) shard + let p = wrap_payload (`Int shard.seq) (Opcode.to_int HEARTBEAT) in + push_frame shard (Yojson.Basic.to_string p |> create_frame) -let connect ~options ~uri ~id ~total ~token () = +let connect ~_options ~uri ~id ~total ~token () = let url = uri |> Uri.to_string in let ip = Ipaddr.V4 Ipaddr.V4.any in + let client = Websocket.Connected_Client.create Websocket_lwt.with_connection (`TLS (`Hostname url, `IP ip, `Port 443)) uri (* Maybe use upgrade_connection? *) >|= fun (recv, send) -> - let shard = init send id in - heartbeat shard >>= (fun _ -> - identify shard total token - ) |> ignore; (* This feels really hacky *) - let handle_frame = process_frame shard in - let rec recv_loop () = recv () >>= handle_frame >>= recv_loop () in - shard
\ No newline at end of file + init ~send ~id ~token ~total_shards:total ()
\ No newline at end of file diff --git a/lib/client/sharder/shardManager.ml b/lib/client/sharder/shardManager.ml new file mode 100644 index 0000000..c5919f3 --- /dev/null +++ b/lib/client/sharder/shardManager.ml @@ -0,0 +1,32 @@ +open Lwt.Infix + +module ShardSet = Set.Make(struct + type t = Shard.t + let compare (s1:t) (s2:t) = Pervasives.compare s1.id s2.id +end) + +type t = { + shards: ShardSet.t; + gateway_url: Uri.t; + token: string; +} + +let create_shard ?(options=[]) manager = + let id = (ShardSet.cardinal manager.shards) + 1 in + Shard.connect ~_options:options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token () + >|= fun shard -> + ShardSet.add shard manager.shards + +let update_shard manager shard = + match ShardSet.mem shard manager.shards with + | true -> ShardSet.add shard manager.shards + | false -> manager.shards + +let heartbeat _manager shard = + Shard.heartbeat shard + +let identify _manager shard = + Shard.identify shard + +let resume _manager shard = + Shard.resume shard
\ No newline at end of file diff --git a/lib/client/sharder/shard_manager.ml b/lib/client/sharder/shard_manager.ml deleted file mode 100644 index 68b3d13..0000000 --- a/lib/client/sharder/shard_manager.ml +++ /dev/null @@ -1,31 +0,0 @@ -open Lwt.Infix -open Websocket - -module ShardSet = Set.Make(Shard) - -type t = { - shards: Shard.t ShardSet.t; - gateway_url: Uri.t; - token: string; -} - -let create_shard ?(options=[]) manager = - let id = (ShardSet.cardinal manager.shards) + 1 in - Shard.connect ~options ~uri:manager.gateway_url ~id ~total:(ShardSet.cardinal manager.shards) ~token:manager.token () - >|= fun shard -> - ShardSet.add shard manager.shards - -let update_shard shard manager = - match ShardSet.mem shard manager.shards with - | true -> ShardSet.add shard manager.shards - | false -> manager.shards - -let heartbeat shard manager = - Shard.heartbeat shard - -let identify shard manager = - let total = ShardSet.cardinal manager.shards in - Shard.identify total manager.token shard - -let resume shard manager = - Shard.resume manager.token shard
\ No newline at end of file |