From 43ad34645cd2dd18553a5e647a951963b91f322e Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Tue, 29 Jan 2019 13:38:27 -0700 Subject: A few improvements to how compression is handled internally --- lib/sharder.ml | 55 ++++++++++++++++--------------------------------------- 1 file changed, 16 insertions(+), 39 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 4b6411d..d8470fd 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -6,31 +6,9 @@ open Websocket_async exception Invalid_Payload exception Failure_to_Establish_Heartbeat exception Inflate_error of Zlib_inflate.error -exception Deflate_error of Zlib_deflate.error let window = Window.create ~witness:B.bytes -let compress ?(level=4) data = - let in_buf = Bytes.create 0xFFFF in - let out_buf = Bytes.create 0xFFFF in - let pos = ref 0 in - let res = Buffer.create (String.length data) in - Zlib_deflate.bytes in_buf out_buf - (fun in_buf opt -> - let n = match opt with - | Some max -> min max (min 0xFFFF (String.length data - !pos)) - | None -> min 0xFFFF (String.length data - !pos) - in - Caml.Bytes.blit_string data !pos in_buf 0 n; - pos := !pos + n; - n) - (fun out_buf len -> - Buffer.add_subbytes res out_buf 0 len; 0xFFFF) - (Zlib_deflate.default ~witness:B.bytes level) - |> function - | Ok _ -> Buffer.contents res - | Error exn -> raise (Deflate_error exn) - let decompress data = let in_buf = Bytes.create 0xFFFF in let out_buf = Bytes.create 0xFFFF in @@ -55,14 +33,15 @@ let try_decompress data = try decompress data module Shard = struct type shard = { + compress: bool; + id: int * int; hb_interval: Time.Span.t Ivar.t; hb_stopper: unit Ivar.t; - seq: int; - session: string option; pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; ready: unit Ivar.t; + seq: int; + session: string option; url: string; - id: int * int; _internal: Reader.t * Writer.t; } @@ -73,13 +52,13 @@ module Shard = struct let identify_lock = Mvar.create () let _ = Mvar.set identify_lock () - let parse (frame:[`Ok of Frame.t | `Eof]) = + let parse ~compress (frame:[`Ok of Frame.t | `Eof]) = match frame with | `Ok s -> begin let open Frame.Opcode in match s.opcode with | Text | Binary -> - let s = if !Client_options.compress then try_decompress s.content + let s = if compress then try_decompress s.content else s.content in (* Logs.debug (fun m -> m "%s" s); *) Some (Yojson.Safe.from_string s) @@ -87,8 +66,7 @@ module Shard = struct end | `Eof -> None - let push_frame ?flate ?payload ~ev shard = - let flate = Option.value ~default:!Client_options.compress flate in + let push_frame ?payload ~ev shard = let content = match payload with | None -> "" | Some p -> @@ -97,8 +75,6 @@ module Shard = struct ("d", p); ] in - let content = if flate then compress content - else content in let (_, write) = shard.pipe in Pipe.write write @@ Frame.create ~content () >>| fun () -> @@ -109,7 +85,7 @@ module Shard = struct | 0 -> return shard | i -> Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq)); - push_frame ~flate:false ~payload:(`Int i) ~ev:HEARTBEAT shard + push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard let dispatch ~payload shard = let module J = Yojson.Safe.Util in @@ -182,11 +158,11 @@ module Shard = struct ("$device", `String "dis.ml"); ("$browser", `String "dis.ml") ]); - ("compress", `Bool !Client_options.compress); + ("compress", `Bool shard.compress); ("large_threshold", `Int !Client_options.large_threshold); ("shard", `List shards); ] in - push_frame ~flate:false ~payload ~ev:IDENTIFY shard + push_frame ~payload ~ev:IDENTIFY shard >>| fun s -> s end | Some s -> @@ -195,7 +171,7 @@ module Shard = struct ("session_id", `String s); ("seq", `Int shard.seq) ] in - push_frame ~flate:false ~payload ~ev:RESUME shard + push_frame ~payload ~ev:RESUME shard let handle_frame ~f shard = let module J = Yojson.Safe.Util in @@ -250,7 +226,7 @@ module Shard = struct uri) - let create ~url ~shards () = + let create ~url ~shards ?(compress=true) () = let open Core in let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in let extra_headers = Http.Base.process_request_headers () in @@ -286,6 +262,7 @@ module Shard = struct id = shards; session = None; url; + compress; _internal = (net_to_ws, ws_to_net); } in @@ -319,7 +296,7 @@ type t = { shards: (Shard.shard Shard.t) list; } -let start ?count () = +let start ?count ?compress () = let module J = Yojson.Safe.Util in Http.get_gateway_bot () >>= fun data -> let data = match data with @@ -336,7 +313,7 @@ let start ?count () = let (read, _) = t.state.pipe in Pipe.read read >>= fun frame -> - (match Shard.parse frame with + (match Shard.parse ~compress:t.state.compress frame with | Some f -> begin Shard.handle_frame ~f t.state >>| fun s -> t.state <- s; t @@ -352,7 +329,7 @@ let start ?count () = match l with | (id, total) when id >= total -> return a | (id, total) -> - Shard.create ~url ~shards:(id, total) () + Shard.create ~url ~shards:(id, total) ?compress () >>= fun shard -> let t = Shard.{ state = shard; } in let _ = Ivar.read t.state.hb_interval >>> fun hb -> -- cgit v1.2.3