import contextlib
import logging
import re
import sys
from asyncio import AbstractEventLoop, CancelledError, get_event_loop, iscoroutinefunction, wait_for
from functools import wraps
from importlib import import_module
from importlib.util import resolve_name
from inspect import getmembers, isawaitable
from types import ModuleType
from typing import Any, Awaitable, Callable, Coroutine, Dict, List, Optional, Tuple, Union
from ..api import WebSocketClient as WSClient
from ..api.cache import Cache
from ..api.error import LibraryException
from ..api.http.client import HTTPClient
from ..api.models.channel import Channel
from ..api.models.flags import Intents, Permissions
from ..api.models.guild import Guild
from ..api.models.member import Member
from ..api.models.message import Message
from ..api.models.misc import Image, Snowflake
from ..api.models.presence import ClientPresence
from ..api.models.role import Role
from ..api.models.team import Application
from ..api.models.user import User
from ..base import get_logger
from ..utils.attrs_utils import convert_list
from ..utils.missing import MISSING
from .context import CommandContext, ComponentContext
from .decor import component as _component
from .enums import ApplicationCommandType, Locale, OptionType
from .models.command import ApplicationCommand, Choice, Command, Option
from .models.component import ActionRow, Button, Modal, SelectMenu
log: logging.Logger = get_logger("client")
__all__ = (
"Client",
"Extension",
"extension_listener",
"extension_command",
"extension_component",
"extension_modal",
"extension_autocomplete",
"extension_user_command",
"extension_message_command",
)
[docs]class Client:
"""
A class representing the client connection to Discord's gateway and API via. WebSocket and HTTP.
:param str token: The token of the application for authentication and connection.
:param Optional[Intents] intents: Allows specific control of permissions the application has when connected. In order to use multiple intents, the ``|`` operator is recommended. Defaults to :attr:`.Intents.DEFAULT`.
:param Optional[List[Tuple[int]]] shards: Dictates and controls the shards that the application connects under.
:param Optional[ClientPresence] presence: Sets an RPC-like presence on the application when connected to the Gateway.
:param Optional[Union[int, Guild, List[int], List[Guild]]] default_scope:
.. versionadded:: 4.3.0
Sets the default scope of all commands.
:param Optional[bool] disable_sync: Controls whether synchronization in the user-facing API should be automatic or not.
:param Optional[Union[bool, logging.DEBUG, logging.INFO, logging.NOTSET, logging.WARNING, logging.ERROR, logging.CRITICAL]] logging:
.. versionadded:: 4.3.2
Set to ``True`` to enable debug logging or set to a log level to use a specific level
:ivar Application me: The application representation of the client.
"""
def __init__(
self,
token: str,
cache_limits: Optional[Dict[type, int]] = None,
intents: Intents = Intents.DEFAULT,
shards: Optional[List[Tuple[int]]] = None,
default_scope: Optional[Union[int, Snowflake, List[Union[int, Snowflake]]]] = None,
presence: Optional[ClientPresence] = None,
_logging: Union[bool, int] = None,
disable_sync: bool = False,
**kwargs,
) -> None:
self._loop: AbstractEventLoop = get_event_loop()
self._http: Union[str, HTTPClient] = token
self._intents: Intents = intents
self._shards: List[Tuple[int]] = shards or []
self._commands: List[Command] = []
self._default_scope = default_scope
self._presence = presence
self._token = token
self._extensions = {}
self._scopes = set()
self.__command_coroutines = []
self.__global_commands = {}
self.__guild_commands = {}
self.me: Optional[Application] = None
self.__id_autocomplete = {}
if self._default_scope:
if not isinstance(self._default_scope, list):
self._default_scope = [self._default_scope]
if any(isinstance(scope, Guild) for scope in self._default_scope):
self._default_scope = [
(scope.id if isinstance(scope, Guild) else scope)
for scope in self._default_scope
]
self._default_scope = convert_list(int)(self._default_scope)
if cache_limits is None:
# Messages have the most explosive growth, but more limits can be added as needed
cache_limits = {
Message: 1000, # Most users won't need to cache many messages
}
self._cache: Cache = Cache(cache_limits)
self._websocket: WSClient = WSClient(
token=token,
cache=self._cache,
intents=self._intents,
shards=self._shards,
presence=self._presence,
)
if _logging := kwargs.get("logging", _logging):
# thx i0 for posting this on the retux Discord
if _logging is True:
_logging = logging.DEBUG
_format = (
"%(asctime)s [%(levelname)s] - .%(funcName)s(): %(message)s"
if _logging == logging.DEBUG
else "%(asctime)s [%(levelname)s] - %(message)s"
)
logging.basicConfig(format=_format, level=_logging)
if disable_sync:
self._automate_sync = False
log.warning(
"Automatic synchronization has been disabled. Interactions may need to be manually synchronized."
)
else:
self._automate_sync = True
[docs] async def modify_nick_in_guild(
self, guild_id: Union[int, str, Snowflake, Guild], new_nick: Optional[str] = MISSING
) -> Member:
"""
.. versionadded:: 4.4.0
Sets a new nick in the specified guild.
:param Union[int, str, Snowflake, Guild] guild_id: The ID of the guild to modify the nick in
:param Optional[str] new_nick: The new nick to assign
"""
if not self._http or isinstance(self._http, str):
raise LibraryException(
code=13, message="You cannot use this method until the bot has started!"
)
if new_nick is MISSING:
raise LibraryException(code=12, message="new nick name must either a string or `None`")
_id = int(guild_id.id) if isinstance(guild_id, Guild) else int(guild_id)
return Member(
**await self._http.modify_self_nick_in_guild(_id, new_nick), _client=self._http
)
@property
def guilds(self) -> List[Guild]:
"""
.. versionadded:: 4.2.0
Returns a list of guilds the bot is in.
"""
return list(self._http.cache[Guild].values.values())
@property
def latency(self) -> float:
"""
.. versionadded:: 4.2.0
Returns the connection latency in milliseconds.
"""
return self._websocket.latency * 1000
[docs] def start(self) -> None:
"""Starts the client session."""
try:
self._loop.run_until_complete(self._ready())
except (CancelledError, Exception) as e:
self._loop.run_until_complete(self._logout())
raise e from e
except KeyboardInterrupt:
log.error("KeyboardInterrupt detected, shutting down the bot.")
finally:
self._loop.run_until_complete(self._logout())
async def __register_id_autocomplete(self) -> None: # TODO: make this use ID and not name
for key in self.__id_autocomplete.keys():
if isinstance(key, str): # compatibility with the decorator from the Command obj
for _ in self.__id_autocomplete[key]:
# _ contains {"coro" : coro, "name": <name_as_string>}
self.event(
_["coro"],
name=f"autocomplete_{key}_{_['name']}",
)
else:
_command_obj = self._find_command(key)
_command: str = _command_obj.name
for _ in self.__id_autocomplete[key]:
# _ contains {"coro" : coro, "name": <name_as_string>}
self.event(
_["coro"],
name=f"autocomplete_{_command}_{_['name']}",
)
@staticmethod
async def __compare_sync(
data: dict, pool: List[dict]
) -> Tuple[bool, dict]: # sourcery no-metrics
"""
Compares an application command during the synchronization process.
:param dict data: The application command to compare.
:param List[dict] pool: The "pool" or list of commands to compare from.
:return: Whether the command has changed or not.
:rtype: bool
"""
# sourcery skip: none-compare
attrs: List[str] = [
name
for name in ApplicationCommand.__slots__
if not name.startswith("_")
and not name.endswith("id")
and name not in {"version", "default_permission"}
]
option_attrs: List[str] = [name for name in Option.__slots__ if not name.startswith("_")]
choice_attrs: List[str] = [name for name in Choice.__slots__ if not name.startswith("_")]
log.info(f"Current attributes to compare: {', '.join(attrs)}.")
clean: bool = True
_command: dict = {}
def __check_options(command, data):
# sourcery skip: low-code-quality, none-compare
# sourcery no-metrics
_command_option_names = [option["name"] for option in command.get("options")]
_data_option_names = [option["name"] for option in data.get("options")]
if any(option not in _command_option_names for option in _data_option_names) or len(
_data_option_names
) != len(_command_option_names):
return False
for option in command.get("options"):
for _option in data.get("options"):
if _option["name"] == option["name"]:
for option_attr in option_attrs:
if (
option.get(option_attr)
and not _option.get(option_attr)
or not option.get(option_attr)
and _option.get(option_attr)
):
return False
elif option_attr == "choices":
if not option.get("choices") or not _option.get("choices"):
continue
_option_choice_names = [
choice["name"] for choice in option.get("choices")
]
_data_choice_names = [
choice["name"] for choice in _option.get("choices")
]
if any(
_ not in _option_choice_names for _ in _data_choice_names
) or len(_data_choice_names) != len(_option_choice_names):
return False
for choice in option.get("choices"):
for _choice in _option.get("choices"):
if choice["name"] == _choice["name"]:
for choice_attr in choice_attrs:
if (
choice.get(choice_attr)
and not _choice.get(choice_attr)
or not choice.get(choice_attr)
and _choice.get(choice_attr)
):
return False
elif choice.get(choice_attr) != _choice.get(
choice_attr
):
return False
else:
continue
for i, __name in enumerate(_option_choice_names):
if _data_choice_names[i] != __name:
return False
elif option_attr == "required":
if (
option.get(option_attr) == None # noqa: E711
and _option.get(option_attr) == False # noqa: E712
):
# API not including if False
continue
elif option_attr == "options":
if not option.get(option_attr) and not _option.get("options"):
continue
_clean = __check_options(option, _option)
if not _clean:
return _clean
elif option.get(option_attr) != _option.get(option_attr):
return False
else:
continue
return next(
(
False
for i, __name in enumerate(_command_option_names)
if _data_option_names[i] != __name
),
True,
)
for command in pool:
if command["name"] == data["name"]:
_command = command
# in case it continues looping
if not command.get("options"):
command["options"] = []
# this will ensure that the option will be an emtpy list, since discord returns `None`
# when no options are present, but they're in the data as `[]`
if command.get("guild_id") and not isinstance(command.get("guild_id"), int):
if isinstance(command.get("guild_id"), list):
command["guild_id"] = [int(_) for _ in command["guild_id"]]
else:
command["guild_id"] = int(command["guild_id"])
# ensure that IDs are present as integers since discord returns strings.
for attr in attrs:
if attr == "options":
if (
not command.get("options")
and data.get("options")
or command.get("options")
and not data.get("options")
):
clean = False
return clean, _command
elif command.get("options") and data.get("options"):
clean = __check_options(command, data)
if not clean:
return clean, _command
else:
continue
elif attr.endswith("localizations"):
if command.get(attr, None) is None and data.get(attr) == {}:
# This is an API/Version difference.
continue
elif (
attr == "dm_permission"
and data.get(attr) == True # noqa: E712
and command.get(attr) == None # noqa: E711
):
# idk, it encountered me and synced unintentionally
continue
# elif data.get(attr, None) and command.get(attr) == data.get(attr):
elif command.get(attr, None) == data.get(attr, None):
# hasattr checks `dict.attr` not `dict[attr]`
continue
clean = False
break
return clean, _command
async def _ready(self) -> None:
"""
Prepares the client with an internal "ready" check to ensure
that all conditions have been met in a chronological order:
.. code-block::
CLIENT START
|___ GATEWAY
| |___ READY
| |___ DISPATCH
|___ SYNCHRONIZE
| |___ CACHE
|___ DETECT DECORATOR
| |___ BUILD MODEL
| |___ SYNCHRONIZE
| |___ CALLBACK
LOOP
"""
if isinstance(self._http, str):
self._http = HTTPClient(self._http, self._cache)
data = await self._http.get_current_bot_information()
self.me = Application(**data, _client=self._http)
ready: bool = False
try:
if self.me.flags is not None:
# This can be None.
if (
self._intents.GUILD_PRESENCES in self._intents
and self.me.flags.GATEWAY_PRESENCE not in self.me.flags
and self.me.flags.GATEWAY_PRESENCE_LIMITED not in self.me.flags
):
raise RuntimeError("Client not authorised for the GUILD_PRESENCES intent.")
if (
self._intents.GUILD_MEMBERS in self._intents
and self.me.flags.GATEWAY_GUILD_MEMBERS not in self.me.flags
and self.me.flags.GATEWAY_GUILD_MEMBERS_LIMITED not in self.me.flags
):
raise RuntimeError("Client not authorised for the GUILD_MEMBERS intent.")
if (
self._intents.GUILD_MESSAGES in self._intents
and self.me.flags.GATEWAY_MESSAGE_CONTENT not in self.me.flags
and self.me.flags.GATEWAY_MESSAGE_CONTENT_LIMITED not in self.me.flags
):
log.critical("Client not authorised for the MESSAGE_CONTENT intent.")
elif self._intents.value != Intents.DEFAULT.value:
raise RuntimeError("Client not authorised for any privileged intents.")
self.__resolve_commands()
if self._automate_sync:
await self.__sync()
else:
await self.__get_all_commands()
await self.__register_id_autocomplete()
ready = True
except Exception:
log.exception("Could not prepare the client:")
finally:
if ready:
log.debug("Client is now ready.")
await self._login()
async def _stop(self) -> None:
"""Stops the websocket connection gracefully."""
log.debug("Shutting down the client....")
self._websocket.ready.clear() # Clears ready state.
self._websocket._closing_lock.set() # Toggles the "ready-to-shutdown" state for the bot.
# And subsequently, the processes will close itself.
await self._http._req._session.close() # Closes the HTTP session associated with the client.
async def _login(self) -> None:
"""Makes a login with the Discord API."""
try:
await self._websocket.run()
except Exception:
log.exception("Websocket have raised an exception, closing.")
if self._websocket._closing_lock.is_set():
# signal for closing.
try:
if self._websocket._task is not None:
self._websocket.__heartbeat_event.set()
try:
# Wait for the keep-alive handler to finish so we can discard it gracefully
await self._websocket._task
finally:
self._websocket._task = None
finally: # then the overall WS client
if self._websocket._client is not None:
# This needs to be properly closed
try:
await self._websocket._client.close(code=1000)
finally:
self._websocket._client = None
[docs] async def wait_until_ready(self) -> None:
"""
.. versionadded:: 4.2.0
Helper method that waits until the websocket is ready.
"""
await self._websocket.wait_until_ready()
async def _get_all_guilds(self) -> List[dict]:
"""
Gets all guilds that the bot is present in.
:return: List of guilds
:rtype: List[dict]
"""
_after = None
_all: list = []
res = await self._http.get_self_guilds(limit=200)
while len(res) >= 200:
_all.extend(res)
_after = int(res[-1]["id"])
res = await self._http.get_self_guilds(
after=_after,
)
_all.extend(res)
return _all
async def __get_all_commands(self) -> None:
# this method is just copied from the sync method
# I expect this to be changed in the sync rework
# until then this will deliver a cache if sync is off to make autocomplete work bug-free
# but even with sync off, we should cache all commands here always
_guilds = await self._get_all_guilds()
_guild_ids = [int(_["id"]) for _ in _guilds]
self._scopes.update(_guild_ids)
_cmds = await self._http.get_application_commands(
application_id=self.me.id, with_localizations=True
)
for command in _cmds:
if command.get("code"):
# Error exists.
raise LibraryException(command["code"], message=f'{command["message"]} |')
self.__global_commands = {"commands": _cmds, "clean": True}
# TODO: add to cache (later)
# responsible for checking if a command is in the cache but not a coro -> allowing removal
for _id in _guild_ids:
try:
_cmds = await self._http.get_application_commands(
application_id=self.me.id, guild_id=_id, with_localizations=True
)
except LibraryException as e:
if int(e.code) != 50001:
raise LibraryException(code=e.code, message=e.message) from e
log.warning(
f"Your bot is missing access to guild with corresponding id {_id}! "
"Syncing commands will not be possible until it is invited with "
"`application.commands` scope!"
)
continue
for command in _cmds:
if command.get("code"):
# Error exists.
raise LibraryException(command["code"], message=f'{command["message"]} |')
self.__guild_commands[_id] = {"commands": _cmds, "clean": True}
def __resolve_commands(self) -> None: # sourcery skip: low-code-quality
"""
Resolves all commands to the command coroutines.
.. warning::
This is an internal method. Do not call it unless you know what you are doing!
"""
for cmd in self._commands:
if cmd.coro.__qualname__ in [_cmd.__qualname__ for _cmd in self.__command_coroutines]:
continue
cmd.listener = self._websocket._dispatch
if cmd.default_scope and self._default_scope:
cmd.scope = (
cmd.scope.extend(self._default_scope)
if isinstance(cmd.scope, list)
else self._default_scope
)
data: Union[dict, List[dict]] = cmd.full_data
coro = cmd.dispatcher
self.__check_command(
command=ApplicationCommand(**(data[0] if isinstance(data, list) else data)),
coro=coro,
)
if cmd.autocompletions:
self.__id_autocomplete.update(cmd.autocompletions)
coro = coro.__func__ if hasattr(coro, "__func__") else coro
coro._command_data = data
coro._name = cmd.name
coro._converters = cmd.converters
if (data["name"] if isinstance(data, dict) else data[0]["name"]) not in (
(
c._command_data["name"]
if isinstance(c._command_data, dict)
else c._command_data[0]["name"]
)
for c in self.__command_coroutines
):
self.__command_coroutines.append(coro)
if cmd.scope not in (MISSING, None):
if isinstance(cmd.scope, List):
[self._scopes.add(_ if isinstance(_, int) else _.id) for _ in cmd.scope]
else:
self._scopes.add(cmd.scope if isinstance(cmd.scope, int) else cmd.scope.id)
self.event(coro, name=f"command_{cmd.name}")
async def __sync(self) -> None: # sourcery no-metrics
"""
Synchronizes all commands to the API.
.. warning::
This is an internal method. Do not call it unless you know what you are doing!
"""
# sourcery skip: low-code-quality
log.debug("starting command sync")
_guilds = await self._get_all_guilds()
_guild_ids = [int(_["id"]) for _ in _guilds]
self._scopes.update(_guild_ids)
_cmds = await self._http.get_application_commands(
application_id=self.me.id, with_localizations=True
)
for command in _cmds:
if command.get("code"):
# Error exists.
raise LibraryException(command["code"], message=f'{command["message"]} |')
self.__global_commands = {"commands": _cmds, "clean": True}
# TODO: add to cache (later)
__check_global_commands: List[str] = [cmd["name"] for cmd in _cmds]
__check_guild_commands: Dict[int, List[str]] = {}
__blocked_guilds: set = set()
# responsible for checking if a command is in the cache but not a coro -> allowing removal
for _id in _guild_ids.copy():
try:
_cmds = await self._http.get_application_commands(
application_id=self.me.id, guild_id=_id, with_localizations=True
)
except LibraryException as e:
if int(e.code) != 50001:
raise LibraryException(code=e.code, message=e.message) from e
log.warning(
f"Your bot is missing access to guild with corresponding id {_id}! "
"Adding commands will not be possible until it is invited with "
"`application.commands` scope!"
)
__blocked_guilds.add(_id)
_guild_ids.remove(_id)
continue
self.__guild_commands[_id] = {"commands": _cmds, "clean": True}
__check_guild_commands[_id] = [cmd["name"] for cmd in _cmds] if _cmds else []
for coro in self.__command_coroutines:
if hasattr(coro, "_command_data"): # just so IDE knows it exists
if isinstance(coro._command_data, list):
_guild_command: dict
for _guild_command in coro._command_data:
_guild_id = int(_guild_command.get("guild_id"))
if _guild_id in __blocked_guilds:
log.fatal(f"Cannot sync commands on guild with id {_guild_id}!")
raise LibraryException(50001, message="Missing Access |")
if _guild_command["name"] not in __check_guild_commands[_guild_id]:
self.__guild_commands[_guild_id]["clean"] = False
self.__guild_commands[_guild_id]["commands"].append(_guild_command)
else:
clean, _command = await self.__compare_sync(
_guild_command, self.__guild_commands[_guild_id]["commands"]
)
if not clean:
self.__guild_commands[_guild_id]["clean"] = False
# _pos = self.__guild_commands[_guild_id]["commands"].index(_command)
# self.__guild_commands[_guild_id]["commands"][_pos] = _guild_command
for _pos, _dict in enumerate(
self.__guild_commands[_guild_id]["commands"]
):
if _dict["name"] == _command["name"]:
self.__guild_commands[_guild_id]["commands"][
_pos
] = _guild_command
break
if __check_guild_commands[_guild_id]:
del __check_guild_commands[_guild_id][
__check_guild_commands[_guild_id].index(_guild_command["name"])
]
elif coro._command_data["name"] in __check_global_commands: # noqa
clean, _command = await self.__compare_sync(
coro._command_data, self.__global_commands["commands"]
)
if not clean:
self.__global_commands["clean"] = False
# _pos = self.__global_commands["commands"].index(_command)
# self.__global_commands["commands"][_pos] = coro._command_data
for _pos, _dict in enumerate(self.__global_commands["commands"]):
if _dict["name"] == _command["name"]:
self.__global_commands["commands"][_pos] = coro._command_data
break
if __check_global_commands:
del __check_global_commands[
__check_global_commands.index(coro._command_data["name"]) # noqa
]
else:
self.__global_commands["clean"] = False
self.__global_commands["commands"].append(coro._command_data)
if not self.__command_coroutines:
if self.__global_commands["commands"]:
self.__global_commands["clean"] = False
self.__global_commands["commands"] = []
__check_global_commands = []
for _id in _guild_ids:
if self.__guild_commands[_id]["commands"]:
__check_guild_commands[_id] = []
self.__guild_commands[_id]["clean"] = False
self.__guild_commands[_id]["commands"] = []
if __check_global_commands:
# names are present but not found in registered global command coroutines. Deleting.
self.__global_commands["clean"] = False
for name in __check_global_commands:
_pos = self.__global_commands["commands"].index(
[_ for _ in self.__global_commands["commands"] if _["name"] == name][0]
)
del self.__global_commands["commands"][_pos]
for _id in _guild_ids:
if __check_guild_commands[_id]:
self.__guild_commands[_id]["clean"] = False
for name in __check_guild_commands[_id]:
_pos = self.__guild_commands[_id]["commands"].index(
[_ for _ in self.__guild_commands[_id]["commands"] if _["name"] == name][0]
)
del self.__guild_commands[_id]["commands"][_pos]
if not self.__global_commands["clean"] or any(
not self.__guild_commands[_id]["clean"] for _id in _guild_ids
):
if not self.__global_commands["clean"]:
res = await self._http.overwrite_application_command(
application_id=int(self.me.id), data=self.__global_commands["commands"]
)
self.__global_commands["clean"] = True
self.__global_commands["commands"] = res
for _id in _guild_ids:
if not self.__guild_commands[_id]["clean"]:
res = await self._http.overwrite_application_command(
application_id=int(self.me.id),
data=self.__guild_commands[_id]["commands"],
guild_id=_id,
)
self.__guild_commands[_id]["clean"] = True
self.__guild_commands[_id]["commands"] = res
[docs] def event(
self, coro: Optional[Callable[..., Coroutine]] = MISSING, *, name: Optional[str] = MISSING
) -> Callable[..., Any]:
"""
A decorator for listening to events dispatched from the
Gateway.
Documentation on how to listen to specific events can be found :ref:`here<events:Event Documentation>`.
:param Optional[Callable[..., Coroutine]] coro: The coroutine of the event.
:param Optional[str] name: The name of the event. If not given, this defaults to the coroutine's name.
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Optional[Callable[..., Coroutine]]):
self._websocket._dispatch.register(
coro, name=name if name is not MISSING else coro.__name__
)
return coro
if coro is not MISSING:
self._websocket._dispatch.register(
coro, name=name if name is not MISSING else coro.__name__
)
return coro
return decorator
[docs] async def change_presence(self, presence: ClientPresence) -> None:
"""
.. versionadded:: 4.2.0
A method that changes the current client's presence on runtime.
.. note::
There is a ratelimit to using this method (5 per minute).
As there's no gateway ratelimiter yet, breaking this ratelimit
will force your bot to disconnect.
:param ClientPresence presence: The presence to change the bot to on identify.
"""
await self._websocket._update_presence(presence)
def __check_command(
self,
command: ApplicationCommand,
coro: Callable[..., Coroutine],
regex: str = r"^[a-z0-9_-]{1,32}$",
) -> None: # sourcery no-metrics
"""
Checks if a command is valid.
"""
reg = re.compile(regex)
_options_names: List[str] = []
_sub_groups_present: bool = False
_sub_cmds_present: bool = False
def __check_sub_group(_sub_group: Option):
nonlocal _sub_groups_present
_sub_groups_present = True
if _sub_group.name is MISSING:
raise LibraryException(11, message="Sub command groups must have a name.")
__indent = 4
log.debug(
f"{' ' * __indent}checking sub command group '{_sub_group.name}' of command '{command.name}'"
)
if not re.fullmatch(reg, _sub_group.name):
raise LibraryException(
11,
message=f"The sub command group name does not match the regex for valid names ('{regex}')",
)
elif _sub_group.description is MISSING and not _sub_group.description:
raise LibraryException(11, message="A description is required.")
elif len(_sub_group.description) > 100:
raise LibraryException(11, message="Descriptions must be less than 100 characters.")
if not _sub_group.options:
raise LibraryException(11, message="sub command groups must have subcommands!")
if len(_sub_group.options) > 25:
raise LibraryException(
11, message="A sub command group cannot contain more than 25 sub commands!"
)
for _sub_command in _sub_group.options:
__check_sub_command(_sub_command, _sub_group)
def __check_sub_command(_sub_command: Option, _sub_group: Option = MISSING):
nonlocal _sub_cmds_present
_sub_cmds_present = True
if _sub_command.name is MISSING:
raise LibraryException(11, message="sub commands must have a name!")
if _sub_group is not MISSING:
__indent = 8
log.debug(
f"{' ' * __indent}checking sub command '{_sub_command.name}' of group '{_sub_group.name}'"
)
else:
__indent = 4
log.debug(
f"{' ' * __indent}checking sub command '{_sub_command.name}' of command '{command.name}'"
)
if not re.fullmatch(reg, _sub_command.name):
raise LibraryException(
11,
message=f"The sub command name does not match the regex for valid names ('{reg}')",
)
elif _sub_command.description is MISSING or not _sub_command.description:
raise LibraryException(11, message="A description is required.")
elif len(_sub_command.description) > 100:
raise LibraryException(11, message="Descriptions must be less than 100 characters.")
if _sub_command.options is not MISSING and _sub_command.options:
if len(_sub_command.options) > 25:
raise LibraryException(
11, message="Your sub command must have less than 25 options."
)
_sub_opt_names = []
for _opt in _sub_command.options:
__check_options(_opt, _sub_opt_names, _sub_command)
del _sub_opt_names
def __check_options(_option: Option, _names: list, _sub_command: Option = MISSING):
nonlocal _options_names
if getattr(_option, "autocomplete", False) and getattr(_option, "choices", False):
log.warning("Autocomplete may not be set to true if choices are present.")
if _option.name is MISSING:
raise LibraryException(11, message="Options must have a name.")
if _sub_command is not MISSING:
__indent = 12 if _sub_groups_present else 8
log.debug(
f"{' ' * __indent}checking option '{_option.name}' of sub command '{_sub_command.name}'"
)
else:
__indent = 4
log.debug(
f"{' ' * __indent}checking option '{_option.name}' of command '{command.name}'"
)
_options_names.append(_option.name)
if not re.fullmatch(reg, _option.name):
raise LibraryException(
11,
message=f"The option name ('{_option.name}') does not match the regex for valid names ('{regex}').",
)
if _option.description is MISSING or not _option.description:
raise LibraryException(
11,
message="A description is required.",
)
elif len(_option.description) > 100:
raise LibraryException(
11,
message="Descriptions must be less than 100 characters.",
)
if _option.name in _names:
raise LibraryException(
11, message="You must not have two options with the same name in a command!"
)
_names.append(_option.name)
def __check_coro():
__indent = 4
log.debug(f"{' ' * __indent}Checking coroutine: '{coro.__name__}'")
_ismethod = hasattr(coro, "__func__")
if not len(coro.__code__.co_varnames) ^ (
_ismethod and len(coro.__code__.co_varnames) == 1
):
raise LibraryException(
11, message="Your command needs at least one argument to return context."
)
elif "kwargs" in coro.__code__.co_varnames:
return
elif _sub_cmds_present and len(coro.__code__.co_varnames) < (3 if _ismethod else 2):
raise LibraryException(
11, message="Your command needs one argument for the sub_command."
)
elif _sub_groups_present and len(coro.__code__.co_varnames) < (4 if _ismethod else 3):
raise LibraryException(
11,
message="Your command needs one argument for the sub_command and one for the sub_command_group.",
)
add: int = (
1 + abs(_sub_cmds_present) + abs(_sub_groups_present) + 1 if _ismethod else +0
)
if len(coro.__code__.co_varnames) - add < len(set(_options_names)):
log.debug(
"Coroutine is missing arguments for options:"
f" {[_arg for _arg in _options_names if _arg not in coro.__code__.co_varnames]}"
)
raise LibraryException(
11, message="You need one argument for every option name in your command!"
)
if command.name is MISSING:
raise LibraryException(11, message="Your command must have a name.")
else:
log.debug(f"checking command '{command.name}':")
if (
not re.fullmatch(reg, command.name)
and command.type == ApplicationCommandType.CHAT_INPUT
):
raise LibraryException(
11,
message=f"Your command name ('{command.name}') does not match the regex for valid names ('{regex}').",
)
elif command.type == ApplicationCommandType.CHAT_INPUT and (
command.description is MISSING or not command.description
):
raise LibraryException(11, message="A description is required.")
elif command.type != ApplicationCommandType.CHAT_INPUT and (
command.description is not MISSING and command.description
):
raise LibraryException(11, message="Only chat-input commands can have a description.")
elif command.description is not MISSING and len(command.description) > 100:
raise LibraryException(11, message="Descriptions must be less than 100 characters.")
if command.options and command.options is not MISSING:
if len(command.options) > 25:
raise LibraryException(11, message="Your command must have less than 25 options.")
if command.type != ApplicationCommandType.CHAT_INPUT:
raise LibraryException(
11, message="Only CHAT_INPUT commands can have options/sub-commands!"
)
_opt_names = []
for _option in command.options:
if _option.type == OptionType.SUB_COMMAND_GROUP:
__check_sub_group(_option)
elif _option.type == OptionType.SUB_COMMAND:
__check_sub_command(_option)
else:
__check_options(_option, _opt_names)
del _opt_names
__check_coro()
[docs] def command(
self,
*,
type: Optional[Union[int, ApplicationCommandType]] = ApplicationCommandType.CHAT_INPUT,
name: Optional[str] = MISSING,
description: Optional[str] = MISSING,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
options: Optional[
Union[Dict[str, Any], List[Dict[str, Any]], Option, List[Option]]
] = MISSING,
name_localizations: Optional[Dict[Union[str, Locale], str]] = MISSING,
description_localizations: Optional[Dict[Union[str, Locale], str]] = MISSING,
default_member_permissions: Optional[Union[int, Permissions]] = MISSING,
dm_permission: Optional[bool] = MISSING,
default_scope: bool = True,
) -> Callable[[Callable[..., Coroutine]], Command]:
"""
A decorator for registering an application command to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a chat-input command:
.. code-block:: python
@bot.command(name="command-name", description="this is a command.")
async def command_name(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
To properly utilise the ``default_member_permissions`` kwarg, it requires OR'ing the permission values, similar to instantiating the client with Intents.
For example:
.. code-block:: python
@bot.command(name="kick", description="Kick a user.", default_member_permissions=interactions.Permissions.BAN_MEMBERS | interactions.Permissions.KICK_MEMBERS)
async def kick(ctx, user: interactions.Member):
...
Another example below for instance is an admin-only command:
.. code-block:: python
@bot.command(name="sudo", description="this is an admin-only command.", default_member_permissions=interactions.Permissions.ADMINISTRATOR)
async def sudo(ctx):
...
.. note::
If ``default_member_permissions`` is not given, this will default to anyone that is able to use the command.
:param Optional[Union[str, int, ApplicationCommandType]] type: The type of application command. Defaults to :attr:`.ApplicationCommandType.CHAT_INPUT`.
:param Optional[str] name: The name of the application command. This *is* required but kept optional to follow kwarg rules.
:param Optional[str] description: The description of the application command. This should be left blank if you are not using ``CHAT_INPUT``.
:param Optional[Union[int, Guild, List[int], List[Guild]]] scope: The "scope"/applicable guilds the application command applies to.
:param Optional[Union[Dict[str, Any], List[Dict[str, Any]], Option, List[Option]]] options: The "arguments"/options of an application command. This should be left blank if you are not using ``CHAT_INPUT``.
:param Optional[Dict[Union[str, Locale], str]] name_localizations:
.. versionadded:: 4.2.0
The dictionary of localization for the ``name`` field. This enforces the same restrictions as the ``name`` field.
:param Optional[Dict[Union[str, Locale], str]] description_localizations:
.. versionadded:: 4.2.0
The dictionary of localization for the ``description`` field. This enforces the same restrictions as the ``description`` field.
:param Optional[Union[int, Permissions]] default_member_permissions: The permissions bit value of :class:`.Permissions`. If not given, defaults to :attr:`.Permissions.USE_APPLICATION_COMMANDS`
:param Optional[bool] dm_permission: The application permissions if executed in a Direct Message. Defaults to ``True``.
:param Optional[bool] default_scope:
.. versionadded:: 4.3.0
Whether the scope of the command is the default scope set in the client. Defaults to ``True``.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Command]
"""
def decorator(coro: Callable[..., Coroutine]) -> Command:
cmd = Command(
coro=coro,
type=type,
name=name,
description=description,
options=options,
scope=scope,
default_member_permissions=default_member_permissions,
dm_permission=dm_permission,
name_localizations=name_localizations,
description_localizations=description_localizations,
default_scope=default_scope,
)
cmd.client = self
self._commands.append(cmd)
return cmd
return decorator
[docs] def message_command(
self,
*,
name: Optional[str] = MISSING,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
name_localizations: Optional[Dict[Union[str, Locale], Any]] = MISSING,
default_member_permissions: Optional[Union[int, Permissions]] = MISSING,
dm_permission: Optional[bool] = MISSING,
default_scope: bool = True,
) -> Callable[[Callable[..., Coroutine]], Command]:
"""
A decorator for registering a message context menu to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a message context menu:
.. code-block:: python
@bot.message_command(name="Context menu name")
async def context_menu_name(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
:param Optional[str] name: The name of the application command.
:param Optional[Union[int, Guild, List[int], List[Guild]]] scope: The "scope"/applicable guilds the application command applies to. Defaults to ``None``.
:param Optional[Dict[Union[str, Locale], str]] name_localizations:
.. versionadded:: 4.2.0
The dictionary of localization for the ``name`` field. This enforces the same restrictions as the ``name`` field.
:param Optional[Union[int, Permissions]] default_member_permissions: The permissions bit value of :class:`.Permissions`. If not given, defaults to :attr:`.Permissions.USE_APPLICATION_COMMANDS`
:param Optional[bool] dm_permission: The application permissions if executed in a Direct Message. Defaults to ``True``.
:param Optional[bool] default_scope:
.. versionadded:: 4.3.0
Whether the scope of the command is the default scope set in the client. Defaults to ``True``.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Command]
"""
def decorator(coro: Callable[..., Coroutine]) -> Command:
return self.command(
type=ApplicationCommandType.MESSAGE,
name=name,
scope=scope,
default_member_permissions=default_member_permissions,
dm_permission=dm_permission,
name_localizations=name_localizations,
default_scope=default_scope,
)(coro)
return decorator
[docs] def user_command(
self,
*,
name: Optional[str] = MISSING,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
name_localizations: Optional[Dict[Union[str, Locale], Any]] = MISSING,
default_member_permissions: Optional[Union[int, Permissions]] = MISSING,
dm_permission: Optional[bool] = MISSING,
default_scope: bool = True,
) -> Callable[[Callable[..., Coroutine]], Command]:
"""
A decorator for registering a user context menu to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a user context menu:
.. code-block:: python
@bot.user_command(name="Context menu name")
async def context_menu_name(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
:param Optional[str] name: The name of the application command.
:param Optional[Union[int, Guild, List[int], List[Guild]]] scope: The "scope"/applicable guilds the application command applies to. Defaults to ``None``.
:param Optional[Dict[Union[str, Locale], str]] name_localizations:
.. versionadded:: 4.2.0
The dictionary of localization for the ``name`` field. This enforces the same restrictions as the ``name`` field.
:param Optional[Union[int, Permissions]] default_member_permissions:
The permissions bit value of :class:`.Permissions`. If not given, defaults to :attr:`.Permissions.USE_APPLICATION_COMMANDS`
:param Optional[bool] dm_permission: The application permissions if executed in a Direct Message. Defaults to ``True``.
:param Optional[bool] default_scope:
.. versionadded:: 4.3.0
Whether the scope of the command is the default scope set in the client. Defaults to ``True``.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Command]
"""
def decorator(coro: Callable[..., Coroutine]) -> Command:
return self.command(
type=ApplicationCommandType.USER,
name=name,
scope=scope,
default_member_permissions=default_member_permissions,
dm_permission=dm_permission,
name_localizations=name_localizations,
default_scope=default_scope,
)(coro)
return decorator
[docs] def component(
self, component: Union[str, Button, SelectMenu]
) -> Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]:
"""
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving components.
The structure for a component callback:
.. code-block:: python
# Method 1
@bot.component(interactions.Button(
style=interactions.ButtonStyle.PRIMARY,
label="click me!",
custom_id="click_me_button",
))
async def button_response(ctx):
...
# Method 2
@bot.component("custom_id")
async def button_response(ctx):
...
The context of the component callback decorator inherits the same
as of the command decorator.
:param Union[str, Button, SelectMenu] component: The component you wish to callback for.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]
"""
def decorator(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
payload: str = (
_component(component).custom_id
if isinstance(component, (Button, SelectMenu))
else component
)
return self.event(coro, name=f"component_{payload}")
return decorator
def _find_command(self, command: Union[str, int]) -> ApplicationCommand:
"""
Iterates over `commands` and returns an :class:`ApplicationCommand` if it matches the name from `command`
:param Union[str, int] command: The name or ID of the command to match
:return: An ApplicationCommand model
:rtype: ApplicationCommand
"""
key = "name" if isinstance(command, str) else "id"
_command_obj = next(
(
ApplicationCommand(**_command)
for _command in self.__global_commands["commands"]
if str(_command[key]) == str(command)
),
None,
)
if not _command_obj:
for scope in self._scopes:
_command_obj = next(
(
ApplicationCommand(**_command)
for _command in self.__guild_commands[scope]["commands"]
if str(_command[key]) == str(command)
),
None,
)
if _command_obj:
break
if not _command_obj or (hasattr(_command_obj, "id") and not _command_obj.id):
raise LibraryException(
6,
message="The command does not exist. Make sure to define"
+ " your autocomplete callback after your commands",
)
else:
return _command_obj
[docs] def autocomplete(
self, command: Union[ApplicationCommand, int, str, Snowflake], name: str
) -> Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]:
"""
.. versionadded:: 4.0.2
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving autocompletion fields.
The structure for an autocomplete callback:
.. code-block:: python
@bot.autocomplete(command="command_name", name="option_name")
async def autocomplete_choice_list(ctx, user_input: str = ""):
await ctx.populate([
interactions.Choice(...),
interactions.Choice(...),
...
])
:param Union[ApplicationCommand, int, str, Snowflake] command: The command, command ID, or command name with the option.
:param str name: The name of the option to autocomplete.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]
"""
if isinstance(command, ApplicationCommand):
_command: str = command.name
elif isinstance(command, str):
_command: str = command
elif isinstance(command, (int, Snowflake)):
_command: Union[Snowflake, int] = int(command)
else:
raise LibraryException(
message="You can only insert strings, integers and ApplicationCommands here!",
code=12,
)
def decorator(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
if isinstance(_command, (int, Snowflake)):
curr_autocomplete = self.__id_autocomplete.get(_command, [])
curr_autocomplete.append({"coro": coro, "name": name})
self.__id_autocomplete[_command] = curr_autocomplete
return coro
return self.event(coro, name=f"autocomplete_{_command}_{name}")
return decorator
[docs] def modal(
self, modal: Union[Modal, str]
) -> Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]:
"""
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving modals.
The structure for a modal callback:
.. code-block:: python
@bot.modal(interactions.Modal(
interactions.TextInput(
style=interactions.TextStyleType.PARAGRAPH,
custom_id="how_was_your_day_field",
label="How has your day been?",
placeholder="Well, so far...",
),
))
async def modal_response(ctx, how_was_your_day_field: str):
...
The context of the modal callback decorator inherits the same
as of the component decorator.
:param Union[Modal, str] modal: The modal or custom_id of modal you wish to callback for.
:return: A callable response.
:rtype: Callable[[Callable[..., Coroutine]], Callable[..., Coroutine]]
"""
def decorator(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
payload: str = modal.custom_id if isinstance(modal, Modal) else modal
return self.event(coro, name=f"modal_{payload}")
return decorator
[docs] def load(
self, name: str, package: Optional[str] = None, *args, **kwargs
) -> Optional["Extension"]:
r"""
.. versionadded:: 4.1.0
"Loads" an extension off of the current client by adding a new class
which is imported from the library.
:param str name: The name of the extension.
:param Optional[str] package: The package of the extension.
:param tuple \*args: Optional arguments to pass to the extension
:param dict \**kwargs: Optional keyword-only arguments to pass to the extension.
:return: The loaded extension.
:rtype: Optional[Extension]
"""
_name: str = resolve_name(name, package)
if _name in self._extensions:
log.error(f"Extension {name} has already been loaded. Skipping.")
return
module = import_module(
name, package
) # should be a module, because Extensions just need to be __init__-ed
try:
setup = getattr(module, "setup")
extension = setup(self, *args, **kwargs)
except Exception as error:
del sys.modules[name]
log.error(f"Could not load {name}: {error}. Skipping.")
raise error from error
else:
log.debug(f"Loaded extension {name}.")
self._extensions[_name] = module
del sys.modules[name]
return extension
[docs] def remove(
self, name: str, remove_commands: bool = True, package: Optional[str] = None
) -> None:
"""
.. versionadded:: 4.1.0
Removes an extension out of the current client from an import resolve.
:param str name: The name of the extension.
:param Optional[bool] remove_commands: Whether to remove commands before reloading. Defaults to ``True``.
:param Optional[str] package: The package of the extension.
"""
try:
_name: str = resolve_name(name, package)
except AttributeError:
_name = name
extension = self._extensions.get(_name)
if _name not in self._extensions:
log.error(f"Extension {name} has not been loaded before. Skipping.")
return
if isinstance(extension, ModuleType): # loaded as a module
for ext_name, ext in getmembers(
extension, lambda x: isinstance(x, type) and issubclass(x, Extension)
):
if ext_name != "Extension":
_extension = self._extensions.get(ext_name)
with contextlib.suppress(AttributeError):
self._loop.create_task(
_extension.teardown(remove_commands=remove_commands)
) # made for Extension, usable by others
else:
with contextlib.suppress(AttributeError):
self._loop.create_task(
extension.teardown(remove_commands=remove_commands)
) # made for Extension, usable by others
del self._extensions[_name]
log.debug(f"Removed extension {name}.")
[docs] def reload(
self,
name: str,
package: Optional[str] = None,
remove_commands: bool = True,
*args,
**kwargs,
) -> Optional["Extension"]:
r"""
.. versionadded:: 4.1.0
"Reloads" an extension off of current client from an import resolve.
.. warning::
This will remove and re-add application commands, counting towards your daily application
command creation limit, as long as you have the ``remove_commands`` argument set to ``True``, which it is by
default.
:param str name: The name of the extension
:param Optional[str] package: The package of the extension
:param Optional[bool] remove_commands: Whether to remove commands before reloading. Defaults to True
:param tuple \*args: Optional arguments to pass to the extension
:param dict \**kwargs: Optional keyword-only arguments to pass to the extension.
:return: The reloaded extension.
:rtype: Optional[Extension]
"""
_name: str = resolve_name(name, package)
extension = self._extensions.get(_name)
if extension is None:
log.warning(f"Extension {name} could not be reloaded because it was never loaded.")
return self.load(name, package)
self.remove(name, package=package, remove_commands=remove_commands)
return self.load(name, package, *args, **kwargs)
[docs] def get_extension(self, name: str) -> Optional[Union[ModuleType, "Extension"]]:
"""
.. versionadded:: 4.2.0
Get an extension based on its name.
:param str name: Name of the extension.
:return: The found extension.
:rtype: Optional[Union[ModuleType, Extension]]
"""
return self._extensions.get(name)
[docs] async def modify(
self,
username: Optional[str] = MISSING,
avatar: Optional[Image] = MISSING,
) -> User:
"""
.. versionadded:: 4.2.0
Modify the bot user account settings.
:param Optional[str] username: The new username of the bot
:param Optional[Image] avatar: The new avatar of the bot
:return: The modified User object
:rtype: User
"""
if not self._http or isinstance(self._http, str):
raise LibraryException(
code=13, message="You cannot use this method until the bot has started!"
)
payload: dict = {}
if avatar is not MISSING:
payload["avatar"] = avatar.data
if username is not MISSING:
payload["username"] = username
data: dict = await self._http.modify_self(payload=payload)
return User(**data)
[docs] async def request_guild_members(
self,
guild_id: Union[Guild, Snowflake, int, str],
limit: Optional[int] = MISSING,
query: Optional[str] = MISSING,
presences: Optional[bool] = MISSING,
user_ids: Optional[Union[Snowflake, List[Snowflake]]] = MISSING,
nonce: Optional[str] = MISSING,
) -> None:
"""
.. versionadded:: 4.3.2
Requests guild members via websocket.
:param Union[Guild, Snowflake, int, str] guild_id: ID of the guild to get members for.
:param Optional[int] limit: Maximum number of members to send matching the 'query' parameter. Required when specifying 'query'.
:param Optional[str] query: String that username starts with.
:param Optional[bool] presences: Used to specify if we want the presences of the matched members.
:param Optional[Union[Snowflake, List[Snowflake]]] user_ids: Used to specify which users you wish to fetch.
:param Optional[str] nonce: Nonce to identify the Guild Members Chunk response.
"""
await self._websocket.request_guild_members(
guild_id=int(guild_id.id) if isinstance(guild_id, Guild) else int(guild_id),
limit=limit if limit is not MISSING else 0,
query=query if query is not MISSING else None,
presences=presences if presences is not MISSING else None,
user_ids=user_ids if user_ids is not MISSING else None,
nonce=nonce if nonce is not MISSING else None,
)
async def _logout(self) -> None:
await self._websocket.close()
await self._http._req.close()
[docs] async def wait_for(
self,
name: str,
check: Optional[Callable[..., Union[bool, Awaitable[bool]]]] = None,
timeout: Optional[float] = None,
) -> Any:
"""
.. versionadded:: 4.4.0
Waits for an event once, and returns the result.
Unlike event decorators, this is not persistent, and can be used to only proceed in a command once an event happens.
:param str name: The event to wait for
:param Optional[Callable[..., Union[bool, Awaitable[bool]]]] check: A function or coroutine to call, which should return a truthy value if the data should be returned
:param float timeout: How long to wait for the event before raising an error
:return: The value of the dispatched event
:rtype: Any
"""
while True:
fut = self._websocket._dispatch.add(name=name)
try:
# asyncio's wait_for
res: tuple = await wait_for(fut, timeout=timeout)
except TimeoutError:
with contextlib.suppress(ValueError):
self._websocket._dispatch.extra_events[name].remove(fut)
raise
if not check:
break
checked = check(*res)
if isawaitable(checked):
checked = await checked
if checked:
break
else:
# The check failed, so try again next time
log.info(f"A check failed waiting for the {name} event")
if res:
return res[0] if len(res) == 1 else res
[docs] async def wait_for_component(
self,
components: Union[
Union[str, Button, SelectMenu],
List[Union[str, Button, SelectMenu]],
] = None,
messages: Union[Message, int, List[Union[Message, int]]] = None,
check: Optional[Callable[[ComponentContext], Union[bool, Awaitable[bool]]]] = None,
timeout: Optional[float] = None,
) -> ComponentContext:
"""
.. versionadded:: 4.4.0
Waits for a component to be interacted with, and returns the resulting context.
.. note::
If you are waiting for a select menu, you can find the selected values in ``ctx.data.values``.
Another possibility is using the :meth:`.Client.wait_for_select` method.
:param Union[str, Button, SelectMenu, List[Union[str, Button, SelectMenu]]] components: The component(s) to wait for
:param Union[Message, int, List[Union[Message, int]]] messages: The message(s) to check for
:param Optional[Callable[[ComponentContext], Union[bool, Awaitable[bool]]]] check: A function or coroutine to call, which should return a truthy value if the data should be returned
:param float timeout: How long to wait for the event before raising an error
:return: The ComponentContext of the dispatched event
:rtype: ComponentContext
"""
custom_ids: List[str] = []
messages_ids: List[int] = []
if components:
if isinstance(components, list):
for component in components:
if isinstance(component, (Button, SelectMenu)):
custom_ids.append(component.custom_id)
elif isinstance(component, ActionRow):
custom_ids.extend([c.custom_id for c in component.components])
elif isinstance(component, list):
for c in component:
if isinstance(c, (Button, SelectMenu)):
custom_ids.append(c.custom_id)
elif isinstance(c, ActionRow):
custom_ids.extend([b.custom_id for b in c.components])
elif isinstance(c, str):
custom_ids.append(c)
elif isinstance(component, str):
custom_ids.append(component)
elif isinstance(components, (Button, SelectMenu)):
custom_ids.append(components.custom_id)
elif isinstance(components, ActionRow):
custom_ids.extend([c.custom_id for c in components.components]) # noqa
elif isinstance(components, str):
custom_ids.append(components)
if messages:
if isinstance(messages, Message):
messages_ids.append(int(messages.id))
elif isinstance(messages, list):
for message in messages:
if isinstance(message, Message):
messages_ids.append(int(message.id))
else:
messages_ids.append(int(message))
else: # account for plain ints, string, or Snowflakes
messages_ids.append(int(messages))
def _check(ctx: ComponentContext) -> bool:
if custom_ids and ctx.data.custom_id not in custom_ids:
return False
if messages_ids and int(ctx.message.id) not in messages_ids:
return False
return check(ctx) if check else True
return await self.wait_for("on_component", check=_check, timeout=timeout)
[docs] async def wait_for_select(
self,
components: Union[
Union[str, SelectMenu],
List[Union[str, SelectMenu]],
] = None,
messages: Union[Message, int, List[Union[Message, int]]] = None,
check: Optional[Callable[[ComponentContext], Union[bool, Awaitable[bool]]]] = None,
timeout: Optional[float] = None,
) -> Tuple[ComponentContext, List[Union[str, Member, User, Role, Channel]]]:
"""
.. versionadded:: 4.4.0
Waits for a select menu to be interacted with, and returns the resulting context and a list of the selected values.
The method can be used like this:
.. code-block:: python
ctx, values = await bot.wait_for_select(custom_id)
In this case ``ctx`` will be your normal context and ``values`` will be a list of :class:`str`, :class:`.Member`, :class:`.User`, :class:`.Channel` or :class:`.Role` objects,
depending on which select type you received.
:param Union[str, SelectMenu, List[Union[str, SelectMenu]]] components: The component(s) to wait for
:param Union[Message, int, List[Union[Message, int]]] messages: The message(s) to check for
:param Optional[Callable[[ComponentContext], Union[bool, Awaitable[bool]]]] check: A function or coroutine to call, which should return a truthy value if the data should be returned
:param float timeout: How long to wait for the event before raising an error
:return: The ComponentContext and list of selections of the dispatched event
:rtype: Tuple[ComponentContext, Union[List[str], List[Member], List[User], List[Channel], List[Role]]]
"""
def _check(_ctx: ComponentContext) -> bool:
if _ctx.data.component_type.value not in {4, 5, 6, 7, 8}:
return False
return check(_ctx) if check else True
ctx: ComponentContext = await self.wait_for_component(
components, messages, check=_check, timeout=timeout
)
if ctx.data.component_type == 4:
return ctx, ctx.data.values
_list = [] # temp storage for items
_data = self._websocket._WebSocketClient__select_option_type_context(
ctx, ctx.data.component_type.value
) # resolved.
for value in ctx.data.values:
_list.append(_data[value])
return ctx, _list
[docs] async def wait_for_modal(
self,
modals: Union[Modal, str, List[Union[Modal, str]]],
check: Optional[Callable[[CommandContext], Union[bool, Awaitable[bool]]]] = None,
timeout: Optional[float] = None,
) -> Tuple[CommandContext, List[str]]:
"""
.. versionadded:: 4.4.0
Waits for a modal to be interacted with, and returns the resulting context and submitted data.
.. note::
This function returns both the context of the modal and the data the user input.
The recommended way to use it is to do:
``modal_ctx, fields = await bot.wait_for_modal(...)``
Alternatively, to get the fields immediately, you can do:
``modal_ctx, (field1, field2, ...) = await bot.wait_for_modal(...)``
:param Union[Modal, str, List[Modal, str]] modals: The modal(s) to wait for
:param Optional[Callable[[CommandContext], Union[bool, Awaitable[bool]]]] check: A function or coroutine to call, which should return a truthy value if the data should be returned
:param Optional[float] timeout: How long to wait for the event before raising an error
:return: The context of the modal, followed by the data the user inputted
:rtype: tuple[CommandContext, list[str]]
"""
ids: List[str] = []
if isinstance(modals, Modal):
ids = [str(modals.custom_id)]
elif isinstance(modals, str):
ids = [modals]
elif isinstance(modals, list):
for modal in modals:
if isinstance(modal, Modal):
ids.append(str(modal.custom_id))
elif isinstance(modal, str):
modals.append(modal)
if not all(isinstance(id, str) for id in ids):
raise TypeError("No modals were passed!")
def _check(ctx: CommandContext):
if ids and ctx.data.custom_id not in ids:
return False
return check(ctx) if check else True
ctx: CommandContext = await self.wait_for("on_modal", check=_check, timeout=timeout)
# Ed requested that it returns a result similar to the decorator
fields: List[str] = []
for actionrow in ctx.data.components: # discord is weird with this
if actionrow.components:
data = actionrow.components[0].value
fields.append(data)
return ctx, fields
[docs] async def get_self_user(self) -> User:
"""
.. versionadded:: 4.4.0
Gets the bot's user information.
"""
return User(**await self._http.get_self(), _client=self._http)
[docs]class Extension:
"""
.. versionadded:: 4.1.0
A class that allows you to represent "extensions" of your code, or
essentially cogs that can be ran independent of the root file in
an object-oriented structure.
The structure of an extension:
.. code-block:: python
class CoolCode(interactions.Extension):
def __init__(self, client):
self.client = client
@extension_user_command(
name="User command in cog",
)
async def cog_user_cmd(self, ctx):
...
def setup(client):
CoolCode(client)
"""
client: Client
def __new__(cls, client: Client, *args, **kwargs) -> "Extension":
# sourcery skip: low-code-quality
self = super().__new__(cls)
self.client = client
self._commands = {}
self._listeners = {}
# This gets every coroutine in a way that we can easily change them
# cls
for name, func in getmembers(self, predicate=iscoroutinefunction):
# TODO we can make these all share the same list, might make it easier to load/unload
if hasattr(func, "__listener_name__"): # set by extension_listener
all_listener_names: List[str] = func.__listener_name__
for listener_name in all_listener_names:
func = client.event(
func, name=listener_name
) # capture the return value for friendlier ext-ing
listeners = self._listeners.get(listener_name, [])
listeners.append(func)
self._listeners[listener_name] = listeners
if hasattr(func, "__component_data__"):
all_component_data: List[Tuple[tuple, dict]] = func.__component_data__
for args, kwargs in all_component_data:
func = client.component(*args, **kwargs)(func)
component = kwargs.get("component") or args[0]
comp_name = (
_component(component).custom_id
if isinstance(component, (Button, SelectMenu))
else component
)
comp_name = f"component_{comp_name}"
listeners = self._listeners.get(comp_name, [])
listeners.append(func)
self._listeners[comp_name] = listeners
if hasattr(func, "__autocomplete_data__"):
all_args_kwargs = func.__autocomplete_data__
for args, kwargs in all_args_kwargs:
func = client.autocomplete(*args, **kwargs)(func)
_command = kwargs.get("command") or args[0]
name = kwargs.get("name") or args[1]
_command: Union[Snowflake, int] = (
_command.id if isinstance(_command, ApplicationCommand) else _command
)
auto_name = f"autocomplete_{_command}_{name}"
listeners = self._listeners.get(auto_name, [])
listeners.append(func)
self._listeners[auto_name] = listeners
if hasattr(func, "__modal_data__"):
all_modal_data: List[Tuple[tuple, dict]] = func.__modal_data__
for args, kwargs in all_modal_data:
func = client.modal(*args, **kwargs)(func)
modal = kwargs.get("modal") or args[0]
_modal_id: str = modal.custom_id if isinstance(modal, Modal) else modal
modal_name = f"modal_{_modal_id}"
listeners = self._listeners.get(modal_name, [])
listeners.append(func)
self._listeners[modal_name] = listeners
for _, cmd in getmembers(self, predicate=lambda command: isinstance(command, Command)):
cmd: Command
if cmd.name in {_cmd.name for _cmd in self.client._commands}:
continue
cmd.extension = self
cmd.client = self.client
self.client._commands.append(cmd)
commands = self._commands.get(cmd.name, [])
coro = cmd.dispatcher
coro = coro.__func__ if hasattr(coro, "__func__") else coro
commands.append(coro)
self._commands[f"command_{cmd.name}"] = commands
client._extensions[cls.__name__] = self
self.client._Client__resolve_commands() # noqa
if client._websocket.ready.is_set() and client._automate_sync:
client._loop.create_task(client._Client__sync()) # noqa
return self
async def teardown(self, remove_commands: bool = True):
for event, funcs in self._listeners.items():
for func in funcs:
self.client._websocket._dispatch.events[event].remove(func)
for cmd, funcs in self._commands.items():
_cmd: str = cmd.split("_", 1)[1]
for _coro in self.client._Client__command_coroutines:
if _coro._name == _cmd:
self.client._Client__command_coroutines.remove(_coro) # noqa
break
for _command in self.client._commands:
if _command.name == _cmd:
self.client._commands.remove(_command)
break
for i in range(len(funcs)):
self.client._websocket._dispatch.events[cmd].pop(i) # noqa
if self.client._automate_sync and remove_commands:
await self.client._Client__sync() # noqa
[docs]@wraps(Client.command)
def extension_command(**kwargs) -> Callable[[Callable[..., Coroutine]], Command]:
def decorator(coro) -> Command:
cmd = Command(coro=coro, **kwargs)
coro.__command_data__ = cmd
return cmd
return decorator
[docs]@wraps(Client.event)
def extension_listener(func: Optional[Coroutine] = None, name: Optional[str] = None):
def decorator(func: Coroutine):
if not hasattr(func, "__listener_name__"):
func.__listener_name__ = []
func.__listener_name__.append(name or func.__name__)
return func
if func:
# allows omitting `()` on `@listener`
return decorator(func)
return decorator
[docs]@wraps(Client.component)
def extension_component(*args, **kwargs):
def decorator(func):
if not hasattr(func, "__component_data__"):
func.__component_data__ = []
func.__component_data__.append((args, kwargs))
return func
return decorator
[docs]@wraps(Client.autocomplete)
def extension_autocomplete(*args, **kwargs):
def decorator(func):
if not hasattr(func, "__autocomplete_data__"):
func.__autocomplete_data__ = []
func.__autocomplete_data__.append((args, kwargs))
return func
return decorator
[docs]@wraps(Client.modal)
def extension_modal(*args, **kwargs):
def decorator(func):
if not hasattr(func, "__modal_data__"):
func.__modal_data__ = []
func.__modal_data__.append((args, kwargs))
return func
return decorator
[docs]@wraps(Client.message_command)
def extension_message_command(**kwargs) -> Callable[[Callable[..., Coroutine]], Command]:
def decorator(func) -> Command:
kwargs["type"] = ApplicationCommandType.MESSAGE
return extension_command(**kwargs)(func)
return decorator
[docs]@wraps(Client.user_command)
def extension_user_command(**kwargs) -> Callable[[Callable[..., Coroutine]], Command]:
def decorator(func) -> Command:
kwargs["type"] = ApplicationCommandType.USER
return extension_command(**kwargs)(func)
return decorator