aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMishio595 <[email protected]>2018-11-29 06:16:23 -0700
committerMishio595 <[email protected]>2018-11-29 06:16:23 -0700
commit77f522a5f3fd74749e7a2cd4c849e520f2b6ba89 (patch)
treed9bb7e7be811ba84c4c527fcd11054ab475ee54b /lib
parentMore models, some dispatch rework starting (diff)
downloaddisml-77f522a5f3fd74749e7a2cd4c849e520f2b6ba89.tar.xz
disml-77f522a5f3fd74749e7a2cd4c849e520f2b6ba89.zip
Some sharding work, reconnect is mostly working
Diffstat (limited to 'lib')
-rw-r--r--lib/dispatch.ml152
-rw-r--r--lib/sharder.ml161
2 files changed, 136 insertions, 177 deletions
diff --git a/lib/dispatch.ml b/lib/dispatch.ml
index c6d717b..43ffe1f 100644
--- a/lib/dispatch.ml
+++ b/lib/dispatch.ml
@@ -1,116 +1,38 @@
-open Async
+(* open Async *)
-type t = {
- hello: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- ready: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- resumed: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- invalid_session: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- channel_create: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t;
- channel_update: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t;
- channel_delete: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t;
- channel_pins_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- guild_create: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t;
- guild_update: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t;
- guild_delete: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t;
- guild_ban_add: Ban.t Pipe.Reader.t * Ban.t Pipe.Writer.t;
- guild_ban_remove: Ban.t Pipe.Reader.t * Ban.t Pipe.Writer.t;
- guild_emojis_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- guild_integrations_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- guild_member_add: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t;
- guild_member_remove: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t;
- guild_member_update: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t;
- guild_members_chunk: (Member.t list) Pipe.Reader.t * (Member.t list) Pipe.Writer.t;
- guild_role_create: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t;
- guild_role_update: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t;
- guild_role_delete: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t;
- message_create: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t;
- message_update: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t;
- message_delete: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t;
- message_bulk_delete: (Message.t list) Pipe.Reader.t * (Message.t list) Pipe.Writer.t;
- message_reaction_add: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t;
- message_reaction_remove: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t;
- message_reaction_remove_all: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t;
- presence_update: Presence.t Pipe.Reader.t * Presence.t Pipe.Writer.t;
- typing_start: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- user_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- voice_state_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- voice_server_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
- webhooks_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t;
-}
-
-let dispatcher =
- {
- hello = Pipe.create ();
- ready = Pipe.create ();
- resumed = Pipe.create ();
- invalid_session = Pipe.create ();
- channel_create = Pipe.create ();
- channel_update = Pipe.create ();
- channel_delete = Pipe.create ();
- channel_pins_update = Pipe.create ();
- guild_create = Pipe.create ();
- guild_update = Pipe.create ();
- guild_delete = Pipe.create ();
- guild_ban_add = Pipe.create ();
- guild_ban_remove = Pipe.create ();
- guild_emojis_update = Pipe.create ();
- guild_integrations_update = Pipe.create ();
- guild_member_add = Pipe.create ();
- guild_member_remove = Pipe.create ();
- guild_member_update = Pipe.create ();
- guild_members_chunk = Pipe.create ();
- guild_role_create = Pipe.create ();
- guild_role_update = Pipe.create ();
- guild_role_delete = Pipe.create ();
- message_create = Pipe.create ();
- message_update = Pipe.create ();
- message_delete = Pipe.create ();
- message_bulk_delete = Pipe.create ();
- message_reaction_add = Pipe.create ();
- message_reaction_remove = Pipe.create ();
- message_reaction_remove_all = Pipe.create ();
- presence_update = Pipe.create ();
- typing_start = Pipe.create ();
- user_update = Pipe.create ();
- voice_state_update = Pipe.create ();
- voice_server_update = Pipe.create ();
- webhooks_update = Pipe.create ();
- }
-
-(* let write (ev:Event.t) =
- let (read, _) = match ev with
- | HELLO -> dispatcher.hello
- | READY -> dispatcher.ready
- | RESUMED -> dispatcher.resumed
- | INVALID_SESSION -> dispatcher.invalid_session
- | CHANNEL_CREATE -> dispatcher.channel_create
- | CHANNEL_UPDATE -> dispatcher.channel_update
- | CHANNEL_DELETE -> dispatcher.channel_delete
- | CHANNEL_PINS_UPDATE -> dispatcher.channel_pins_update
- | GUILD_CREATE -> dispatcher.guild_create
- | GUILD_UPDATE -> dispatcher.guild_update
- | GUILD_DELETE -> dispatcher.guild_delete
- | GUILD_BAN_ADD -> dispatcher.guild_ban_add
- | GUILD_BAN_REMOVE -> dispatcher.guild_ban_remove
- | GUILD_EMOJIS_UPDATE -> dispatcher.guild_emojis_update
- | GUILD_INTEGRATIONS_UPDATE -> dispatcher.guild_integrations_update
- | GUILD_MEMBER_ADD -> dispatcher.guild_member_ad
- | GUILD_MEMBER_REMOVE -> dispatcher.guild_member_remove
- | GUILD_MEMBER_UPDATE -> dispatcher.guild_member_update
- | GUILD_MEMBERS_CHUNK -> dispatcher.guild_members_chunk
- | GUILD_ROLE_CREATE -> dispatcher.guild_role_create
- | GUILD_ROLE_UPDATE -> dispatcher.guild_role_updatE
- | GUILD_ROLE_DELETE -> dispatcher.guild_role_delete
- | MESSAGE_CREATE -> dispatcher.message_create
- | MESSAGE_UPDATE -> dispatcher.message_update
- | MESSAGE_DELETE -> dispatcher.message_delete
- | MESSAGE_BULK_DELETE -> dispatcher.message_bulk_delete
- | MESSAGE_REACTION_ADD -> dispatcher.message_reaction_add
- | MESSAGE_REACTION_REMOVE -> dispatcher.message_reaction_remove
- | MESSAGE_REACTION_REMOVE_ALL -> dispatcher.message_reaction_remove_all
- | PRESENCE_UPDATE -> dispatcher.presence_update
- | TYPING_START -> dispatcher.typing_start
- | USER_UPDATE -> dispatcher.user_update
- | VOICE_STATE_UPDATE -> dispatcher.voice_state_update
- | VOICE_SERVER_UPDATE -> dispatcher.voice_server_update
- | WEBHOOKS_UPDATE -> dispatcher.webhooks_update *) \ No newline at end of file
+type dispatch_event =
+| HELLO of Yojson.Basic.json
+| READY of Yojson.Basic.json
+| RESUMED of Yojson.Basic.json
+| INVALID_SESSION of Yojson.Basic.json
+| CHANNEL_CREATE of Channel.t
+| CHANNEL_UPDATE of Channel.t
+| CHANNEL_DELETE of Channel.t
+| CHANNEL_PINS_UPDATE of Yojson.Basic.json
+| GUILD_CREATE of Guild.t
+| GUILD_UPDATE of Guild.t
+| GUILD_DELETE of Guild.t
+| GUILD_BAN_ADD of Ban.t
+| GUILD_BAN_REMOVE of Ban.t
+| GUILD_EMOJIS_UPDATE of Yojson.Basic.json
+| GUILD_INTEGRATIONS_UPDATE of Yojson.Basic.json
+| GUILD_MEMBER_ADD of Member.t
+| GUILD_MEMBER_REMOVE of Member.t
+| GUILD_MEMBER_UPDATE of Member.t
+| GUILD_MEMBERS_CHUNK of Member.t list
+| GUILD_ROLE_CREATE of Role.t * Guild.t
+| GUILD_ROLE_UPDATE of Role.t * Guild.t
+| GUILD_ROLE_DELETE of Role.t * Guild.t
+| MESSAGE_CREATE of Message.t
+| MESSAGE_UPDATE of Message.t
+| MESSAGE_DELETE of Message.t
+| MESSAGE_BULK_DELETE of Message.t list
+| MESSAGE_REACTION_ADD of Message.t * Reaction.t
+| MESSAGE_REACTION_REMOVE of Message.t * Reaction.t
+| MESSAGE_REACTION_REMOVE_ALL of Message.t * Reaction.t list
+| PRESENCE_UPDATE of Presence.t
+| TYPING_START of Yojson.Basic.json
+| USER_UPDATE of Yojson.Basic.json
+| VOICE_STATE_UPDATE of Yojson.Basic.json
+| VOICE_SERVER_UPDATE of Yojson.Basic.json
+| WEBHOOKS_UPDATE of Yojson.Basic.json
diff --git a/lib/sharder.ml b/lib/sharder.ml
index defcfe1..74870bd 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -3,25 +3,39 @@ open Core
open Websocket_async
exception Invalid_Payload
+exception Failure_to_Establish_Heartbeat
module Shard = struct
- type t = {
- mutable hb: unit Ivar.t option;
- mutable seq: int;
- mutable session: string option;
- token: string;
- shard: int * int;
- write: Frame.t Pipe.Writer.t;
- read: Frame.t Pipe.Reader.t;
+ type shard = {
+ hb: unit Ivar.t option;
+ seq: int;
+ session: string option;
+ pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t;
ready: unit Ivar.t;
+ token: string;
+ url: string;
+ id: int * int;
+ }
+
+ type 'a t = {
+ mutable shard: 'a;
+ mutable binds: ('a -> unit) list;
}
let identify_lock = Mutex.create ()
+ let bind ~f t =
+ t.binds <- f :: t.binds
+
let parse (frame:[`Ok of Frame.t | `Eof]) =
match frame with
- | `Ok s -> Yojson.Basic.from_string s.content (* TODO Handler non-text frames *)
- | `Eof -> raise Invalid_Payload (* TODO This needs to go into reconnect code, or stop using client_ez and handle frames manually *)
+ | `Ok s -> begin
+ let open Frame.Opcode in
+ match s.opcode with
+ | Text -> Some (Yojson.Basic.from_string s.content)
+ | _ -> None
+ end
+ | `Eof -> None
let push_frame ?payload ~ev shard =
print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
@@ -33,33 +47,31 @@ module Shard = struct
("d", p);
]
in
- Pipe.write shard.write @@ Frame.create ~content ()
+ let (_, write) = shard.pipe in
+ Pipe.write_if_open write @@ Frame.create ~content ()
>>| fun () ->
shard
let heartbeat shard =
- let seq = match shard.seq with
+ let payload = match shard.seq with
| 0 -> `Null
| i -> `Int i
in
- let payload = `Assoc [
- ("op", `Int 1);
- ("d", seq);
- ] in
push_frame ~payload ~ev:HEARTBEAT shard
let dispatch ~payload shard =
let module J = Yojson.Basic.Util in
let seq = J.(member "s" payload |> to_int) in
- shard.seq <- seq;
let t = J.(member "t" payload |> to_string) in
let data = J.member "d" payload in
+ let session = J.(member "session_id" data |> to_string_option) in
if t = "READY" then begin
Ivar.fill_if_empty shard.ready ();
- let session = J.(member "session_id" data |> to_string) in
- shard.session <- Some session
end;
- return shard
+ return { shard with
+ seq = seq;
+ session = session;
+ }
let set_status ~status shard =
let payload = match status with
@@ -97,27 +109,30 @@ module Shard = struct
Ivar.read shard.ready >>= fun _ ->
push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard
- let initialize ~data shard =
+ let initialize ?data shard =
let module J = Yojson.Basic.Util in
let hb = match shard.hb with
| None -> begin
- let hb_interval = J.(member "heartbeat_interval" data |> to_int) in
- let finished = Ivar.create () in
- Clock.every'
- ~continue_on_error:true
- ~finished
- (Core.Time.Span.create ~ms:hb_interval ())
- (fun () -> heartbeat shard >>= fun _ -> return ());
- finished
+ match data with
+ | Some data ->
+ let hb_interval = J.(member "heartbeat_interval" data |> to_int) in
+ let finished = Ivar.create () in
+ Clock.every'
+ ~continue_on_error:true
+ ~finished
+ (Core.Time.Span.create ~ms:hb_interval ())
+ (fun () -> heartbeat shard >>= fun _ -> return ());
+ finished
+ | None -> raise Failure_to_Establish_Heartbeat
end
| Some s -> s
in
- shard.hb <- Some hb;
- Mutex.lock identify_lock;
- let (cur, max) = shard.shard in
+ let shard = { shard with hb = Some hb; } in
+ let (cur, max) = shard.id in
let shards = [`Int cur; `Int max] in
match shard.session with
- | None ->
+ | None -> begin
+ Mutex.lock identify_lock;
let payload = `Assoc [
("token", `String shard.token);
("properties", `Assoc [
@@ -130,6 +145,12 @@ module Shard = struct
("shard", `List shards);
] in
push_frame ~payload ~ev:IDENTIFY shard
+ >>| fun s -> begin
+ Clock.after (Core.Time.Span.create ~sec:5 ())
+ >>> (fun _ -> Mutex.unlock identify_lock);
+ s
+ end
+ end
| Some s ->
let payload = `Assoc [
("token", `String shard.token);
@@ -137,11 +158,6 @@ module Shard = struct
("seq", `Int shard.seq)
] in
push_frame ~payload ~ev:RESUME shard
- >>| fun s ->
- Clock.after (Core.Time.Span.create ~sec:5 ())
- >>| (fun _ -> Mutex.unlock identify_lock)
- |> ignore;
- s
let handle_frame ~f shard =
let module J = Yojson.Basic.Util in
@@ -151,15 +167,21 @@ module Shard = struct
match op with
| DISPATCH -> dispatch ~payload:f shard
| HEARTBEAT -> heartbeat shard
- | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *)
- | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *)
+ | INVALID_SESSION -> begin
+ if J.(member "d" f |> to_bool) then
+ initialize shard
+ else begin
+ initialize { shard with session = None; }
+ end
+ end
+ | RECONNECT -> initialize shard
| HELLO -> initialize ~data:(J.member "d" f) shard
| HEARTBEAT_ACK -> return shard
| opcode ->
print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode;
return shard
- let create ~url ~shards ~token () =
+ let rec create ~url ~shards ~token () =
let open Core in
let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in
let extra_headers = Http.Base.process_request_headers () in
@@ -184,26 +206,36 @@ module Shard = struct
uri
>>> ignore;
Ivar.read initialized >>| fun () ->
- let rec ev_loop ~reader shard =
- Pipe.read reader
+ let rec ev_loop t =
+ let (read, _) = t.shard.pipe in
+ Pipe.read read
>>= fun frame ->
- handle_frame ~f:(parse frame) shard
- >>= fun shard ->
- ev_loop ~reader shard
+ (match parse frame with
+ | Some f -> begin
+ handle_frame ~f t.shard
+ >>| fun shard ->
+ t.shard <- shard;
+ t
+ end
+ | None -> recreate t.shard)
+ >>= fun t ->
+ List.iter ~f:(fun f -> f t.shard) t.binds;
+ ev_loop t
in
let shard = {
- read;
- write;
+ pipe = (read, write);
ready = Ivar.create ();
hb = None;
seq = 0;
- shard = shards;
+ id = shards;
session = None;
- token = token;
+ token;
+ url;
}
in
- ev_loop ~reader:read shard |> ignore;
- shard
+ let t = { shard; binds = []; } in
+ ev_loop t >>> ignore;
+ t
in
match Unix.getaddrinfo host (string_of_int port) [] with
| [] -> failwithf "DNS resolution failed for %s" host ()
@@ -220,10 +252,17 @@ module Shard = struct
`TCP (h, p)
in
Conduit_async.V2.connect addr >>= tcp_fun
+ and recreate shard =
+ print_endline "Reconnecting...";
+ (match shard.hb with
+ | Some hb -> Ivar.fill_if_empty hb ()
+ | None -> ()
+ );
+ create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) ()
end
type t = {
- shards: Shard.t list;
+ mutable shards: (Shard.shard Shard.t) list;
}
let start ?count token =
@@ -246,21 +285,19 @@ let start ?count token =
in
gen_shards shard_list []
>>| fun shards ->
- {
- shards;
- }
+ { shards; }
let set_status sharder status =
- Deferred.all @@ List.map ~f:(fun shard ->
- Shard.set_status ~status shard
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.set_status ~status t.shard
) sharder.shards
let set_status_with sharder f =
- Deferred.all @@ List.map ~f:(fun shard ->
- Shard.set_status ~status:(f shard) shard
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.set_status ~status:(f t.shard) t.shard
) sharder.shards
let request_guild_members ~guild ?query ?limit sharder =
- Deferred.all @@ List.map ~f:(fun shard ->
- Shard.request_guild_members ~guild ?query ?limit shard
+ Deferred.all @@ List.map ~f:(fun t ->
+ Shard.request_guild_members ~guild ?query ?limit t.shard
) sharder.shards