"""Module for command helpers and classes.
This module contains extended classes and functions which are intended to
be used instead of those from the `discord.ext.commands` module.
"""
from __future__ import annotations
import inspect
import re
import functools
import weakref
from typing import (
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
Union,
MutableMapping,
TYPE_CHECKING,
)
import discord
from discord.ext.commands import (
BadArgument,
CommandError,
CheckFailure,
DisabledCommand,
command as dpy_command_deco,
Command as DPYCommand,
Cog as DPYCog,
CogMeta as DPYCogMeta,
Group as DPYGroup,
Greedy,
)
from .errors import ConversionFailure
from .requires import PermState, PrivilegeLevel, Requires, PermStateAllowedStates
from ..i18n import Translator
if TYPE_CHECKING:
# circular import avoidance
from .context import Context
__all__ = [
"Cog",
"CogMixin",
"CogCommandMixin",
"CogGroupMixin",
"Command",
"Group",
"GroupMixin",
"command",
"group",
"RESERVED_COMMAND_NAMES",
]
#: The following names are reserved for various reasons
RESERVED_COMMAND_NAMES = (
"cancel", # reserved due to use in ``redbot.core.utils.MessagePredicate``
)
_ = Translator("commands.commands", __file__)
DisablerDictType = MutableMapping[discord.Guild, Callable[["Context"], Awaitable[bool]]]
class CogCommandMixin:
"""A mixin for cogs and commands."""
@property
def help(self) -> str:
"""To be defined by subclasses"""
...
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if isinstance(self, Command):
decorated = self.callback
else:
decorated = self
self.requires: Requires = Requires(
privilege_level=getattr(
decorated, "__requires_privilege_level__", PrivilegeLevel.NONE
),
user_perms=getattr(decorated, "__requires_user_perms__", {}),
bot_perms=getattr(decorated, "__requires_bot_perms__", {}),
checks=getattr(decorated, "__requires_checks__", []),
)
def format_text_for_context(self, ctx: "Context", text: str) -> str:
"""
This formats text based on values in context
The steps are (currently, roughly) the following:
- substitute ``[p]`` with ``ctx.clean_prefix``
- substitute ``[botname]`` with ``ctx.me.display_name``
More steps may be added at a later time.
Cog creators should only override this if they want
help text to be modified, and may also want to
look at `format_help_for_context` and (for commands only)
``format_shortdoc_for_context``
Parameters
----------
ctx: Context
text: str
Returns
-------
str
text which has had some portions replaced based on context
"""
formatting_pattern = re.compile(r"\[p\]|\[botname\]")
def replacement(m: re.Match) -> str:
s = m.group(0)
if s == "[p]":
return ctx.clean_prefix
if s == "[botname]":
return ctx.me.display_name
# We shouldnt get here:
return s
return formatting_pattern.sub(replacement, text)
def format_help_for_context(self, ctx: "Context") -> str:
"""
This formats the help string based on values in context
The steps are (currently, roughly) the following:
- get the localized help
- substitute ``[p]`` with ``ctx.clean_prefix``
- substitute ``[botname]`` with ``ctx.me.display_name``
More steps may be added at a later time.
Cog creators may override this in their own command classes
as long as the method signature stays the same.
Parameters
----------
ctx: Context
Returns
-------
str
Localized help with some formatting
"""
help_str = self.help
if not help_str:
# Short circuit out on an empty help string
return help_str
return self.format_text_for_context(ctx, help_str)
def allow_for(self, model_id: Union[int, str], guild_id: int) -> None:
"""Actively allow this command for the given model.
Parameters
----------
model_id : Union[int, str]
Must be an `int` if supplying an ID. `str` is only valid
for "default".
guild_id : int
The guild ID to allow this cog or command in. For global
rules, use ``0``.
"""
self.requires.set_rule(model_id, PermState.ACTIVE_ALLOW, guild_id=guild_id)
def deny_to(self, model_id: Union[int, str], guild_id: int) -> None:
"""Actively deny this command to the given model.
Parameters
----------
model_id : Union[int, str]
Must be an `int` if supplying an ID. `str` is only valid
for "default".
guild_id : int
The guild ID to deny this cog or command in. For global
rules, use ``0``.
"""
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
if cur_rule is PermState.PASSIVE_ALLOW:
self.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
else:
self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id)
def clear_rule_for(
self, model_id: Union[int, str], guild_id: int
) -> Tuple[PermState, PermState]:
"""Clear the rule which is currently set for this model.
Parameters
----------
model_id : Union[int, str]
Must be an `int` if supplying an ID. `str` is only valid
for "default".
guild_id : int
The guild ID. For global rules, use ``0``.
"""
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
if cur_rule is PermState.ACTIVE_ALLOW:
new_rule = PermState.NORMAL
elif cur_rule is PermState.ACTIVE_DENY:
new_rule = PermState.NORMAL
elif cur_rule is PermState.CAUTIOUS_ALLOW:
new_rule = PermState.PASSIVE_ALLOW
else:
return cur_rule, cur_rule
self.requires.set_rule(model_id, new_rule, guild_id=guild_id)
return cur_rule, new_rule
def set_default_rule(self, rule: Optional[bool], guild_id: int) -> None:
"""Set the default rule for this cog or command.
Parameters
----------
rule : Optional[bool]
The rule to set as default. If ``True`` for allow,
``False`` for deny and ``None`` for normal.
guild_id : int
The guild to set the default rule in. When ``0``, this will
set the global default rule.
"""
if rule is None:
self.clear_rule_for(Requires.DEFAULT, guild_id=guild_id)
elif rule is True:
self.allow_for(Requires.DEFAULT, guild_id=guild_id)
elif rule is False:
self.deny_to(Requires.DEFAULT, guild_id=guild_id)
[docs]class Command(CogCommandMixin, DPYCommand):
"""Command class for Red.
This should not be created directly, and instead via the decorator.
This class inherits from `discord.ext.commands.Command`. The
attributes listed below are simply additions to the ones listed
with that class.
Attributes
----------
checks : List[`coroutine function`]
A list of check predicates which cannot be overridden, unlike
`Requires.checks`.
translator : Translator
A translator for this command's help docstring.
ignore_optional_for_conversion : bool
A value which can be set to not have discord.py's
argument parsing behavior for ``typing.Optional``
(type used will be of the inner type instead)
"""
def __call__(self, *args, **kwargs):
if self.cog:
# We need to inject cog as self here
return self.callback(self.cog, *args, **kwargs)
else:
return self.callback(*args, **kwargs)
def __init__(self, *args, **kwargs):
self.ignore_optional_for_conversion = kwargs.pop("ignore_optional_for_conversion", False)
super().__init__(*args, **kwargs)
self._help_override = kwargs.pop("help_override", None)
self.translator = kwargs.pop("i18n", None)
if self.parent is None:
for name in (self.name, *self.aliases):
if name in RESERVED_COMMAND_NAMES:
raise RuntimeError(
f"The name `{name}` cannot be set as a command name. It is reserved for internal use."
)
if len(self.qualified_name) > 60:
raise RuntimeError(
f"This command ({self.qualified_name}) has an excessively long qualified name, "
"and will not be added to the bot to prevent breaking tools and menus. (limit 60)"
)
def _ensure_assignment_on_copy(self, other):
super()._ensure_assignment_on_copy(other)
# Red specific
other.requires = self.requires
other.ignore_optional_for_conversion = self.ignore_optional_for_conversion
return other
@property
def callback(self):
return self._callback
@callback.setter
def callback(self, function):
"""
Below should be mostly the same as discord.py
Currently, we modify behavior for
- functools.partial support
- typing.Optional behavior change as an option
"""
self._callback = function
if isinstance(function, functools.partial):
self.module = function.func.__module__
globals_ = function.func.__globals__
else:
self.module = function.__module__
globals_ = function.__globals__
signature = inspect.signature(function)
self.params = signature.parameters.copy()
# PEP-563 allows postponing evaluation of annotations with a __future__
# import. When postponed, Parameter.annotation will be a string and must
# be replaced with the real value for the converters to work later on
for key, value in self.params.items():
if isinstance(value.annotation, str):
self.params[key] = value = value.replace(
annotation=eval(value.annotation, globals_)
)
# fail early for when someone passes an unparameterized Greedy type
if value.annotation is Greedy:
raise TypeError("Unparameterized Greedy[...] is disallowed in signature.")
if not self.ignore_optional_for_conversion:
continue # reduces indentation compared to alternative
try:
vtype = value.annotation.__origin__
if vtype is Union:
_NoneType = type if TYPE_CHECKING else type(None)
args = value.annotation.__args__
if _NoneType in args:
args = tuple(a for a in args if a is not _NoneType)
if len(args) == 1:
# can't have a union of 1 or 0 items
# 1 prevents this from becoming 0
# we need to prevent 2 become 1
# (Don't change that to becoming, it's intentional :musical_note:)
self.params[key] = value = value.replace(annotation=args[0])
else:
# and mypy wretches at the correct Union[args]
temp_type = type if TYPE_CHECKING else Union[args]
self.params[key] = value = value.replace(annotation=temp_type)
except AttributeError:
continue
@property
def help(self):
"""Help string for this command.
If the :code:`help` kwarg was passed into the decorator, it will
default to that. If not, it will attempt to translate the docstring
of the command's callback function.
"""
if self._help_override is not None:
return self._help_override
if self.translator is None:
translator = getattr(self.cog, "__translator__", lambda s: s)
else:
translator = self.translator
command_doc = self.callback.__doc__
if command_doc is None:
return ""
return inspect.cleandoc(translator(command_doc))
@help.setter
def help(self, value):
# We don't want our help property to be overwritten, namely by super()
pass
@property
def parents(self) -> List["Group"]:
"""List[commands.Group] : Returns all parent commands of this command.
This is sorted by the length of :attr:`.qualified_name` from highest to lowest.
If the command has no parents, this will be an empty list.
"""
cmd = self.parent
entries = []
while cmd is not None:
entries.append(cmd)
cmd = cmd.parent
return sorted(entries, key=lambda x: len(x.qualified_name), reverse=True)
# noinspection PyMethodOverriding
[docs] async def can_run(
self,
ctx: "Context",
*,
check_all_parents: bool = False,
change_permission_state: bool = False,
) -> bool:
"""Check if this command can be run in the given context.
This function first checks if the command can be run using
discord.py's method `discord.ext.commands.Command.can_run`,
then will return the result of `Requires.verify`.
Keyword Arguments
-----------------
check_all_parents : bool
If ``True``, this will check permissions for all of this
command's parents and its cog as well as the command
itself. Defaults to ``False``.
change_permission_state : bool
Whether or not the permission state should be changed as
a result of this call. For most cases this should be
``False``. Defaults to ``False``.
"""
ret = await super().can_run(ctx)
if ret is False:
return False
# This is so contexts invoking other commands can be checked with
# this command as well
original_command = ctx.command
original_state = ctx.permission_state
ctx.command = self
if check_all_parents is True:
# Since we're starting from the beginning, we should reset the state to normal
ctx.permission_state = PermState.NORMAL
for parent in reversed(self.parents):
try:
result = await parent.can_run(ctx, change_permission_state=True)
except CommandError:
result = False
if result is False:
return False
if self.parent is None and self.cog is not None:
# For top-level commands, we need to check the cog's requires too
ret = await self.cog.requires.verify(ctx)
if ret is False:
return False
try:
return await self.requires.verify(ctx)
finally:
ctx.command = original_command
if not change_permission_state:
ctx.permission_state = original_state
async def prepare(self, ctx):
ctx.command = self
if not self.enabled:
raise DisabledCommand(f"{self.name} command is disabled")
if not await self.can_run(ctx, change_permission_state=True):
raise CheckFailure(f"The check functions for command {self.qualified_name} failed.")
if self.cooldown_after_parsing:
await self._parse_arguments(ctx)
self._prepare_cooldowns(ctx)
else:
self._prepare_cooldowns(ctx)
await self._parse_arguments(ctx)
if self._max_concurrency is not None:
await self._max_concurrency.acquire(ctx)
await self.call_before_hooks(ctx)
[docs] async def do_conversion(
self, ctx: "Context", converter, argument: str, param: inspect.Parameter
):
"""Convert an argument according to its type annotation.
Raises
------
ConversionFailure
If doing the conversion failed.
Returns
-------
Any
The converted argument.
"""
# Let's not worry about all of this junk if it's just a str converter
if converter is str:
return argument
try:
return await super().do_conversion(ctx, converter, argument, param)
except BadArgument as exc:
raise ConversionFailure(converter, argument, param, *exc.args) from exc
except ValueError as exc:
# Some common converters need special treatment...
if converter in (int, float):
message = _('"{argument}" is not a number.').format(argument=argument)
raise ConversionFailure(converter, argument, param, message) from exc
# We should expose anything which might be a bug in the converter
raise exc
[docs] async def can_see(self, ctx: "Context"):
"""Check if this command is visible in the given context.
In short, this will verify whether the user can run the
command, and also whether the command is hidden or not.
Parameters
----------
ctx : `Context`
The invocation context to check with.
Returns
-------
bool
``True`` if this command is visible in the given context.
"""
for cmd in (self, *self.parents):
if cmd.hidden:
return False
try:
can_run = await self.can_run(
ctx, check_all_parents=True, change_permission_state=False
)
except (CheckFailure, DisabledCommand):
return False
else:
if can_run is False:
return False
return True
[docs] def disable_in(self, guild: discord.Guild) -> bool:
"""Disable this command in the given guild.
Parameters
----------
guild : discord.Guild
The guild to disable the command in.
Returns
-------
bool
``True`` if the command wasn't already disabled.
"""
disabler = get_command_disabler(guild)
if disabler in self.checks:
return False
else:
self.checks.append(disabler)
return True
[docs] def enable_in(self, guild: discord.Guild) -> bool:
"""Enable this command in the given guild.
Parameters
----------
guild : discord.Guild
The guild to enable the command in.
Returns
-------
bool
``True`` if the command wasn't already enabled.
"""
disabler = get_command_disabler(guild)
try:
self.checks.remove(disabler)
except ValueError:
return False
else:
return True
[docs] def allow_for(self, model_id: Union[int, str], guild_id: int) -> None:
super().allow_for(model_id, guild_id=guild_id)
parents = self.parents
if self.cog is not None:
parents.append(self.cog)
for parent in parents:
cur_rule = parent.requires.get_rule(model_id, guild_id=guild_id)
if cur_rule is PermState.NORMAL:
parent.requires.set_rule(model_id, PermState.PASSIVE_ALLOW, guild_id=guild_id)
elif cur_rule is PermState.ACTIVE_DENY:
parent.requires.set_rule(model_id, PermState.CAUTIOUS_ALLOW, guild_id=guild_id)
[docs] def clear_rule_for(
self, model_id: Union[int, str], guild_id: int
) -> Tuple[PermState, PermState]:
old_rule, new_rule = super().clear_rule_for(model_id, guild_id=guild_id)
if old_rule is PermState.ACTIVE_ALLOW:
parents = self.parents
if self.cog is not None:
parents.append(self.cog)
for parent in parents:
should_continue = parent.reevaluate_rules_for(model_id, guild_id=guild_id)[1]
if not should_continue:
break
return old_rule, new_rule
[docs] def error(self, coro):
"""
A decorator that registers a coroutine as a local error handler.
A local error handler is an :func:`.on_command_error` event limited to
a single command.
The on_command_error event is still dispatched
for commands with a dedicated error handler.
Red's global error handler will ignore commands with a registered error handler.
To have red handle specific errors with the default behavior,
call ``Red.on_command_error`` with ``unhandled_by_cog`` set to True.
Due to how discord.py wraps exceptions, the exception you are expecting here
is likely in ``error.original`` despite that the normal event handler for bot
wide command error handling has no such wrapping.
For example:
.. code-block:: python
@a_command.error
async def a_command_error_handler(self, ctx, error):
if isinstance(error.original, MyErrrorType):
self.log_exception(error.original)
else:
await ctx.bot.on_command_error(ctx, error.original, unhandled_by_cog=True)
Parameters
-----------
coro : :term:`coroutine function`
The coroutine to register as the local error handler.
Raises
-------
discord.ClientException
The coroutine is not actually a coroutine.
"""
return super().error(coro)
[docs] def format_shortdoc_for_context(self, ctx: "Context") -> str:
"""
This formats the short version of the help
string based on values in context
See ``format_text_for_context`` for the actual implementation details
Cog creators may override this in their own command classes
as long as the method signature stays the same.
Parameters
----------
ctx: Context
Returns
-------
str
Localized help with some formatting
"""
sh = self.short_doc
return self.format_text_for_context(ctx, sh) if sh else sh
class GroupMixin(discord.ext.commands.GroupMixin):
"""Mixin for `Group` and `Red` classes.
This class inherits from :class:`discord.ext.commands.GroupMixin`.
"""
def command(self, *args, **kwargs):
"""A shortcut decorator that invokes :func:`.command` and adds it to
the internal command list via :meth:`~.GroupMixin.add_command`.
"""
def decorator(func):
kwargs.setdefault("parent", self)
result = command(*args, **kwargs)(func)
self.add_command(result)
return result
return decorator
def group(self, *args, **kwargs):
"""A shortcut decorator that invokes :func:`.group` and adds it to
the internal command list via :meth:`~.GroupMixin.add_command`.
"""
def decorator(func):
kwargs.setdefault("parent", self)
result = group(*args, **kwargs)(func)
self.add_command(result)
return result
return decorator
class CogGroupMixin:
requires: Requires
def reevaluate_rules_for(
self, model_id: Union[str, int], guild_id: int = 0
) -> Tuple[PermState, bool]:
"""Re-evaluate a rule by checking subcommand rules.
This is called when a subcommand is no longer actively allowed.
Parameters
----------
model_id : Union[int, str]
Must be an `int` if supplying an ID. `str` is only valid
for "default".
guild_id : int
The guild ID. For global rules, use ``0``.
Returns
-------
Tuple[PermState, bool]
A 2-tuple containing the new rule and a bool indicating
whether or not the rule was changed as a result of this
call.
"""
cur_rule = self.requires.get_rule(model_id, guild_id=guild_id)
if cur_rule not in (PermState.NORMAL, PermState.ACTIVE_ALLOW, PermState.ACTIVE_DENY):
# The above three states are unaffected by subcommand rules
# Remaining states can be changed if there exists no actively-allowed
# subcommand (this includes subcommands multiple levels below)
all_commands: Dict[str, Command] = getattr(self, "all_commands", {})
if any(
cmd.requires.get_rule(model_id, guild_id=guild_id) in PermStateAllowedStates
for cmd in all_commands.values()
):
return cur_rule, False
elif cur_rule is PermState.PASSIVE_ALLOW:
self.requires.set_rule(model_id, PermState.NORMAL, guild_id=guild_id)
return PermState.NORMAL, True
elif cur_rule is PermState.CAUTIOUS_ALLOW:
self.requires.set_rule(model_id, PermState.ACTIVE_DENY, guild_id=guild_id)
return PermState.ACTIVE_DENY, True
# Default return value
return cur_rule, False
[docs]class Group(GroupMixin, Command, CogGroupMixin, DPYGroup):
"""Group command class for Red.
This class inherits from `Command`, with :class:`GroupMixin` and
`discord.ext.commands.Group` mixed in.
"""
def __init__(self, *args, **kwargs):
self.autohelp = kwargs.pop("autohelp", True)
super().__init__(*args, **kwargs)
async def invoke(self, ctx: "Context"):
# we skip prepare in some cases to avoid some things
# We still always want this part of the behavior though
ctx.command = self
# Our re-ordered behavior below.
view = ctx.view
previous = view.index
view.skip_ws()
trigger = view.get_word()
if trigger:
ctx.subcommand_passed = trigger
ctx.invoked_subcommand = self.all_commands.get(trigger, None)
view.index = previous
view.previous = previous
if ctx.invoked_subcommand is None or self == ctx.invoked_subcommand:
if self.autohelp and not self.invoke_without_command:
if not await self.can_run(ctx, change_permission_state=True):
raise CheckFailure()
await ctx.send_help()
elif self.invoke_without_command:
# So invoke_without_command when a subcommand of this group is invoked
# will skip the the invokation of *this* command. However, because of
# how our permissions system works, we don't want it to skip the checks
# as well.
if not await self.can_run(ctx, change_permission_state=True):
raise CheckFailure()
# this is actually why we don't prepare earlier.
await super().invoke(ctx)
class CogMixin(CogGroupMixin, CogCommandMixin):
"""Mixin class for a cog, intended for use with discord.py's cog class"""
@property
def help(self):
doc = self.__doc__
translator = getattr(self, "__translator__", lambda s: s)
if doc:
return inspect.cleandoc(translator(doc))
async def can_run(self, ctx: "Context", **kwargs) -> bool:
"""
This really just exists to allow easy use with other methods using can_run
on commands and groups such as help formatters.
kwargs used in that won't apply here as they don't make sense to,
but will be swallowed silently for a compatible signature for ease of use.
Parameters
----------
ctx : `Context`
The invocation context to check with.
Returns
-------
bool
``True`` if this cog is usable in the given context.
"""
try:
can_run = await self.requires.verify(ctx)
except CommandError:
return False
return can_run
async def can_see(self, ctx: "Context") -> bool:
"""Check if this cog is visible in the given context.
In short, this will verify whether
the user is allowed to access the cog by permissions.
This has an identical signature to the one used by commands, and groups,
but needs a different underlying mechanism.
Parameters
----------
ctx : `Context`
The invocation context to check with.
Returns
-------
bool
``True`` if this cog is visible in the given context.
"""
return await self.can_run(ctx)
class Cog(CogMixin, DPYCog, metaclass=DPYCogMeta):
"""
Red's Cog base class
This includes a metaclass from discord.py
"""
__cog_commands__: Tuple[Command]
@property
def all_commands(self) -> Dict[str, Command]:
"""
This does not have identical behavior to
Group.all_commands but should return what you expect
"""
return {cmd.name: cmd for cmd in self.__cog_commands__}
[docs]def command(name=None, cls=Command, **attrs):
"""A decorator which transforms an async function into a `Command`.
Same interface as `discord.ext.commands.command`.
"""
attrs["help_override"] = attrs.pop("help", None)
return dpy_command_deco(name, cls, **attrs)
[docs]def group(name=None, cls=Group, **attrs):
"""A decorator which transforms an async function into a `Group`.
Same interface as `discord.ext.commands.group`.
"""
return dpy_command_deco(name, cls, **attrs)
__command_disablers: DisablerDictType = weakref.WeakValueDictionary()
def get_command_disabler(guild: discord.Guild) -> Callable[["Context"], Awaitable[bool]]:
"""Get the command disabler for a guild.
A command disabler is a simple check predicate which returns
``False`` if the context is within the given guild.
"""
try:
return __command_disablers[guild.id]
except KeyError:
async def disabler(ctx: "Context") -> bool:
if ctx.guild is not None and ctx.guild.id == guild.id:
raise DisabledCommand()
return True
__command_disablers[guild.id] = disabler
return disabler
# This is intentionally left out of `__all__` as it is not intended for general use
class _AlwaysAvailableCommand(Command):
"""
This should be used only for informational commands
which should not be disabled or removed
These commands cannot belong to a cog.
These commands do not respect most forms of checks, and
should only be used with that in mind.
This particular class is not supported for 3rd party use
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.cog is not None:
raise TypeError("This command may not be added to a cog")
async def can_run(self, ctx, *args, **kwargs) -> bool:
return not ctx.author.bot