diff options
| author | Mishio595 <[email protected]> | 2018-11-29 06:16:23 -0700 |
|---|---|---|
| committer | Mishio595 <[email protected]> | 2018-11-29 06:16:23 -0700 |
| commit | 77f522a5f3fd74749e7a2cd4c849e520f2b6ba89 (patch) | |
| tree | d9bb7e7be811ba84c4c527fcd11054ab475ee54b /lib | |
| parent | More models, some dispatch rework starting (diff) | |
| download | disml-77f522a5f3fd74749e7a2cd4c849e520f2b6ba89.tar.xz disml-77f522a5f3fd74749e7a2cd4c849e520f2b6ba89.zip | |
Some sharding work, reconnect is mostly working
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/dispatch.ml | 152 | ||||
| -rw-r--r-- | lib/sharder.ml | 161 |
2 files changed, 136 insertions, 177 deletions
diff --git a/lib/dispatch.ml b/lib/dispatch.ml index c6d717b..43ffe1f 100644 --- a/lib/dispatch.ml +++ b/lib/dispatch.ml @@ -1,116 +1,38 @@ -open Async +(* open Async *) -type t = { - hello: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - ready: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - resumed: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - invalid_session: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - channel_create: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t; - channel_update: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t; - channel_delete: Channel.t Pipe.Reader.t * Channel.t Pipe.Writer.t; - channel_pins_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - guild_create: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t; - guild_update: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t; - guild_delete: Guild.t Pipe.Reader.t * Guild.t Pipe.Writer.t; - guild_ban_add: Ban.t Pipe.Reader.t * Ban.t Pipe.Writer.t; - guild_ban_remove: Ban.t Pipe.Reader.t * Ban.t Pipe.Writer.t; - guild_emojis_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - guild_integrations_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - guild_member_add: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t; - guild_member_remove: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t; - guild_member_update: Member.t Pipe.Reader.t * Member.t Pipe.Writer.t; - guild_members_chunk: (Member.t list) Pipe.Reader.t * (Member.t list) Pipe.Writer.t; - guild_role_create: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t; - guild_role_update: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t; - guild_role_delete: (Role.t * Guild.t) Pipe.Reader.t * (Role.t * Guild.t) Pipe.Writer.t; - message_create: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t; - message_update: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t; - message_delete: Message.t Pipe.Reader.t * Message.t Pipe.Writer.t; - message_bulk_delete: (Message.t list) Pipe.Reader.t * (Message.t list) Pipe.Writer.t; - message_reaction_add: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t; - message_reaction_remove: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t; - message_reaction_remove_all: (Message.t * Reaction.t) Pipe.Reader.t * (Message.t * Reaction.t) Pipe.Writer.t; - presence_update: Presence.t Pipe.Reader.t * Presence.t Pipe.Writer.t; - typing_start: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - user_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - voice_state_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - voice_server_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; - webhooks_update: Yojson.Basic.json Pipe.Reader.t * Yojson.Basic.json Pipe.Writer.t; -} - -let dispatcher = - { - hello = Pipe.create (); - ready = Pipe.create (); - resumed = Pipe.create (); - invalid_session = Pipe.create (); - channel_create = Pipe.create (); - channel_update = Pipe.create (); - channel_delete = Pipe.create (); - channel_pins_update = Pipe.create (); - guild_create = Pipe.create (); - guild_update = Pipe.create (); - guild_delete = Pipe.create (); - guild_ban_add = Pipe.create (); - guild_ban_remove = Pipe.create (); - guild_emojis_update = Pipe.create (); - guild_integrations_update = Pipe.create (); - guild_member_add = Pipe.create (); - guild_member_remove = Pipe.create (); - guild_member_update = Pipe.create (); - guild_members_chunk = Pipe.create (); - guild_role_create = Pipe.create (); - guild_role_update = Pipe.create (); - guild_role_delete = Pipe.create (); - message_create = Pipe.create (); - message_update = Pipe.create (); - message_delete = Pipe.create (); - message_bulk_delete = Pipe.create (); - message_reaction_add = Pipe.create (); - message_reaction_remove = Pipe.create (); - message_reaction_remove_all = Pipe.create (); - presence_update = Pipe.create (); - typing_start = Pipe.create (); - user_update = Pipe.create (); - voice_state_update = Pipe.create (); - voice_server_update = Pipe.create (); - webhooks_update = Pipe.create (); - } - -(* let write (ev:Event.t) = - let (read, _) = match ev with - | HELLO -> dispatcher.hello - | READY -> dispatcher.ready - | RESUMED -> dispatcher.resumed - | INVALID_SESSION -> dispatcher.invalid_session - | CHANNEL_CREATE -> dispatcher.channel_create - | CHANNEL_UPDATE -> dispatcher.channel_update - | CHANNEL_DELETE -> dispatcher.channel_delete - | CHANNEL_PINS_UPDATE -> dispatcher.channel_pins_update - | GUILD_CREATE -> dispatcher.guild_create - | GUILD_UPDATE -> dispatcher.guild_update - | GUILD_DELETE -> dispatcher.guild_delete - | GUILD_BAN_ADD -> dispatcher.guild_ban_add - | GUILD_BAN_REMOVE -> dispatcher.guild_ban_remove - | GUILD_EMOJIS_UPDATE -> dispatcher.guild_emojis_update - | GUILD_INTEGRATIONS_UPDATE -> dispatcher.guild_integrations_update - | GUILD_MEMBER_ADD -> dispatcher.guild_member_ad - | GUILD_MEMBER_REMOVE -> dispatcher.guild_member_remove - | GUILD_MEMBER_UPDATE -> dispatcher.guild_member_update - | GUILD_MEMBERS_CHUNK -> dispatcher.guild_members_chunk - | GUILD_ROLE_CREATE -> dispatcher.guild_role_create - | GUILD_ROLE_UPDATE -> dispatcher.guild_role_updatE - | GUILD_ROLE_DELETE -> dispatcher.guild_role_delete - | MESSAGE_CREATE -> dispatcher.message_create - | MESSAGE_UPDATE -> dispatcher.message_update - | MESSAGE_DELETE -> dispatcher.message_delete - | MESSAGE_BULK_DELETE -> dispatcher.message_bulk_delete - | MESSAGE_REACTION_ADD -> dispatcher.message_reaction_add - | MESSAGE_REACTION_REMOVE -> dispatcher.message_reaction_remove - | MESSAGE_REACTION_REMOVE_ALL -> dispatcher.message_reaction_remove_all - | PRESENCE_UPDATE -> dispatcher.presence_update - | TYPING_START -> dispatcher.typing_start - | USER_UPDATE -> dispatcher.user_update - | VOICE_STATE_UPDATE -> dispatcher.voice_state_update - | VOICE_SERVER_UPDATE -> dispatcher.voice_server_update - | WEBHOOKS_UPDATE -> dispatcher.webhooks_update *)
\ No newline at end of file +type dispatch_event = +| HELLO of Yojson.Basic.json +| READY of Yojson.Basic.json +| RESUMED of Yojson.Basic.json +| INVALID_SESSION of Yojson.Basic.json +| CHANNEL_CREATE of Channel.t +| CHANNEL_UPDATE of Channel.t +| CHANNEL_DELETE of Channel.t +| CHANNEL_PINS_UPDATE of Yojson.Basic.json +| GUILD_CREATE of Guild.t +| GUILD_UPDATE of Guild.t +| GUILD_DELETE of Guild.t +| GUILD_BAN_ADD of Ban.t +| GUILD_BAN_REMOVE of Ban.t +| GUILD_EMOJIS_UPDATE of Yojson.Basic.json +| GUILD_INTEGRATIONS_UPDATE of Yojson.Basic.json +| GUILD_MEMBER_ADD of Member.t +| GUILD_MEMBER_REMOVE of Member.t +| GUILD_MEMBER_UPDATE of Member.t +| GUILD_MEMBERS_CHUNK of Member.t list +| GUILD_ROLE_CREATE of Role.t * Guild.t +| GUILD_ROLE_UPDATE of Role.t * Guild.t +| GUILD_ROLE_DELETE of Role.t * Guild.t +| MESSAGE_CREATE of Message.t +| MESSAGE_UPDATE of Message.t +| MESSAGE_DELETE of Message.t +| MESSAGE_BULK_DELETE of Message.t list +| MESSAGE_REACTION_ADD of Message.t * Reaction.t +| MESSAGE_REACTION_REMOVE of Message.t * Reaction.t +| MESSAGE_REACTION_REMOVE_ALL of Message.t * Reaction.t list +| PRESENCE_UPDATE of Presence.t +| TYPING_START of Yojson.Basic.json +| USER_UPDATE of Yojson.Basic.json +| VOICE_STATE_UPDATE of Yojson.Basic.json +| VOICE_SERVER_UPDATE of Yojson.Basic.json +| WEBHOOKS_UPDATE of Yojson.Basic.json diff --git a/lib/sharder.ml b/lib/sharder.ml index defcfe1..74870bd 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -3,25 +3,39 @@ open Core open Websocket_async exception Invalid_Payload +exception Failure_to_Establish_Heartbeat module Shard = struct - type t = { - mutable hb: unit Ivar.t option; - mutable seq: int; - mutable session: string option; - token: string; - shard: int * int; - write: Frame.t Pipe.Writer.t; - read: Frame.t Pipe.Reader.t; + type shard = { + hb: unit Ivar.t option; + seq: int; + session: string option; + pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; ready: unit Ivar.t; + token: string; + url: string; + id: int * int; + } + + type 'a t = { + mutable shard: 'a; + mutable binds: ('a -> unit) list; } let identify_lock = Mutex.create () + let bind ~f t = + t.binds <- f :: t.binds + let parse (frame:[`Ok of Frame.t | `Eof]) = match frame with - | `Ok s -> Yojson.Basic.from_string s.content (* TODO Handler non-text frames *) - | `Eof -> raise Invalid_Payload (* TODO This needs to go into reconnect code, or stop using client_ez and handle frames manually *) + | `Ok s -> begin + let open Frame.Opcode in + match s.opcode with + | Text -> Some (Yojson.Basic.from_string s.content) + | _ -> None + end + | `Eof -> None let push_frame ?payload ~ev shard = print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev; @@ -33,33 +47,31 @@ module Shard = struct ("d", p); ] in - Pipe.write shard.write @@ Frame.create ~content () + let (_, write) = shard.pipe in + Pipe.write_if_open write @@ Frame.create ~content () >>| fun () -> shard let heartbeat shard = - let seq = match shard.seq with + let payload = match shard.seq with | 0 -> `Null | i -> `Int i in - let payload = `Assoc [ - ("op", `Int 1); - ("d", seq); - ] in push_frame ~payload ~ev:HEARTBEAT shard let dispatch ~payload shard = let module J = Yojson.Basic.Util in let seq = J.(member "s" payload |> to_int) in - shard.seq <- seq; let t = J.(member "t" payload |> to_string) in let data = J.member "d" payload in + let session = J.(member "session_id" data |> to_string_option) in if t = "READY" then begin Ivar.fill_if_empty shard.ready (); - let session = J.(member "session_id" data |> to_string) in - shard.session <- Some session end; - return shard + return { shard with + seq = seq; + session = session; + } let set_status ~status shard = let payload = match status with @@ -97,27 +109,30 @@ module Shard = struct Ivar.read shard.ready >>= fun _ -> push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard - let initialize ~data shard = + let initialize ?data shard = let module J = Yojson.Basic.Util in let hb = match shard.hb with | None -> begin - let hb_interval = J.(member "heartbeat_interval" data |> to_int) in - let finished = Ivar.create () in - Clock.every' - ~continue_on_error:true - ~finished - (Core.Time.Span.create ~ms:hb_interval ()) - (fun () -> heartbeat shard >>= fun _ -> return ()); - finished + match data with + | Some data -> + let hb_interval = J.(member "heartbeat_interval" data |> to_int) in + let finished = Ivar.create () in + Clock.every' + ~continue_on_error:true + ~finished + (Core.Time.Span.create ~ms:hb_interval ()) + (fun () -> heartbeat shard >>= fun _ -> return ()); + finished + | None -> raise Failure_to_Establish_Heartbeat end | Some s -> s in - shard.hb <- Some hb; - Mutex.lock identify_lock; - let (cur, max) = shard.shard in + let shard = { shard with hb = Some hb; } in + let (cur, max) = shard.id in let shards = [`Int cur; `Int max] in match shard.session with - | None -> + | None -> begin + Mutex.lock identify_lock; let payload = `Assoc [ ("token", `String shard.token); ("properties", `Assoc [ @@ -130,6 +145,12 @@ module Shard = struct ("shard", `List shards); ] in push_frame ~payload ~ev:IDENTIFY shard + >>| fun s -> begin + Clock.after (Core.Time.Span.create ~sec:5 ()) + >>> (fun _ -> Mutex.unlock identify_lock); + s + end + end | Some s -> let payload = `Assoc [ ("token", `String shard.token); @@ -137,11 +158,6 @@ module Shard = struct ("seq", `Int shard.seq) ] in push_frame ~payload ~ev:RESUME shard - >>| fun s -> - Clock.after (Core.Time.Span.create ~sec:5 ()) - >>| (fun _ -> Mutex.unlock identify_lock) - |> ignore; - s let handle_frame ~f shard = let module J = Yojson.Basic.Util in @@ -151,15 +167,21 @@ module Shard = struct match op with | DISPATCH -> dispatch ~payload:f shard | HEARTBEAT -> heartbeat shard - | RECONNECT -> print_endline "OP 7"; return shard (* TODO reconnect *) - | INVALID_SESSION -> print_endline "OP 9"; return shard (* TODO invalid session *) + | INVALID_SESSION -> begin + if J.(member "d" f |> to_bool) then + initialize shard + else begin + initialize { shard with session = None; } + end + end + | RECONNECT -> initialize shard | HELLO -> initialize ~data:(J.member "d" f) shard | HEARTBEAT_ACK -> return shard | opcode -> print_endline @@ "Invalid Opcode: " ^ Opcode.to_string opcode; return shard - let create ~url ~shards ~token () = + let rec create ~url ~shards ~token () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = Http.Base.process_request_headers () in @@ -184,26 +206,36 @@ module Shard = struct uri >>> ignore; Ivar.read initialized >>| fun () -> - let rec ev_loop ~reader shard = - Pipe.read reader + let rec ev_loop t = + let (read, _) = t.shard.pipe in + Pipe.read read >>= fun frame -> - handle_frame ~f:(parse frame) shard - >>= fun shard -> - ev_loop ~reader shard + (match parse frame with + | Some f -> begin + handle_frame ~f t.shard + >>| fun shard -> + t.shard <- shard; + t + end + | None -> recreate t.shard) + >>= fun t -> + List.iter ~f:(fun f -> f t.shard) t.binds; + ev_loop t in let shard = { - read; - write; + pipe = (read, write); ready = Ivar.create (); hb = None; seq = 0; - shard = shards; + id = shards; session = None; - token = token; + token; + url; } in - ev_loop ~reader:read shard |> ignore; - shard + let t = { shard; binds = []; } in + ev_loop t >>> ignore; + t in match Unix.getaddrinfo host (string_of_int port) [] with | [] -> failwithf "DNS resolution failed for %s" host () @@ -220,10 +252,17 @@ module Shard = struct `TCP (h, p) in Conduit_async.V2.connect addr >>= tcp_fun + and recreate shard = + print_endline "Reconnecting..."; + (match shard.hb with + | Some hb -> Ivar.fill_if_empty hb () + | None -> () + ); + create ~url:(shard.url) ~shards:(shard.id) ~token:(shard.token) () end type t = { - shards: Shard.t list; + mutable shards: (Shard.shard Shard.t) list; } let start ?count token = @@ -246,21 +285,19 @@ let start ?count token = in gen_shards shard_list [] >>| fun shards -> - { - shards; - } + { shards; } let set_status sharder status = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status ~status shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status t.shard ) sharder.shards let set_status_with sharder f = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.set_status ~status:(f shard) shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.set_status ~status:(f t.shard) t.shard ) sharder.shards let request_guild_members ~guild ?query ?limit sharder = - Deferred.all @@ List.map ~f:(fun shard -> - Shard.request_guild_members ~guild ?query ?limit shard + Deferred.all @@ List.map ~f:(fun t -> + Shard.request_guild_members ~guild ?query ?limit t.shard ) sharder.shards |