aboutsummaryrefslogtreecommitdiff
path: root/lib/client/sharder.ml
blob: df5767ed5ca2a206c042278736fc2322237b8316 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
open Lwt.Infix
open Websocket

type data = {
    shards: int list;
    token: string;
    url: string;
}

module Shard = struct
    type t = {
        mutable hb: Lwt_engine.event option;
        mutable seq: int;
        session: string option;
        token: string;
        shard: int list;
        send: Frame.t -> unit Lwt.t;
        recv: unit -> Frame.t Lwt.t;
    }

    let parse (frame : Frame.t) =
        try
            frame.content
            |> Yojson.Basic.from_string
        with Yojson.Json_error err ->
            print_endline err;
            `String ""

    let encode term =
        let content = term |> Yojson.Basic.to_string in
        Frame.create ~content ()

    let push_frame ?payload shard (ev : Opcode.t) =
        print_endline @@ "Pushing frame. OP: " ^ Opcode.to_string @@ ev;
        let content = match payload with
        | None -> None
        | Some p ->
            Some (Yojson.Basic.to_string (`Assoc [
            ("op", `Int (Opcode.to_int ev));
            ("d", p);
        ]))
        in
        let frame = Frame.create ?content () in
        shard.send frame
        |> ignore;
        shard

    let initialize shard data =
        print_endline "Initializing...";
        let hb = match shard.hb with
        | None -> begin
            let hb_interval = List.assoc "heartbeat_interval" @@
                Yojson.Basic.Util.to_assoc data
                |> Yojson.Basic.Util.to_int
            in
            let seq = match shard.seq with
            | 0 -> `Null
            | i -> `Int i
            in
            let payload = `Assoc [
                ("op", `Int 1);
                ("d", seq);
            ] in
            Lwt_engine.on_timer
            (Float.of_int hb_interval)
            true
            (fun _ev -> push_frame ~payload shard HEARTBEAT |> ignore)
        end
        | Some s -> s
        in
        let shard = { shard with hb = Some hb; } in
        match shard.session with
        | None ->
            let payload = `Assoc [
                ("token", `String shard.token);
                ("properties", `Assoc [
                    ("$os", `String Sys.os_type);
                    ("$device", `String "animus");
                    ("$browser", `String "animus")
                ]);
                ("compress", `Bool false); (* TODO add compression handling*)
                ("large_threshold", `Int 250);
                ("shard", `List (List.map (fun i -> `Int i) shard.shard))
            ] in
            push_frame ~payload shard IDENTIFY
        | Some s ->
            let payload = `Assoc [
                ("token", `String shard.token);
                ("session_id", `String s);
                ("seq", `Int shard.seq)
            ] in
            push_frame ~payload shard RECONNECT



    let handle_frame shard (term : Yojson.Basic.json) =
        Yojson.Basic.pretty_print Format.std_formatter term;
        print_newline ();
        match term with
        | `Assoc term -> begin
            let op = List.assoc "op" term
            |> Yojson.Basic.Util.to_int
            |> Opcode.from_int
            in
            match op with
            | DISPATCH -> print_endline "OP 0"; shard (* TODO dispatch *)
            | HEARTBEAT -> push_frame shard HEARTBEAT
            | RECONNECT -> print_endline "OP 7"; shard (* TODO reconnect *)
            | INVALID_SESSION -> print_endline "OP 9"; shard (* TODO invalid session *)
            | HELLO ->
                let data = List.assoc "d" term in
                initialize shard data
            | _opcode -> print_endline "no match"; shard
        end
        | _ -> shard

    let create data =
        let uri = (data.url ^ "?v=7&encoding=json") |> Uri.of_string in
        let http_uri = Uri.with_scheme uri (Some "https") in
        let headers = Http.Base.process_request_headers () in
        Resolver_lwt.resolve_uri ~uri:http_uri Resolver_lwt_unix.system >>= fun endp ->
        let ctx = Conduit_lwt_unix.default_ctx in
        Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client ->
        Websocket_lwt_unix.with_connection
            ~extra_headers:headers
            client
            uri
        >>= fun (recv, send) ->
        let rec recv_forever shard = begin
            recv ()
            >>= fun frame ->
            Lwt.return @@ handle_frame shard @@ parse frame
            >>= fun shard -> recv_forever shard
        end in
        let shard = {
            send;
            recv;
            hb = None;
            seq = 0;
            shard = data.shards;
            session = None;
            token = data.token;
        } in
        recv_forever shard
end

type t = {
    shards: Shard.t list;
}