aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRapptz <[email protected]>2020-09-04 08:09:41 -0400
committerRapptz <[email protected]>2020-09-23 03:21:16 -0400
commit930761e058597b8fd8a5aaf7c747ca78b324bbc9 (patch)
treede467405524d48228d7804f5ce1fda71c6f69a45
parentAdd more close codes that can't be handled for reconnecting. (diff)
downloaddiscord.py-930761e058597b8fd8a5aaf7c747ca78b324bbc9.tar.xz
discord.py-930761e058597b8fd8a5aaf7c747ca78b324bbc9.zip
Rewrite chunking to work with intents.
This slows down chunking significantly for bots in a large number of guilds since it goes down from 75 guilds/request to 1 guild/request. However the logic was rewritten to fire the chunking request immediately after receiving the GUILD_CREATE rather than waiting for all the guilds in the ready stream before doing it.
-rw-r--r--discord/gateway.py2
-rw-r--r--discord/guild.py24
-rw-r--r--discord/state.py239
3 files changed, 91 insertions, 174 deletions
diff --git a/discord/gateway.py b/discord/gateway.py
index e7274520..ce64c501 100644
--- a/discord/gateway.py
+++ b/discord/gateway.py
@@ -370,7 +370,7 @@ class DiscordWebSocket:
}
if state._intents is not None:
- payload['d']['intents'] = state._intents
+ payload['d']['intents'] = state._intents.value
await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify)
await self.send_as_json(payload)
diff --git a/discord/guild.py b/discord/guild.py
index 0bf94a28..c15b78cf 100644
--- a/discord/guild.py
+++ b/discord/guild.py
@@ -2045,11 +2045,6 @@ class Guild(Hashable):
This is a websocket operation and can be slow.
- .. warning::
-
- Most bots do not need to use this. It's mainly a helper
- for bots who have disabled ``guild_subscriptions``.
-
.. versionadded:: 1.3
Parameters
@@ -2059,7 +2054,7 @@ class Guild(Hashable):
requests all members.
limit: :class:`int`
The maximum number of members to send back. This must be
- a number between 1 and 1000.
+ a number between 1 and 100.
cache: :class:`bool`
Whether to cache the members internally. This makes operations
such as :meth:`get_member` work for those that matched.
@@ -2073,19 +2068,26 @@ class Guild(Hashable):
-------
asyncio.TimeoutError
The query timed out waiting for the members.
+ ValueError
+ Invalid parameters were passed to the function
Returns
--------
List[:class:`Member`]
The list of members that have matched the query.
"""
- if user_ids is not None and query is not None:
- raise TypeError('Cannot pass both query and user_ids')
- if user_ids is None and query is None:
- raise TypeError('Must pass either query or user_ids')
+ if query is None:
+ if query == '':
+ raise ValueError('Cannot pass empty query string.')
+
+ if user_ids is None:
+ raise ValueError('Must pass either query or user_ids')
+
+ if user_ids is not None and query is not None:
+ raise ValueError('Cannot pass both query and user_ids')
- limit = limit or 5
+ limit = min(100, limit or 5)
return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache)
async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False):
diff --git a/discord/state.py b/discord/state.py
index 61f66457..bcbc70b0 100644
--- a/discord/state.py
+++ b/discord/state.py
@@ -25,7 +25,7 @@ DEALINGS IN THE SOFTWARE.
"""
import asyncio
-from collections import deque, namedtuple, OrderedDict
+from collections import deque, OrderedDict
import copy
import datetime
import itertools
@@ -49,20 +49,22 @@ from .channel import *
from .raw_models import *
from .member import Member
from .role import Role
-from .enums import ChannelType, try_enum, Status, Enum
+from .enums import ChannelType, try_enum, Status
from . import utils
from .flags import Intents
from .embeds import Embed
from .object import Object
from .invite import Invite
-class ListenerType(Enum):
- chunk = 0
- query_members = 1
+class ChunkRequest:
+ __slots__ = ('guild_id', 'nonce', 'future')
+
+ def __init__(self, guild_id, future):
+ self.guild_id = guild_id
+ self.nonce = os.urandom(16).hex()
+ self.future = future
-Listener = namedtuple('Listener', ('type', 'future', 'predicate'))
log = logging.getLogger(__name__)
-ReadyState = namedtuple('ReadyState', ('launch', 'guilds'))
async def logging_coroutine(coroutine, *, info):
try:
@@ -100,7 +102,7 @@ class ConnectionState:
self.allowed_mentions = allowed_mentions
# Only disable cache if both fetch_offline and guild_subscriptions are off.
self._cache_members = (self._fetch_offline or self.guild_subscriptions)
- self._listeners = []
+ self._chunk_requests = []
activity = options.get('activity', None)
if activity:
@@ -120,7 +122,9 @@ class ConnectionState:
if intents is not None:
if not isinstance(intents, Intents):
raise TypeError('intents parameter must be Intent not %r' % type(intents))
- intents = intents.value
+
+ if not intents.members and self._fetch_offline:
+ raise ValueError('Intents.members has be enabled to fetch offline members.')
self._activity = activity
self._status = status
@@ -152,34 +156,20 @@ class ConnectionState:
# to reconnect loops which cause mass allocations and deallocations.
gc.collect()
- def get_nonce(self):
- return os.urandom(16).hex()
-
- def process_listeners(self, listener_type, argument, result):
+ def process_chunk_requests(self, guild_id, nonce, members):
removed = []
- for i, listener in enumerate(self._listeners):
- if listener.type != listener_type:
- continue
-
- future = listener.future
+ for i, request in enumerate(self._chunk_requests):
+ future = request.future
if future.cancelled():
removed.append(i)
continue
- try:
- passed = listener.predicate(argument)
- except Exception as exc:
- future.set_exception(exc)
+ if request.guild_id == guild_id and request.nonce == nonce:
+ future.set_result(members)
removed.append(i)
- else:
- if passed:
- future.set_result(result)
- removed.append(i)
- if listener.type == ListenerType.chunk:
- break
for index in reversed(removed):
- del self._listeners[index]
+ del self._chunk_requests[index]
def call_handlers(self, key, *args, **kwargs):
try:
@@ -313,10 +303,6 @@ class ConnectionState:
self._add_guild(guild)
return guild
- def chunks_needed(self, guild):
- for _ in range(math.ceil(guild._member_count / 1000)):
- yield self.receive_chunk(guild.id)
-
def _get_guild_channel(self, data):
channel_id = int(data['channel_id'])
try:
@@ -333,43 +319,20 @@ class ConnectionState:
ws = self._get_websocket(guild_id) # This is ignored upstream
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
- async def request_offline_members(self, guilds):
- # get all the chunks
- chunks = []
- for guild in guilds:
- chunks.extend(self.chunks_needed(guild))
-
- # we only want to request ~75 guilds per chunk request.
- splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
- for split in splits:
- await self.chunker([g.id for g in split])
-
- # wait for the chunks
- if chunks:
- try:
- await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
- except asyncio.TimeoutError:
- log.warning('Somehow timed out waiting for chunks.')
- else:
- log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
-
async def query_members(self, guild, query, limit, user_ids, cache):
guild_id = guild.id
ws = self._get_websocket(guild_id)
if ws is None:
raise RuntimeError('Somehow do not have a websocket for this guild_id')
- # Limits over 1000 cannot be supported since
- # the main use case for this is guild_subscriptions being disabled
- # and they don't receive GUILD_MEMBER events which make computing
- # member_count impossible. The only way to fix it is by limiting
- # the limit parameter to 1 to 1000.
- nonce = self.get_nonce()
- future = self.receive_member_query(guild_id, nonce)
+ future = self.loop.create_future()
+ request = ChunkRequest(guild.id, future)
+ self._chunk_requests.append(request)
+
try:
# start the query operation
- await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=nonce)
- members = await asyncio.wait_for(future, timeout=5.0)
+ await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce)
+ members = await asyncio.wait_for(future, timeout=30.0)
if cache:
for member in members:
@@ -382,29 +345,26 @@ class ConnectionState:
async def _delay_ready(self):
try:
- launch = self._ready_state.launch
-
# only real bots wait for GUILD_CREATE streaming
if self.is_bot:
while True:
# this snippet of code is basically waiting N seconds
# until the last GUILD_CREATE was sent
try:
- await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
+ guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
except asyncio.TimeoutError:
break
else:
- launch.clear()
-
- guilds = next(zip(*self._ready_state.guilds), [])
- if self._fetch_offline:
- await self.request_offline_members(guilds)
-
- for guild, unavailable in self._ready_state.guilds:
- if unavailable is False:
- self.dispatch('guild_available', guild)
- else:
- self.dispatch('guild_join', guild)
+ try:
+ if self._fetch_offline:
+ await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
+ except asyncio.TimeoutError:
+ log.info('Timed out waiting for chunks while launching ready event.')
+ finally:
+ if guild.unavailable is False:
+ self.dispatch('guild_available', guild)
+ else:
+ self.dispatch('guild_join', guild)
# remove the state
try:
@@ -429,16 +389,13 @@ class ConnectionState:
if self._ready_task is not None:
self._ready_task.cancel()
- self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
+ self._ready_state = asyncio.Queue()
self.clear()
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user
- guilds = self._ready_state.guilds
for guild_data in data['guilds']:
- guild = self._add_guild_from_data(guild_data)
- if (not self.is_bot and not guild.unavailable) or guild.large:
- guilds.append((guild, guild.unavailable))
+ self._add_guild_from_data(guild_data)
for relationship in data.get('relationships', []):
try:
@@ -772,14 +729,18 @@ class ConnectionState:
return self._add_guild_from_data(data)
+ async def chunk_guild(self, guild):
+ future = self.loop.create_future()
+ request = ChunkRequest(guild.id, future)
+ self._chunk_requests.append(request)
+ await self.chunker(guild.id, nonce=request.nonce)
+ await request.future
+
async def _chunk_and_dispatch(self, guild, unavailable):
- chunks = list(self.chunks_needed(guild))
- await self.chunker(guild.id)
- if chunks:
- try:
- await utils.sane_wait_for(chunks, timeout=len(chunks))
- except asyncio.TimeoutError:
- log.info('Somehow timed out waiting for chunks.')
+ try:
+ await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
+ except asyncio.TimeoutError:
+ log.info('Somehow timed out waiting for chunks.')
if unavailable is False:
self.dispatch('guild_available', guild)
@@ -794,25 +755,17 @@ class ConnectionState:
guild = self._get_create_guild(data)
+ try:
+ # Notify the on_ready state, if any, that this guild is complete.
+ self._ready_state.put_nowait(guild)
+ except AttributeError:
+ pass
+ else:
+ # If we're waiting for the event, put the rest on hold
+ return
+
# check if it requires chunking
if guild.large:
- if unavailable is False:
- # check if we're waiting for 'useful' READY
- # and if we are, we don't want to dispatch any
- # event such as guild_join or guild_available
- # because we're still in the 'READY' phase. Or
- # so we say.
- try:
- state = self._ready_state
- state.launch.set()
- state.guilds.append((guild, unavailable))
- except AttributeError:
- # the _ready_state attribute is only there during
- # processing of useful READY.
- pass
- else:
- return
-
# since we're not waiting for 'useful' READY we'll just
# do the chunk request here if wanted
if self._fetch_offline:
@@ -929,8 +882,8 @@ class ConnectionState:
if existing is None or existing.joined_at is None:
guild._add_member(member)
- self.process_listeners(ListenerType.chunk, guild, len(members))
- self.process_listeners(ListenerType.query_members, (guild_id, data.get('nonce')), members)
+ if data.get('chunk_index', 0) + 1 == data.get('chunk_count'):
+ self.process_chunk_requests(guild_id, data.get('nonce'), members)
def parse_guild_integrations_update(self, data):
guild = self._get_guild(int(data['guild_id']))
@@ -1054,21 +1007,6 @@ class ConnectionState:
def create_message(self, *, channel, data):
return Message(state=self, channel=channel, data=data)
- def receive_chunk(self, guild_id):
- future = self.loop.create_future()
- listener = Listener(ListenerType.chunk, future, lambda s: s.id == guild_id)
- self._listeners.append(listener)
- return future
-
- def receive_member_query(self, guild_id, nonce):
- def predicate(args, *, guild_id=guild_id, nonce=nonce):
- return args == (guild_id, nonce)
-
- future = self.loop.create_future()
- listener = Listener(ListenerType.query_members, future, predicate)
- self._listeners.append(listener)
- return future
-
class AutoShardedConnectionState(ConnectionState):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -1091,51 +1029,31 @@ class AutoShardedConnectionState(ConnectionState):
ws = self._get_websocket(guild_id, shard_id=shard_id)
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
- async def request_offline_members(self, guilds, *, shard_id):
- # get all the chunks
- chunks = []
- for guild in guilds:
- chunks.extend(self.chunks_needed(guild))
-
- # we only want to request ~75 guilds per chunk request.
- splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
- for split in splits:
- await self.chunker([g.id for g in split], shard_id=shard_id)
-
- # wait for the chunks
- if chunks:
- try:
- await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
- except asyncio.TimeoutError:
- log.info('Somehow timed out waiting for chunks.')
- else:
- log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
-
async def _delay_ready(self):
await self.shards_launched.wait()
- launch = self._ready_state.launch
+ processed = []
while True:
# this snippet of code is basically waiting N seconds
# until the last GUILD_CREATE was sent
try:
- await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
+ guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
except asyncio.TimeoutError:
break
else:
- launch.clear()
-
- guilds = sorted(self._ready_state.guilds, key=lambda g: g[0].shard_id)
-
- for shard_id, sub_guilds_info in itertools.groupby(guilds, key=lambda g: g[0].shard_id):
- sub_guilds, sub_available = zip(*sub_guilds_info)
- if self._fetch_offline:
- await self.request_offline_members(sub_guilds, shard_id=shard_id)
+ try:
+ if self._fetch_offline:
+ await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
+ except asyncio.TimeoutError:
+ log.info('Timed out waiting for chunks while launching ready event.')
+ finally:
+ processed.append(guild)
+ if guild.unavailable is False:
+ self.dispatch('guild_available', guild)
+ else:
+ self.dispatch('guild_join', guild)
- for guild, unavailable in zip(sub_guilds, sub_available):
- if unavailable is False:
- self.dispatch('guild_available', guild)
- else:
- self.dispatch('guild_join', guild)
+ guilds = sorted(processed, key=lambda g: g.shard_id)
+ for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id):
self.dispatch('shard_ready', shard_id)
# remove the state
@@ -1155,16 +1073,13 @@ class AutoShardedConnectionState(ConnectionState):
def parse_ready(self, data):
if not hasattr(self, '_ready_state'):
- self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
+ self._ready_state = asyncio.Queue()
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user
- guilds = self._ready_state.guilds
for guild_data in data['guilds']:
- guild = self._add_guild_from_data(guild_data)
- if guild.large:
- guilds.append((guild, guild.unavailable))
+ self._add_guild_from_data(guild_data)
if self._messages:
self._update_message_references()