aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdelyn Breedlove <[email protected]>2019-02-05 21:42:34 +0000
committerAdelyn Breedlove <[email protected]>2019-02-05 21:42:34 +0000
commitdbb2b74e90c09e15143c880f3d273fc606c899dc (patch)
tree11e2cdb06393890ade23438f9b714dbdbe81aa63
parentMove large_threshold to shard storage, as it isn't needed elsewhere (diff)
parentSharder fixes (diff)
downloaddisml-dbb2b74e90c09e15143c880f3d273fc606c899dc.tar.xz
disml-dbb2b74e90c09e15143c880f3d273fc606c899dc.zip
Merge branch 'sharder_fixes' into 'master'
Sharder fixes See merge request Mishio595/disml!17
-rw-r--r--bin/bot.ml3
-rw-r--r--lib/event.ml4
-rw-r--r--lib/sharder.ml96
-rw-r--r--lib/sharder.mli12
4 files changed, 66 insertions, 49 deletions
diff --git a/bin/bot.ml b/bin/bot.ml
index 734cf74..bfc8505 100644
--- a/bin/bot.ml
+++ b/bin/bot.ml
@@ -93,6 +93,9 @@ let check_command Event.MessageCreate.{message} =
| Ok msg -> Message.reply message (Printf.sprintf "```lisp\n%s```" (Message.sexp_of_t msg |> Sexp.to_string_hum)) >>> ignore
| _ -> ()
end
+ | "!shutdown" ->
+ Ivar.read client >>> fun client ->
+ Sharder.shutdown_all client.sharder >>> ignore
| _ -> ()
(* Example logs setup *)
diff --git a/lib/event.ml b/lib/event.ml
index 65bb845..2e02249 100644
--- a/lib/event.ml
+++ b/lib/event.ml
@@ -37,7 +37,7 @@ type t =
| WEBHOOK_UPDATE of WebhookUpdate.t
| UNKNOWN of Unknown.t
-let event_of_yojson ~contents t = match t with
+let event_of_yojson ~contents = function
| "READY" -> READY Ready.(deserialize contents)
| "RESUMED" -> RESUMED Resumed.(deserialize contents)
| "CHANNEL_CREATE" -> CHANNEL_CREATE ChannelCreate.(deserialize contents)
@@ -73,7 +73,7 @@ let event_of_yojson ~contents t = match t with
| "WEBHOOK_UPDATE" -> WEBHOOK_UPDATE WebhookUpdate.(deserialize contents)
| s -> UNKNOWN Unknown.(deserialize s contents)
-let dispatch ev = match ev with
+let dispatch = function
| READY d -> !Dispatch.ready d
| RESUMED d -> !Dispatch.resumed d
| CHANNEL_CREATE d -> !Dispatch.channel_create d
diff --git a/lib/sharder.ml b/lib/sharder.ml
index b613399..8570a08 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -46,6 +46,7 @@ module Shard = struct
type 'a t = {
mutable state: 'a;
+ mutable stopped: bool;
}
let identify_lock = Mvar.create ()
@@ -56,13 +57,13 @@ module Shard = struct
| `Ok s -> begin
let open Frame.Opcode in
match s.opcode with
- | Text -> Some (Yojson.Safe.from_string s.content)
+ | Text -> Ok (Yojson.Safe.from_string s.content)
| Binary ->
- if compress then Some (decompress s.content |> Yojson.Safe.from_string)
- else None
- | _ -> None
+ if compress then Ok (decompress s.content |> Yojson.Safe.from_string)
+ else Error "Failed to decompress"
+ | _ -> Error "Unexpected opcode"
end
- | `Eof -> None
+ | `Eof -> Error "EOF"
let push_frame ?payload ~ev shard =
let content = match payload with
@@ -74,7 +75,7 @@ module Shard = struct
]
in
let (_, write) = shard.pipe in
- Pipe.write write @@ Frame.create ~content ()
+ Pipe.write_if_open write @@ Frame.create ~content ()
>>| fun () ->
shard
@@ -142,7 +143,7 @@ module Shard = struct
let initialize ?data shard =
let module J = Yojson.Safe.Util in
let _ = match data with
- | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ())
+ | Some data -> Ivar.fill_if_empty shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ())
| None -> raise Failure_to_Establish_Heartbeat
in
let shards = [`Int (fst shard.id); `Int (snd shard.id)] in
@@ -279,17 +280,15 @@ module Shard = struct
in
Conduit_async.V2.connect addr >>= tcp_fun
- let shutdown_clean shard =
- Logs.debug (fun m -> m "Performing clean shutdown. Shard [%d, %d]" (fst shard.id) (snd shard.id));
- Pipe.write_if_open (snd shard.pipe) (Frame.create ~opcode:Frame.Opcode.Close ~final:true ())
- >>= fun _ ->
- Ivar.fill shard.hb_stopper ();
- Writer.close ~force_close:(Deferred.never ()) (snd shard._internal)
-
- let recreate shard =
- shutdown_clean shard >>= fun () ->
- Logs.debug (fun m -> m "Relaunching shard [%d, %d]" (fst shard.id) (snd shard.id));
- create ~url:(shard.url) ~shards:(shard.id) ()
+ let shutdown ?(clean=false) ?(restart=true) t =
+ let _ = clean in
+ if not restart then t.stopped <- true;
+ Logs.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" (fst t.state.id) (snd t.state.id));
+ Pipe.write_if_open (snd t.state.pipe) (Frame.close 1001)
+ >>= fun () ->
+ Ivar.fill_if_empty t.state.hb_stopper ();
+ Pipe.close_read (fst t.state.pipe);
+ Writer.close (snd t.state._internal)
end
type t = {
@@ -309,37 +308,44 @@ let start ?count ?compress ?large_threshold () =
| None -> J.(member "shards" data |> to_int)
in
let shard_list = (0, count) in
+ Logs.info (fun m -> m "Connecting to %s" url);
let rec ev_loop (t:Shard.shard Shard.t) =
- let (read, _) = t.state.pipe in
- Pipe.read read
- >>= fun frame ->
- (match Shard.parse ~compress:t.state.compress frame with
- | Some f -> begin
- Shard.handle_frame ~f t.state
- >>| fun s -> t.state <- s; t
- end
- | None -> begin
- Logs.warn (fun m -> m "Websocket unexpectedly closed. Restarting...");
- Shard.recreate t.state
- >>| fun s -> t.state <- s; t
- end)
- >>= fun t ->
- ev_loop t
+ let step (t:Shard.shard Shard.t) =
+ Pipe.read (fst t.state.pipe) >>= fun frame ->
+ begin match Shard.parse ~compress:t.state.compress frame with
+ | Ok f ->
+ Shard.handle_frame ~f t.state >>| fun s ->
+ t.state <- s
+ | Error e ->
+ Logs.warn (fun m -> m "Websocket closed. Reason: %s" e);
+ Deferred.never ()
+ end >>| fun () -> t
+ in
+ if t.stopped then return ()
+ else step t >>= ev_loop
in
- Logs.info (fun m -> m "Connecting to %s" url);
let rec gen_shards l a =
match l with
| (id, total) when id >= total -> return a
| (id, total) ->
- Shard.create ~url ~shards:(id, total) ?compress ?large_threshold ()
- >>= fun shard ->
- let t = Shard.{ state = shard; } in
- let _ = Ivar.read t.state.hb_interval >>> fun hb ->
- Clock.every'
- ~stop:(Ivar.read t.state.hb_stopper)
- ~continue_on_error:true
- hb (fun () -> Shard.heartbeat t.state >>| ignore) in
- ev_loop t >>> (fun _ -> Logs.err (fun m -> m "Event loop unexpectedly exited."));
+ let wrap ?(reuse:Shard.shard Shard.t option) state = match reuse with
+ | Some t -> t.state <- state; return t
+ | None -> return Shard.{ state; stopped = false } in
+ let create () =
+ Shard.create ~url ~shards:(id, total) ?compress ?large_threshold ()
+ in
+ let rec bind (t:Shard.shard Shard.t) =
+ let _ = Ivar.read t.state.hb_interval >>> fun hb ->
+ Clock.every'
+ ~stop:(Ivar.read t.state.hb_stopper)
+ ~continue_on_error:true
+ hb (fun () -> Shard.heartbeat t.state >>| ignore) in
+ ev_loop t >>> ignore;
+ Pipe.closed (fst t.state.pipe) >>= (fun () ->
+ create () >>= wrap ~reuse:t >>= bind) >>> ignore;
+ return t
+ in
+ create () >>= wrap >>= bind >>= fun t ->
gen_shards (id+1, total) (t :: a)
in
gen_shards shard_list []
@@ -364,7 +370,7 @@ let request_guild_members ?query ?limit ~guild sharder =
Shard.request_guild_members ~guild ?query ?limit t.state
) sharder.shards
-let shutdown_all sharder =
+let shutdown_all ?restart sharder =
Deferred.all @@ List.map ~f:(fun t ->
- Shard.shutdown_clean t.state
+ Shard.shutdown ~clean:true ?restart t
) sharder.shards \ No newline at end of file
diff --git a/lib/sharder.mli b/lib/sharder.mli
index e00ad14..7c9c90d 100644
--- a/lib/sharder.mli
+++ b/lib/sharder.mli
@@ -37,6 +37,7 @@ module Shard : sig
(** Wrapper around an internal state, used to wrap {!shard}. *)
type 'a t = {
mutable state: 'a;
+ mutable stopped: bool;
}
(** Send a heartbeat to Discord. This is handled automatically. *)
@@ -67,7 +68,11 @@ module Shard : sig
unit ->
shard Deferred.t
- val shutdown_clean : shard -> unit Deferred.t
+ val shutdown :
+ ?clean:bool ->
+ ?restart:bool ->
+ shard t ->
+ unit Deferred.t
end
(** Calls {!Shard.set_status} for each shard registered with the sharder. *)
@@ -90,4 +95,7 @@ val request_guild_members :
t ->
Shard.shard list Deferred.t
-val shutdown_all : t -> unit list Deferred.t \ No newline at end of file
+val shutdown_all :
+ ?restart:bool ->
+ t ->
+ unit list Deferred.t