aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/sharder.ml46
1 files changed, 22 insertions, 24 deletions
diff --git a/lib/sharder.ml b/lib/sharder.ml
index 74870bd..27b3b41 100644
--- a/lib/sharder.ml
+++ b/lib/sharder.ml
@@ -204,25 +204,9 @@ module Shard = struct
~net_to_ws
~ws_to_net
uri
- >>> ignore;
+ >>> ignore; (* TODO this needs to error check and retry with backoff *)
Ivar.read initialized >>| fun () ->
- let rec ev_loop t =
- let (read, _) = t.shard.pipe in
- Pipe.read read
- >>= fun frame ->
- (match parse frame with
- | Some f -> begin
- handle_frame ~f t.shard
- >>| fun shard ->
- t.shard <- shard;
- t
- end
- | None -> recreate t.shard)
- >>= fun t ->
- List.iter ~f:(fun f -> f t.shard) t.binds;
- ev_loop t
- in
- let shard = {
+ {
pipe = (read, write);
ready = Ivar.create ();
hb = None;
@@ -232,10 +216,6 @@ module Shard = struct
token;
url;
}
- in
- let t = { shard; binds = []; } in
- ev_loop t >>> ignore;
- t
in
match Unix.getaddrinfo host (string_of_int port) [] with
| [] -> failwithf "DNS resolution failed for %s" host ()
@@ -262,7 +242,7 @@ module Shard = struct
end
type t = {
- mutable shards: (Shard.shard Shard.t) list;
+ shards: (Shard.shard Shard.t) list;
}
let start ?count token =
@@ -280,7 +260,25 @@ let start ?count token =
| (id, total) ->
Shard.create ~url ~shards:(id, total) ~token ()
>>= fun shard ->
- let a = shard :: a in
+ let rec ev_loop t =
+ let (read, _) = t.shard.pipe in
+ Pipe.read read
+ >>= fun frame ->
+ (match parse frame with
+ | Some f -> begin
+ handle_frame ~f t.shard
+ >>| fun shard ->
+ t.shard <- shard;
+ t
+ end
+ | None -> recreate t.shard)
+ >>= fun t ->
+ List.iter ~f:(fun f -> f t.shard) t.binds;
+ ev_loop t
+ in
+ let t = { shard; binds = []; } in
+ ev_loop t >>> ignore;
+ let a = t :: a in
gen_shards (id+1, total) a
in
gen_shards shard_list []