diff options
| author | Rapptz <[email protected]> | 2018-06-10 18:09:14 -0400 |
|---|---|---|
| committer | Rapptz <[email protected]> | 2018-06-10 18:10:00 -0400 |
| commit | f25091efe1281aebe70189c61f9cac405b21a72f (patch) | |
| tree | d0d13dad1a89de9f45845a36ea475098b7a0b494 /discord/gateway.py | |
| parent | Add Message.jump_to_url (diff) | |
| download | discord.py-f25091efe1281aebe70189c61f9cac405b21a72f.tar.xz discord.py-f25091efe1281aebe70189c61f9cac405b21a72f.zip | |
Drop support for Python 3.4 and make minimum version 3.5.2.
Diffstat (limited to 'discord/gateway.py')
| -rw-r--r-- | discord/gateway.py | 152 |
1 files changed, 65 insertions, 87 deletions
diff --git a/discord/gateway.py b/discord/gateway.py index 4e100753..d463ff73 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -71,7 +71,7 @@ class KeepAliveHandler(threading.Thread): if self._last_ack + self.heartbeat_timeout < time.monotonic(): log.warn("Shard ID %s has stopped responding to the gateway. Closing and restarting." % self.shard_id) coro = self.ws.close(4000) - f = compat.run_coroutine_threadsafe(coro, loop=self.ws.loop) + f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop) try: f.result() @@ -84,7 +84,7 @@ class KeepAliveHandler(threading.Thread): data = self.get_payload() log.debug(self.msg, data['d']) coro = self.ws.send_as_json(data) - f = compat.run_coroutine_threadsafe(coro, loop=self.ws.loop) + f = asyncio.run_coroutine_threadsafe(coro, loop=self.ws.loop) try: # block until sending is complete f.result() @@ -190,14 +190,13 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): self._buffer = bytearray() @classmethod - @asyncio.coroutine - def from_client(cls, client, *, shard_id=None, session=None, sequence=None, resume=False): + async def from_client(cls, client, *, shard_id=None, session=None, sequence=None, resume=False): """Creates a main websocket for Discord from a :class:`Client`. This is for internal use only. """ - gateway = yield from client.http.get_gateway() - ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls) + gateway = await client.http.get_gateway() + ws = await websockets.connect(gateway, loop=client.loop, klass=cls) # dynamically add attributes needed ws.token = client.http.token @@ -215,19 +214,19 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): log.info('Created websocket connected to %s', gateway) # poll event for OP Hello - yield from ws.poll_event() + await ws.poll_event() if not resume: - yield from ws.identify() + await ws.identify() return ws - yield from ws.resume() + await ws.resume() try: - yield from ws.ensure_open() + await ws.ensure_open() except websockets.exceptions.ConnectionClosed: # ws got closed so let's just do a regular IDENTIFY connect. log.info('RESUME failed (the websocket decided to close) for Shard ID %s. Retrying.', shard_id) - return (yield from cls.from_client(client, shard_id=shard_id)) + return (await cls.from_client(client, shard_id=shard_id)) else: return ws @@ -251,13 +250,12 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): A future to wait for. """ - future = compat.create_future(self.loop) + future = self.loop.create_future() entry = EventListener(event=event, predicate=predicate, result=result, future=future) self._dispatch_listeners.append(entry) return future - @asyncio.coroutine - def identify(self): + async def identify(self): """Sends the IDENTIFY packet.""" payload = { 'op': self.IDENTIFY, @@ -291,11 +289,10 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): 'afk': False } - yield from self.send_as_json(payload) + await self.send_as_json(payload) log.info('Shard ID %s has sent the IDENTIFY payload.', self.shard_id) - @asyncio.coroutine - def resume(self): + async def resume(self): """Sends the RESUME packet.""" payload = { 'op': self.RESUME, @@ -306,11 +303,10 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): } } - yield from self.send_as_json(payload) + await self.send_as_json(payload) log.info('Shard ID %s has sent the RESUME payload.', self.shard_id) - @asyncio.coroutine - def received_message(self, msg): + async def received_message(self, msg): self._dispatch('socket_raw_receive', msg) if isinstance(msg, bytes): @@ -342,7 +338,7 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): # so we terminate our connection and raise an # internal exception signalling to reconnect. log.info('Received RECONNECT opcode.') - yield from self.close() + await self.close() raise ResumeWebSocket(self.shard_id) if op == self.HEARTBEAT_ACK: @@ -351,27 +347,27 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): if op == self.HEARTBEAT: beat = self._keep_alive.get_payload() - yield from self.send_as_json(beat) + await self.send_as_json(beat) return if op == self.HELLO: interval = data['heartbeat_interval'] / 1000.0 self._keep_alive = KeepAliveHandler(ws=self, interval=interval, shard_id=self.shard_id) # send a heartbeat immediately - yield from self.send_as_json(self._keep_alive.get_payload()) + await self.send_as_json(self._keep_alive.get_payload()) self._keep_alive.start() return if op == self.INVALIDATE_SESSION: if data == True: - yield from asyncio.sleep(5.0, loop=self.loop) - yield from self.close() + await asyncio.sleep(5.0, loop=self.loop) + await self.close() raise ResumeWebSocket(self.shard_id) self.sequence = None self.session_id = None log.info('Shard ID %s session has been invalidated.' % self.shard_id) - yield from self.identify() + await self.identify() return if op != self.DISPATCH: @@ -435,8 +431,7 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): def _can_handle_close(self, code): return code not in (1000, 4004, 4010, 4011) - @asyncio.coroutine - def poll_event(self): + async def poll_event(self): """Polls for a DISPATCH event and handles the general gateway loop. Raises @@ -445,8 +440,8 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): The websocket connection was terminated for unhandled reasons. """ try: - msg = yield from self.recv() - yield from self.received_message(msg) + msg = await self.recv() + await self.received_message(msg) except websockets.exceptions.ConnectionClosed as e: if self._can_handle_close(e.code): log.info('Websocket closed with %s (%s), attempting a reconnect.', e.code, e.reason) @@ -455,21 +450,18 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): log.info('Websocket closed with %s (%s), cannot reconnect.', e.code, e.reason) raise ConnectionClosed(e, shard_id=self.shard_id) from e - @asyncio.coroutine - def send(self, data): + async def send(self, data): self._dispatch('socket_raw_send', data) - yield from super().send(data) + await super().send(data) - @asyncio.coroutine - def send_as_json(self, data): + async def send_as_json(self, data): try: - yield from super().send(utils.to_json(data)) + await super().send(utils.to_json(data)) except websockets.exceptions.ConnectionClosed as e: if not self._can_handle_close(e.code): raise ConnectionClosed(e, shard_id=self.shard_id) from e - @asyncio.coroutine - def change_presence(self, *, activity=None, status=None, afk=False, since=0.0): + async def change_presence(self, *, activity=None, status=None, afk=False, since=0.0): if activity is not None: if not isinstance(activity, _ActivityTag): raise InvalidArgument('activity must be one of Game, Streaming, or Activity.') @@ -490,18 +482,16 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): sent = utils.to_json(payload) log.debug('Sending "%s" to change status', sent) - yield from self.send(sent) + await self.send(sent) - @asyncio.coroutine - def request_sync(self, guild_ids): + async def request_sync(self, guild_ids): payload = { 'op': self.GUILD_SYNC, 'd': list(guild_ids) } - yield from self.send_as_json(payload) + await self.send_as_json(payload) - @asyncio.coroutine - def voice_state(self, guild_id, channel_id, self_mute=False, self_deaf=False): + async def voice_state(self, guild_id, channel_id, self_mute=False, self_deaf=False): payload = { 'op': self.VOICE_STATE, 'd': { @@ -513,14 +503,13 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): } log.debug('Updating our voice state to %s.', payload) - yield from self.send_as_json(payload) + await self.send_as_json(payload) - @asyncio.coroutine - def close_connection(self, *args, **kwargs): + async def close_connection(self, *args, **kwargs): if self._keep_alive: self._keep_alive.stop() - yield from super().close_connection(*args, **kwargs) + await super().close_connection(*args, **kwargs) class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): """Implements the websocket protocol for handling voice connections. @@ -565,13 +554,11 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): self.max_size = None self._keep_alive = None - @asyncio.coroutine - def send_as_json(self, data): + async def send_as_json(self, data): log.debug('Sending voice websocket frame: %s.', data) - yield from self.send(utils.to_json(data)) + await self.send(utils.to_json(data)) - @asyncio.coroutine - def resume(self): + async def resume(self): state = self._connection payload = { 'op': self.RESUME, @@ -581,10 +568,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): 'session_id': state.session_id } } - yield from self.send_as_json(payload) + await self.send_as_json(payload) - @asyncio.coroutine - def identify(self): + async def identify(self): state = self._connection payload = { 'op': self.IDENTIFY, @@ -595,27 +581,25 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): 'token': state.token } } - yield from self.send_as_json(payload) + await self.send_as_json(payload) @classmethod - @asyncio.coroutine - def from_client(cls, client, *, resume=False): + async def from_client(cls, client, *, resume=False): """Creates a voice websocket for the :class:`VoiceClient`.""" gateway = 'wss://' + client.endpoint + '/?v=3' - ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls) + ws = await websockets.connect(gateway, loop=client.loop, klass=cls) ws.gateway = gateway ws._connection = client ws._max_heartbeat_timeout = 60.0 if resume: - yield from ws.resume() + await ws.resume() else: - yield from ws.identify() + await ws.identify() return ws - @asyncio.coroutine - def select_protocol(self, ip, port): + async def select_protocol(self, ip, port): payload = { 'op': self.SELECT_PROTOCOL, 'd': { @@ -628,10 +612,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): } } - yield from self.send_as_json(payload) + await self.send_as_json(payload) - @asyncio.coroutine - def speak(self, is_speaking=True): + async def speak(self, is_speaking=True): payload = { 'op': self.SPEAKING, 'd': { @@ -640,10 +623,9 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): } } - yield from self.send_as_json(payload) + await self.send_as_json(payload) - @asyncio.coroutine - def received_message(self, msg): + async def received_message(self, msg): log.debug('Voice websocket frame received: %s', msg) op = msg['op'] data = msg.get('d') @@ -652,17 +634,16 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): interval = data['heartbeat_interval'] / 1000.0 self._keep_alive = VoiceKeepAliveHandler(ws=self, interval=interval) self._keep_alive.start() - yield from self.initial_connection(data) + await self.initial_connection(data) elif op == self.HEARTBEAT_ACK: self._keep_alive.ack() elif op == self.INVALIDATE_SESSION: log.info('Voice RESUME failed.') - yield from self.identify() + await self.identify() elif op == self.SESSION_DESCRIPTION: - yield from self.load_secret_key(data) + await self.load_secret_key(data) - @asyncio.coroutine - def initial_connection(self, data): + async def initial_connection(self, data): state = self._connection state.ssrc = data['ssrc'] state.voice_port = data['port'] @@ -670,7 +651,7 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): packet = bytearray(70) struct.pack_into('>I', packet, 0, state.ssrc) state.socket.sendto(packet, (state.endpoint_ip, state.voice_port)) - recv = yield from self.loop.sock_recv(state.socket, 70) + recv = await self.loop.sock_recv(state.socket, 70) log.debug('received packet in initial_connection: %s', recv) # the ip is ascii starting at the 4th byte and ending at the first null @@ -683,28 +664,25 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): state.port = struct.unpack_from('<H', recv, len(recv) - 2)[0] log.debug('detected ip: %s port: %s', state.ip, state.port) - yield from self.select_protocol(state.ip, state.port) + await self.select_protocol(state.ip, state.port) log.info('selected the voice protocol for use') - @asyncio.coroutine - def load_secret_key(self, data): + async def load_secret_key(self, data): log.info('received secret key for voice connection') self._connection.secret_key = data.get('secret_key') - yield from self.speak() + await self.speak() - @asyncio.coroutine - def poll_event(self): + async def poll_event(self): try: - msg = yield from asyncio.wait_for(self.recv(), timeout=30.0, loop=self.loop) - yield from self.received_message(json.loads(msg)) + msg = await asyncio.wait_for(self.recv(), timeout=30.0, loop=self.loop) + await self.received_message(json.loads(msg)) except websockets.exceptions.ConnectionClosed as e: raise ConnectionClosed(e, shard_id=None) from e - @asyncio.coroutine - def close_connection(self, *args, **kwargs): + async def close_connection(self, *args, **kwargs): if self._keep_alive: self._keep_alive.stop() - yield from super().close_connection(*args, **kwargs) + await super().close_connection(*args, **kwargs) |