From 193cd616263306c5ac21dc8b93dfd91a335a0c8b Mon Sep 17 00:00:00 2001 From: Adelyn Breelove Date: Tue, 29 Jan 2019 13:27:18 -0700 Subject: Compression!!! --- lib/sharder.ml | 85 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 12 deletions(-) (limited to 'lib/sharder.ml') diff --git a/lib/sharder.ml b/lib/sharder.ml index 65d137e..4b6411d 100644 --- a/lib/sharder.ml +++ b/lib/sharder.ml @@ -1,13 +1,62 @@ open Async open Core +open Decompress open Websocket_async exception Invalid_Payload exception Failure_to_Establish_Heartbeat +exception Inflate_error of Zlib_inflate.error +exception Deflate_error of Zlib_deflate.error + +let window = Window.create ~witness:B.bytes + +let compress ?(level=4) data = + let in_buf = Bytes.create 0xFFFF in + let out_buf = Bytes.create 0xFFFF in + let pos = ref 0 in + let res = Buffer.create (String.length data) in + Zlib_deflate.bytes in_buf out_buf + (fun in_buf opt -> + let n = match opt with + | Some max -> min max (min 0xFFFF (String.length data - !pos)) + | None -> min 0xFFFF (String.length data - !pos) + in + Caml.Bytes.blit_string data !pos in_buf 0 n; + pos := !pos + n; + n) + (fun out_buf len -> + Buffer.add_subbytes res out_buf 0 len; 0xFFFF) + (Zlib_deflate.default ~witness:B.bytes level) + |> function + | Ok _ -> Buffer.contents res + | Error exn -> raise (Deflate_error exn) + +let decompress data = + let in_buf = Bytes.create 0xFFFF in + let out_buf = Bytes.create 0xFFFF in + let window = Window.reset window in + let pos = ref 0 in + let res = Buffer.create (String.length data) in + Zlib_inflate.bytes in_buf out_buf + (fun in_buf -> + let n = min 0xFFFF (String.length data - !pos) in + Caml.Bytes.blit_string data !pos in_buf 0 n; + pos := !pos + n; + n) + (fun out_buf len -> + Buffer.add_subbytes res out_buf 0 len; 0xFFFF) + (Zlib_inflate.default ~witness:B.bytes window) + |> function + | Ok _ -> Buffer.contents res + | Error exn -> raise (Inflate_error exn) + +let try_decompress data = try decompress data + with Inflate_error _ -> data module Shard = struct type shard = { hb_interval: Time.Span.t Ivar.t; + hb_stopper: unit Ivar.t; seq: int; session: string option; pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; @@ -29,12 +78,17 @@ module Shard = struct | `Ok s -> begin let open Frame.Opcode in match s.opcode with - | Text -> Some (Yojson.Safe.from_string s.content) + | Text | Binary -> + let s = if !Client_options.compress then try_decompress s.content + else s.content in + (* Logs.debug (fun m -> m "%s" s); *) + Some (Yojson.Safe.from_string s) | _ -> None end | `Eof -> None - let push_frame ?payload ~ev shard = + let push_frame ?flate ?payload ~ev shard = + let flate = Option.value ~default:!Client_options.compress flate in let content = match payload with | None -> "" | Some p -> @@ -43,18 +97,19 @@ module Shard = struct ("d", p); ] in + let content = if flate then compress content + else content in let (_, write) = shard.pipe in Pipe.write write @@ Frame.create ~content () >>| fun () -> shard let heartbeat shard = - Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq)); - let payload = match shard.seq with - | 0 -> `Null - | i -> `Int i - in - push_frame ~payload ~ev:HEARTBEAT shard + match shard.seq with + | 0 -> return shard + | i -> + Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq)); + push_frame ~flate:false ~payload:(`Int i) ~ev:HEARTBEAT shard let dispatch ~payload shard = let module J = Yojson.Safe.Util in @@ -127,11 +182,11 @@ module Shard = struct ("$device", `String "dis.ml"); ("$browser", `String "dis.ml") ]); - ("compress", `Bool !Client_options.compress); (* TODO add compression handling*) + ("compress", `Bool !Client_options.compress); ("large_threshold", `Int !Client_options.large_threshold); ("shard", `List shards); ] in - push_frame ~payload ~ev:IDENTIFY shard + push_frame ~flate:false ~payload ~ev:IDENTIFY shard >>| fun s -> s end | Some s -> @@ -140,7 +195,7 @@ module Shard = struct ("session_id", `String s); ("seq", `Int shard.seq) ] in - push_frame ~payload ~ev:RESUME shard + push_frame ~flate:false ~payload ~ev:RESUME shard let handle_frame ~f shard = let module J = Yojson.Safe.Util in @@ -226,6 +281,7 @@ module Shard = struct pipe = (read, write); ready = Ivar.create (); hb_interval = Ivar.create (); + hb_stopper = Ivar.create (); seq = 0; id = shards; session = None; @@ -251,6 +307,7 @@ module Shard = struct let shutdown_clean shard = let (_,w) = shard._internal in + Ivar.fill shard.hb_stopper (); Writer.close w let recreate shard = @@ -298,7 +355,11 @@ let start ?count () = Shard.create ~url ~shards:(id, total) () >>= fun shard -> let t = Shard.{ state = shard; } in - let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in + let _ = Ivar.read t.state.hb_interval >>> fun hb -> + Clock.every' + ~stop:(Ivar.read t.state.hb_stopper) + ~continue_on_error:true + hb (fun () -> Shard.heartbeat t.state >>| ignore) in ev_loop t >>> ignore; gen_shards (id+1, total) (t :: a) in -- cgit v1.2.3