From 67099798319d59fa5dc06f43802cc411ac33426f Mon Sep 17 00:00:00 2001 From: Rapptz Date: Fri, 30 Dec 2016 22:56:47 -0500 Subject: Move GuildChannel over to abc module. --- discord/abc.py | 301 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 264 insertions(+), 37 deletions(-) (limited to 'discord/abc.py') diff --git a/discord/abc.py b/discord/abc.py index 81bb4105..e2ae12e2 100644 --- a/discord/abc.py +++ b/discord/abc.py @@ -29,16 +29,13 @@ import io import os import asyncio +from collections import namedtuple + from .message import Message from .iterators import LogsFromIterator from .context_managers import Typing from .errors import ClientException, NoMoreMessages -import discord.message -import discord.iterators -import discord.context_managers -import discord.errors - class Snowflake(metaclass=abc.ABCMeta): __slots__ = () @@ -89,38 +86,6 @@ class User(metaclass=abc.ABCMeta): return True return NotImplemented -class GuildChannel(metaclass=abc.ABCMeta): - __slots__ = () - - @property - @abc.abstractmethod - def mention(self): - raise NotImplementedError - - @abc.abstractmethod - def overwrites_for(self, obj): - raise NotImplementedError - - @abc.abstractmethod - def permissions_for(self, user): - raise NotImplementedError - - @classmethod - def __subclasshook__(cls, C): - if cls is GuildChannel: - if Snowflake.__subclasshook__(C) is NotImplemented: - return NotImplemented - - mro = C.__mro__ - for attr in ('name', 'guild', 'overwrites_for', 'permissions_for', 'mention'): - for base in mro: - if attr in base.__dict__: - break - else: - return NotImplemented - return True - return NotImplemented - class PrivateChannel(metaclass=abc.ABCMeta): __slots__ = () @@ -137,6 +102,268 @@ class PrivateChannel(metaclass=abc.ABCMeta): return NotImplemented return NotImplemented +_Overwrites = namedtuple('_Overwrites', 'id allow deny type') + +class GuildChannel: + __slots__ = () + + def __str__(self): + return self.name + + @asyncio.coroutine + def _move(self, position): + if position < 0: + raise InvalidArgument('Channel position cannot be less than 0.') + + http = self._state.http + url = '{0}/{1.guild.id}/channels'.format(http.GUILDS, self) + channels = [c for c in self.guild.channels if isinstance(c, type(self))] + + if position >= len(channels): + raise InvalidArgument('Channel position cannot be greater than {}'.format(len(channels) - 1)) + + channels.sort(key=lambda c: c.position) + + try: + # remove ourselves from the channel list + channels.remove(self) + except ValueError: + # not there somehow lol + return + else: + # add ourselves at our designated position + channels.insert(position, self) + + payload = [{'id': c.id, 'position': index } for index, c in enumerate(channels)] + yield from http.patch(url, json=payload, bucket='move_channel') + + def _fill_overwrites(self, data): + self._overwrites = [] + everyone_index = 0 + everyone_id = self.guild.id + + for index, overridden in enumerate(data.get('permission_overwrites', [])): + overridden_id = int(overridden.pop('id')) + self._overwrites.append(_Overwrites(id=overridden_id, **overridden)) + + if overridden['type'] == 'member': + continue + + if overridden_id == everyone_id: + # the @everyone role is not guaranteed to be the first one + # in the list of permission overwrites, however the permission + # resolution code kind of requires that it is the first one in + # the list since it is special. So we need the index so we can + # swap it to be the first one. + everyone_index = index + + # do the swap + tmp = self._overwrites + if tmp: + tmp[everyone_index], tmp[0] = tmp[0], tmp[everyone_index] + + @property + def changed_roles(self): + """Returns a list of :class:`Roles` that have been overridden from + their default values in the :attr:`Guild.roles` attribute.""" + ret = [] + for overwrite in filter(lambda o: o.type == 'role', self._overwrites): + role = discord.utils.get(self.guild.roles, id=overwrite.id) + if role is None: + continue + + role = copy.copy(role) + role.permissions.handle_overwrite(overwrite.allow, overwrite.deny) + ret.append(role) + return ret + + @property + def is_default(self): + """bool : Indicates if this is the default channel for the :class:`Guild` it belongs to.""" + return self.guild.id == self.id + + @property + def mention(self): + """str : The string that allows you to mention the channel.""" + return '<#{0.id}>'.format(self) + + @property + def created_at(self): + """Returns the channel's creation time in UTC.""" + return discord.utils.snowflake_time(self.id) + + def overwrites_for(self, obj): + """Returns the channel-specific overwrites for a member or a role. + + Parameters + ----------- + obj + The :class:`Role` or :class:`Member` or :class:`Object` denoting + whose overwrite to get. + + Returns + --------- + :class:`PermissionOverwrite` + The permission overwrites for this object. + """ + + if isinstance(obj, Member): + predicate = lambda p: p.type == 'member' + elif isinstance(obj, Role): + predicate = lambda p: p.type == 'role' + else: + predicate = lambda p: True + + for overwrite in filter(predicate, self._overwrites): + if overwrite.id == obj.id: + allow = Permissions(overwrite.allow) + deny = Permissions(overwrite.deny) + return PermissionOverwrite.from_pair(allow, deny) + + return PermissionOverwrite() + + @property + def overwrites(self): + """Returns all of the channel's overwrites. + + This is returned as a list of two-element tuples containing the target, + which can be either a :class:`Role` or a :class:`Member` and the overwrite + as the second element as a :class:`PermissionOverwrite`. + + Returns + -------- + List[Tuple[Union[:class:`Role`, :class:`Member`], :class:`PermissionOverwrite`]]: + The channel's permission overwrites. + """ + ret = [] + for ow in self._permission_overwrites: + allow = Permissions(ow.allow) + deny = Permissions(ow.deny) + overwrite = PermissionOverwrite.from_pair(allow, deny) + + if ow.type == 'role': + # accidentally quadratic + target = discord.utils.find(lambda r: r.id == ow.id, self.server.roles) + elif ow.type == 'member': + target = self.server.get_member(ow.id) + + ret.append((target, overwrite)) + return ret + + def permissions_for(self, member): + """Handles permission resolution for the current :class:`Member`. + + This function takes into consideration the following cases: + + - Guild owner + - Guild roles + - Channel overrides + - Member overrides + - Whether the channel is the default channel. + + Parameters + ---------- + member : :class:`Member` + The member to resolve permissions for. + + Returns + ------- + :class:`Permissions` + The resolved permissions for the member. + """ + + # The current cases can be explained as: + # Guild owner get all permissions -- no questions asked. Otherwise... + # The @everyone role gets the first application. + # After that, the applied roles that the user has in the channel + # (or otherwise) are then OR'd together. + # After the role permissions are resolved, the member permissions + # have to take into effect. + # After all that is done.. you have to do the following: + + # If manage permissions is True, then all permissions are set to + # True. If the channel is the default channel then everyone gets + # read permissions regardless. + + # The operation first takes into consideration the denied + # and then the allowed. + + if member.id == self.guild.owner.id: + return Permissions.all() + + default = self.guild.default_role + base = Permissions(default.permissions.value) + + # Apply guild roles that the member has. + for role in member.roles: + base.value |= role.permissions.value + + # Guild-wide Administrator -> True for everything + # Bypass all channel-specific overrides + if base.administrator: + return Permissions.all() + + member_role_ids = set(map(lambda r: r.id, member.roles)) + denies = 0 + allows = 0 + + # Apply channel specific role permission overwrites + for overwrite in self._overwrites: + if overwrite.type == 'role' and overwrite.id in member_role_ids: + denies |= overwrite.deny + allows |= overwrite.allow + + base.handle_overwrite(allow=allows, deny=denies) + + # Apply member specific permission overwrites + for overwrite in self._overwrites: + if overwrite.type == 'member' and overwrite.id == member.id: + base.handle_overwrite(allow=overwrite.allow, deny=overwrite.deny) + break + + # default channels can always be read + if self.is_default: + base.read_messages = True + + # if you can't send a message in a channel then you can't have certain + # permissions as well + if not base.send_messages: + base.send_tts_messages = False + base.mention_everyone = False + base.embed_links = False + base.attach_files = False + + # if you can't read a channel then you have no permissions there + if not base.read_messages: + denied = Permissions.all_channel() + base.value &= ~denied.value + + # text channels do not have voice related permissions + if isinstance(self, TextChannel): + denied = Permissions.voice() + base.value &= ~denied.value + + return base + + @asyncio.coroutine + def delete(self): + """|coro| + + Deletes the channel. + + You must have Manage Channel permission to use this. + + Raises + ------- + Forbidden + You do not have proper permissions to delete the channel. + NotFound + The channel was not found or was already deleted. + HTTPException + Deleting the channel failed. + """ + yield from self._state.http.delete_channel(self.id) + class MessageChannel(metaclass=abc.ABCMeta): __slots__ = () -- cgit v1.2.3