aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAdelyn Breelove <[email protected]>2019-01-29 13:38:27 -0700
committerAdelyn Breelove <[email protected]>2019-01-29 13:38:27 -0700
commit43ad34645cd2dd18553a5e647a951963b91f322e (patch)
treee91364ae70b9b009d60fadb4d6a1dfe9a82b73e7 /lib
parentCompression!!! (diff)
downloaddisml-43ad34645cd2dd18553a5e647a951963b91f322e.tar.xz
disml-43ad34645cd2dd18553a5e647a951963b91f322e.zip
A few improvements to how compression is handled internally
Diffstat (limited to 'lib')
-rw-r--r--lib/client.ml5
-rw-r--r--lib/client_options.ml3
-rw-r--r--lib/client_options.mli3
-rw-r--r--lib/sharder.ml55
-rw-r--r--lib/sharder.mli9
5 files changed, 26 insertions, 49 deletions
diff --git a/lib/client.ml b/lib/client.ml
index dd69910..8241046 100644
--- a/lib/client.ml
+++ b/lib/client.ml
@@ -6,11 +6,10 @@ type t = {
sharder: Sharder.t;
}
-let start ?count ?(compress=false) ?(large=250) token =
+let start ?count ?compress ?(large=250) token =
Client_options.token := token;
- Client_options.compress := compress;
Client_options.large_threshold := large;
- Sharder.start ?count ()
+ Sharder.start ?count ?compress ()
>>| fun sharder ->
{ sharder; }
diff --git a/lib/client_options.ml b/lib/client_options.ml
index 6a0d261..0b52006 100644
--- a/lib/client_options.ml
+++ b/lib/client_options.ml
@@ -1,3 +1,2 @@
let token = ref ""
-let large_threshold = ref 250
-let compress = ref false \ No newline at end of file
+let large_threshold = ref 250 \ No newline at end of file
diff --git a/lib/client_options.mli b/lib/client_options.mli
index 455ed58..42d3eee 100644
--- a/lib/client_options.mli
+++ b/lib/client_options.mli
@@ -1,4 +1,3 @@
(** Token that is set when using {!Client.start} *)
val token : string ref
-val large_threshold : int ref
-val compress : bool ref \ No newline at end of file
+val large_threshold : int ref \ No newline at end of file
diff --git a/lib/sharder.ml b/lib/sharder.ml
index 4b6411d..d8470fd 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -6,31 +6,9 @@ open Websocket_async
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
exception Inflate_error of Zlib_inflate.error
-exception Deflate_error of Zlib_deflate.error
let window = Window.create ~witness:B.bytes
-let compress ?(level=4) data =
- let in_buf = Bytes.create 0xFFFF in
- let out_buf = Bytes.create 0xFFFF in
- let pos = ref 0 in
- let res = Buffer.create (String.length data) in
- Zlib_deflate.bytes in_buf out_buf
- (fun in_buf opt ->
- let n = match opt with
- | Some max -> min max (min 0xFFFF (String.length data - !pos))
- | None -> min 0xFFFF (String.length data - !pos)
- in
- Caml.Bytes.blit_string data !pos in_buf 0 n;
- pos := !pos + n;
- n)
- (fun out_buf len ->
- Buffer.add_subbytes res out_buf 0 len; 0xFFFF)
- (Zlib_deflate.default ~witness:B.bytes level)
- |> function
- | Ok _ -> Buffer.contents res
- | Error exn -> raise (Deflate_error exn)
-
let decompress data =
let in_buf = Bytes.create 0xFFFF in
let out_buf = Bytes.create 0xFFFF in
@@ -55,14 +33,15 @@ let try_decompress data = try decompress data
module Shard = struct
type shard = {
+ compress: bool;
+ id: int * int;
hb_interval: Time.Span.t Ivar.t;
hb_stopper: unit Ivar.t;
- seq: int;
- session: string option;
pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t;
ready: unit Ivar.t;
+ seq: int;
+ session: string option;
url: string;
- id: int * int;
_internal: Reader.t * Writer.t;
}
@@ -73,13 +52,13 @@ module Shard = struct
let identify_lock = Mvar.create ()
let _ = Mvar.set identify_lock ()
- let parse (frame:[`Ok of Frame.t | `Eof]) =
+ let parse ~compress (frame:[`Ok of Frame.t | `Eof]) =
match frame with
| `Ok s -> begin
let open Frame.Opcode in
match s.opcode with
| Text | Binary ->
- let s = if !Client_options.compress then try_decompress s.content
+ let s = if compress then try_decompress s.content
else s.content in
(* Logs.debug (fun m -> m "%s" s); *)
Some (Yojson.Safe.from_string s)
@@ -87,8 +66,7 @@ module Shard = struct
end
| `Eof -> None
- let push_frame ?flate ?payload ~ev shard =
- let flate = Option.value ~default:!Client_options.compress flate in
+ let push_frame ?payload ~ev shard =
let content = match payload with
| None -> ""
| Some p ->
@@ -97,8 +75,6 @@ module Shard = struct
("d", p);
]
in
- let content = if flate then compress content
- else content in
let (_, write) = shard.pipe in
Pipe.write write @@ Frame.create ~content ()
>>| fun () ->
@@ -109,7 +85,7 @@ module Shard = struct
| 0 -> return shard
| i ->
Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
- push_frame ~flate:false ~payload:(`Int i) ~ev:HEARTBEAT shard
+ push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
let dispatch ~payload shard =
let module J = Yojson.Safe.Util in
@@ -182,11 +158,11 @@ module Shard = struct
("$device", `String "dis.ml");
("$browser", `String "dis.ml")
]);
- ("compress", `Bool !Client_options.compress);
+ ("compress", `Bool shard.compress);
("large_threshold", `Int !Client_options.large_threshold);
("shard", `List shards);
] in
- push_frame ~flate:false ~payload ~ev:IDENTIFY shard
+ push_frame ~payload ~ev:IDENTIFY shard
>>| fun s -> s
end
| Some s ->
@@ -195,7 +171,7 @@ module Shard = struct
("session_id", `String s);
("seq", `Int shard.seq)
] in
- push_frame ~flate:false ~payload ~ev:RESUME shard
+ push_frame ~payload ~ev:RESUME shard
let handle_frame ~f shard =
let module J = Yojson.Safe.Util in
@@ -250,7 +226,7 @@ module Shard = struct
uri)
- let create ~url ~shards () =
+ let create ~url ~shards ?(compress=true) () =
let open Core in
let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in
let extra_headers = Http.Base.process_request_headers () in
@@ -286,6 +262,7 @@ module Shard = struct
id = shards;
session = None;
url;
+ compress;
_internal = (net_to_ws, ws_to_net);
}
in
@@ -319,7 +296,7 @@ type t = {
shards: (Shard.shard Shard.t) list;
}
-let start ?count () =
+let start ?count ?compress () =
let module J = Yojson.Safe.Util in
Http.get_gateway_bot () >>= fun data ->
let data = match data with
@@ -336,7 +313,7 @@ let start ?count () =
let (read, _) = t.state.pipe in
Pipe.read read
>>= fun frame ->
- (match Shard.parse frame with
+ (match Shard.parse ~compress:t.state.compress frame with
| Some f -> begin
Shard.handle_frame ~f t.state
>>| fun s -> t.state <- s; t
@@ -352,7 +329,7 @@ let start ?count () =
match l with
| (id, total) when id >= total -> return a
| (id, total) ->
- Shard.create ~url ~shards:(id, total) ()
+ Shard.create ~url ~shards:(id, total) ?compress ()
>>= fun shard ->
let t = Shard.{ state = shard; } in
let _ = Ivar.read t.state.hb_interval >>> fun hb ->
diff --git a/lib/sharder.mli b/lib/sharder.mli
index 5017a86..f5002ee 100644
--- a/lib/sharder.mli
+++ b/lib/sharder.mli
@@ -12,6 +12,7 @@ type t
(** Start the Sharder. This is called by {!Client.start}. *)
val start :
?count:int ->
+ ?compress:bool ->
unit ->
t Deferred.t
@@ -19,14 +20,15 @@ val start :
module Shard : sig
(** Representation of the state of a shard. *)
type shard = {
+ compress: bool; (** Whether to compress payloads. *)
+ id: int * int; (** A tuple as expected by Discord. First element is the current shard index, second element is the total shard count. *)
hb_interval: Time.Span.t Ivar.t; (** Time span between heartbeats, wrapped in an Ivar. *)
hb_stopper: unit Ivar.t; (** Stops the heartbeat sequencer when filled. *)
- seq: int; (** Current sequence number *)
- session: string option; (** Session id, if one exists. *)
pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; (** Raw frame IO pipe used for websocket communications. *)
ready: unit Ivar.t; (** A simple Ivar indicating if the shard has received READY. *)
+ seq: int; (** Current sequence number *)
+ session: string option; (** Session id, if one exists. *)
url: string; (** The websocket URL in use. *)
- id: int * int; (** A tuple as expected by Discord. First element is the current shard index, second element is the total shard count. *)
_internal: Reader.t * Writer.t;
}
@@ -58,6 +60,7 @@ module Shard : sig
val create :
url:string ->
shards:int * int ->
+ ?compress:bool ->
unit ->
shard Deferred.t