aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAdelyn Breelove <[email protected]>2019-01-29 13:27:18 -0700
committerAdelyn Breelove <[email protected]>2019-01-29 13:27:18 -0700
commit193cd616263306c5ac21dc8b93dfd91a335a0c8b (patch)
tree0d6d14661f2ae869d8dc87babf2a01d99b73f003 /lib
parentAdd client options (diff)
downloaddisml-193cd616263306c5ac21dc8b93dfd91a335a0c8b.tar.xz
disml-193cd616263306c5ac21dc8b93dfd91a335a0c8b.zip
Compression!!!
Diffstat (limited to 'lib')
-rw-r--r--lib/client.mli2
-rw-r--r--lib/dune2
-rw-r--r--lib/sharder.ml85
-rw-r--r--lib/sharder.mli1
4 files changed, 76 insertions, 14 deletions
diff --git a/lib/client.mli b/lib/client.mli
index 7ece537..346ce1a 100644
--- a/lib/client.mli
+++ b/lib/client.mli
@@ -22,7 +22,7 @@ type t = {
Scheduler.go_main ~main ()
]}
@param ?count Optional amount of shards to launch. Defaults to autosharding.
- @param ?compress Whether to use compression over the gateway. {b NOT CURRENTLY SUPPORTED. DO NOT SET TO TRUE. }
+ @param ?compress Whether to use compression over the gateway. }
@param ?large Large threshold for guilds. Default is 250.
@param string The token used for authentication.
@return A deferred client object.
diff --git a/lib/dune b/lib/dune
index 466c939..e190d0c 100644
--- a/lib/dune
+++ b/lib/dune
@@ -20,7 +20,7 @@
event_models
client client_options disml dispatch endpoints event http impl opcode rl s sharder
)
- (libraries checkseum.ocaml core async_ssl cohttp-async decompress logs yojson websocket-async ppx_deriving_yojson.runtime)
+ (libraries checkseum.c core async_ssl cohttp-async decompress logs yojson websocket-async ppx_deriving_yojson.runtime)
(preprocess (pps ppx_jane ppx_deriving_yojson))
)
diff --git a/lib/sharder.ml b/lib/sharder.ml
index 65d137e..4b6411d 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -1,13 +1,62 @@
open Async
open Core
+open Decompress
open Websocket_async
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
+exception Inflate_error of Zlib_inflate.error
+exception Deflate_error of Zlib_deflate.error
+
+let window = Window.create ~witness:B.bytes
+
+let compress ?(level=4) data =
+ let in_buf = Bytes.create 0xFFFF in
+ let out_buf = Bytes.create 0xFFFF in
+ let pos = ref 0 in
+ let res = Buffer.create (String.length data) in
+ Zlib_deflate.bytes in_buf out_buf
+ (fun in_buf opt ->
+ let n = match opt with
+ | Some max -> min max (min 0xFFFF (String.length data - !pos))
+ | None -> min 0xFFFF (String.length data - !pos)
+ in
+ Caml.Bytes.blit_string data !pos in_buf 0 n;
+ pos := !pos + n;
+ n)
+ (fun out_buf len ->
+ Buffer.add_subbytes res out_buf 0 len; 0xFFFF)
+ (Zlib_deflate.default ~witness:B.bytes level)
+ |> function
+ | Ok _ -> Buffer.contents res
+ | Error exn -> raise (Deflate_error exn)
+
+let decompress data =
+ let in_buf = Bytes.create 0xFFFF in
+ let out_buf = Bytes.create 0xFFFF in
+ let window = Window.reset window in
+ let pos = ref 0 in
+ let res = Buffer.create (String.length data) in
+ Zlib_inflate.bytes in_buf out_buf
+ (fun in_buf ->
+ let n = min 0xFFFF (String.length data - !pos) in
+ Caml.Bytes.blit_string data !pos in_buf 0 n;
+ pos := !pos + n;
+ n)
+ (fun out_buf len ->
+ Buffer.add_subbytes res out_buf 0 len; 0xFFFF)
+ (Zlib_inflate.default ~witness:B.bytes window)
+ |> function
+ | Ok _ -> Buffer.contents res
+ | Error exn -> raise (Inflate_error exn)
+
+let try_decompress data = try decompress data
+ with Inflate_error _ -> data
module Shard = struct
type shard = {
hb_interval: Time.Span.t Ivar.t;
+ hb_stopper: unit Ivar.t;
seq: int;
session: string option;
pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t;
@@ -29,12 +78,17 @@ module Shard = struct
| `Ok s -> begin
let open Frame.Opcode in
match s.opcode with
- | Text -> Some (Yojson.Safe.from_string s.content)
+ | Text | Binary ->
+ let s = if !Client_options.compress then try_decompress s.content
+ else s.content in
+ (* Logs.debug (fun m -> m "%s" s); *)
+ Some (Yojson.Safe.from_string s)
| _ -> None
end
| `Eof -> None
- let push_frame ?payload ~ev shard =
+ let push_frame ?flate ?payload ~ev shard =
+ let flate = Option.value ~default:!Client_options.compress flate in
let content = match payload with
| None -> ""
| Some p ->
@@ -43,18 +97,19 @@ module Shard = struct
("d", p);
]
in
+ let content = if flate then compress content
+ else content in
let (_, write) = shard.pipe in
Pipe.write write @@ Frame.create ~content ()
>>| fun () ->
shard
let heartbeat shard =
- Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
- let payload = match shard.seq with
- | 0 -> `Null
- | i -> `Int i
- in
- push_frame ~payload ~ev:HEARTBEAT shard
+ match shard.seq with
+ | 0 -> return shard
+ | i ->
+ Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
+ push_frame ~flate:false ~payload:(`Int i) ~ev:HEARTBEAT shard
let dispatch ~payload shard =
let module J = Yojson.Safe.Util in
@@ -127,11 +182,11 @@ module Shard = struct
("$device", `String "dis.ml");
("$browser", `String "dis.ml")
]);
- ("compress", `Bool !Client_options.compress); (* TODO add compression handling*)
+ ("compress", `Bool !Client_options.compress);
("large_threshold", `Int !Client_options.large_threshold);
("shard", `List shards);
] in
- push_frame ~payload ~ev:IDENTIFY shard
+ push_frame ~flate:false ~payload ~ev:IDENTIFY shard
>>| fun s -> s
end
| Some s ->
@@ -140,7 +195,7 @@ module Shard = struct
("session_id", `String s);
("seq", `Int shard.seq)
] in
- push_frame ~payload ~ev:RESUME shard
+ push_frame ~flate:false ~payload ~ev:RESUME shard
let handle_frame ~f shard =
let module J = Yojson.Safe.Util in
@@ -226,6 +281,7 @@ module Shard = struct
pipe = (read, write);
ready = Ivar.create ();
hb_interval = Ivar.create ();
+ hb_stopper = Ivar.create ();
seq = 0;
id = shards;
session = None;
@@ -251,6 +307,7 @@ module Shard = struct
let shutdown_clean shard =
let (_,w) = shard._internal in
+ Ivar.fill shard.hb_stopper ();
Writer.close w
let recreate shard =
@@ -298,7 +355,11 @@ let start ?count () =
Shard.create ~url ~shards:(id, total) ()
>>= fun shard ->
let t = Shard.{ state = shard; } in
- let _ = Ivar.read t.state.hb_interval >>> fun hb -> Clock.every ~continue_on_error:true hb (fun () -> Shard.heartbeat t.state >>> ignore) in
+ let _ = Ivar.read t.state.hb_interval >>> fun hb ->
+ Clock.every'
+ ~stop:(Ivar.read t.state.hb_stopper)
+ ~continue_on_error:true
+ hb (fun () -> Shard.heartbeat t.state >>| ignore) in
ev_loop t >>> ignore;
gen_shards (id+1, total) (t :: a)
in
diff --git a/lib/sharder.mli b/lib/sharder.mli
index 92c4178..5017a86 100644
--- a/lib/sharder.mli
+++ b/lib/sharder.mli
@@ -20,6 +20,7 @@ module Shard : sig
(** Representation of the state of a shard. *)
type shard = {
hb_interval: Time.Span.t Ivar.t; (** Time span between heartbeats, wrapped in an Ivar. *)
+ hb_stopper: unit Ivar.t; (** Stops the heartbeat sequencer when filled. *)
seq: int; (** Current sequence number *)
session: string option; (** Session id, if one exists. *)
pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; (** Raw frame IO pipe used for websocket communications. *)