aboutsummaryrefslogtreecommitdiff
path: root/lib/gateway
diff options
context:
space:
mode:
authorAdelyn Breedlove <[email protected]>2019-03-02 18:30:46 -0700
committerAdelyn Breedlove <[email protected]>2019-03-02 18:30:46 -0700
commit6163027a715b31d87e1f8e4fde8f7f3b4db2bc42 (patch)
treed84b6f956fd43d8a3bb2dff3a1bde9e27dcc1cc2 /lib/gateway
parentStyle improvements who dis (diff)
downloaddisml-6163027a715b31d87e1f8e4fde8f7f3b4db2bc42.tar.xz
disml-6163027a715b31d87e1f8e4fde8f7f3b4db2bc42.zip
Initial Lwt changes. Successfully compiles
Diffstat (limited to 'lib/gateway')
-rw-r--r--lib/gateway/dispatch.ml69
-rw-r--r--lib/gateway/dispatch.mli68
-rw-r--r--lib/gateway/event.ml163
-rw-r--r--lib/gateway/event.mli4
-rw-r--r--lib/gateway/opcode.ml25
-rw-r--r--lib/gateway/opcode.mli1
-rw-r--r--lib/gateway/sharder.ml289
-rw-r--r--lib/gateway/sharder.mli51
8 files changed, 316 insertions, 354 deletions
diff --git a/lib/gateway/dispatch.ml b/lib/gateway/dispatch.ml
index 6776ff2..593896c 100644
--- a/lib/gateway/dispatch.ml
+++ b/lib/gateway/dispatch.ml
@@ -1,37 +1,36 @@
-open Core
open Event_models
-let ready = ref (fun (_:Ready.t) -> ())
-let resumed = ref (fun (_:Resumed.t) -> ())
-let channel_create = ref (fun (_:ChannelCreate.t) -> ())
-let channel_update = ref (fun (_:ChannelUpdate.t) -> ())
-let channel_delete = ref (fun (_:ChannelDelete.t) -> ())
-let channel_pins_update = ref (fun (_:ChannelPinsUpdate.t) -> ())
-let guild_create = ref (fun (_:GuildCreate.t) -> ())
-let guild_update = ref (fun (_:GuildUpdate.t) -> ())
-let guild_delete = ref (fun (_:GuildDelete.t) -> ())
-let member_ban = ref (fun (_:GuildBanAdd.t) -> ())
-let member_unban = ref (fun (_:GuildBanRemove.t) -> ())
-let guild_emojis_update = ref (fun (_:GuildEmojisUpdate.t) -> ())
-(* let integrations_update = ref (fun (_:Yojson.Safe.t) -> ()) *)
-let member_join = ref (fun (_:GuildMemberAdd.t) -> ())
-let member_leave = ref (fun (_:GuildMemberRemove.t) -> ())
-let member_update = ref (fun (_:GuildMemberUpdate.t) -> ())
-let members_chunk = ref (fun (_:GuildMembersChunk.t) -> ())
-let role_create = ref (fun (_:GuildRoleCreate.t) -> ())
-let role_update = ref (fun (_:GuildRoleUpdate.t) -> ())
-let role_delete = ref (fun (_:GuildRoleDelete.t) -> ())
-let message_create = ref (fun (_:MessageCreate.t) -> ())
-let message_update = ref (fun (_:MessageUpdate.t) -> ())
-let message_delete = ref (fun (_:MessageDelete.t) -> ())
-let message_delete_bulk = ref (fun (_:MessageDeleteBulk.t) -> ())
-let reaction_add = ref (fun (_:ReactionAdd.t) -> ())
-let reaction_remove = ref (fun (_:ReactionRemove.t) -> ())
-let reaction_remove_all = ref (fun (_:ReactionRemoveAll.t) -> ())
-let presence_update = ref (fun (_:PresenceUpdate.t) -> ())
-let typing_start = ref (fun (_:TypingStart.t) -> ())
-let user_update = ref (fun (_:UserUpdate.t) -> ())
-(* let voice_state_update = ref (fun (_:Yojson.Safe.t) -> ()) *)
-(* let voice_server_update = ref (fun (_:Yojson.Safe.t) -> ()) *)
-let webhook_update = ref (fun (_:WebhookUpdate.t) -> ())
-let unknown = ref (fun (_:Unknown.t) -> ()) \ No newline at end of file
+let ready = ref (fun (_:Ready.t) -> Lwt.return_unit)
+let resumed = ref (fun (_:Resumed.t) -> Lwt.return_unit)
+let channel_create = ref (fun (_:ChannelCreate.t) -> Lwt.return_unit)
+let channel_update = ref (fun (_:ChannelUpdate.t) -> Lwt.return_unit)
+let channel_delete = ref (fun (_:ChannelDelete.t) -> Lwt.return_unit)
+let channel_pins_update = ref (fun (_:ChannelPinsUpdate.t) -> Lwt.return_unit)
+let guild_create = ref (fun (_:GuildCreate.t) -> Lwt.return_unit)
+let guild_update = ref (fun (_:GuildUpdate.t) -> Lwt.return_unit)
+let guild_delete = ref (fun (_:GuildDelete.t) -> Lwt.return_unit)
+let member_ban = ref (fun (_:GuildBanAdd.t) -> Lwt.return_unit)
+let member_unban = ref (fun (_:GuildBanRemove.t) -> Lwt.return_unit)
+let guild_emojis_update = ref (fun (_:GuildEmojisUpdate.t) -> Lwt.return_unit)
+(* let integrations_update = ref (fun (_:Yojson.Safe.t) -> Lwt.return_unit) *)
+let member_join = ref (fun (_:GuildMemberAdd.t) -> Lwt.return_unit)
+let member_leave = ref (fun (_:GuildMemberRemove.t) -> Lwt.return_unit)
+let member_update = ref (fun (_:GuildMemberUpdate.t) -> Lwt.return_unit)
+let members_chunk = ref (fun (_:GuildMembersChunk.t) -> Lwt.return_unit)
+let role_create = ref (fun (_:GuildRoleCreate.t) -> Lwt.return_unit)
+let role_update = ref (fun (_:GuildRoleUpdate.t) -> Lwt.return_unit)
+let role_delete = ref (fun (_:GuildRoleDelete.t) -> Lwt.return_unit)
+let message_create = ref (fun (_:MessageCreate.t) -> Lwt.return_unit)
+let message_update = ref (fun (_:MessageUpdate.t) -> Lwt.return_unit)
+let message_delete = ref (fun (_:MessageDelete.t) -> Lwt.return_unit)
+let message_delete_bulk = ref (fun (_:MessageDeleteBulk.t) -> Lwt.return_unit)
+let reaction_add = ref (fun (_:ReactionAdd.t) -> Lwt.return_unit)
+let reaction_remove = ref (fun (_:ReactionRemove.t) -> Lwt.return_unit)
+let reaction_remove_all = ref (fun (_:ReactionRemoveAll.t) -> Lwt.return_unit)
+let presence_update = ref (fun (_:PresenceUpdate.t) -> Lwt.return_unit)
+let typing_start = ref (fun (_:TypingStart.t) -> Lwt.return_unit)
+let user_update = ref (fun (_:UserUpdate.t) -> Lwt.return_unit)
+(* let voice_state_update = ref (fun (_:Yojson.Safe.t) -> Lwt.return_unit) *)
+(* let voice_server_update = ref (fun (_:Yojson.Safe.t) -> Lwt.return_unit) *)
+let webhook_update = ref (fun (_:WebhookUpdate.t) -> Lwt.return_unit)
+let unknown = ref (fun (_:Unknown.t) -> Lwt.return_unit) \ No newline at end of file
diff --git a/lib/gateway/dispatch.mli b/lib/gateway/dispatch.mli
index 89905a6..1ec1271 100644
--- a/lib/gateway/dispatch.mli
+++ b/lib/gateway/dispatch.mli
@@ -20,101 +20,101 @@ Client.message_create := check_command
open Event_models
(** Dispatched when each shard receives READY from discord after identifying on the gateway. Other event dispatch is received after this. *)
-val ready : (Ready.t -> unit) ref
+val ready : (Ready.t -> unit Lwt.t) ref
(** Dispatched when successfully reconnecting to the gateway. *)
-val resumed : (Resumed.t -> unit) ref
+val resumed : (Resumed.t -> unit Lwt.t) ref
(** Dispatched when a channel is created which is visible to the bot. *)
-val channel_create : (ChannelCreate.t -> unit) ref
+val channel_create : (ChannelCreate.t -> unit Lwt.t) ref
(** Dispatched when a channel visible to the bot is changed. *)
-val channel_update : (ChannelUpdate.t -> unit) ref
+val channel_update : (ChannelUpdate.t -> unit Lwt.t) ref
(** Dispatched when a channel visible to the bot is deleted. *)
-val channel_delete : (ChannelDelete.t -> unit) ref
+val channel_delete : (ChannelDelete.t -> unit Lwt.t) ref
(** Dispatched when messages are pinned or unpinned from a a channel. *)
-val channel_pins_update : (ChannelPinsUpdate.t -> unit) ref
+val channel_pins_update : (ChannelPinsUpdate.t -> unit Lwt.t) ref
(** Dispatched when the bot joins a guild, and during startup. *)
-val guild_create : (GuildCreate.t -> unit) ref
+val guild_create : (GuildCreate.t -> unit Lwt.t) ref
(** Dispatched when a guild the bot is in is edited. *)
-val guild_update : (GuildUpdate.t -> unit) ref
+val guild_update : (GuildUpdate.t -> unit Lwt.t) ref
(** Dispatched when the bot is removed from a guild. *)
-val guild_delete : (GuildDelete.t -> unit) ref
+val guild_delete : (GuildDelete.t -> unit Lwt.t) ref
(** Dispatched when a member is banned. *)
-val member_ban : (GuildBanAdd.t -> unit) ref
+val member_ban : (GuildBanAdd.t -> unit Lwt.t) ref
(** Dispatched when a member is unbanned. *)
-val member_unban : (GuildBanRemove.t -> unit) ref
+val member_unban : (GuildBanRemove.t -> unit Lwt.t) ref
(** Dispatched when emojis are added or removed from a guild. *)
-val guild_emojis_update : (GuildEmojisUpdate.t -> unit) ref
+val guild_emojis_update : (GuildEmojisUpdate.t -> unit Lwt.t) ref
(** Dispatched when a guild's integrations are updated. *)
-(* val integrations_update : (Yojson.Safe.t -> unit) ref *)
+(* val integrations_update : (Yojson.Safe.t -> unit Lwt.t) ref *)
(** Dispatched when a member joins a guild. *)
-val member_join : (GuildMemberAdd.t -> unit) ref
+val member_join : (GuildMemberAdd.t -> unit Lwt.t) ref
(** Dispatched when a member leaves a guild. Is Dispatched alongside {!Client.member_ban} when a user is banned. *)
-val member_leave : (GuildMemberRemove.t -> unit) ref
+val member_leave : (GuildMemberRemove.t -> unit Lwt.t) ref
(** Dispatched when a member object is updated. *)
-val member_update : (GuildMemberUpdate.t -> unit) ref
+val member_update : (GuildMemberUpdate.t -> unit Lwt.t) ref
(** Dispatched when requesting guild members through {!Client.request_guild_members} *)
-val members_chunk : (GuildMembersChunk.t -> unit) ref
+val members_chunk : (GuildMembersChunk.t -> unit Lwt.t) ref
(** Dispatched when a role is created. *)
-val role_create : (GuildRoleCreate.t -> unit) ref
+val role_create : (GuildRoleCreate.t -> unit Lwt.t) ref
(** Dispatched when a role is edited. *)
-val role_update : (GuildRoleUpdate.t -> unit) ref
+val role_update : (GuildRoleUpdate.t -> unit Lwt.t) ref
(** Dispatched when a role is deleted. *)
-val role_delete : (GuildRoleDelete.t -> unit) ref
+val role_delete : (GuildRoleDelete.t -> unit Lwt.t) ref
(** Dispatched when a message is sent. *)
-val message_create : (MessageCreate.t -> unit) ref
+val message_create : (MessageCreate.t -> unit Lwt.t) ref
(** Dispatched when a message is edited. This does not necessarily mean the content changed. *)
-val message_update : (MessageUpdate.t -> unit) ref
+val message_update : (MessageUpdate.t -> unit Lwt.t) ref
(** Dispatched when a message is deleted. *)
-val message_delete : (MessageDelete.t -> unit) ref
+val message_delete : (MessageDelete.t -> unit Lwt.t) ref
(** Dispatched when messages are bulk deleted. *)
-val message_delete_bulk : (MessageDeleteBulk.t -> unit) ref
+val message_delete_bulk : (MessageDeleteBulk.t -> unit Lwt.t) ref
(** Dispatched when a rection is added to a message. *)
-val reaction_add : (ReactionAdd.t -> unit) ref
+val reaction_add : (ReactionAdd.t -> unit Lwt.t) ref
(** Dispatched when a reaction is removed from a message. *)
-val reaction_remove : (ReactionRemove.t -> unit) ref
+val reaction_remove : (ReactionRemove.t -> unit Lwt.t) ref
(** Dispatched when all reactions are cleared from a message. *)
-val reaction_remove_all : (ReactionRemoveAll.t -> unit) ref
+val reaction_remove_all : (ReactionRemoveAll.t -> unit Lwt.t) ref
(** Dispatched when a user updates their presence. *)
-val presence_update : (PresenceUpdate.t -> unit) ref
+val presence_update : (PresenceUpdate.t -> unit Lwt.t) ref
(** Dispatched when a typing indicator is displayed. *)
-val typing_start : (TypingStart.t -> unit) ref
+val typing_start : (TypingStart.t -> unit Lwt.t) ref
(** Dispatched when the current user is updated. You most likely want {!Client.member_update} or {!Client.presence_update} instead. *)
-val user_update : (UserUpdate.t -> unit) ref
+val user_update : (UserUpdate.t -> unit Lwt.t) ref
(** Dispatched when a webhook is updated. *)
-val webhook_update : (WebhookUpdate.t -> unit) ref
+val webhook_update : (WebhookUpdate.t -> unit Lwt.t) ref
(** Dispatched as a fallback for unknown events. *)
-val unknown : (Unknown.t -> unit) ref
+val unknown : (Unknown.t -> unit Lwt.t) ref
(**/**)
-(* val voice_state_update : (Yojson.Safe.t -> unit) ref *)
-(* val voice_server_update : (Yojson.Safe.t -> unit) ref *)
+(* val voice_state_update : (Yojson.Safe.t -> unit Lwt.t) ref *)
+(* val voice_server_update : (Yojson.Safe.t -> unit Lwt.t) ref *)
diff --git a/lib/gateway/event.ml b/lib/gateway/event.ml
index 88dd50d..af9861d 100644
--- a/lib/gateway/event.ml
+++ b/lib/gateway/event.ml
@@ -1,5 +1,4 @@
-open Async
-open Core
+open Lwt.Infix
open Event_models
type t =
@@ -74,103 +73,137 @@ let event_of_yojson ~contents = function
| "WEBHOOK_UPDATE" -> WEBHOOK_UPDATE WebhookUpdate.(deserialize contents)
| s -> UNKNOWN Unknown.(deserialize s contents)
-let dispatch ev =
+let dispatch cache ev =
match ev with
| READY d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> Ready.update_cache cache d);
- !Dispatch.ready d
+ let cache = Ready.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.ready d);
+ cache
| RESUMED d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> Resumed.update_cache cache d);
- !Dispatch.resumed d
+ let cache = Resumed.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.resumed d);
+ cache
| CHANNEL_CREATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ChannelCreate.update_cache cache d);
- !Dispatch.channel_create d
+ let cache = ChannelCreate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.channel_create d);
+ cache
| CHANNEL_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ChannelUpdate.update_cache cache d);
- !Dispatch.channel_update d
+ let cache = ChannelDelete.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.channel_update d);
+ cache
| CHANNEL_DELETE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ChannelDelete.update_cache cache d);
- !Dispatch.channel_delete d
+ let cache = ChannelDelete.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.channel_delete d);
+ cache
| CHANNEL_PINS_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ChannelPinsUpdate.update_cache cache d);
- !Dispatch.channel_pins_update d
+ let cache = ChannelPinsUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.channel_pins_update d);
+ cache
| GUILD_CREATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildCreate.update_cache cache d);
- !Dispatch.guild_create d
+ let cache = GuildCreate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.guild_create d);
+ cache
| GUILD_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildUpdate.update_cache cache d);
- !Dispatch.guild_update d
+ let cache = GuildUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.guild_update d);
+ cache
| GUILD_DELETE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildDelete.update_cache cache d);
- !Dispatch.guild_delete d
+ let cache = GuildDelete.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.guild_delete d);
+ cache
| GUILD_BAN_ADD d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildBanAdd.update_cache cache d);
- !Dispatch.member_ban d
+ let cache = GuildBanAdd.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.member_ban d);
+ cache
| GUILD_BAN_REMOVE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildBanRemove.update_cache cache d);
- !Dispatch.member_unban d
+ let cache = GuildBanRemove.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.member_unban d);
+ cache
| GUILD_EMOJIS_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildEmojisUpdate.update_cache cache d);
- !Dispatch.guild_emojis_update d
+ let cache = GuildEmojisUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.guild_emojis_update d);
+ cache
(* | GUILD_INTEGRATIONS_UPDATE d -> !Dispatch.integrations_update d *)
| GUILD_MEMBER_ADD d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildMemberAdd.update_cache cache d);
- !Dispatch.member_join d
+ let cache = GuildMemberAdd.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.member_join d);
+ cache
| GUILD_MEMBER_REMOVE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildMemberRemove.update_cache cache d);
- !Dispatch.member_leave d
+ let cache = GuildMemberRemove.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.member_leave d);
+ cache
| GUILD_MEMBER_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildMemberUpdate.update_cache cache d);
- !Dispatch.member_update d
+ let cache = GuildMemberUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.member_update d);
+ cache
| GUILD_MEMBERS_CHUNK d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildMembersChunk.update_cache cache d);
- !Dispatch.members_chunk d
+ let cache = GuildMembersChunk.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.members_chunk d);
+ cache
| GUILD_ROLE_CREATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildRoleCreate.update_cache cache d);
- !Dispatch.role_create d
+ let cache = GuildRoleCreate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.role_create d);
+ cache
| GUILD_ROLE_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildRoleUpdate.update_cache cache d);
- !Dispatch.role_update d
+ let cache = GuildRoleUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.role_update d);
+ cache
| GUILD_ROLE_DELETE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> GuildRoleDelete.update_cache cache d);
- !Dispatch.role_delete d
+ let cache = GuildRoleDelete.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.role_delete d);
+ cache
| MESSAGE_CREATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> MessageCreate.update_cache cache d);
- !Dispatch.message_create d
+ let cache = MessageCreate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.message_create d);
+ cache
| MESSAGE_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> MessageUpdate.update_cache cache d);
- !Dispatch.message_update d
+ let cache = MessageUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.message_update d);
+ cache
| MESSAGE_DELETE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> MessageDelete.update_cache cache d);
- !Dispatch.message_delete d
+ let cache = MessageDelete.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.message_delete d);
+ cache
| MESSAGE_DELETE_BULK d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> MessageDeleteBulk.update_cache cache d);
- !Dispatch.message_delete_bulk d
+ let cache = MessageDeleteBulk.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.message_delete_bulk d);
+ cache
| REACTION_ADD d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ReactionAdd.update_cache cache d);
- !Dispatch.reaction_add d
+ let cache = ReactionAdd.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.reaction_add d);
+ cache
| REACTION_REMOVE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ReactionRemove.update_cache cache d);
- !Dispatch.reaction_remove d
+ let cache = ReactionRemove.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.reaction_remove d);
+ cache
| REACTION_REMOVE_ALL d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> ReactionRemoveAll.update_cache cache d);
- !Dispatch.reaction_remove_all d
+ let cache = ReactionRemoveAll.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.reaction_remove_all d);
+ cache
| PRESENCE_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> PresenceUpdate.update_cache cache d);
- !Dispatch.presence_update d
+ let cache = PresenceUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.presence_update d);
+ cache
| TYPING_START d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> TypingStart.update_cache cache d);
- !Dispatch.typing_start d
+ let cache = TypingStart.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.typing_start d);
+ cache
| USER_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> UserUpdate.update_cache cache d);
- !Dispatch.user_update d
+ let cache = UserUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.user_update d);
+ cache
(* | VOICE_STATE_UPDATE d -> !Dispatch.voice_state_update d *)
(* | VOICE_SERVER_UPDATE d -> !Dispatch.voice_server_update d *)
| WEBHOOK_UPDATE d ->
- Mvar.update_exn Cache.cache ~f:(fun cache -> WebhookUpdate.update_cache cache d);
- !Dispatch.webhook_update d
- | UNKNOWN d -> !Dispatch.unknown d
+ let cache = WebhookUpdate.update_cache cache d in
+ Lwt.async (fun () -> !Dispatch.webhook_update d);
+ cache
+ | UNKNOWN d ->
+ Lwt.async (fun () -> !Dispatch.unknown d);
+ cache
let handle_event ~ev contents =
+ Lwt_mvar.take Cache.cache >>= fun cache ->
event_of_yojson ~contents ev
- |> dispatch \ No newline at end of file
+ |> dispatch cache
+ |> Lwt_mvar.put Cache.cache \ No newline at end of file
diff --git a/lib/gateway/event.mli b/lib/gateway/event.mli
index 4db3c84..1817c52 100644
--- a/lib/gateway/event.mli
+++ b/lib/gateway/event.mli
@@ -43,7 +43,7 @@ type t =
val event_of_yojson : contents:Yojson.Safe.t -> string -> t
(** Sends the event to the registered handler. *)
-val dispatch : t -> unit
+val dispatch : Cache.t -> t -> Cache.t
(** Wrapper to other functions. This is called from the shards. *)
-val handle_event : ev:string -> Yojson.Safe.t -> unit \ No newline at end of file
+val handle_event : ev:string -> Yojson.Safe.t -> unit Lwt.t \ No newline at end of file
diff --git a/lib/gateway/opcode.ml b/lib/gateway/opcode.ml
index e2f44aa..f7b8fd7 100644
--- a/lib/gateway/opcode.ml
+++ b/lib/gateway/opcode.ml
@@ -1,17 +1,16 @@
-open Core
-
type t =
- | DISPATCH
- | HEARTBEAT
- | IDENTIFY
- | STATUS_UPDATE
- | VOICE_STATE_UPDATE
- | RESUME
- | RECONNECT
- | REQUEST_GUILD_MEMBERS
- | INVALID_SESSION
- | HELLO
- | HEARTBEAT_ACK
+| DISPATCH
+| HEARTBEAT
+| IDENTIFY
+| STATUS_UPDATE
+| VOICE_STATE_UPDATE
+| RESUME
+| RECONNECT
+| REQUEST_GUILD_MEMBERS
+| INVALID_SESSION
+| HELLO
+| HEARTBEAT_ACK
+[@@deriving sexp]
exception Invalid_Opcode of int
diff --git a/lib/gateway/opcode.mli b/lib/gateway/opcode.mli
index 9fa5b96..5d1a35c 100644
--- a/lib/gateway/opcode.mli
+++ b/lib/gateway/opcode.mli
@@ -13,6 +13,7 @@ type t =
| INVALID_SESSION
| HELLO
| HEARTBEAT_ACK
+[@@deriving sexp]
(** Raised when receiving an invalid opcode. This should never occur. *)
exception Invalid_Opcode of int
diff --git a/lib/gateway/sharder.ml b/lib/gateway/sharder.ml
index ba865a9..9c98979 100644
--- a/lib/gateway/sharder.ml
+++ b/lib/gateway/sharder.ml
@@ -1,7 +1,7 @@
-open Async
-open Core
+open Lwt.Infix
open Decompress
-open Websocket_async
+open Websocket
+open Websocket_lwt
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
@@ -32,16 +32,17 @@ let decompress src =
module Shard = struct
type shard =
{ compress: bool
- ; id: int * int
- ; hb_interval: Time.Span.t Ivar.t
- ; hb_stopper: unit Ivar.t
+ ; hb_interval: int Lwt.t * int Lwt.u
+ ; hb_stopper: unit Lwt.t * unit Lwt.u
+ ; id: int
; large_threshold: int
- ; pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t
- ; ready: unit Ivar.t
+ ; ready: unit Lwt.t * unit Lwt.u
+ ; recv: Frame.t Lwt_stream.t
+ ; send: (Frame.t -> unit Lwt.t)
; seq: int
; session: string option
+ ; shard_count: int
; url: string
- ; _internal: Reader.t * Writer.t
}
type 'a t =
@@ -50,44 +51,38 @@ module Shard = struct
; mutable can_resume: bool
}
- let identify_lock = Mvar.create ()
- let _ = Mvar.set identify_lock ()
+ let identify_lock = Lwt_mvar.create ()
- let parse ~compress (frame:[`Ok of Frame.t | `Eof]) =
- match frame with
- | `Ok s -> begin
- let open Frame.Opcode in
- match s.opcode with
- | Text -> `Ok (Yojson.Safe.from_string s.content)
- | Binary ->
- if compress then `Ok (decompress s.content |> Yojson.Safe.from_string)
- else `Error "Failed to decompress"
- | Close -> `Close s
- | op ->
- let op = Frame.Opcode.to_string op in
- `Error ("Unexpected opcode " ^ op)
- end
- | `Eof -> `Eof
+ let parse ~compress (frame:Frame.t) =
+ let open Frame.Opcode in
+ match frame.opcode with
+ | Text -> `Ok (Yojson.Safe.from_string frame.content)
+ | Binary ->
+ if compress then `Ok (decompress frame.content |> Yojson.Safe.from_string)
+ else `Error "Failed to decompress"
+ | Close -> `Close frame
+ | op ->
+ let op = Frame.Opcode.to_string op in
+ `Error ("Unexpected opcode " ^ op)
let push_frame ?payload ~ev shard =
let content = match payload with
| None -> ""
| Some p ->
- Yojson.Safe.to_string @@ `Assoc [
- "op", `Int (Opcode.to_int ev);
- "d", p;
+ Yojson.Safe.to_string @@ `Assoc
+ [ "op", `Int (Opcode.to_int ev)
+ ; "d", p
]
in
- let (_, write) = shard.pipe in
- Pipe.write_if_open write @@ Frame.create ~content ()
- >>| fun () ->
+ Frame.create ~content ()
+ |> shard.send >|= fun () ->
shard
let heartbeat shard =
match shard.seq with
- | 0 -> return shard
+ | 0 -> Lwt.return shard
| i ->
- Logs.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" (fst shard.id) (snd shard.id) (shard.seq));
+ Logs_lwt.debug (fun m -> m "Heartbeating - Shard: [%d, %d] - Seq: %d" shard.id shard.shard_count shard.seq) >>= fun () ->
push_frame ~payload:(`Int i) ~ev:HEARTBEAT shard
let dispatch ~payload shard =
@@ -96,20 +91,20 @@ module Shard = struct
let t = J.(member "t" payload |> to_string) in
let data = J.member "d" payload in
let session = if t = "READY" then begin
- Ivar.fill_if_empty shard.ready ();
- Clock.after (Core.Time.Span.create ~sec:5 ())
- >>> (fun _ -> Mvar.put identify_lock () >>> ignore);
+ Lwt.wakeup_later (snd shard.ready) ();
+ (* TODO figure out action after time in Lwt *)
+ (* Clock.after (Core.Time.Span.create ~sec:5 ())
+ >>> (fun _ -> Lwt_mvar.put identify_lock () >>> ignore); *)
J.(member "session_id" data |> to_string_option)
end else shard.session in
- Event.handle_event ~ev:t data;
- return
+ Event.handle_event ~ev:t data >|= fun () ->
{ shard with seq = seq
; session = session
}
- let set_status ?(status="online") ?(kind=0) ?name ?since ?url shard =
- let since = Option.(since >>| (fun v -> `Int v) |> value ~default:`Null) in
- let url = Option.(url >>| (fun v -> `String v) |> value ~default:`Null) in
+ let set_status ?(status="online") ?(kind=0) ?name ?since ?url shard =
+ let since = Option.(map since ~f:(fun v -> `Int v) |> value ~default:`Null) in
+ let url = Option.(map url ~f:(fun v -> `String v) |> value ~default:`Null) in
let game = match name with
| Some name -> `Assoc
[ "name", `String name
@@ -125,30 +120,30 @@ module Shard = struct
; "game", game
]
in
- Ivar.read shard.ready >>= fun _ ->
+ fst shard.ready >>= fun _ ->
push_frame ~payload ~ev:STATUS_UPDATE shard
let request_guild_members ?(query="") ?(limit=0) ~guild shard =
let payload = `Assoc
- [ "guild_id", `String (Int.to_string guild)
+ [ "guild_id", `String (string_of_int guild)
; "query", `String query
; "limit", `Int limit
]
in
- Ivar.read shard.ready >>= fun _ ->
+ fst shard.ready >>= fun _ ->
push_frame ~payload ~ev:REQUEST_GUILD_MEMBERS shard
let initialize ?data shard =
let module J = Yojson.Safe.Util in
let _ = match data with
- | Some data -> Ivar.fill_if_empty shard.hb_interval (Time.Span.create ~ms:J.(member "heartbeat_interval" data |> to_int) ())
+ | Some data -> Lwt.wakeup_later (snd shard.hb_interval) J.(member "heartbeat_interval" data |> to_int)
| None -> raise Failure_to_Establish_Heartbeat
in
- let shards = [`Int (fst shard.id); `Int (snd shard.id)] in
+ let shards = [`Int shard.id; `Int shard.shard_count] in
match shard.session with
| None -> begin
- Mvar.take identify_lock >>= fun () ->
- Logs.debug (fun m -> m "Identifying shard [%d, %d]" (fst shard.id) (snd shard.id));
+ Lwt_mvar.take identify_lock >>= fun () ->
+ Logs_lwt.debug (fun m -> m "Identifying shard [%d, %d]" shard.id shard.shard_count) >>= fun () ->
let payload = `Assoc
[ "token", `String !Client_options.token
; "properties", `Assoc
@@ -162,7 +157,6 @@ module Shard = struct
]
in
push_frame ~payload ~ev:IDENTIFY shard
- >>| fun s -> s
end
| Some s ->
let payload = `Assoc
@@ -180,117 +174,50 @@ module Shard = struct
| DISPATCH -> dispatch ~payload:f shard
| HEARTBEAT -> heartbeat shard
| INVALID_SESSION -> begin
- Logs.err (fun m -> m "Invalid Session on Shard [%d, %d]: %s" (fst shard.id) (snd shard.id) (Yojson.Safe.pretty_to_string f));
- if J.(member "d" f |> to_bool) then
- initialize shard
- else begin
- initialize { shard with session = None; }
- end
+ Logs_lwt.warn (fun m -> m "Invalid Session on Shard [%d, %d]: %s" shard.id shard.shard_count (Yojson.Safe.pretty_to_string f)) >>= fun () ->
+ if J.(member "d" f |> to_bool) then initialize shard
+ else initialize { shard with session = None; }
end
| RECONNECT -> initialize shard
| HELLO -> initialize ~data:(J.member "d" f) shard
- | HEARTBEAT_ACK -> return shard
+ | HEARTBEAT_ACK -> Lwt.return shard
| opcode ->
- Logs.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode));
- return shard
-
- let rec make_client
- ~initialized
- ~extra_headers
- ~app_to_ws
- ~ws_to_app
- ~net_to_ws
- ~ws_to_net
- ?(ms=500)
- uri =
- client
- ~initialized
- ~extra_headers
- ~app_to_ws
- ~ws_to_app
- ~net_to_ws
- ~ws_to_net
- uri
- >>> fun res ->
- match res with
- | Ok () -> ()
- | Error _ ->
- let backoff = Time.Span.create ~ms () in
- Clock.after backoff >>> (fun () ->
- make_client
- ~initialized
- ~extra_headers
- ~app_to_ws
- ~ws_to_app
- ~net_to_ws
- ~ws_to_net
- ~ms:(min 60_000 (ms * 2))
- uri)
+ Logs_lwt.warn (fun m -> m "Invalid Opcode: %s" (Opcode.to_string opcode)) >|= fun () ->
+ shard
+ let make_client ?extra_headers uri =
+ let uri = Uri.with_scheme uri (Some "https") in
+ Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system >>= fun endp ->
+ Conduit_lwt_unix.(
+ endp_to_client ~ctx:default_ctx endp >>= fun client ->
+ with_connection ?extra_headers ~ctx:default_ctx client uri)
let create ~url ~shards ?(compress=true) ?(large_threshold=100) () =
- let open Core in
- let uri = (url ^ "?v=6&encoding=json") |> Uri.of_string in
+ let uri = Uri.(with_query' (of_string url) ["encoding", "json"; "v", "6"]) in
let extra_headers = Http.Base.process_request_headers () in
- let host = Option.value_exn ~message:"no host in uri" Uri.(host uri) in
- let port =
- match Uri.port uri, Uri_services.tcp_port_of_uri uri with
- | Some p, _ -> p
- | None, Some p -> p
- | _ -> 443 in
- let scheme = Option.value_exn ~message:"no scheme in uri" Uri.(scheme uri) in
- let tcp_fun (net_to_ws, ws_to_net) =
- let (app_to_ws, write) = Pipe.create () in
- let (read, ws_to_app) = Pipe.create () in
- let initialized = Ivar.create () in
- make_client
- ~initialized
- ~extra_headers
- ~app_to_ws
- ~ws_to_app
- ~net_to_ws
- ~ws_to_net
- uri;
- Ivar.read initialized >>| fun () ->
- { pipe = (read, write)
- ; ready = Ivar.create ()
- ; hb_interval = Ivar.create ()
- ; hb_stopper = Ivar.create ()
- ; seq = 0
- ; id = shards
- ; session = None
- ; url
- ; large_threshold
- ; compress
- ; _internal = (net_to_ws, ws_to_net)
- }
- in
- match Unix.getaddrinfo host (string_of_int port) [] with
- | [] -> failwithf "DNS resolution failed for %s" host ()
- | { ai_addr; _ } :: _ ->
- let addr =
- match scheme, ai_addr with
- | _, ADDR_UNIX path -> `Unix_domain_socket path
- | "https", ADDR_INET (h, p)
- | "wss", ADDR_INET (h, p) ->
- let h = Ipaddr_unix.of_inet_addr h in
- `OpenSSL (h, p, Conduit_async.V2.Ssl.Config.create ())
- | _, ADDR_INET (h, p) ->
- let h = Ipaddr_unix.of_inet_addr h in
- `TCP (h, p)
- in
- Conduit_async.V2.connect addr >>= tcp_fun
+ make_client ~extra_headers uri >|= fun (recv, send) ->
+ let recv = mk_frame_stream recv in
+ { compress
+ ; hb_interval = Lwt.wait ()
+ ; hb_stopper = Lwt.wait ()
+ ; id = fst shards
+ ; large_threshold
+ ; ready = Lwt.wait ()
+ ; recv
+ ; send
+ ; seq = 0
+ ; session = None
+ ; shard_count = snd shards
+ ; url
+ }
let shutdown ?(clean=false) ?(restart=true) t =
let _ = clean in
t.can_resume <- restart;
t.stopped <- true;
- Logs.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" (fst t.state.id) (snd t.state.id));
- Pipe.write_if_open (snd t.state.pipe) (Frame.close 1001)
- >>= fun () ->
- Ivar.fill_if_empty t.state.hb_stopper ();
- Pipe.close_read (fst t.state.pipe);
- Writer.close (snd t.state._internal)
+ Logs_lwt.debug (fun m -> m "Performing shutdown. Shard [%d, %d]" t.state.id t.state.shard_count) >>= fun () ->
+ t.state.send (Frame.close 1001) >|= fun () ->
+ Lwt.wakeup_later (snd t.state.hb_stopper) ()
end
type t = { shards: (Shard.shard Shard.t) list }
@@ -300,7 +227,7 @@ let start ?count ?compress ?large_threshold () =
Http.get_gateway_bot () >>= fun data ->
let data = match data with
| Ok d -> d
- | Error e -> Error.raise e
+ | Error e -> Base.Error.(of_string e |> raise)
in
let url = J.(member "url" data |> to_string) in
let count = match count with
@@ -311,36 +238,36 @@ let start ?count ?compress ?large_threshold () =
Logs.info (fun m -> m "Connecting to %s" url);
let rec ev_loop (t:Shard.shard Shard.t) =
let step (t:Shard.shard Shard.t) =
- Pipe.read (fst t.state.pipe) >>= fun frame ->
+ Lwt_stream.get t.state.recv >>= function None -> Lwt.return t | Some frame ->
begin match Shard.parse ~compress:t.state.compress frame with
| `Ok f ->
- Shard.handle_frame ~f t.state >>| fun s ->
+ Shard.handle_frame ~f t.state >|= fun s ->
t.state <- s
| `Close c ->
- Logs.warn (fun m -> m "Close frame received. %s" (Frame.show c));
+ Logs_lwt.warn (fun m -> m "Close frame received. %s" (Frame.show c)) >>= fun () ->
Shard.shutdown t
| `Error e ->
- Logs.warn (fun m -> m "Websocket soft error: %s" e);
- return ()
+ Logs_lwt.warn (fun m -> m "Websocket soft error: %s" e) >>= fun () ->
+ Lwt.return_unit
| `Eof ->
- Logs.warn (fun m -> m "Websocket closed unexpectedly");
+ Logs_lwt.warn (fun m -> m "Websocket closed unexpectedly") >>= fun () ->
Shard.shutdown t
- end >>| fun () -> t
+ end >|= fun () -> t
in
- if t.stopped then return ()
+ if t.stopped then Lwt.return_unit
else step t >>= ev_loop
in
let rec gen_shards l a =
match l with
- | (id, total) when id >= total -> return a
+ | (id, total) when id >= total -> Lwt.return a
| (id, total) ->
let wrap ?(reuse:Shard.shard Shard.t option) state = match reuse with
| Some t ->
t.state <- state;
t.stopped <- false;
- return t
+ Lwt.return t
| None ->
- return Shard.{ state
+ Lwt.return Shard.{ state
; stopped = false
; can_resume = true
}
@@ -349,34 +276,38 @@ let start ?count ?compress ?large_threshold () =
Shard.create ~url ~shards:(id, total) ?compress ?large_threshold ()
in
let rec bind (t:Shard.shard Shard.t) =
- let _ = Ivar.read t.state.hb_interval >>> fun hb ->
- Clock.every'
- ~stop:(Ivar.read t.state.hb_stopper)
- ~continue_on_error:true
- hb (fun () -> Shard.heartbeat t.state >>| ignore) in
- ev_loop t >>> (fun () -> Logs.debug (fun m -> m "Event loop stopped."));
- Pipe.closed (fst t.state.pipe) >>> (fun () -> if t.can_resume then
- create () >>= wrap ~reuse:t >>= bind >>> ignore);
- return t
+ Lwt.async (fun () ->
+ fst t.state.hb_interval >|= fun _hb -> ()
+ (* TODO figure out clocks in Lwt *)
+ );
+ Lwt.async (fun () -> ev_loop t >>= fun () -> Logs_lwt.debug (fun m -> m "Event loop stopped."));
+ (* TODO figure out how to bind to closed websocket *)
+ Lwt.async (fun () -> Lwt_stream.closed t.state.recv >>= fun () ->
+ if t.can_resume then create () >>= wrap ~reuse:t >>= bind >|= ignore
+ else Lwt.return_unit);
+ Lwt.return t
in
create () >>= wrap >>= bind >>= fun t ->
gen_shards (id+1, total) (t :: a)
in
gen_shards shard_list []
- >>| fun shards ->
+ >|= fun shards ->
{ shards }
let set_status ?status ?kind ?name ?since ?url sharder =
- Deferred.all @@ List.map ~f:(fun t ->
- Shard.set_status ?status ?kind ?name ?since ?url t.state
- ) sharder.shards
+ List.map (fun (t:Shard.shard Shard.t) ->
+ Shard.set_status ?status ?kind ?name ?since ?url t.state >|= ignore)
+ sharder.shards
+ |> Lwt.join
let request_guild_members ?query ?limit ~guild sharder =
- Deferred.all @@ List.map ~f:(fun t ->
- Shard.request_guild_members ~guild ?query ?limit t.state
- ) sharder.shards
+ List.map (fun (t:Shard.shard Shard.t) ->
+ Shard.request_guild_members ~guild ?query ?limit t.state >|= ignore)
+ sharder.shards
+ |> Lwt.join
let shutdown_all ?restart sharder =
- Deferred.all @@ List.map ~f:(fun t ->
- Shard.shutdown ~clean:true ?restart t
- ) sharder.shards
+ List.map (fun t ->
+ Shard.shutdown ~clean:true ?restart t)
+ sharder.shards
+ |> Lwt.join
diff --git a/lib/gateway/sharder.mli b/lib/gateway/sharder.mli
index 6249d4d..20186f8 100644
--- a/lib/gateway/sharder.mli
+++ b/lib/gateway/sharder.mli
@@ -1,8 +1,6 @@
(** Internal sharding manager. Most of this is accessed through {!Client}. *)
-open Core
-open Async
-open Websocket_async
+open Websocket
exception Invalid_Payload
exception Failure_to_Establish_Heartbeat
@@ -15,23 +13,24 @@ val start :
?compress:bool ->
?large_threshold:int ->
unit ->
- t Deferred.t
+ t Lwt.t
(** Module representing a single shard. *)
module Shard : sig
(** Representation of the state of a shard. *)
- type shard = {
- compress: bool; (** Whether to compress payloads. *)
- id: int * int; (** A tuple as expected by Discord. First element is the current shard index, second element is the total shard count. *)
- hb_interval: Time.Span.t Ivar.t; (** Time span between heartbeats, wrapped in an Ivar. *)
- hb_stopper: unit Ivar.t; (** Stops the heartbeat sequencer when filled. *)
- large_threshold: int; (** Minimum number of members needed for a guild to be considered large. *)
- pipe: Frame.t Pipe.Reader.t * Frame.t Pipe.Writer.t; (** Raw frame IO pipe used for websocket communications. *)
- ready: unit Ivar.t; (** A simple Ivar indicating if the shard has received READY. *)
- seq: int; (** Current sequence number *)
- session: string option; (** Session id, if one exists. *)
- url: string; (** The websocket URL in use. *)
- _internal: Reader.t * Writer.t;
+ type shard =
+ { compress: bool (** Whether to compress payloads. *)
+ ; hb_interval: int Lwt.t * int Lwt.u (** Time between heartbeats. Not known until HELLO is received. *)
+ ; hb_stopper: unit Lwt.t * unit Lwt.u (** Stops the heartbeat sequencing when filled *)
+ ; id: int (** ID of the current shard. Must be less than shard_count. *)
+ ; large_threshold: int (** Minimum number of members needed for a guild to be considered large. *)
+ ; ready: unit Lwt.t * unit Lwt.u (** A simple promise indicating if the shard has received READY. *)
+ ; recv: Frame.t Lwt_stream.t (** Receiver function for the websocket. *)
+ ; send: (Frame.t -> unit Lwt.t) (** Sender function for the websocket. *)
+ ; seq: int (** Current sequence number for the session. *)
+ ; session: string option (** Current session ID *)
+ ; shard_count: int (** Total number of shards. *)
+ ; url: string (** The websocket URL. *)
}
(** Wrapper around an internal state, used to wrap {!shard}. *)
@@ -44,7 +43,7 @@ module Shard : sig
(** Send a heartbeat to Discord. This is handled automatically. *)
val heartbeat :
shard ->
- shard Deferred.t
+ shard Lwt.t
(** Set the status of the shard. *)
val set_status :
@@ -54,7 +53,7 @@ module Shard : sig
?since:int ->
?url:string ->
shard ->
- shard Deferred.t
+ shard Lwt.t
(** Request guild members for the shard's guild. Causes dispatch of multiple {{!Dispatch.members_chunk}member chunk} events. *)
val request_guild_members :
@@ -62,7 +61,7 @@ module Shard : sig
?limit:int ->
guild:Snowflake.t ->
shard ->
- shard Deferred.t
+ shard Lwt.t
(** Create a new shard *)
val create :
@@ -71,13 +70,13 @@ module Shard : sig
?compress:bool ->
?large_threshold:int ->
unit ->
- shard Deferred.t
+ shard Lwt.t
val shutdown :
?clean:bool ->
?restart:bool ->
shard t ->
- unit Deferred.t
+ unit Lwt.t
end
(** Calls {!Shard.set_status} for each shard registered with the sharder. *)
@@ -88,7 +87,7 @@ val set_status :
?since:int ->
?url:string ->
t ->
- Shard.shard list Deferred.t
+ unit Lwt.t
(** Calls {!Shard.request_guild_members} for each shard registered with the sharder. *)
val request_guild_members :
@@ -96,9 +95,9 @@ val request_guild_members :
?limit:int ->
guild:Snowflake.t ->
t ->
- Shard.shard list Deferred.t
+ unit Lwt.t
val shutdown_all :
- ?restart:bool ->
- t ->
- unit list Deferred.t
+ ?restart:bool ->
+ t ->
+ unit Lwt.t