From ae0f5c08bb4fb8868ca47030557a161993f24c8f Mon Sep 17 00:00:00 2001 From: Adelyn Breedlove Date: Tue, 5 Feb 2019 21:42:34 +0000 Subject: Sharder fixes --- lib/sharder.ml | 96 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 51 insertions(+), 45 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index b613399..8570a08 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -46,6 +46,7 @@ module Shard = struct type 'a t = { mutable state: 'a; + mutable stopped: bool; } let identify_lock = Mvar.create () @@ -56,13 +57,13 @@ module Shard = struct | `Ok s -> begin let open Frame.Opcode in match s.opcode with - | Text -> Some (Yojson.Safe.from_string s.content) + | Text -> Ok (Yojson.Safe.from_string s.content) | Binary -> - if compress then Some (decompress s.content |> Yojson.Safe.from_string) - else None - | _ -> None + if compress then Ok (decompress s.content |> Yojson.Safe.from_string) + else Error "Failed to decompress" + | _ -> Error "Unexpected opcode" end - | `Eof -> None + | `Eof -> Error "EOF" let push_frame ?payload ~ev shard = let content = match payload with @@ -74,7 +75,7 @@ module Shard = struct ] in let (_, write) = shard.pipe in - Pipe.write write @@ Frame.create ~content () + Pipe.write_if_open write @@ Frame.create ~content () >>| fun () -> shard @@ -142,7 +143,7 @@ module Shard = struct let initialize ?data shard = let module J = Yojson.Safe.Util in let _ = match data with - | Some data -> Ivar.fill shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ()) + | Some data -> Ivar.fill_if_empty shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ()) | None -> raise Failure_to_Establish_Heartbeat in let shards = [`Int (fst shard.id); `Int (snd shard.id)] in @@ -279,17 +280,15 @@ module Shard = struct in Conduit_async.V2.connect addr >>= tcp_fun - let shutdown_clean shard = - Logs.debug (fun m -> m "Performing clean shutdown. Shard [%d, %d]" (fst shard.id) (snd shard.id)); - Pipe.write_if_open (snd shard.pipe) (Frame.create ~opcode:Frame.Opcode.Close ~final:true ()) - >>= fun _ -> - Ivar.fill shard.hb_stopper (); - Writer.close ~force_close:(Deferred.never ()) (snd shard._internal) - - let recreate shard = - shutdown_clean shard >>= fun () -> - Logs.debug (fun m -> m "Relaunching shard [%d, %d]" (fst shard.id) (snd shard.id)); - create ~url:(shard.url) ~shards:(shard.id) () + let shutdown ?(clean=false) ?(restart=true) t = + let _ = clean in + if not restart then t.stopped <- true; + Logs.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" (fst t.state.id) (snd t.state.id)); + Pipe.write_if_open (snd t.state.pipe) (Frame.close 1001) + >>= fun () -> + Ivar.fill_if_empty t.state.hb_stopper (); + Pipe.close_read (fst t.state.pipe); + Writer.close (snd t.state._internal) end type t = { @@ -309,37 +308,44 @@ let start ?count ?compress ?large_threshold () = | None -> J.(member "shards" data |> to_int) in let shard_list = (0, count) in + Logs.info (fun m -> m "Connecting to %s" url); let rec ev_loop (t:Shard.shard Shard.t) = - let (read, _) = t.state.pipe in - Pipe.read read - >>= fun frame -> - (match Shard.parse ~compress:t.state.compress frame with - | Some f -> begin - Shard.handle_frame ~f t.state - >>| fun s -> t.state <- s; t - end - | None -> begin - Logs.warn (fun m -> m "Websocket unexpectedly closed. Restarting..."); - Shard.recreate t.state - >>| fun s -> t.state <- s; t - end) - >>= fun t -> - ev_loop t + let step (t:Shard.shard Shard.t) = + Pipe.read (fst t.state.pipe) >>= fun frame -> + begin match Shard.parse ~compress:t.state.compress frame with + | Ok f -> + Shard.handle_frame ~f t.state >>| fun s -> + t.state <- s + | Error e -> + Logs.warn (fun m -> m "Websocket closed. Reason: %s" e); + Deferred.never () + end >>| fun () -> t + in + if t.stopped then return () + else step t >>= ev_loop in - Logs.info (fun m -> m "Connecting to %s" url); let rec gen_shards l a = match l with | (id, total) when id >= total -> return a | (id, total) -> - Shard.create ~url ~shards:(id, total) ?compress ?large_threshold () - >>= fun shard -> - let t = Shard.{ state = shard; } in - let _ = Ivar.read t.state.hb_interval >>> fun hb -> - Clock.every' - ~stop:(Ivar.read t.state.hb_stopper) - ~continue_on_error:true - hb (fun () -> Shard.heartbeat t.state >>| ignore) in - ev_loop t >>> (fun _ -> Logs.err (fun m -> m "Event loop unexpectedly exited.")); + let wrap ?(reuse:Shard.shard Shard.t option) state = match reuse with + | Some t -> t.state <- state; return t + | None -> return Shard.{ state; stopped = false } in + let create () = + Shard.create ~url ~shards:(id, total) ?compress ?large_threshold () + in + let rec bind (t:Shard.shard Shard.t) = + let _ = Ivar.read t.state.hb_interval >>> fun hb -> + Clock.every' + ~stop:(Ivar.read t.state.hb_stopper) + ~continue_on_error:true + hb (fun () -> Shard.heartbeat t.state >>| ignore) in + ev_loop t >>> ignore; + Pipe.closed (fst t.state.pipe) >>= (fun () -> + create () >>= wrap ~reuse:t >>= bind) >>> ignore; + return t + in + create () >>= wrap >>= bind >>= fun t -> gen_shards (id+1, total) (t :: a) in gen_shards shard_list [] @@ -364,7 +370,7 @@ let request_guild_members ?query ?limit ~guild sharder = Shard.request_guild_members ~guild ?query ?limit t.state ) sharder.shards -let shutdown_all sharder = +let shutdown_all ?restart sharder = Deferred.all @@ List.map ~f:(fun t -> - Shard.shutdown_clean t.state + Shard.shutdown ~clean:true ?restart t ) sharder.shards \ No newline at end of file -- cgit v1.2.3