Source code for redbot.core.commands.commands

"""Module for command helpers and classes.

This module contains extended classes and functions which are intended to
be used instead of those from the `discord.ext.commands` module.
"""
from __future__ import annotations

import inspect
import re
import functools
import weakref
from typing import (
    Awaitable,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
    MutableMapping,
    TYPE_CHECKING,
)

import discord
from discord.ext.commands import (
    BadArgument,
    CommandError,
    CheckFailure,
    DisabledCommand,
    command as dpy_command_deco,
    Command as DPYCommand,
    Cog as DPYCog,
    CogMeta as DPYCogMeta,
    Group as DPYGroup,
    Greedy,
)

from .errors import ConversionFailure
from .requires import PermState, PrivilegeLevel, Requires, PermStateAllowedStates
from ..i18n import Translator

if TYPE_CHECKING:
    # circular import avoidance
    from .context import Context


__all__ = [
    "Cog",
    "CogMixin",
    "CogCommandMixin",
    "CogGroupMixin",
    "Command",
    "Group",
    "GroupMixin",
    "command",
    "group",
    "RESERVED_COMMAND_NAMES",
]

#: The following names are reserved for various reasons
RESERVED_COMMAND_NAMES = (
    "cancel",  # reserved due to use in ``redbot.core.utils.MessagePredicate``
)

_ = Translator("commands.commands", __file__)
DisablerDictType = MutableMapping[discord.Guild, Callable[["Context"], Awaitable[bool]]]


class CogCommandMixin:
    """A mixin for cogs and commands."""

    @property
    def help(self) -> str:
        """To be defined by subclasses"""
        ...

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if isinstance(self, Command):
            decorated = self.callback
        else:
            decorated = self
        self.requires: Requires = Requires(
            privilege_level=getattr(
                decorated, "__requires_privilege_level__", PrivilegeLevel.NONE
            ),
            user_perms=getattr(decorated, "__requires_user_perms__", {}),
            bot_perms=getattr(decorated, "__requires_bot_perms__", {}),
            checks=getattr(decorated, "__requires_checks__", []),
        )

    def format_text_for_context(self, ctx: "Context", text: str) -> str:
        """
        This formats text based on values in context

        The steps are (currently, roughly) the following:

            - substitute ``[p]`` with ``ctx.clean_prefix``
            - substitute ``[botname]`` with ``ctx.me.display_name``

        More steps may be added at a later time.

        Cog creators should only override this if they want
        help text to be modified, and may also want to
        look at `format_help_for_context` and (for commands only)
        ``format_shortdoc_for_context``

        Parameters
        ----------
        ctx: Context
        text: str

        Returns
        -------
        str
            text which has had some portions replaced based on context
        """
        formatting_pattern = re.compile(r"\[p\]|\[botname\]")

        def replacement(m: re.Match) -> str:
            s = m.group(0)
            if s == "[p]":
                return ctx.clean_prefix
            if s == "[botname]":
                return ctx.me.display_name
            # We shouldnt get here:
            return s

        return formatting_pattern.sub(replacement, text)

    def format_help_for_context(self, ctx: "Context") -> str:
        """
        This formats the help string based on values in context

        The steps are (currently, roughly) the following:

            - get the localized help
            - substitute ``[p]`` with ``ctx.clean_prefix``
            - substitute ``[botname]`` with ``ctx.me.display_name``

        More steps may be added at a later time.

        Cog creators may override this in their own command classes
        as long as the method signature stays the same.

        Parameters
        ----------
        ctx: Context

        Returns
        -------
        str
            Localized help with some formatting
        """

        help_str = self.help
        if not help_str:
            # Short circuit out on an empty help string
            return help_str

        return self.format_text_for_context(ctx, help_str)

    def allow_for(self, model_id: Union[int, str], guild_id: int) -> None:
        """Actively allow this command for the given model.

        Parameters
        ----------
        model_id : Union[int, str]
            Must be an `int` if supplying an ID. `str` is only valid
            for "default".
        guild_id : int
            The guild ID to allow this cog or command in. For global
            rules, use ``0``.

        """
        self.requires.set_rule(model_id, PermState.ACTIVE_ALLOW, guild_id=guild_id)

    def deny_to(self, model_id: Union[int, str], guild_id: int) -> None:
        """Actively deny this command to the given model.

        Parameters
        ----------
        model_id : Union[int, str]
            Must be an `int` if supplying an ID. `str` is only valid
            for "default".
        guild_id : int
            The guild ID to deny this cog or command in. For global
            rules, use ``0``.

        """
        cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
        if cur_rule is PermState.PASSIVE_ALLOW:
            self.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
        else:
            self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id)

    def clear_rule_for(
        self, model_id: Union[int, str], guild_id: int
    ) -> Tuple[PermState, PermState]:
        """Clear the rule which is currently set for this model.

        Parameters
        ----------
        model_id : Union[int, str]
            Must be an `int` if supplying an ID. `str` is only valid
            for "default".
        guild_id : int
            The guild ID. For global rules, use ``0``.

        """
        cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
        if cur_rule is PermState.ACTIVE_ALLOW:
            new_rule = PermState.NORMAL
        elif cur_rule is PermState.ACTIVE_DENY:
            new_rule = PermState.NORMAL
        elif cur_rule is PermState.CAUTIOUS_ALLOW:
            new_rule = PermState.PASSIVE_ALLOW
        else:
            return cur_rule, cur_rule
        self.requires.set_rule(model_id, new_rule, guild_id=guild_id)
        return cur_rule, new_rule

    def set_default_rule(self, rule: Optional[bool], guild_id: int) -> None:
        """Set the default rule for this cog or command.

        Parameters
        ----------
        rule : Optional[bool]
            The rule to set as default. If ``True`` for allow,
            ``False`` for deny and ``None`` for normal.
        guild_id : int
            The guild to set the default rule in. When ``0``, this will
            set the global default rule.

        """
        if rule is None:
            self.clear_rule_for(Requires.DEFAULT, guild_id=guild_id)
        elif rule is True:
            self.allow_for(Requires.DEFAULT, guild_id=guild_id)
        elif rule is False:
            self.deny_to(Requires.DEFAULT, guild_id=guild_id)


[docs]class Command(CogCommandMixin, DPYCommand): """Command class for Red. This should not be created directly, and instead via the decorator. This class inherits from `discord.ext.commands.Command`. The attributes listed below are simply additions to the ones listed with that class. Attributes ---------- checks : List[`coroutine function`] A list of check predicates which cannot be overridden, unlike `Requires.checks`. translator : Translator A translator for this command's help docstring. ignore_optional_for_conversion : bool A value which can be set to not have discord.py's argument parsing behavior for ``typing.Optional`` (type used will be of the inner type instead) """ def __call__(self, *args, **kwargs): if self.cog: # We need to inject cog as self here return self.callback(self.cog, *args, **kwargs) else: return self.callback(*args, **kwargs) def __init__(self, *args, **kwargs): self.ignore_optional_for_conversion = kwargs.pop("ignore_optional_for_conversion", False) super().__init__(*args, **kwargs) self._help_override = kwargs.pop("help_override", None) self.translator = kwargs.pop("i18n", None) if self.parent is None: for name in (self.name, *self.aliases): if name in RESERVED_COMMAND_NAMES: raise RuntimeError( f"The name `{name}` cannot be set as a command name. It is reserved for internal use." ) if len(self.qualified_name) > 60: raise RuntimeError( f"This command ({self.qualified_name}) has an excessively long qualified name, " "and will not be added to the bot to prevent breaking tools and menus. (limit 60)" ) def _ensure_assignment_on_copy(self, other): super()._ensure_assignment_on_copy(other) # Red specific other.requires = self.requires other.ignore_optional_for_conversion = self.ignore_optional_for_conversion return other @property def callback(self): return self._callback @callback.setter def callback(self, function): """ Below should be mostly the same as discord.py Currently, we modify behavior for - functools.partial support - typing.Optional behavior change as an option """ self._callback = function if isinstance(function, functools.partial): self.module = function.func.__module__ globals_ = function.func.__globals__ else: self.module = function.__module__ globals_ = function.__globals__ signature = inspect.signature(function) self.params = signature.parameters.copy() # PEP-563 allows postponing evaluation of annotations with a __future__ # import. When postponed, Parameter.annotation will be a string and must # be replaced with the real value for the converters to work later on for key, value in self.params.items(): if isinstance(value.annotation, str): self.params[key] = value = value.replace( annotation=eval(value.annotation, globals_) ) # fail early for when someone passes an unparameterized Greedy type if value.annotation is Greedy: raise TypeError("Unparameterized Greedy[...] is disallowed in signature.") if not self.ignore_optional_for_conversion: continue # reduces indentation compared to alternative try: vtype = value.annotation.__origin__ if vtype is Union: _NoneType = type if TYPE_CHECKING else type(None) args = value.annotation.__args__ if _NoneType in args: args = tuple(a for a in args if a is not _NoneType) if len(args) == 1: # can't have a union of 1 or 0 items # 1 prevents this from becoming 0 # we need to prevent 2 become 1 # (Don't change that to becoming, it's intentional :musical_note:) self.params[key] = value = value.replace(annotation=args[0]) else: # and mypy wretches at the correct Union[args] temp_type = type if TYPE_CHECKING else Union[args] self.params[key] = value = value.replace(annotation=temp_type) except AttributeError: continue @property def help(self): """Help string for this command. If the :code:`help` kwarg was passed into the decorator, it will default to that. If not, it will attempt to translate the docstring of the command's callback function. """ if self._help_override is not None: return self._help_override if self.translator is None: translator = getattr(self.cog, "__translator__", lambda s: s) else: translator = self.translator command_doc = self.callback.__doc__ if command_doc is None: return "" return inspect.cleandoc(translator(command_doc)) @help.setter def help(self, value): # We don't want our help property to be overwritten, namely by super() pass @property def parents(self) -> List["Group"]: """List[commands.Group] : Returns all parent commands of this command. This is sorted by the length of :attr:`.qualified_name` from highest to lowest. If the command has no parents, this will be an empty list. """ cmd = self.parent entries = [] while cmd is not None: entries.append(cmd) cmd = cmd.parent return sorted(entries, key=lambda x: len(x.qualified_name), reverse=True) # noinspection PyMethodOverriding
[docs] async def can_run( self, ctx: "Context", *, check_all_parents: bool = False, change_permission_state: bool = False, ) -> bool: """Check if this command can be run in the given context. This function first checks if the command can be run using discord.py's method `discord.ext.commands.Command.can_run`, then will return the result of `Requires.verify`. Keyword Arguments ----------------- check_all_parents : bool If ``True``, this will check permissions for all of this command's parents and its cog as well as the command itself. Defaults to ``False``. change_permission_state : bool Whether or not the permission state should be changed as a result of this call. For most cases this should be ``False``. Defaults to ``False``. """ ret = await super().can_run(ctx) if ret is False: return False # This is so contexts invoking other commands can be checked with # this command as well original_command = ctx.command original_state = ctx.permission_state ctx.command = self if check_all_parents is True: # Since we're starting from the beginning, we should reset the state to normal ctx.permission_state = PermState.NORMAL for parent in reversed(self.parents): try: result = await parent.can_run(ctx, change_permission_state=True) except CommandError: result = False if result is False: return False if self.parent is None and self.cog is not None: # For top-level commands, we need to check the cog's requires too ret = await self.cog.requires.verify(ctx) if ret is False: return False try: return await self.requires.verify(ctx) finally: ctx.command = original_command if not change_permission_state: ctx.permission_state = original_state
async def prepare(self, ctx): ctx.command = self if not self.enabled: raise DisabledCommand(f"{self.name} command is disabled") if not await self.can_run(ctx, change_permission_state=True): raise CheckFailure(f"The check functions for command {self.qualified_name} failed.") if self.cooldown_after_parsing: await self._parse_arguments(ctx) self._prepare_cooldowns(ctx) else: self._prepare_cooldowns(ctx) await self._parse_arguments(ctx) if self._max_concurrency is not None: await self._max_concurrency.acquire(ctx) await self.call_before_hooks(ctx)
[docs] async def do_conversion( self, ctx: "Context", converter, argument: str, param: inspect.Parameter ): """Convert an argument according to its type annotation. Raises ------ ConversionFailure If doing the conversion failed. Returns ------- Any The converted argument. """ # Let's not worry about all of this junk if it's just a str converter if converter is str: return argument try: return await super().do_conversion(ctx, converter, argument, param) except BadArgument as exc: raise ConversionFailure(converter, argument, param, *exc.args) from exc except ValueError as exc: # Some common converters need special treatment... if converter in (int, float): message = _('"{argument}" is not a number.').format(argument=argument) raise ConversionFailure(converter, argument, param, message) from exc # We should expose anything which might be a bug in the converter raise exc
[docs] async def can_see(self, ctx: "Context"): """Check if this command is visible in the given context. In short, this will verify whether the user can run the command, and also whether the command is hidden or not. Parameters ---------- ctx : `Context` The invocation context to check with. Returns ------- bool ``True`` if this command is visible in the given context. """ for cmd in (self, *self.parents): if cmd.hidden: return False try: can_run = await self.can_run( ctx, check_all_parents=True, change_permission_state=False ) except (CheckFailure, DisabledCommand): return False else: if can_run is False: return False return True
[docs] def disable_in(self, guild: discord.Guild) -> bool: """Disable this command in the given guild. Parameters ---------- guild : discord.Guild The guild to disable the command in. Returns ------- bool ``True`` if the command wasn't already disabled. """ disabler = get_command_disabler(guild) if disabler in self.checks: return False else: self.checks.append(disabler) return True
[docs] def enable_in(self, guild: discord.Guild) -> bool: """Enable this command in the given guild. Parameters ---------- guild : discord.Guild The guild to enable the command in. Returns ------- bool ``True`` if the command wasn't already enabled. """ disabler = get_command_disabler(guild) try: self.checks.remove(disabler) except ValueError: return False else: return True
[docs] def allow_for(self, model_id: Union[int, str], guild_id: int) -> None: super().allow_for(model_id, guild_id=guild_id) parents = self.parents if self.cog is not None: parents.append(self.cog) for parent in parents: cur_rule = parent.requires.get_rule(model_id, guild_id=guild_id) if cur_rule is PermState.NORMAL: parent.requires.set_rule(model_id, PermState.PASSIVE_ALLOW, guild_id=guild_id) elif cur_rule is PermState.ACTIVE_DENY: parent.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
[docs] def clear_rule_for( self, model_id: Union[int, str], guild_id: int ) -> Tuple[PermState, PermState]: old_rule, new_rule = super().clear_rule_for(model_id, guild_id=guild_id) if old_rule is PermState.ACTIVE_ALLOW: parents = self.parents if self.cog is not None: parents.append(self.cog) for parent in parents: should_continue = parent.reevaluate_rules_for(model_id, guild_id=guild_id)[1] if not should_continue: break return old_rule, new_rule
[docs] def error(self, coro): """ A decorator that registers a coroutine as a local error handler. A local error handler is an :func:`.on_command_error` event limited to a single command. The on_command_error event is still dispatched for commands with a dedicated error handler. Red's global error handler will ignore commands with a registered error handler. To have red handle specific errors with the default behavior, call ``Red.on_command_error`` with ``unhandled_by_cog`` set to True. Due to how discord.py wraps exceptions, the exception you are expecting here is likely in ``error.original`` despite that the normal event handler for bot wide command error handling has no such wrapping. For example: .. code-block:: python @a_command.error async def a_command_error_handler(self, ctx, error): if isinstance(error.original, MyErrrorType): self.log_exception(error.original) else: await ctx.bot.on_command_error(ctx, error.original, unhandled_by_cog=True) Parameters ----------- coro : :term:`coroutine function` The coroutine to register as the local error handler. Raises ------- discord.ClientException The coroutine is not actually a coroutine. """ return super().error(coro)
[docs] def format_shortdoc_for_context(self, ctx: "Context") -> str: """ This formats the short version of the help string based on values in context See ``format_text_for_context`` for the actual implementation details Cog creators may override this in their own command classes as long as the method signature stays the same. Parameters ---------- ctx: Context Returns ------- str Localized help with some formatting """ sh = self.short_doc return self.format_text_for_context(ctx, sh) if sh else sh
class GroupMixin(discord.ext.commands.GroupMixin): """Mixin for `Group` and `Red` classes. This class inherits from :class:`discord.ext.commands.GroupMixin`. """ def command(self, *args, **kwargs): """A shortcut decorator that invokes :func:`.command` and adds it to the internal command list via :meth:`~.GroupMixin.add_command`. """ def decorator(func): kwargs.setdefault("parent", self) result = command(*args, **kwargs)(func) self.add_command(result) return result return decorator def group(self, *args, **kwargs): """A shortcut decorator that invokes :func:`.group` and adds it to the internal command list via :meth:`~.GroupMixin.add_command`. """ def decorator(func): kwargs.setdefault("parent", self) result = group(*args, **kwargs)(func) self.add_command(result) return result return decorator class CogGroupMixin: requires: Requires def reevaluate_rules_for( self, model_id: Union[str, int], guild_id: int = 0 ) -> Tuple[PermState, bool]: """Re-evaluate a rule by checking subcommand rules. This is called when a subcommand is no longer actively allowed. Parameters ---------- model_id : Union[int, str] Must be an `int` if supplying an ID. `str` is only valid for "default". guild_id : int The guild ID. For global rules, use ``0``. Returns ------- Tuple[PermState, bool] A 2-tuple containing the new rule and a bool indicating whether or not the rule was changed as a result of this call. """ cur_rule = self.requires.get_rule(model_id, guild_id=guild_id) if cur_rule not in (PermState.NORMAL, PermState.ACTIVE_ALLOW, PermState.ACTIVE_DENY): # The above three states are unaffected by subcommand rules # Remaining states can be changed if there exists no actively-allowed # subcommand (this includes subcommands multiple levels below) all_commands: Dict[str, Command] = getattr(self, "all_commands", {}) if any( cmd.requires.get_rule(model_id, guild_id=guild_id) in PermStateAllowedStates for cmd in all_commands.values() ): return cur_rule, False elif cur_rule is PermState.PASSIVE_ALLOW: self.requires.set_rule(model_id, PermState.NORMAL, guild_id=guild_id) return PermState.NORMAL, True elif cur_rule is PermState.CAUTIOUS_ALLOW: self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id) return PermState.ACTIVE_DENY, True # Default return value return cur_rule, False
[docs]class Group(GroupMixin, Command, CogGroupMixin, DPYGroup): """Group command class for Red. This class inherits from `Command`, with :class:`GroupMixin` and `discord.ext.commands.Group` mixed in. """ def __init__(self, *args, **kwargs): self.autohelp = kwargs.pop("autohelp", True) super().__init__(*args, **kwargs) async def invoke(self, ctx: "Context"): # we skip prepare in some cases to avoid some things # We still always want this part of the behavior though ctx.command = self # Our re-ordered behavior below. view = ctx.view previous = view.index view.skip_ws() trigger = view.get_word() if trigger: ctx.subcommand_passed = trigger ctx.invoked_subcommand = self.all_commands.get(trigger, None) view.index = previous view.previous = previous if ctx.invoked_subcommand is None or self == ctx.invoked_subcommand: if self.autohelp and not self.invoke_without_command: if not await self.can_run(ctx, change_permission_state=True): raise CheckFailure() await ctx.send_help() elif self.invoke_without_command: # So invoke_without_command when a subcommand of this group is invoked # will skip the the invokation of *this* command. However, because of # how our permissions system works, we don't want it to skip the checks # as well. if not await self.can_run(ctx, change_permission_state=True): raise CheckFailure() # this is actually why we don't prepare earlier. await super().invoke(ctx)
class CogMixin(CogGroupMixin, CogCommandMixin): """Mixin class for a cog, intended for use with discord.py's cog class""" @property def help(self): doc = self.__doc__ translator = getattr(self, "__translator__", lambda s: s) if doc: return inspect.cleandoc(translator(doc)) async def can_run(self, ctx: "Context", **kwargs) -> bool: """ This really just exists to allow easy use with other methods using can_run on commands and groups such as help formatters. kwargs used in that won't apply here as they don't make sense to, but will be swallowed silently for a compatible signature for ease of use. Parameters ---------- ctx : `Context` The invocation context to check with. Returns ------- bool ``True`` if this cog is usable in the given context. """ try: can_run = await self.requires.verify(ctx) except CommandError: return False return can_run async def can_see(self, ctx: "Context") -> bool: """Check if this cog is visible in the given context. In short, this will verify whether the user is allowed to access the cog by permissions. This has an identical signature to the one used by commands, and groups, but needs a different underlying mechanism. Parameters ---------- ctx : `Context` The invocation context to check with. Returns ------- bool ``True`` if this cog is visible in the given context. """ return await self.can_run(ctx) class Cog(CogMixin, DPYCog, metaclass=DPYCogMeta): """ Red's Cog base class This includes a metaclass from discord.py """ __cog_commands__: Tuple[Command] @property def all_commands(self) -> Dict[str, Command]: """ This does not have identical behavior to Group.all_commands but should return what you expect """ return {cmd.name: cmd for cmd in self.__cog_commands__}
[docs]def command(name=None, cls=Command, **attrs): """A decorator which transforms an async function into a `Command`. Same interface as `discord.ext.commands.command`. """ attrs["help_override"] = attrs.pop("help", None) return dpy_command_deco(name, cls, **attrs)
[docs]def group(name=None, cls=Group, **attrs): """A decorator which transforms an async function into a `Group`. Same interface as `discord.ext.commands.group`. """ return dpy_command_deco(name, cls, **attrs)
__command_disablers: DisablerDictType = weakref.WeakValueDictionary() def get_command_disabler(guild: discord.Guild) -> Callable[["Context"], Awaitable[bool]]: """Get the command disabler for a guild. A command disabler is a simple check predicate which returns ``False`` if the context is within the given guild. """ try: return __command_disablers[guild.id] except KeyError: async def disabler(ctx: "Context") -> bool: if ctx.guild is not None and ctx.guild.id == guild.id: raise DisabledCommand() return True __command_disablers[guild.id] = disabler return disabler # This is intentionally left out of `__all__` as it is not intended for general use class _AlwaysAvailableCommand(Command): """ This should be used only for informational commands which should not be disabled or removed These commands cannot belong to a cog. These commands do not respect most forms of checks, and should only be used with that in mind. This particular class is not supported for 3rd party use """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.cog is not None: raise TypeError("This command may not be added to a cog") async def can_run(self, ctx, *args, **kwargs) -> bool: return not ctx.author.bot