import asyncio
from datetime import datetime
from logging import Logger
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from ..api.error import LibraryException
from ..api.models.channel import Channel, Thread
from ..api.models.flags import MessageFlags, Permissions
from ..api.models.guild import Guild
from ..api.models.member import Member
from ..api.models.message import Attachment, Embed, File, Message, MessageReference
from ..api.models.misc import AllowedMentions, Snowflake
from ..api.models.user import User
from ..base import get_logger
from ..utils.attrs_utils import ClientSerializerMixin, convert_int, define, field
from ..utils.missing import MISSING
from .enums import ComponentType, InteractionCallbackType, InteractionType, Locale
from .models.command import Choice
from .models.component import ActionRow, Button, Modal, SelectMenu, _build_components
from .models.misc import InteractionData
if TYPE_CHECKING:
from .bot import Client, Extension
from .models.command import Command
log: Logger = get_logger("context")
__all__ = (
"_Context",
"CommandContext",
"ComponentContext",
)
@define()
class _Context(ClientSerializerMixin):
"""
The base class of "context" for dispatched event data
from the gateway. The premise of having this class is so
that the user-facing API is able to allow developers to
easily access information presented from any event in
a "contextualized" sense.
"""
message: Optional[Message] = field(converter=Message, default=None, add_client=True)
member: Optional[Member] = field(default=None, converter=Member, add_client=True)
user: User = field(converter=User, default=None, add_client=True)
id: Snowflake = field(converter=Snowflake)
application_id: Snowflake = field(converter=Snowflake)
type: InteractionType = field(converter=InteractionType)
callback: Optional[InteractionCallbackType] = field(
converter=InteractionCallbackType, default=None
)
data: InteractionData = field(converter=InteractionData)
version: int = field()
token: str = field()
guild_id: Snowflake = field(converter=Snowflake)
channel_id: Snowflake = field(converter=Snowflake)
responded: bool = field(default=False)
deferred: bool = field(default=False)
app_permissions: Permissions = field(converter=convert_int(Permissions), default=None)
locale: Optional[Locale] = field(converter=Locale, default=None)
guild_locale: Optional[Locale] = field(converter=Locale, default=None)
def __attrs_post_init__(self) -> None:
if self.member and self.guild_id:
self.member._extras["guild_id"] = self.guild_id
if self.user is None:
self.user = self.member.user if self.member else None
if self.member and not self.member.user and self.user:
self.member.user = self.user
@property
def deferred_ephemeral(self) -> bool:
"""
.. versionadded:: 4.4.0
Returns whether the current interaction was deferred ephemerally.
"""
return bool(
self.message.flags & MessageFlags.EPHEMERAL
and self.message.flags & MessageFlags.LOADING
)
@property
def created_at(self) -> datetime:
"""
.. versionadded:: 4.4.0
Returns when the interaction was created.
"""
return self.id.timestamp
@property
def author(self) -> Optional[Member]:
"""
Returns the author/member that invoked the interaction.
"""
return self.member
@property
def channel(self) -> Optional[Channel]:
"""
.. versionadded:: 4.1.0
.. versionchanged:: 4.4.0
Channel now returns ``None`` instead of ``MISSING`` if it is not found to avoid confusion
Returns the current channel, if cached.
"""
return self._client.cache[Channel].get(self.channel_id, None) or self._client.cache[
Thread
].get(self.channel_id, None)
@property
def guild(self) -> Optional[Guild]:
"""
.. versionadded:: 4.1.0
.. versionchanged:: 4.4.0
Guild now returns ``None`` instead of ``MISSING`` if it is not found to avoid confusion
Returns the current guild, if cached.
"""
return self._client.cache[Guild].get(self.guild_id, None)
async def get_channel(self) -> Channel:
"""
.. versionadded:: 4.1.0
This gets the channel the context was invoked in. If the channel is not cached, an HTTP request is made.
:return: The channel as object
:rtype: Channel
"""
if channel := self.channel:
await asyncio.sleep(0)
return channel
res = await self._client.get_channel(int(self.channel_id))
return Channel(**res, _client=self._client)
async def get_guild(self) -> Guild:
"""
.. versionadded:: 4.1.0
This gets the guild the context was invoked in. If the guild is not cached, an HTTP request is made.
:return: The guild as object
:rtype: Guild
"""
if guild := self.guild:
await asyncio.sleep(0)
return guild
res = await self._client.get_guild(int(self.guild_id))
return Guild(**res, _client=self._client)
async def send(
self,
content: Optional[str] = MISSING,
*,
tts: Optional[bool] = MISSING,
attachments: Optional[List[Attachment]] = MISSING,
files: Optional[Union[File, List[File]]] = MISSING,
embeds: Optional[Union[Embed, List[Embed]]] = MISSING,
allowed_mentions: Optional[Union[AllowedMentions, dict]] = MISSING,
components: Optional[
Union[ActionRow, Button, SelectMenu, List[ActionRow], List[Button], List[SelectMenu]]
] = MISSING,
ephemeral: Optional[bool] = False,
suppress_embeds: bool = False,
) -> Tuple[dict, List[File]]:
"""
This allows the invocation state described in the "context"
to send an interaction response.
:param Optional[str] content: The contents of the message as a string or string-converted value.
:param Optional[bool] tts: Whether the message utilizes the text-to-speech Discord programme or not.
:param Optional[List[Attachment]] attachments: The attachments to attach to the message. Needs to be uploaded to the CDN first
:param Optional[Union[File, List[File]]] files:
.. versionadded:: 4.4.0
The files to attach to the message.
:param Optional[Union[Embed, List[Embed]]] embeds: An embed, or list of embeds for the message.
:param Optional[Union[AllowedMentions, dict]] allowed_mentions: The allowed mentions for the message.
:param Optional[Union[ActionRow, Button, SelectMenu, List[Union[ActionRow, Button, SelectMenu]]]] components: A component, or list of components for the message.
:param Optional[bool] ephemeral: Whether the response is hidden or not.
:param Optional[bool] suppress_embeds: Whether embeds are not shown in the message.
:return: The sent message.
"""
if (
content is MISSING
and self.message
and self.callback == InteractionCallbackType.DEFERRED_UPDATE_MESSAGE
):
_content = self.message.content
else:
_content: str = "" if content is MISSING else content
_tts: bool = False if tts is MISSING else tts
if (
embeds is MISSING
and self.message
and self.callback == InteractionCallbackType.DEFERRED_UPDATE_MESSAGE
):
embeds = self.message.embeds
_embeds: list = (
[]
if not embeds or embeds is MISSING
else ([embed._json for embed in embeds] if isinstance(embeds, list) else [embeds._json])
)
_allowed_mentions: dict = (
{}
if allowed_mentions is MISSING
else allowed_mentions._json
if isinstance(allowed_mentions, AllowedMentions)
else allowed_mentions
)
if components is not MISSING and components:
# components could be not missing but an empty list
_components = _build_components(components=components)
elif (
components is MISSING
and self.message
and self.callback == InteractionCallbackType.DEFERRED_UPDATE_MESSAGE
):
if isinstance(self.message.components, list):
_components = self.message.components
else:
_components = [self.message.components]
else:
_components = []
_flags = MessageFlags.EPHEMERAL if ephemeral else MessageFlags(0)
if suppress_embeds:
_flags |= MessageFlags.SUPPRESS_EMBEDS
_attachments = [] if attachments is MISSING else [a._json for a in attachments]
if not files or files is MISSING:
_files = []
elif isinstance(files, list):
_files = [file._json_payload(id) for id, file in enumerate(files)]
else:
_files = [files._json_payload(0)]
files = [files]
_files.extend(_attachments)
return (
dict(
content=_content,
tts=_tts,
embeds=_embeds,
allowed_mentions=_allowed_mentions,
components=_components,
attachments=_files,
flags=_flags.value,
),
files,
)
async def edit(
self,
content: Optional[str] = MISSING,
*,
tts: Optional[bool] = MISSING,
attachments: Optional[List[Attachment]] = MISSING,
files: Optional[Union[File, List[File]]] = MISSING,
embeds: Optional[Union[Embed, List[Embed]]] = MISSING,
allowed_mentions: Optional[Union[AllowedMentions, dict]] = MISSING,
message_reference: Optional[MessageReference] = MISSING,
components: Optional[
Union[ActionRow, Button, SelectMenu, List[ActionRow], List[Button], List[SelectMenu]]
] = MISSING,
) -> Tuple[dict, List[File]]: # sourcery skip: low-code-quality
"""
This allows the invocation state described in the "context"
to send an interaction response.
:param Optional[str] content: The contents of the message as a string or string-converted value.
:param Optional[bool] tts: Whether the message utilizes the text-to-speech Discord programme or not.
:param Optional[List[Attachment]] attachments: The attachments to attach to the message. Needs to be uploaded to the CDN first
:param Optional[Union[File, List[File]]] files:
.. versionadded:: 4.4.0
The files to attach to the message.
:param Optional[Union[Embed, List[Embed]]] embeds: An embed, or list of embeds for the message.
:param Optional[Union[AllowedMentions, dict]] allowed_mentions: The allowed mentions for the message.
:param Optional[MessageReference] message_reference: Include to make your message a reply.
:param Optional[Union[ActionRow, Button, SelectMenu, List[Union[ActionRow, Button, SelectMenu]]]] components: A component, or list of components for the message.
:return: The edited message.
"""
payload = {}
if self.message.content is not None or content is not MISSING:
_content: str = self.message.content if content is MISSING else content
payload["content"] = _content
_tts: bool = False if tts is MISSING else tts
payload["tts"] = _tts
if self.message.embeds is not None or embeds is not MISSING:
if embeds is MISSING:
embeds = self.message.embeds
_embeds: list = (
([embed._json for embed in embeds] if isinstance(embeds, list) else [embeds._json])
if embeds
else []
)
payload["embeds"] = _embeds
if self.message.attachments is not None or attachments is not MISSING:
if attachments is MISSING:
attachments = self.message.attachments
_attachments: list = (
(
[attachment._json for attachment in attachments]
if isinstance(attachments, list)
else [attachments._json]
)
if attachments
else []
)
if not files or files is MISSING:
_files = []
elif isinstance(files, list):
_files = [file._json_payload(id) for id, file in enumerate(files)]
else:
_files = [files._json_payload(0)]
files = [files]
_files.extend(_attachments)
payload["attachments"] = _files
_allowed_mentions: dict = (
{}
if allowed_mentions is MISSING
else allowed_mentions._json
if isinstance(allowed_mentions, AllowedMentions)
else allowed_mentions
)
_message_reference: dict = {} if message_reference is MISSING else message_reference._json
payload["allowed_mentions"] = _allowed_mentions
payload["message_reference"] = _message_reference
if self.message.components is not None or components is not MISSING:
if components is MISSING:
_components = _build_components(components=self.message.components)
elif not components:
_components = []
else:
_components = _build_components(components=components)
payload["components"] = _components
return payload, files
async def popup(self, modal: Modal) -> dict:
"""
This "pops up" a modal to present information back to the
user.
:param Modal modal: The components you wish to show.
"""
payload: dict = {
"type": InteractionCallbackType.MODAL.value,
"data": {
"title": modal.title,
"components": modal._json.get("components"),
"custom_id": modal.custom_id,
},
}
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data=payload,
)
self.responded = True
return payload
async def has_permissions(
self, *permissions: Union[int, Permissions], operator: str = "and"
) -> bool:
r"""
.. versionadded:: 4.3.2
Returns whether the author of the interaction has the permissions in the given context.
:param Union[int, Permissions] \*permissions: The list of permissions
:param Optional[str] operator: The operator to use to calculate permissions. Possible values: ``and``, ``or``. Defaults to ``and``.
:return: Whether the author has the permissions
:rtype: bool
"""
if operator == "and":
return all(perm in self.author.permissions for perm in permissions)
else:
return any(perm in self.author.permissions for perm in permissions)
[docs]@define()
class CommandContext(_Context):
"""
A derivation of context
designed specifically for application command data.
:ivar Snowflake id: The ID of the interaction.
:ivar Snowflake application_id: The application ID of the interaction.
:ivar InteractionType type: The type of interaction.
:ivar InteractionData data: The application command data.
:ivar Optional[Union[Message, Member, User]] target: The target selected if this interaction is invoked as a context menu.
:ivar str token: The token of the interaction response.
:ivar Snowflake guild_id: The ID of the current guild.
:ivar Snowflake channel_id: The ID of the current channel.
:ivar User user: The user data model.
:ivar bool responded: Whether an original response was made or not.
:ivar bool deferred: Whether the response was deferred or not.
:ivar Optional[Locale] locale: The selected language of the user invoking the interaction.
:ivar Optional[Locale] guild_locale: The guild's preferred language, if invoked in a guild.
:ivar str app_permissions: Bitwise set of permissions the bot has within the channel the interaction was sent from.
:ivar Client client:
.. versionadded:: 4.3.0
The client instance that the command belongs to.
:ivar Command command:
.. versionadded:: 4.3.0
The command object that is being invoked.
:ivar Extension extension:
.. versionadded:: 4.3.0
The extension the command belongs to.
"""
target: Optional[Union[Message, Member, User]] = field(default=None)
client: "Client" = field(default=None, init=False)
command: "Command" = field(default=None, init=False)
extension: "Extension" = field(default=None, init=False)
def __attrs_post_init__(self) -> None:
super().__attrs_post_init__()
if self.data.target_id:
target = self.data.target_id
if self.data.type == 2:
self.target = (
self.data.resolved.members[target]
if self.guild_id and str(self.data.target_id) in self.data.resolved.members
else self.data.resolved.users[target]
)
# member id would have potential to exist, and therefore have target def priority.
elif self.data.type == 3:
self.target = self.data.resolved.messages[target]
self.target._client = self._client
[docs] async def edit(
self, content: Optional[str] = MISSING, **kwargs
) -> Message: # sourcery skip: low-code-quality
payload, files = await super().edit(content, **kwargs)
msg = None
if self.deferred:
if (
hasattr(self.message, "id")
and self.message.id is not None
and self.message.flags != 64
):
try:
res = await self._client.edit_message(
int(self.channel_id), int(self.message.id), payload=payload, files=files
)
except LibraryException as e:
if e.code in {10015, 10018}:
log.warning(f"You can't edit hidden messages." f"({e.message}).")
else:
# if its not ephemeral or some other thing.
raise e from e
else:
self.message = msg = Message(**res, _client=self._client)
else:
try:
res = await self._client.edit_interaction_response(
token=self.token,
application_id=str(self.application_id),
data=payload,
files=files,
message_id=self.message.id
if self.message and self.message.flags != 64
else "@original",
)
except LibraryException as e:
if e.code in {10015, 10018}:
log.warning(f"You can't edit hidden messages." f"({e.message}).")
else:
# if its not ephemeral or some other thing.
raise e from e
else:
self.message = msg = Message(**res, _client=self._client)
else:
try:
res = await self._client.edit_interaction_response(
token=self.token,
application_id=str(self.application_id),
data=payload,
files=files,
)
except LibraryException as e:
if e.code in {10015, 10018}:
log.warning(f"You can't edit hidden messages." f"({e.message}).")
else:
# if its not ephemeral or some other thing.
raise e from e
else:
self.message = msg = Message(**res, _client=self._client)
return msg if msg is not None else Message(**payload, _client=self._client)
[docs] async def defer(self, ephemeral: Optional[bool] = False) -> Message:
"""
.. versionchanged:: 4.4.0
Now returns the created message object
This "defers" an interaction response, allowing up
to a 15-minute delay between invocation and responding.
:param Optional[bool] ephemeral: Whether the deferred state is hidden or not.
:return: The deferred message
:rtype: Message
"""
if not self.responded:
self.deferred = True
_ephemeral: int = MessageFlags.EPHEMERAL.value if ephemeral else 0
self.callback = InteractionCallbackType.DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data={"type": self.callback.value, "data": {"flags": _ephemeral}},
)
try:
_msg = await self._client.get_original_interaction_response(
self.token, str(self.application_id)
)
except LibraryException:
pass
else:
self.message = Message(**_msg, _client=self._client)
self.responded = True
return self.message
[docs] async def send(self, content: Optional[str] = MISSING, **kwargs) -> Message:
payload, files = await super().send(content, **kwargs)
if not self.deferred:
self.callback = InteractionCallbackType.CHANNEL_MESSAGE_WITH_SOURCE
_payload: dict = {"type": self.callback.value, "data": payload}
msg = None
if self.responded:
res = await self._client._post_followup(
data=payload,
files=files,
token=self.token,
application_id=str(self.application_id),
)
self.message = msg = Message(**res, _client=self._client)
else:
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data=_payload,
files=files,
)
try:
_msg = await self._client.get_original_interaction_response(
self.token, str(self.application_id)
)
except LibraryException:
pass
else:
self.message = msg = Message(**_msg, _client=self._client)
self.responded = True
if msg is not None:
return msg
return Message(
**payload,
_client=self._client,
author={"_client": self._client, "id": None, "username": None, "discriminator": None},
)
[docs] async def delete(self) -> None:
"""
This deletes the interaction response of a message sent by
the contextually derived information from this class.
.. note::
Doing this will proceed in the context message no longer
being present.
"""
if self.responded and self.message is not None:
await self._client.delete_interaction_response(
application_id=str(self.application_id),
token=self.token,
message_id=int(self.message.id),
)
else:
await self._client.delete_interaction_response(
application_id=str(self.application_id), token=self.token
)
self.message = None
[docs] async def populate(self, choices: Union[Choice, List[Choice]]) -> List[Choice]:
"""
This "populates" the list of choices that the client-end
user will be able to select from in the autocomplete field.
.. warning::
Only a maximum of ``25`` choices may be presented
within an autocomplete option.
:param Union[Choice, List[Choice]] choices: The choices you wish to present.
:return: The list of choices you've given.
:rtype: List[Choice]
"""
_choices: Union[list, None] = []
if not choices or (isinstance(choices, list) and len(choices) == 0):
_choices = None
elif isinstance(choices, Choice):
_choices.append(choices._json)
elif isinstance(choices, list) and all(isinstance(choice, Choice) for choice in choices):
_choices = [choice._json for choice in choices]
elif all(
isinstance(choice, dict) and all(isinstance(x, str) for x in choice)
for choice in choices
):
_choices = list(choices)
else:
raise LibraryException(6, message="Autocomplete choice items must be of type Choice")
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data={
"type": InteractionCallbackType.APPLICATION_COMMAND_AUTOCOMPLETE_RESULT.value,
"data": {"choices": _choices},
},
)
return _choices
[docs]@define()
class ComponentContext(_Context):
"""
A derivation of context
designed specifically for component data.
:ivar Snowflake id: The ID of the interaction.
:ivar Snowflake application_id: The application ID of the interaction.
:ivar InteractionType type: The type of interaction.
:ivar InteractionData data: The application command data.
:ivar str token: The token of the interaction response.
:ivar Snowflake guild_id: The ID of the current guild.
:ivar Snowflake channel_id: The ID of the current channel.
:ivar Optional[Message] message: The message data model.
:ivar User user: The user data model.
:ivar bool responded: Whether an original response was made or not.
:ivar bool deferred: Whether the response was deferred or not.
:ivar Optional[Locale] locale: The selected language of the user invoking the interaction.
:ivar Optional[Locale] guild_locale: The guild's preferred language, if invoked in a guild.
:ivar str app_permissions: Bitwise set of permissions the bot has within the channel the interaction was sent from.
"""
[docs] async def edit(self, content: Optional[str] = MISSING, **kwargs) -> Message:
payload, files = await super().edit(content, **kwargs)
msg = None
if not self.deferred:
self.callback = InteractionCallbackType.UPDATE_MESSAGE
await self._client.create_interaction_response(
data={"type": self.callback.value, "data": payload},
files=files,
token=self.token,
application_id=int(self.id),
)
try:
_msg = await self._client.get_original_interaction_response(
self.token, str(self.application_id)
)
except LibraryException:
pass
else:
self.message = msg = Message(**_msg, _client=self._client)
self.responded = True
elif self.callback != InteractionCallbackType.DEFERRED_UPDATE_MESSAGE:
await self._client._post_followup(
data=payload,
files=files,
token=self.token,
application_id=str(self.application_id),
)
else:
res = await self._client.edit_interaction_response(
data=payload,
files=files,
token=self.token,
application_id=str(self.application_id),
)
self.responded = True
self.message = msg = Message(**res, _client=self._client)
return msg if msg is not None else Message(**payload, _client=self._client)
[docs] async def send(self, content: Optional[str] = MISSING, **kwargs) -> Message:
payload, files = await super().send(content, **kwargs)
if not self.deferred:
self.callback = InteractionCallbackType.CHANNEL_MESSAGE_WITH_SOURCE
_payload: dict = {"type": self.callback.value, "data": payload}
msg = None
if self.responded:
res = await self._client._post_followup(
data=payload,
files=files,
token=self.token,
application_id=str(self.application_id),
)
self.message = msg = Message(**res, _client=self._client)
else:
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data=_payload,
files=files,
)
try:
_msg = await self._client.get_original_interaction_response(
self.token, str(self.application_id)
)
except LibraryException:
pass
else:
self.message = msg = Message(**_msg, _client=self._client)
self.responded = True
return msg if msg is not None else Message(**payload, _client=self._client)
[docs] async def defer(
self, ephemeral: Optional[bool] = False, edit_origin: Optional[bool] = False
) -> Message:
"""
.. versionchanged:: 4.4.0
Now returns the created message object
This "defers" a component response, allowing up
to a 15-minute delay between invocation and responding.
:param Optional[bool] ephemeral: Whether the deferred state is hidden or not.
:param Optional[bool] edit_origin: Whether you want to edit the original message or send a followup message
:return: The deferred message
:rtype: Message
"""
if not self.responded:
self.deferred = True
_ephemeral: int = MessageFlags.EPHEMERAL.value if bool(ephemeral) else 0
# ephemeral doesn't change callback typings. just data json
if edit_origin:
self.callback = InteractionCallbackType.DEFERRED_UPDATE_MESSAGE
else:
self.callback = InteractionCallbackType.DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE
await self._client.create_interaction_response(
token=self.token,
application_id=int(self.id),
data={"type": self.callback.value, "data": {"flags": _ephemeral}},
)
try:
_msg = await self._client.get_original_interaction_response(
self.token, str(self.application_id)
)
except LibraryException:
pass
else:
self.message = Message(**_msg, _client=self._client)
self.responded = True
return self.message
[docs] async def disable_all_components(
self, respond_to_interaction: Optional[bool] = True, **other_kwargs: Optional[dict]
) -> Message:
r"""
.. versionadded:: 4.3.2
Disables all components of the message.
:param Optional[bool] respond_to_interaction: Whether the components should be disabled in an interaction response, default True
:param Optional[dict] \**other_kwargs: Additional keyword-arguments to pass to the edit method. This only works when this method is used as interaction response and takes the same arguments as :func:`ComponentContext.edit()`
:return: The modified Message
:rtype: Message
"""
if not respond_to_interaction:
return await self.message.disable_all_components()
for components in self.message.components:
for component in components.components:
component.disabled = True
if other_kwargs.get("components"):
raise LibraryException(
12, "You must not specify the `components` argument in this method."
)
other_kwargs["components"] = self.message.components
return await self.edit(**other_kwargs)
@property
def custom_id(self) -> Optional[str]:
"""
The custom ID of the interacted component.
:rtype: Optional[str]
"""
return self.data.custom_id
@property
def label(self) -> Optional[str]:
"""
The label of the interacted button.
:rtype: Optional[str]
"""
if self.data.component_type != ComponentType.BUTTON:
return
if self.message is None:
return
if self.message.components is None:
return
for action_row in self.message.components:
for component in action_row.components:
if component.custom_id == self.custom_id:
return component.label