1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
open Lwt.Infix
open Websocket
type data = {
shards: int list;
token: string;
url: string;
}
module Shard = struct
type t = {
mutable hb: Lwt_engine.event option;
mutable seq: int;
session: string option;
token: string;
shard: int list;
send: Frame.t -> unit Lwt.t;
recv: unit -> Frame.t Lwt.t;
}
let parse (frame : Frame.t) =
try
frame.content
|> Yojson.Basic.from_string
with Yojson.Json_error err ->
print_endline err;
`String ""
let encode term =
let content = term |> Yojson.Basic.to_string in
Frame.create ~content ()
let push_frame ?payload shard (ev : Opcode.t) =
print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
let content = match payload with
| None -> None
| Some p ->
Some (Yojson.Basic.to_string (`Assoc [
("op", `Int (Opcode.to_int ev));
("d", p);
]))
in
let frame = Frame.create ?content () in
shard.send frame
|> ignore;
shard
let initialize shard data =
print_endline "Initializing...";
let hb = match shard.hb with
| None -> begin
let hb_interval = List.assoc "heartbeat_interval" @@
Yojson.Basic.Util.to_assoc data
|> Yojson.Basic.Util.to_int
in
let seq = match shard.seq with
| 0 -> `Null
| i -> `Int i
in
let payload = `Assoc [
("op", `Int 1);
("d", seq);
] in
Lwt_engine.on_timer
(Float.of_int hb_interval)
true
(fun _ev -> push_frame ~payload shard HEARTBEAT |> ignore)
end
| Some s -> s
in
let shard = { shard with hb = Some hb; } in
match shard.session with
| None ->
let payload = `Assoc [
("token", `String shard.token);
("properties", `Assoc [
("$os", `String Sys.os_type);
("$device", `String "animus");
("$browser", `String "animus")
]);
("compress", `Bool false); (* TODO add compression handling*)
("large_threshold", `Int 250);
("shard", `List (List.map (fun i -> `Int i) shard.shard))
] in
push_frame ~payload shard IDENTIFY
| Some s ->
let payload = `Assoc [
("token", `String shard.token);
("session_id", `String s);
("seq", `Int shard.seq)
] in
push_frame ~payload shard RECONNECT
let handle_frame shard (term : Yojson.Basic.json) =
Yojson.Basic.pretty_print Format.std_formatter term;
print_newline ();
match term with
| `Assoc term -> begin
let op = List.assoc "op" term
|> Yojson.Basic.Util.to_int
|> Opcode.from_int
in
match op with
| DISPATCH -> print_endline "OP 0"; shard (* TODO dispatch *)
| HEARTBEAT -> push_frame shard HEARTBEAT
| RECONNECT -> print_endline "OP 7"; shard (* TODO reconnect *)
| INVALID_SESSION -> print_endline "OP 9"; shard (* TODO invalid session *)
| HELLO ->
let data = List.assoc "d" term in
initialize shard data
| _opcode -> print_endline "no match"; shard
end
| _ -> shard
let create data =
let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in
let http_uri = Uri.with_scheme uri (Some "https") in
let headers = Http.Base.process_request_headers () in
Resolver_lwt.resolve_uri ~uri:http_uri Resolver_lwt_unix.system >>= fun endp ->
let ctx = Conduit_lwt_unix.default_ctx in
Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client ->
Websocket_lwt_unix.with_connection
~extra_headers:headers
client
uri
>>= fun (recv, send) ->
let rec recv_forever shard = begin
recv ()
>>= fun frame ->
Lwt.return @@ handle_frame shard @@ parse frame
>>= fun shard -> recv_forever shard
end in
let shard = {
send;
recv;
hb = None;
seq = 0;
shard = data.shards;
session = None;
token = data.token;
} in
recv_forever shard
end
type t = {
shards: Shard.t list;
}
|