aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRapptz <[email protected]>2020-09-05 13:48:02 -0400
committerRapptz <[email protected]>2020-09-23 03:21:17 -0400
commit92e1816114a840e74348635b40d91f9dc7c8157d (patch)
tree10fc7c5894879bbc33fea6fc37ca6f367f339a27
parentUse a lock for the gateway rate limiter. (diff)
downloaddiscord.py-92e1816114a840e74348635b40d91f9dc7c8157d.tar.xz
discord.py-92e1816114a840e74348635b40d91f9dc7c8157d.zip
Maximize concurrency when chunking on AutoSharded clients
-rw-r--r--discord/state.py66
1 files changed, 45 insertions, 21 deletions
diff --git a/discord/state.py b/discord/state.py
index b4b72d3a..7d4acbb9 100644
--- a/discord/state.py
+++ b/discord/state.py
@@ -347,6 +347,7 @@ class ConnectionState:
try:
# only real bots wait for GUILD_CREATE streaming
if self.is_bot:
+ states = []
while True:
# this snippet of code is basically waiting N seconds
# until the last GUILD_CREATE was sent
@@ -355,17 +356,26 @@ class ConnectionState:
except asyncio.TimeoutError:
break
else:
- try:
- if self._fetch_offline:
- await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
- except asyncio.TimeoutError:
- log.info('Timed out waiting for chunks while launching ready event.')
- finally:
+ if self._fetch_offline:
+ future = await self.chunk_guild(guild, wait=False)
+ states.append((guild, future))
+ else:
if guild.unavailable is False:
self.dispatch('guild_available', guild)
else:
self.dispatch('guild_join', guild)
+ for guild, future in states:
+ try:
+ await asyncio.wait_for(future, timeout=5.0)
+ except asyncio.TimeoutError:
+ log.warning('Shard ID %s timed out waiting for chunks for guild_id %s.', guild.shard_id, guild.id)
+
+ if guild.unavailable is False:
+ self.dispatch('guild_available', guild)
+ else:
+ self.dispatch('guild_join', guild)
+
# remove the state
try:
del self._ready_state
@@ -733,12 +743,14 @@ class ConnectionState:
return self._add_guild_from_data(data)
- async def chunk_guild(self, guild):
+ async def chunk_guild(self, guild, *, wait=True):
future = self.loop.create_future()
request = ChunkRequest(guild.id, future)
self._chunk_requests.append(request)
await self.chunker(guild.id, nonce=request.nonce)
- await request.future
+ if wait:
+ await request.future
+ return request.future
async def _chunk_and_dispatch(self, guild, unavailable):
try:
@@ -1041,20 +1053,32 @@ class AutoShardedConnectionState(ConnectionState):
except asyncio.TimeoutError:
break
else:
- try:
- if self._fetch_offline:
- await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
- except asyncio.TimeoutError:
- log.info('Timed out waiting for chunks while launching ready event.')
- finally:
- processed.append(guild)
- if guild.unavailable is False:
- self.dispatch('guild_available', guild)
- else:
- self.dispatch('guild_join', guild)
+ if self._fetch_offline:
+ # Chunk the guild in the background while we wait for GUILD_CREATE streaming
+ future = asyncio.ensure_future(self.chunk_guild(guild))
+ else:
+ future = self.loop.create_future()
+ future.set_result(True)
+
+ processed.append((guild, future))
+
+ guilds = sorted(processed, key=lambda g: g[0].shard_id)
+ for shard_id, info in itertools.groupby(guilds, key=lambda g: g[0].shard_id):
+ children, futures = zip(*info)
+ # 110 reqs/minute w/ 1 req/guild plus some buffer
+ timeout = 61 * (len(children) / 110)
+ try:
+ await utils.sane_wait_for(futures, timeout=timeout)
+ except asyncio.TimeoutError:
+ log.warning('Shard ID %s failed to wait for chunks (timeout=%.2f) for %d guilds', self.shard_id,
+ timeout,
+ len(guilds))
+ for guild in children:
+ if guild.unavailable is False:
+ self.dispatch('guild_available', guild)
+ else:
+ self.dispatch('guild_join', guild)
- guilds = sorted(processed, key=lambda g: g.shard_id)
- for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id):
self.dispatch('shard_ready', shard_id)
# remove the state