diff options
Diffstat (limited to 'discord/player.py')
| -rw-r--r-- | discord/player.py | 167 |
1 files changed, 105 insertions, 62 deletions
diff --git a/discord/player.py b/discord/player.py index 2ea5308c..1e08faf2 100644 --- a/discord/player.py +++ b/discord/player.py @@ -21,6 +21,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +from __future__ import annotations import threading import traceback @@ -33,12 +34,23 @@ import time import json import sys import re +import io + +from typing import Any, Callable, Generic, IO, Optional, TYPE_CHECKING, Tuple, Type, TypeVar, Union from .errors import ClientException from .opus import Encoder as OpusEncoder from .oggparse import OggStream +from .utils import MISSING + +if TYPE_CHECKING: + from .voice_client import VoiceClient + -log = logging.getLogger(__name__) +AT = TypeVar('AT', bound='AudioSource') +FT = TypeVar('FT', bound='FFmpegOpusAudio') + +log: logging.Logger = logging.getLogger(__name__) __all__ = ( 'AudioSource', @@ -49,6 +61,8 @@ __all__ = ( 'PCMVolumeTransformer', ) +CREATE_NO_WINDOW: int + if sys.platform != 'win32': CREATE_NO_WINDOW = 0 else: @@ -65,7 +79,7 @@ class AudioSource: The audio source reads are done in a separate thread. """ - def read(self): + def read(self) -> bytes: """Reads 20ms worth of audio. Subclasses must implement this. @@ -85,11 +99,11 @@ class AudioSource: """ raise NotImplementedError - def is_opus(self): + def is_opus(self) -> bool: """Checks if the audio source is already encoded in Opus.""" return False - def cleanup(self): + def cleanup(self) -> None: """Called when clean-up is needed to be done. Useful for clearing buffer data or processes after @@ -97,7 +111,7 @@ class AudioSource: """ pass - def __del__(self): + def __del__(self) -> None: self.cleanup() class PCMAudio(AudioSource): @@ -108,10 +122,10 @@ class PCMAudio(AudioSource): stream: :term:`py:file object` A file-like object that reads byte data representing raw PCM. """ - def __init__(self, stream): - self.stream = stream + def __init__(self, stream: io.BufferedIOBase) -> None: + self.stream: io.BufferedIOBase = stream - def read(self): + def read(self) -> bytes: ret = self.stream.read(OpusEncoder.FRAME_SIZE) if len(ret) != OpusEncoder.FRAME_SIZE: return b'' @@ -126,17 +140,15 @@ class FFmpegAudio(AudioSource): .. versionadded:: 1.3 """ - def __init__(self, source, *, executable='ffmpeg', args, **subprocess_kwargs): - self._process = self._stdout = None - + def __init__(self, source: str, *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any): args = [executable, *args] kwargs = {'stdout': subprocess.PIPE} kwargs.update(subprocess_kwargs) - self._process = self._spawn_process(args, **kwargs) - self._stdout = self._process.stdout + self._process: subprocess.Popen = self._spawn_process(args, **kwargs) + self._stdout: IO[bytes] = self._process.stdout # type: ignore - def _spawn_process(self, args, **subprocess_kwargs): + def _spawn_process(self, args: Any, **subprocess_kwargs: Any) -> subprocess.Popen: process = None try: process = subprocess.Popen(args, creationflags=CREATE_NO_WINDOW, **subprocess_kwargs) @@ -148,9 +160,9 @@ class FFmpegAudio(AudioSource): else: return process - def cleanup(self): + def cleanup(self) -> None: proc = self._process - if proc is None: + if proc is MISSING: return log.info('Preparing to terminate ffmpeg process %s.', proc.pid) @@ -167,7 +179,7 @@ class FFmpegAudio(AudioSource): else: log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode) - self._process = self._stdout = None + self._process = self._stdout = MISSING class FFmpegPCMAudio(FFmpegAudio): """An audio source from FFmpeg (or AVConv). @@ -204,7 +216,16 @@ class FFmpegPCMAudio(FFmpegAudio): The subprocess failed to be created. """ - def __init__(self, source, *, executable='ffmpeg', pipe=False, stderr=None, before_options=None, options=None): + def __init__( + self, + source: str, + *, + executable: str = 'ffmpeg', + pipe: bool = False, + stderr: Optional[IO[str]] = None, + before_options: Optional[str] = None, + options: Optional[str] = None + ) -> None: args = [] subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} @@ -222,13 +243,13 @@ class FFmpegPCMAudio(FFmpegAudio): super().__init__(source, executable=executable, args=args, **subprocess_kwargs) - def read(self): + def read(self) -> bytes: ret = self._stdout.read(OpusEncoder.FRAME_SIZE) if len(ret) != OpusEncoder.FRAME_SIZE: return b'' return ret - def is_opus(self): + def is_opus(self) -> bool: return False class FFmpegOpusAudio(FFmpegAudio): @@ -292,8 +313,18 @@ class FFmpegOpusAudio(FFmpegAudio): The subprocess failed to be created. """ - def __init__(self, source, *, bitrate=128, codec=None, executable='ffmpeg', - pipe=False, stderr=None, before_options=None, options=None): + def __init__( + self, + source: str, + *, + bitrate: int = 128, + codec: Optional[str] = None, + executable: str = 'ffmpeg', + pipe=False, + stderr=None, + before_options=None, + options=None, + ) -> None: args = [] subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} @@ -323,7 +354,13 @@ class FFmpegOpusAudio(FFmpegAudio): self._packet_iter = OggStream(self._stdout).iter_packets() @classmethod - async def from_probe(cls, source, *, method=None, **kwargs): + async def from_probe( + cls: Type[FT], + source: str, + *, + method: Optional[Union[str, Callable[[str, str], Tuple[Optional[str], Optional[int]]]]] = None, + **kwargs: Any, + ) -> FT: """|coro| A factory method that creates a :class:`FFmpegOpusAudio` after probing @@ -382,10 +419,16 @@ class FFmpegOpusAudio(FFmpegAudio): executable = kwargs.get('executable') codec, bitrate = await cls.probe(source, method=method, executable=executable) - return cls(source, bitrate=bitrate, codec=codec, **kwargs) + return cls(source, bitrate=bitrate, codec=codec, **kwargs) # type: ignore @classmethod - async def probe(cls, source, *, method=None, executable=None): + async def probe( + cls, + source: str, + *, + method: Optional[Union[str, Callable[[str, str], Tuple[Optional[str], Optional[int]]]]] = None, + executable: Optional[str] = None, + ) -> Tuple[Optional[str], Optional[int]]: """|coro| Probes the input source for bitrate and codec information. @@ -408,7 +451,7 @@ class FFmpegOpusAudio(FFmpegAudio): Returns --------- - Tuple[Optional[:class:`str`], Optional[:class:`int`]] + Optional[Tuple[Optional[:class:`str`], Optional[:class:`int`]]] A 2-tuple with the codec and bitrate of the input source. """ @@ -434,15 +477,15 @@ class FFmpegOpusAudio(FFmpegAudio): codec = bitrate = None loop = asyncio.get_event_loop() try: - codec, bitrate = await loop.run_in_executor(None, lambda: probefunc(source, executable)) + codec, bitrate = await loop.run_in_executor(None, lambda: probefunc(source, executable)) # type: ignore except Exception: if not fallback: log.exception("Probe '%s' using '%s' failed", method, executable) - return + return # type: ignore log.exception("Probe '%s' using '%s' failed, trying fallback", method, executable) try: - codec, bitrate = await loop.run_in_executor(None, lambda: fallback(source, executable)) + codec, bitrate = await loop.run_in_executor(None, lambda: fallback(source, executable)) # type: ignore except Exception: log.exception("Fallback probe using '%s' failed", executable) else: @@ -453,7 +496,7 @@ class FFmpegOpusAudio(FFmpegAudio): return codec, bitrate @staticmethod - def _probe_codec_native(source, executable='ffmpeg'): + def _probe_codec_native(source, executable: str = 'ffmpeg') -> Tuple[Optional[str], Optional[int]]: exe = executable[:2] + 'probe' if executable in ('ffmpeg', 'avconv') else executable args = [exe, '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'a:0', source] output = subprocess.check_output(args, timeout=20) @@ -465,12 +508,12 @@ class FFmpegOpusAudio(FFmpegAudio): codec = streamdata.get('codec_name') bitrate = int(streamdata.get('bit_rate', 0)) - bitrate = max(round(bitrate/1000, 0), 512) + bitrate = max(round(bitrate/1000), 512) return codec, bitrate @staticmethod - def _probe_codec_fallback(source, executable='ffmpeg'): + def _probe_codec_fallback(source, executable: str = 'ffmpeg') -> Tuple[Optional[str], Optional[int]]: args = [executable, '-hide_banner', '-i', source] proc = subprocess.Popen(args, creationflags=CREATE_NO_WINDOW, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out, _ = proc.communicate(timeout=20) @@ -487,13 +530,13 @@ class FFmpegOpusAudio(FFmpegAudio): return codec, bitrate - def read(self): + def read(self) -> bytes: return next(self._packet_iter, b'') - def is_opus(self): + def is_opus(self) -> bool: return True -class PCMVolumeTransformer(AudioSource): +class PCMVolumeTransformer(AudioSource, Generic[AT]): """Transforms a previous :class:`AudioSource` to have volume controls. This does not work on audio sources that have :meth:`AudioSource.is_opus` @@ -515,53 +558,53 @@ class PCMVolumeTransformer(AudioSource): The audio source is opus encoded. """ - def __init__(self, original, volume=1.0): + def __init__(self, original: AT, volume: float = 1.0): if not isinstance(original, AudioSource): raise TypeError(f'expected AudioSource not {original.__class__.__name__}.') if original.is_opus(): raise ClientException('AudioSource must not be Opus encoded.') - self.original = original + self.original: AT = original self.volume = volume @property - def volume(self): + def volume(self) -> float: """Retrieves or sets the volume as a floating point percentage (e.g. ``1.0`` for 100%).""" return self._volume @volume.setter - def volume(self, value): + def volume(self, value: float) -> None: self._volume = max(value, 0.0) - def cleanup(self): + def cleanup(self) -> None: self.original.cleanup() - def read(self): + def read(self) -> bytes: ret = self.original.read() return audioop.mul(ret, 2, min(self._volume, 2.0)) class AudioPlayer(threading.Thread): - DELAY = OpusEncoder.FRAME_LENGTH / 1000.0 + DELAY: float = OpusEncoder.FRAME_LENGTH / 1000.0 - def __init__(self, source, client, *, after=None): + def __init__(self, source: AudioSource, client: VoiceClient, *, after=None): threading.Thread.__init__(self) - self.daemon = True - self.source = source - self.client = client - self.after = after + self.daemon: bool = True + self.source: AudioSource = source + self.client: VoiceClient = client + self.after: Optional[Callable[[Optional[Exception]], Any]] = after - self._end = threading.Event() - self._resumed = threading.Event() + self._end: threading.Event = threading.Event() + self._resumed: threading.Event = threading.Event() self._resumed.set() # we are not paused - self._current_error = None - self._connected = client._connected - self._lock = threading.Lock() + self._current_error: Optional[Exception] = None + self._connected: threading.Event = client._connected + self._lock: threading.Lock = threading.Lock() if after is not None and not callable(after): raise TypeError('Expected a callable for the "after" parameter.') - def _do_run(self): + def _do_run(self) -> None: self.loops = 0 self._start = time.perf_counter() @@ -596,7 +639,7 @@ class AudioPlayer(threading.Thread): delay = max(0, self.DELAY + (next_time - time.perf_counter())) time.sleep(delay) - def run(self): + def run(self) -> None: try: self._do_run() except Exception as exc: @@ -606,7 +649,7 @@ class AudioPlayer(threading.Thread): self.source.cleanup() self._call_after() - def _call_after(self): + def _call_after(self) -> None: error = self._current_error if self.after is not None: @@ -622,36 +665,36 @@ class AudioPlayer(threading.Thread): print(msg, file=sys.stderr) traceback.print_exception(type(error), error, error.__traceback__) - def stop(self): + def stop(self) -> None: self._end.set() self._resumed.set() self._speak(False) - def pause(self, *, update_speaking=True): + def pause(self, *, update_speaking: bool = True) -> None: self._resumed.clear() if update_speaking: self._speak(False) - def resume(self, *, update_speaking=True): + def resume(self, *, update_speaking: bool = True) -> None: self.loops = 0 self._start = time.perf_counter() self._resumed.set() if update_speaking: self._speak(True) - def is_playing(self): + def is_playing(self) -> bool: return self._resumed.is_set() and not self._end.is_set() - def is_paused(self): + def is_paused(self) -> bool: return not self._end.is_set() and not self._resumed.is_set() - def _set_source(self, source): + def _set_source(self, source: AudioSource) -> None: with self._lock: self.pause(update_speaking=False) self.source = source self.resume(update_speaking=False) - def _speak(self, speaking): + def _speak(self, speaking: bool) -> None: try: asyncio.run_coroutine_threadsafe(self.client.ws.speak(speaking), self.client.loop) except Exception as e: |