aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--discord/__init__.py2
-rw-r--r--discord/abc.py69
-rw-r--r--discord/channel.py8
-rw-r--r--discord/client.py8
-rw-r--r--discord/gateway.py78
-rw-r--r--discord/opus.py19
-rw-r--r--discord/player.py248
-rw-r--r--discord/shard.py5
-rw-r--r--discord/state.py10
-rw-r--r--discord/voice_client.py664
-rw-r--r--docs/api.rst15
11 files changed, 599 insertions, 527 deletions
diff --git a/discord/__init__.py b/discord/__init__.py
index 6f0b735e..fe3e9790 100644
--- a/discord/__init__.py
+++ b/discord/__init__.py
@@ -40,6 +40,8 @@ from .enums import *
from collections import namedtuple
from .embeds import Embed
from .shard import AutoShardedClient
+from .player import *
+from .voice_client import VoiceClient
import logging
diff --git a/discord/abc.py b/discord/abc.py
index 9c94c5dc..07d72093 100644
--- a/discord/abc.py
+++ b/discord/abc.py
@@ -25,19 +25,18 @@ DEALINGS IN THE SOFTWARE.
"""
import abc
-import io
-import os
import asyncio
from collections import namedtuple
from .iterators import HistoryIterator
from .context_managers import Typing
-from .errors import InvalidArgument
+from .errors import InvalidArgument, ClientException
from .permissions import PermissionOverwrite, Permissions
from .role import Role
from .invite import Invite
from .file import File
+from .voice_client import VoiceClient
from . import utils, compat
class _Undefined:
@@ -783,3 +782,67 @@ class Messageable(metaclass=abc.ABCMeta):
counter += 1
"""
return HistoryIterator(self, limit=limit, before=before, after=after, around=around, reverse=reverse)
+
+
+class Callable(metaclass=abc.ABCMeta):
+ __slots__ = ()
+
+ @abc.abstractmethod
+ def _get_voice_client_key(self):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _get_voice_state_pair(self):
+ raise NotImplementedError
+
+ @asyncio.coroutine
+ def connect(self, *, timeout=10.0, reconnect=True):
+ """|coro|
+
+ Connects to voice and creates a :class:`VoiceClient` to establish
+ your connection to the voice server.
+
+ Parameters
+ -----------
+ timeout: float
+ The timeout in seconds to wait for the
+ initial handshake to be completed.
+ reconnect: bool
+ Whether the bot should automatically attempt
+ a reconnect if a part of the handshake fails
+ or the gateway goes down.
+
+ Raises
+ -------
+ asyncio.TimeoutError
+ Could not connect to the voice channel in time.
+ ClientException
+ You are already connected to a voice channel.
+ OpusNotLoaded
+ The opus library has not been loaded.
+
+ Returns
+ -------
+ :class:`VoiceClient`
+ A voice client that is fully connected to the voice server.
+ """
+ key_id, key_name = self._get_voice_client_key()
+ state = self._state
+
+ if state._get_voice_client(key_id):
+ raise ClientException('Already connected to a voice channel.')
+
+ voice = VoiceClient(state=state, timeout=timeout, channel=self)
+
+ try:
+ yield from voice.connect(reconnect=reconnect)
+ except asyncio.TimeoutError as e:
+ try:
+ yield from voice.disconnect()
+ except:
+ # we don't care if disconnect failed because connection failed
+ pass
+ raise e # re-raise
+
+ state._add_voice_client(key_id, voice)
+ return voice
diff --git a/discord/channel.py b/discord/channel.py
index 67259f44..6478c47b 100644
--- a/discord/channel.py
+++ b/discord/channel.py
@@ -290,7 +290,7 @@ class TextChannel(discord.abc.Messageable, discord.abc.GuildChannel, Hashable):
count += 1
ret.append(msg)
-class VoiceChannel(discord.abc.GuildChannel, Hashable):
+class VoiceChannel(discord.abc.Callable, discord.abc.GuildChannel, Hashable):
"""Represents a Discord guild voice channel.
Supported Operations:
@@ -335,6 +335,12 @@ class VoiceChannel(discord.abc.GuildChannel, Hashable):
def __repr__(self):
return '<VoiceChannel id={0.id} name={0.name!r} position={0.position}>'.format(self)
+ def _get_voice_client_key(self):
+ return self.guild.id, 'guild_id'
+
+ def _get_voice_state_pair(self):
+ return self.guild.id, self.id
+
def _update(self, guild, data):
self.guild = guild
self.name = data['name']
diff --git a/discord/client.py b/discord/client.py
index dca01a4e..c8a81d33 100644
--- a/discord/client.py
+++ b/discord/client.py
@@ -31,6 +31,7 @@ from .errors import *
from .permissions import Permissions, PermissionOverwrite
from .enums import ChannelType, Status
from .gateway import *
+from .voice_client import VoiceClient
from .emoji import Emoji
from .http import HTTPClient
from .state import ConnectionState
@@ -119,10 +120,11 @@ class Client:
self.connection.shard_count = self.shard_count
self._closed = asyncio.Event(loop=self.loop)
self._ready = asyncio.Event(loop=self.loop)
+ self.connection._get_websocket = lambda g: self.ws
- # if VoiceClient.warn_nacl:
- # VoiceClient.warn_nacl = False
- # log.warning("PyNaCl is not installed, voice will NOT be supported")
+ if VoiceClient.warn_nacl:
+ VoiceClient.warn_nacl = False
+ log.warning("PyNaCl is not installed, voice will NOT be supported")
# internals
diff --git a/discord/gateway.py b/discord/gateway.py
index 9a6991e8..f8e50061 100644
--- a/discord/gateway.py
+++ b/discord/gateway.py
@@ -109,7 +109,6 @@ class VoiceKeepAliveHandler(KeepAliveHandler):
self.msg = 'Keeping voice websocket alive with timestamp {0[d]}'
def get_payload(self):
- self.ack()
return {
'op': self.ws.HEARTBEAT,
'd': int(time.time() * 1000)
@@ -481,12 +480,9 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol):
}
}
+ log.debug('Updating our voice state to %s.', payload)
yield from self.send_as_json(payload)
- # we're leaving a voice channel so remove it from the client list
- if channel_id is None:
- self._connection._remove_voice_client(guild_id)
-
@asyncio.coroutine
def close_connection(self, force=False):
if self._keep_alive:
@@ -511,6 +507,14 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
Receive only. Gives you the secret key required for voice.
SPEAKING
Send only. Notifies the client if you are currently speaking.
+ HEARTBEAT_ACK
+ Receive only. Tells you your heartbeat has been acknowledged.
+ RESUME
+ Sent only. Tells the client to resume its session.
+ HELLO
+ Receive only. Tells you that your websocket connection was acknowledged.
+ INVALIDATE_SESSION
+ Sent only. Tells you that your RESUME request has failed and to re-IDENTIFY.
"""
IDENTIFY = 0
@@ -519,6 +523,10 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
HEARTBEAT = 3
SESSION_DESCRIPTION = 4
SPEAKING = 5
+ HEARTBEAT_ACK = 6
+ RESUME = 7
+ HELLO = 8
+ INVALIDATE_SESSION = 9
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -527,28 +535,50 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
@asyncio.coroutine
def send_as_json(self, data):
+ log.debug('Sending voice websocket frame: %s.', data)
yield from self.send(utils.to_json(data))
+ @asyncio.coroutine
+ def resume(self):
+ state = self._connection
+ payload = {
+ 'op': self.RESUME,
+ 'd': {
+ 'token': state.token,
+ 'server_id': str(state.server_id),
+ 'session_id': state.session_id
+ }
+ }
+ yield from self.send_as_json(payload)
+
+ @asyncio.coroutine
+ def identify(self):
+ state = self._connection
+ payload = {
+ 'op': self.IDENTIFY,
+ 'd': {
+ 'server_id': str(state.server_id),
+ 'user_id': str(state.user.id),
+ 'session_id': state.session_id,
+ 'token': state.token
+ }
+ }
+ yield from self.send_as_json(payload)
+
@classmethod
@asyncio.coroutine
- def from_client(cls, client):
+ def from_client(cls, client, *, resume=False):
"""Creates a voice websocket for the :class:`VoiceClient`."""
- gateway = 'wss://' + client.endpoint
+ gateway = 'wss://' + client.endpoint + '/?v=3'
ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls)
ws.gateway = gateway
ws._connection = client
- identify = {
- 'op': cls.IDENTIFY,
- 'd': {
- 'guild_id': client.guild_id,
- 'user_id': client.user.id,
- 'session_id': client.session_id,
- 'token': client.token
- }
- }
+ if resume:
+ yield from ws.resume()
+ else:
+ yield from ws.identify()
- yield from ws.send_as_json(identify)
return ws
@asyncio.coroutine
@@ -566,7 +596,6 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
}
yield from self.send_as_json(payload)
- log.debug('Selected protocol as {}'.format(payload))
@asyncio.coroutine
def speak(self, is_speaking=True):
@@ -579,12 +608,11 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
}
yield from self.send_as_json(payload)
- log.debug('Voice speaking now set to {}'.format(is_speaking))
@asyncio.coroutine
def received_message(self, msg):
log.debug('Voice websocket frame received: {}'.format(msg))
- op = msg.get('op')
+ op = msg['op']
data = msg.get('d')
if op == self.READY:
@@ -592,14 +620,20 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol):
self._keep_alive = VoiceKeepAliveHandler(ws=self, interval=interval)
self._keep_alive.start()
yield from self.initial_connection(data)
+ elif op == self.HEARTBEAT_ACK:
+ self._keep_alive.ack()
+ elif op == self.INVALIDATE_SESSION:
+ log.info('Voice RESUME failed.')
+ yield from self.identify()
elif op == self.SESSION_DESCRIPTION:
yield from self.load_secret_key(data)
@asyncio.coroutine
def initial_connection(self, data):
state = self._connection
- state.ssrc = data.get('ssrc')
- state.voice_port = data.get('port')
+ state.ssrc = data['ssrc']
+ state.voice_port = data['port']
+
packet = bytearray(70)
struct.pack_into('>I', packet, 0, state.ssrc)
state.socket.sendto(packet, (state.endpoint_ip, state.voice_port))
diff --git a/discord/opus.py b/discord/opus.py
index 911501c1..fcf27a72 100644
--- a/discord/opus.py
+++ b/discord/opus.py
@@ -183,15 +183,16 @@ signal_ctl = {
}
class Encoder:
- def __init__(self, sampling, channels, application=APPLICATION_AUDIO):
- self.sampling_rate = sampling
- self.channels = channels
- self.application = application
+ SAMPLING_RATE = 48000
+ CHANNELS = 2
+ FRAME_LENGTH = 20
+ SAMPLE_SIZE = 4 # (bit_rate / 8) * CHANNELS (bit_rate == 16)
+ SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH)
+
+ FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE
- self.frame_length = 20
- self.sample_size = 2 * self.channels # (bit_rate / 8) but bit_rate == 16
- self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length)
- self.frame_size = self.samples_per_frame * self.sample_size
+ def __init__(self, application=APPLICATION_AUDIO):
+ self.application = application
if not is_loaded():
raise OpusNotLoaded()
@@ -210,7 +211,7 @@ class Encoder:
def _create_state(self):
ret = ctypes.c_int()
- result = _lib.opus_encoder_create(self.sampling_rate, self.channels, self.application, ctypes.byref(ret))
+ result = _lib.opus_encoder_create(self.SAMPLING_RATE, self.CHANNELS, self.application, ctypes.byref(ret))
if ret.value != 0:
log.info('error has happened in state creation')
diff --git a/discord/player.py b/discord/player.py
new file mode 100644
index 00000000..adda3584
--- /dev/null
+++ b/discord/player.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+
+"""
+The MIT License (MIT)
+
+Copyright (c) 2015-2017 Rapptz
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+"""
+
+import threading
+import subprocess
+import shlex
+import time
+
+from .errors import ClientException
+from .opus import Encoder as OpusEncoder
+
+__all__ = [ 'AudioSource', 'PCMAudio', 'FFmpegPCMAudio' ]
+
+class AudioSource:
+ """Represents an audio stream.
+
+ The audio stream can be Opus encoded or not, however if the audio stream
+ is not Opus encoded then the audio format must be 16-bit 48KHz stereo PCM.
+
+ .. warning::
+
+ The audio source reads are done in a separate thread.
+ """
+
+ def read(self):
+ """Reads 20ms worth of audio.
+
+ Subclasses must implement this.
+
+ If the audio is complete, then returning an empty *bytes-like* object
+ to signal this is the way to do so.
+
+ If :meth:`is_opus` method returns ``True``, then it must return
+ 20ms worth of Opus encoded audio. Otherwise, it must be 20ms
+ worth of 16-bit 48KHz stereo PCM, which is about 3,840 bytes
+ per frame (20ms worth of audio).
+
+ Returns
+ --------
+ bytes
+ A bytes like object that represents the PCM or Opus data.
+ """
+ raise NotImplementedError
+
+ def is_opus(self):
+ """Checks if the audio source is already encoded in Opus.
+
+ Defaults to ``False``.
+ """
+ return False
+
+ def cleanup(self):
+ """Called when clean-up is needed to be done.
+
+ Useful for clearing buffer data or processes after
+ it is done playing audio.
+ """
+ pass
+
+class PCMAudio(AudioSource):
+ """Represents raw 16-bit 48KHz stereo PCM audio source.
+
+ Attributes
+ -----------
+ stream: file-like object
+ A file-like object that reads byte data representing raw PCM.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+
+ def read(self):
+ return self.stream.read(OpusEncoder.FRAME_SIZE)
+
+class FFmpegPCMAudio(AudioSource):
+ """An audio source from FFmpeg (or AVConv).
+
+ This launches a sub-process to a specific input file given.
+
+ .. warning::
+
+ You must have the ffmpeg or avconv executable in your path environment
+ variable in order for this to work.
+
+ Parameters
+ ------------
+ source: Union[str, BinaryIO]
+ The input that ffmpeg will take and convert to PCM bytes.
+ If ``pipe`` is True then this is a file-like object that is
+ passed to the stdin of ffmpeg.
+ executable: str
+ The executable name (and path) to use. Defaults to ``ffmpeg``.
+ pipe: bool
+ If true, denotes that ``source`` parameter will be passed
+ to the stdin of ffmpeg. Defaults to ``False``.
+ stderr: Optional[BinaryIO]
+ A file-like object to pass to the Popen constructor.
+ Could also be an instance of ``subprocess.PIPE``.
+ options: Optional[str]
+ Extra command line arguments to pass to ffmpeg after the ``-i`` flag.
+ before_options: Optional[str]
+ Extra command line arguments to pass to ffmpeg before the ``-i`` flag.
+
+ Raises
+ --------
+ ClientException
+ The subprocess failed to be created.
+ """
+
+ def __init__(self, source, *, executable='ffmpeg', pipe=False, stderr=None, before_options=None, options=None):
+ stdin = None if not pipe else source
+
+ args = [executable]
+
+ if isinstance(before_options, str):
+ args.extend(shlex.split(before_options))
+
+ args.append('-i')
+ args.append('-' if pipe else shlex.quote(source))
+ args.extend(('-f', 's16le', '-ar', '48000', '-ac', '2', '-loglevel', 'warning'))
+
+ if isinstance(options, str):
+ args.extend(shlex.split(options))
+
+ args.append('pipe:1')
+
+ try:
+ self._process = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
+ self._stdout = self._process.stdout
+ except FileNotFoundError:
+ raise ClientException(executable + ' was not found.') from None
+ except subprocess.SubprocessError as e:
+ raise ClientException('Popen failed: {0.__class__.__name__}: {0}'.format(e)) from e
+
+ def read(self):
+ return self._stdout.read(OpusEncoder.FRAME_SIZE)
+
+ def cleanup(self):
+ proc = self._process
+ proc.kill()
+ if proc.poll() is None:
+ proc.communicate()
+
+class AudioPlayer(threading.Thread):
+ DELAY = OpusEncoder.FRAME_LENGTH / 1000.0
+
+ def __init__(self, source, client, *, after=None):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.source = source
+ self.client = client
+ self.after = after
+
+ self._end = threading.Event()
+ self._resumed = threading.Event()
+ self._resumed.set() # we are not paused
+ self._current_error = None
+ self._connected = client._connected
+
+ if after is not None and not callable(after):
+ raise TypeError('Expected a callable for the "after" parameter.')
+
+ def _do_run(self):
+ self.loops = 0
+ self._start = time.time()
+ is_opus = self.source.is_opus()
+
+ # getattr lookup speed ups
+ play_audio = self.client.send_audio_packet
+
+ while not self._end.is_set():
+ # are we paused?
+ if not self._resumed.is_set():
+ # wait until we aren't
+ self._resumed.wait()
+
+ # are we disconnected from voice?
+ if not self._connected.is_set():
+ # wait until we are connected
+ self._connected.wait()
+ # reset our internal data
+ self.loops = 0
+ self._start = time.time()
+
+ self.loops += 1
+ data = self.source.read()
+
+ if not data:
+ self.stop()
+ break
+
+ play_audio(data, encode=not is_opus)
+ next_time = self._start + self.DELAY * self.loops
+ delay = max(0, self.DELAY + (next_time - time.time()))
+ time.sleep(delay)
+
+ def run(self):
+ try:
+ self._do_run()
+ except Exception as e:
+ self._current_error = e
+ self.stop()
+ finally:
+ self._call_after()
+ self.source.cleanup()
+
+ def _call_after(self):
+ if self.after is not None:
+ try:
+ self.after(self._current_error)
+ except:
+ pass
+
+ def stop(self):
+ self._end.set()
+
+ def pause(self):
+ self._resumed.clear()
+
+ def resume(self):
+ self.loops = 0
+ self._start = time.time()
+ self._resumed.set()
+
+ def is_playing(self):
+ return self._resumed.is_set() and not self._end.is_set()
diff --git a/discord/shard.py b/discord/shard.py
index 62bd9a3c..d3d54ab5 100644
--- a/discord/shard.py
+++ b/discord/shard.py
@@ -110,6 +110,11 @@ class AutoShardedClient(Client):
# the key is the shard_id
self.shards = {}
+ def _get_websocket(guild_id):
+ i = (guild_id >> 22) % self.shard_count
+ return self.shards[i].ws
+
+ self.connection._get_websocket = _get_websocket
self._still_sharding = True
@asyncio.coroutine
diff --git a/discord/state.py b/discord/state.py
index 8d0757dd..10f6d16e 100644
--- a/discord/state.py
+++ b/discord/state.py
@@ -688,6 +688,16 @@ class ConnectionState:
if call is not None:
call._update_voice_state(data)
+ def parse_voice_server_update(self, data):
+ try:
+ key_id = int(data['guild_id'])
+ except KeyError:
+ key_id = int(data['channel_id'])
+
+ vc = self._get_voice_client(key_id)
+ if vc is not None and vc.is_connected():
+ compat.create_task(vc._switch_regions())
+
def parse_typing_start(self, data):
channel = self.get_channel(int(data['channel_id']))
if channel is not None:
diff --git a/discord/voice_client.py b/discord/voice_client.py
index 75763794..89f5ab0a 100644
--- a/discord/voice_client.py
+++ b/discord/voice_client.py
@@ -29,9 +29,9 @@ DEALINGS IN THE SOFTWARE.
- Our main web socket (mWS) sends opcode 4 with a guild ID and channel ID.
- The mWS receives VOICE_STATE_UPDATE and VOICE_SERVER_UPDATE.
- We pull the session_id from VOICE_STATE_UPDATE.
-- We pull the token, endpoint and guild_id from VOICE_SERVER_UPDATE.
+- We pull the token, endpoint and server_id from VOICE_SERVER_UPDATE.
- Then we initiate the voice web socket (vWS) pointing to the endpoint.
-- We send opcode 0 with the user_id, guild_id, session_id and token using the vWS.
+- We send opcode 0 with the user_id, server_id, session_id and token using the vWS.
- The vWS sends back opcode 2 with an ssrc, port, modes(array) and hearbeat_interval.
- We send a UDP discovery packet to endpoint:port and receive our IP and our port in LE.
- Then we send our IP and port via vWS with opcode 1.
@@ -40,18 +40,10 @@ DEALINGS IN THE SOFTWARE.
"""
import asyncio
-import websockets
import socket
-import json, time
import logging
import struct
import threading
-import subprocess
-import shlex
-import functools
-import datetime
-import audioop
-import inspect
log = logging.getLogger(__name__)
@@ -62,123 +54,10 @@ except ImportError:
has_nacl = False
from . import opus
+from .backoff import ExponentialBackoff
from .gateway import *
-from .errors import ClientException, InvalidArgument, ConnectionClosed
-
-class StreamPlayer(threading.Thread):
- def __init__(self, stream, encoder, connected, player, after, **kwargs):
- threading.Thread.__init__(self, **kwargs)
- self.daemon = True
- self.buff = stream
- self.frame_size = encoder.frame_size
- self.player = player
- self._end = threading.Event()
- self._resumed = threading.Event()
- self._resumed.set() # we are not paused
- self._connected = connected
- self.after = after
- self.delay = encoder.frame_length / 1000.0
- self._volume = 1.0
- self._current_error = None
-
- if after is not None and not callable(after):
- raise TypeError('Expected a callable for the "after" parameter.')
-
- def _do_run(self):
- self.loops = 0
- self._start = time.time()
- while not self._end.is_set():
- # are we paused?
- if not self._resumed.is_set():
- # wait until we aren't
- self._resumed.wait()
-
- if not self._connected.is_set():
- self.stop()
- break
-
- self.loops += 1
- data = self.buff.read(self.frame_size)
-
- if self._volume != 1.0:
- data = audioop.mul(data, 2, min(self._volume, 2.0))
-
- if len(data) != self.frame_size:
- self.stop()
- break
-
- self.player(data)
- next_time = self._start + self.delay * self.loops
- delay = max(0, self.delay + (next_time - time.time()))
- time.sleep(delay)
-
- def run(self):
- try:
- self._do_run()
- except Exception as e:
- self._current_error = e
- self.stop()
- finally:
- self._call_after()
-
- def _call_after(self):
- if self.after is not None:
- try:
- arg_count = len(inspect.signature(self.after).parameters)
- except:
- # if this ended up happening, a mistake was made.
- arg_count = 0
-
- try:
- if arg_count == 0:
- self.after()
- else:
- self.after(self)
- except:
- pass
-
- def stop(self):
- self._end.set()
-
- @property
- def error(self):
- return self._current_error
-
- @property
- def volume(self):
- return self._volume
-
- @volume.setter
- def volume(self, value):
- self._volume = max(value, 0.0)
-
- def pause(self):
- self._resumed.clear()
-
- def resume(self):
- self.loops = 0
- self._start = time.time()
- self._resumed.set()
-
- def is_playing(self):
- return self._resumed.is_set() and not self.is_done()
-
- def is_done(self):
- return not self._connected.is_set() or self._end.is_set()
-
-class ProcessPlayer(StreamPlayer):
- def __init__(self, process, client, after, **kwargs):
- super().__init__(process.stdout, client.encoder,
- client._connected, client.play_audio, after, **kwargs)
- self.process = process
-
- def run(self):
- super().run()
-
- self.process.kill()
- if self.process.poll() is None:
- self.process.communicate()
-
+from .errors import ClientException, ConnectionClosed
+from .player import AudioPlayer, AudioSource
class VoiceClient:
"""Represents a Discord voice connection.
@@ -196,45 +75,46 @@ class VoiceClient:
Attributes
-----------
- session_id : str
+ session_id: str
The voice connection session ID.
- token : str
+ token: str
The voice connection token.
- user : :class:`User`
- The user connected to voice.
- endpoint : str
+ endpoint: str
The endpoint we are connecting to.
- channel : :class:`Channel`
+ channel: :class:`Channel`
The voice channel connected to.
- guild : :class:`Guild`
- The guild the voice channel is connected to.
- Shorthand for ``channel.guild``.
loop
The event loop that the voice client is running on.
"""
- def __init__(self, user, main_ws, session_id, channel, data, loop):
+ def __init__(self, state, timeout, channel):
if not has_nacl:
raise RuntimeError("PyNaCl library needed in order to use voice")
- self.user = user
- self.main_ws = main_ws
self.channel = channel
- self.session_id = session_id
- self.loop = loop
- self._connected = asyncio.Event(loop=self.loop)
- self.token = data.get('token')
- self.guild_id = data.get('guild_id')
- self.endpoint = data.get('endpoint')
+ self.main_ws = None
+ self.timeout = timeout
+ self.loop = state.loop
+ self._state = state
+ # this will be used in the AudioPlayer thread
+ self._connected = threading.Event()
+ self._connections = 0
self.sequence = 0
self.timestamp = 0
- self.encoder = opus.Encoder(48000, 2)
- log.info('created opus encoder with {0.__dict__}'.format(self.encoder))
+ self._runner = None
+ self._player = None
+ self.encoder = opus.Encoder()
warn_nacl = not has_nacl
@property
def guild(self):
- return self.channel.guild
+ """Optional[:class:`Guild`]: The guild we're connected to, if applicable."""
+ return getattr(self.channel, 'guild', None)
+
+ @property
+ def user(self):
+ """:class:`ClientUser`: The user connected to voice (i.e. ourselves)."""
+ return self._state.user
def checked_add(self, attr, value, limit):
val = getattr(self, attr)
@@ -246,56 +126,127 @@ class VoiceClient:
# connection related
@asyncio.coroutine
- def connect(self):
- log.info('voice connection is connecting...')
- self.endpoint = self.endpoint.replace(':80', '')
+ def start_handshake(self):
+ log.info('Starting voice handshake...')
+
+ key_id, key_name = self.channel._get_voice_client_key()
+ guild_id, channel_id = self.channel._get_voice_state_pair()
+ state = self._state
+ self.main_ws = ws = state._get_websocket(guild_id)
+ self._connections += 1
+
+ def session_id_found(data):
+ user_id = data.get('user_id', 0)
+ _guild_id = data.get(key_name)
+ return int(user_id) == state.self_id and int(_guild_id) == key_id
+
+ # register the futures for waiting
+ session_id_future = ws.wait_for('VOICE_STATE_UPDATE', session_id_found)
+ voice_data_future = ws.wait_for('VOICE_SERVER_UPDATE', lambda d: int(d.get(key_name, 0)) == key_id)
+
+ # request joining
+ yield from ws.voice_state(guild_id, channel_id)
+
+ try:
+ session_id_data = yield from asyncio.wait_for(session_id_future, timeout=self.timeout, loop=self.loop)
+ data = yield from asyncio.wait_for(voice_data_future, timeout=self.timeout, loop=state.loop)
+ except asyncio.TimeoutError as e:
+ yield from ws.voice_state(guild_id, None, self_mute=True)
+ raise e
+
+ self.session_id = session_id_data.get('session_id')
+ self.server_id = data.get(key_name)
+ self.token = data.get('token')
+ self.endpoint = data.get('endpoint', '').replace(':80', '')
self.endpoint_ip = socket.gethostbyname(self.endpoint)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(False)
- log.info('Voice endpoint found {0.endpoint} (IP: {0.endpoint_ip})'.format(self))
+ log.info('Voice handshake complete. Endpoint found %s (IP: %s)', self.endpoint, self.endpoint_ip)
- self.ws = yield from DiscordVoiceWebSocket.from_client(self)
- while not self._connected.is_set():
- yield from self.ws.poll_event()
- if hasattr(self, 'secret_key'):
- # we have a secret key, so we don't need to poll
- # websocket events anymore
- self._connected.set()
- break
+ @asyncio.coroutine
+ def terminate_handshake(self, *, remove=False):
+ guild_id, _ = self.channel._get_voice_state_pair()
+ yield from self.main_ws.voice_state(guild_id, None, self_mute=True)
- self.loop.create_task(self.poll_voice_ws())
+ if remove:
+ key_id, _ = self.channel._get_voice_client_key()
+ self._state._remove_voice_client(key_id)
@asyncio.coroutine
- def poll_voice_ws(self):
- """|coro|
- Reads from the voice websocket while connected.
- """
- while self._connected.is_set():
+ def _switch_regions(self):
+ # just reconnect when we're requested to switch voice regions
+ # signal the reconnect loop
+ yield from self.ws.close(1006)
+
+ @asyncio.coroutine
+ def connect(self, *, reconnect=True, _tries=0, do_handshake=True):
+ log.info('Connecting to voice...')
+ try:
+ del self.secret_key
+ except AttributeError:
+ pass
+
+ if do_handshake:
+ yield from self.start_handshake()
+
+ try:
+ self.ws = yield from DiscordVoiceWebSocket.from_client(self)
+ self._connected.clear()
+ while not hasattr(self, 'secret_key'):
+ yield from self.ws.poll_event()
+ self._connected.set()
+ except (ConnectionClosed, asyncio.TimeoutError):
+ if reconnect and _tries < 5:
+ log.exception('Failed to connect to voice... Retrying...')
+ yield from asyncio.sleep(1 + _tries * 2.0, loop=self.loop)
+ yield from self.terminate_handshake()
+ yield from self.connect(reconnect=reconnect, _tries=_tries + 1)
+ else:
+ raise
+
+ if self._runner is None:
+ self._runner = self.loop.create_task(self.poll_voice_ws(reconnect))
+
+ @asyncio.coroutine
+ def poll_voice_ws(self, reconnect):
+ backoff = ExponentialBackoff()
+ fmt = 'Disconnected from voice... Reconnecting in {:.2f}s.'
+ while True:
try:
yield from self.ws.poll_event()
- except ConnectionClosed as e:
- if e.code == 1000:
- break
- else:
- raise
+ except (ConnectionClosed, asyncio.TimeoutError) as e:
+ if isinstance(e, ConnectionClosed):
+ if e.code == 1000:
+ yield from self.disconnect()
+ break
+
+ if not reconnect:
+ yield from self.disconnect()
+ raise e
+
+ retry = backoff.delay()
+ log.exception(fmt.format(retry))
+ self._connected.clear()
+ yield from asyncio.sleep(retry, loop=self.loop)
+ yield from self.terminate_handshake()
+ yield from self.connect(reconnect=True)
@asyncio.coroutine
def disconnect(self):
"""|coro|
Disconnects all connections to the voice client.
-
- In order to reconnect, you must create another voice client
- using :meth:`Client.join_voice_channel`.
"""
if not self._connected.is_set():
return
+ self.stop()
self._connected.clear()
+
try:
yield from self.ws.close()
- yield from self.main_ws.voice_state(self.guild_id, None, self_mute=True)
+ yield from self.terminate_handshake(remove=True)
finally:
self.socket.close()
@@ -305,28 +256,16 @@ class VoiceClient:
Moves you to a different voice channel.
- .. warning::
-
- :class:`Object` instances do not work with this function.
-
Parameters
-----------
- channel : :class:`Channel`
+ channel: :class:`abc.Snowflake`
The channel to move to. Must be a voice channel.
-
- Raises
- -------
- InvalidArgument
- Not a voice channel.
"""
-
- if str(getattr(channel, 'type', 'text')) != 'voice':
- raise InvalidArgument('Must be a voice channel.')
-
- yield from self.main_ws.voice_state(self.guild_id, channel.id)
+ guild_id, _ = self.channel._get_voice_state_pair()
+ yield from self.main_ws.voice_state(guild_id, channel.id)
def is_connected(self):
- """bool : Indicates if the voice client is connected to voice."""
+ """bool: Indicates if the voice client is connected to voice."""
return self._connected.is_set()
# audio related
@@ -349,328 +288,75 @@ class VoiceClient:
# Encrypt and return the data
return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext
- def create_ffmpeg_player(self, filename, *, use_avconv=False, pipe=False, stderr=None, options=None, before_options=None, headers=None, after=None):
- """Creates a stream player for ffmpeg that launches in a separate thread to play
- audio.
-
- The ffmpeg player launches a subprocess of ``ffmpeg`` to a specific
- filename and then plays that file.
-
- You must have the ffmpeg or avconv executable in your path environment variable
- in order for this to work.
-
- The operations that can be done on the player are the same as those in
- :meth:`create_stream_player`.
-
- Examples
- ----------
-
- Basic usage: ::
-
- voice = yield from client.join_voice_channel(channel)
- player = voice.create_ffmpeg_player('cool.mp3')
- player.start()
-
- Parameters
- -----------
- filename
- The filename that ffmpeg will take and convert to PCM bytes.
- If ``pipe`` is True then this is a file-like object that is
- passed to the stdin of ``ffmpeg``.
- use_avconv: bool
- Use ``avconv`` instead of ``ffmpeg``.
- pipe : bool
- If true, denotes that ``filename`` parameter will be passed
- to the stdin of ffmpeg.
- stderr
- A file-like object or ``subprocess.PIPE`` to pass to the Popen
- constructor.
- options : str
- Extra command line flags to pass to ``ffmpeg`` after the ``-i`` flag.
- before_options : str
- Command line flags to pass to ``ffmpeg`` before the ``-i`` flag.
- headers: dict
- HTTP headers dictionary to pass to ``-headers`` command line option
- after : callable
- The finalizer that is called after the stream is done being
- played. All exceptions the finalizer throws are silently discarded.
-
- Raises
- -------
- ClientException
- Popen failed to due to an error in ``ffmpeg`` or ``avconv``.
-
- Returns
- --------
- StreamPlayer
- A stream player with specific operations.
- See :meth:`create_stream_player`.
- """
- command = 'ffmpeg' if not use_avconv else 'avconv'
- input_name = '-' if pipe else shlex.quote(filename)
- before_args = ""
- if isinstance(headers, dict):
- for key, value in headers.items():
- before_args += "{}: {}\r\n".format(key, value)
- before_args = ' -headers ' + shlex.quote(before_args)
-
- if isinstance(before_options, str):
- before_args += ' ' + before_options
-
- cmd = command + '{} -i {} -f s16le -ar {} -ac {} -loglevel warning'
- cmd = cmd.format(before_args, input_name, self.encoder.sampling_rate, self.encoder.channels)
-
- if isinstance(options, str):
- cmd = cmd + ' ' + options
-
- cmd += ' pipe:1'
-
- stdin = None if not pipe else filename
- args = shlex.split(cmd)
- try:
- p = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
- return ProcessPlayer(p, self, after)
- except FileNotFoundError as e:
- raise ClientException('ffmpeg/avconv was not found in your PATH environment variable') from e
- except subprocess.SubprocessError as e:
- raise ClientException('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e
-
+ def play(self, source, *, after=None):
+ """Plays an :class:`AudioSource`.
- @asyncio.coroutine
- def create_ytdl_player(self, url, *, ytdl_options=None, **kwargs):
- """|coro|
-
- Creates a stream player for youtube or other services that launches
- in a separate thread to play the audio.
-
- The player uses the ``youtube_dl`` python library to get the information
- required to get audio from the URL. Since this uses an external library,
- you must install it yourself. You can do so by calling
- ``pip install youtube_dl``.
-
- You must have the ffmpeg or avconv executable in your path environment
- variable in order for this to work.
-
- The operations that can be done on the player are the same as those in
- :meth:`create_stream_player`. The player has been augmented and enhanced
- to have some info extracted from the URL. If youtube-dl fails to extract
- the information then the attribute is ``None``. The ``yt``, ``url``, and
- ``download_url`` attributes are always available.
-
- +---------------------+---------------------------------------------------------+
- | Operation | Description |
- +=====================+=========================================================+
- | player.yt | The `YoutubeDL <ytdl>` instance. |
- +---------------------+---------------------------------------------------------+
- | player.url | The URL that is currently playing. |
- +---------------------+---------------------------------------------------------+
- | player.download_url | The URL that is currently being downloaded to ffmpeg. |
- +---------------------+---------------------------------------------------------+
- | player.title | The title of the audio stream. |
- +---------------------+---------------------------------------------------------+
- | player.description | The description of the audio stream. |
- +---------------------+---------------------------------------------------------+
- | player.uploader | The uploader of the audio stream. |
- +---------------------+---------------------------------------------------------+
- | player.upload_date | A datetime.date object of when the stream was uploaded. |
- +---------------------+---------------------------------------------------------+
- | player.duration | The duration of the audio in seconds. |
- +---------------------+---------------------------------------------------------+
- | player.likes | How many likes the audio stream has. |
- +---------------------+---------------------------------------------------------+
- | player.dislikes | How many dislikes the audio stream has. |
- +---------------------+---------------------------------------------------------+
- | player.is_live | Checks if the audio stream is currently livestreaming. |
- +---------------------+---------------------------------------------------------+
- | player.views | How many views the audio stream has. |
- +---------------------+---------------------------------------------------------+
-
- .. _ytdl: https://github.com/rg3/youtube-dl/blob/master/youtube_dl/YoutubeDL.py#L128-L278
-
- Examples
- ----------
+ The finalizer, ``after`` is called after the source has been exhausted
+ or an error occurred.
- Basic usage: ::
-
- voice = await client.join_voice_channel(channel)
- player = await voice.create_ytdl_player('https://www.youtube.com/watch?v=d62TYemN6MQ')
- player.start()
+ If an error happens while the audio player is running, the exception is
+ caught and the audio player is then stopped.
Parameters
-----------
- url : str
- The URL that ``youtube_dl`` will take and download audio to pass
- to ``ffmpeg`` or ``avconv`` to convert to PCM bytes.
- ytdl_options : dict
- A dictionary of options to pass into the ``YoutubeDL`` instance.
- See `the documentation <ytdl>`_ for more details.
- \*\*kwargs
- The rest of the keyword arguments are forwarded to
- :func:`create_ffmpeg_player`.
+ source: :class:`AudioSource`
+ The audio source we're reading from.
+ after
+ The finalizer that is called after the stream is exhausted.
+ All exceptions it throws are silently discarded. This function
+ must have a single parameter, ``error``, that denotes an
+ optional exception that was raised during playing.
Raises
-------
ClientException
- Popen failure from either ``ffmpeg``/``avconv``.
-
- Returns
- --------
- StreamPlayer
- An augmented StreamPlayer that uses ffmpeg.
- See :meth:`create_stream_player` for base operations.
+ Already playing audio or not connected.
+ TypeError
+ source is not a :class:`AudioSource` or after is not a callable.
"""
- import youtube_dl
-
- use_avconv = kwargs.get('use_avconv', False)
- opts = {
- 'format': 'webm[abr>0]/bestaudio/best',
- 'prefer_ffmpeg': not use_avconv
- }
-
- if ytdl_options is not None and isinstance(ytdl_options, dict):
- opts.update(ytdl_options)
-
- ydl = youtube_dl.YoutubeDL(opts)
- func = functools.partial(ydl.extract_info, url, download=False)
- info = yield from self.loop.run_in_executor(None, func)
- if "entries" in info:
- info = info['entries'][0]
-
- log.info('playing URL {}'.format(url))
- download_url = info['url']
- player = self.create_ffmpeg_player(download_url, **kwargs)
-
- # set the dynamic attributes from the info extraction
- player.download_url = download_url
- player.url = url
- player.yt = ydl
- player.views = info.get('view_count')
- player.is_live = bool(info.get('is_live'))
- player.likes = info.get('like_count')
- player.dislikes = info.get('dislike_count')
- player.duration = info.get('duration')
- player.uploader = info.get('uploader')
-
- is_twitch = 'twitch' in url
- if is_twitch:
- # twitch has 'title' and 'description' sort of mixed up.
- player.title = info.get('description')
- player.description = None
- else:
- player.title = info.get('title')
- player.description = info.get('description')
- # upload date handling
- date = info.get('upload_date')
- if date:
- try:
- date = datetime.datetime.strptime(date, '%Y%M%d').date()
- except ValueError:
- date = None
+ if not self._connected:
+ raise ClientException('Not connected to voice.')
- player.upload_date = date
- return player
+ if self.is_playing():
+ raise ClientException('Already playing audio.')
- def encoder_options(self, *, sample_rate, channels=2):
- """Sets the encoder options for the OpusEncoder.
+ if not isinstance(source, AudioSource):
+ raise TypeError('source must an AudioSource not {0.__class__.__name__}'.format(source))
- Calling this after you create a stream player
- via :meth:`create_ffmpeg_player` or :meth:`create_stream_player`
- has no effect.
+ self._player = AudioPlayer(source, self, after=after)
+ self._player.start()
- Parameters
- ----------
- sample_rate : int
- Sets the sample rate of the OpusEncoder. The unit is in Hz.
- channels : int
- Sets the number of channels for the OpusEncoder.
- 2 for stereo, 1 for mono.
+ def is_playing(self):
+ """Indicates if we're currently playing audio."""
+ return self._player is not None and self._player.is_playing()
- Raises
- -------
- InvalidArgument
- The values provided are invalid.
- """
- if sample_rate not in (8000, 12000, 16000, 24000, 48000):
- raise InvalidArgument('Sample rate out of range. Valid: [8000, 12000, 16000, 24000, 48000]')
- if channels not in (1, 2):
- raise InvalidArgument('Channels must be either 1 or 2.')
-
- self.encoder = opus.Encoder(sample_rate, channels)
- log.info('created opus encoder with {0.__dict__}'.format(self.encoder))
-
- def create_stream_player(self, stream, *, after=None):
- """Creates a stream player that launches in a separate thread to
- play audio.
-
- The stream player assumes that ``stream.read`` is a valid function
- that returns a *bytes-like* object.
-
- The finalizer, ``after`` is called after the stream has been exhausted
- or an error occurred (see below).
-
- The following operations are valid on the ``StreamPlayer`` object:
-
- +---------------------+-----------------------------------------------------+
- | Operation | Description |
- +=====================+=====================================================+
- | player.start() | Starts the audio stream. |
- +---------------------+-----------------------------------------------------+
- | player.stop() | Stops the audio stream. |
- +---------------------+-----------------------------------------------------+
- | player.is_done() | Returns a bool indicating if the stream is done. |
- +---------------------+-----------------------------------------------------+
- | player.is_playing() | Returns a bool indicating if the stream is playing. |
- +---------------------+-----------------------------------------------------+
- | player.pause() | Pauses the audio stream. |
- +---------------------+-----------------------------------------------------+
- | player.resume() | Resumes the audio stream. |
- +---------------------+-----------------------------------------------------+
- | player.volume | Allows you to set the volume of the stream. 1.0 is |
- | | equivalent to 100% and 0.0 is equal to 0%. The |
- | | maximum the volume can be set to is 2.0 for 200%. |
- +---------------------+-----------------------------------------------------+
- | player.error | The exception that stopped the player. If no error |
- | | happened, then this returns None. |
- +---------------------+-----------------------------------------------------+
-
- The stream must have the same sampling rate as the encoder and the same
- number of channels. The defaults are 48000 Hz and 2 channels. You
- could change the encoder options by using :meth:`encoder_options`
- but this must be called **before** this function.
-
- If an error happens while the player is running, the exception is caught and
- the player is then stopped. The caught exception could then be retrieved
- via ``player.error``\. When the player is stopped in this matter, the
- finalizer under ``after`` is called.
+ def stop(self):
+ """Stops playing audio."""
+ if self._player:
+ self._player.stop()
+ self._player = None
- Parameters
- -----------
- stream
- The stream object to read from.
- after
- The finalizer that is called after the stream is exhausted.
- All exceptions it throws are silently discarded. This function
- can have either no parameters or a single parameter taking in the
- current player.
+ def pause(self):
+ """Pauses the audio playing."""
+ if self._player:
+ self._player.pause()
- Returns
- --------
- StreamPlayer
- A stream player with the operations noted above.
- """
- return StreamPlayer(stream, self.encoder, self._connected, self.play_audio, after)
+ def resume(self):
+ """Resumes the audio playing."""
+ if self._player:
+ self._player.resume()
- def play_audio(self, data, *, encode=True):
+ def send_audio_packet(self, data, *, encode=True):
"""Sends an audio packet composed of the data.
You must be connected to play audio.
Parameters
----------
- data : bytes
+ data: bytes
The *bytes-like object* denoting PCM or Opus voice data.
- encode : bool
+ encode: bool
Indicates if ``data`` should be encoded into Opus.
Raises
@@ -683,13 +369,13 @@ class VoiceClient:
self.checked_add('sequence', 1, 65535)
if encode:
- encoded_data = self.encoder.encode(data, self.encoder.samples_per_frame)
+ encoded_data = self.encoder.encode(data, self.encoder.SAMPLES_PER_FRAME)
else:
encoded_data = data
packet = self._get_voice_packet(encoded_data)
try:
- sent = self.socket.sendto(packet, (self.endpoint_ip, self.voice_port))
+ self.socket.sendto(packet, (self.endpoint_ip, self.voice_port))
except BlockingIOError:
log.warning('A packet has been dropped (seq: {0.sequence}, timestamp: {0.timestamp})'.format(self))
- self.checked_add('timestamp', self.encoder.samples_per_frame, 4294967295)
+ self.checked_add('timestamp', self.encoder.SAMPLES_PER_FRAME, 4294967295)
diff --git a/docs/api.rst b/docs/api.rst
index 09f5be77..2177a270 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -40,6 +40,21 @@ Client
.. autoclass:: AutoShardedClient
:members:
+Voice
+------
+
+.. autoclass:: VoiceClient
+ :members:
+
+.. autoclass:: AudioSource
+ :members:
+
+.. autoclass:: PCMAudio
+ :members:
+
+.. autoclass:: FFmpegPCMAudio
+ :members:
+
Opus Library
~~~~~~~~~~~~~