Source code for interactions.api.http.request

import asyncio
import traceback
from asyncio import AbstractEventLoop, Lock, get_event_loop, get_running_loop, new_event_loop
from contextlib import suppress
from json import dumps
from logging import Logger
from sys import version_info
from typing import Any, Dict, Optional
from urllib.parse import quote

from aiohttp import ClientSession
from aiohttp import __version__ as http_version

from ...api.error import LibraryException
from ...base import __version__, get_logger
from .limiter import Limiter
from .route import Route

__all__ = ("_Request",)
log: Logger = get_logger("http")


[docs]class _Request: """ A class representing how HTTP requests are sent/read. :ivar str token: The current application token. :ivar AbstractEventLoop _loop: The current coroutine event loop. :ivar Dict[str, Limiter] ratelimits: The current per-route rate limiters from the API. :ivar Dict[str, str] buckets: The current endpoint to shared_bucket cache from the API. :ivar dict _headers: The current headers for an HTTP request. :ivar ClientSession _session: The current session for making requests. :ivar Limiter _global_lock: The global rate limiter. """ __slots__ = ( "token", "_loop", "ratelimits", "buckets", "_headers", "_session", "_global_lock", ) token: str _loop: AbstractEventLoop ratelimits: Dict[str, Limiter] # bucket: Limiter buckets: Dict[str, str] # endpoint: shared_bucket _headers: dict _global_lock: Limiter def __init__(self, token: str) -> None: """ :param token: The application token used for authorizing. :type token: str """ self.token = token try: self._loop = get_event_loop() if version_info < (3, 10) else get_running_loop() except RuntimeError: self._loop = new_event_loop() self.ratelimits = {} self.buckets = {} self._headers = { "Authorization": f"Bot {self.token}", "User-Agent": f"DiscordBot (https://github.com/interactions-py/library {__version__}) " f"Python/{version_info[0]}.{version_info[1]} " f"aiohttp/{http_version}", } self._session = ClientSession() self._global_lock = ( Limiter(lock=Lock(loop=self._loop)) if version_info < (3, 10) else Limiter(lock=Lock()) ) def _check_session(self) -> None: """Ensures that we have a valid connection session.""" if self._session.closed: self._session = ClientSession() async def _check_lock(self) -> None: """Checks the global lock for its current state.""" if self._global_lock.lock.locked(): log.warning("The HTTP client is still globally locked, waiting for it to clear.") await self._global_lock.lock.acquire() self._global_lock.reset_after = 0
[docs] async def request(self, route: Route, **kwargs) -> Optional[Any]: r""" Sends a request to the Discord API. :param route: The HTTP route to request. :type route: Route :param \**kwargs?: Optional keyword-only arguments to pass as information in the request. :type \**kwargs?: dict :return: The contents of the request if any. :rtype: Optional[Any] """ # sourcery skip: low-code-quality kwargs["headers"] = {**self._headers, **kwargs.get("headers", {})} if kwargs.get("json"): kwargs["headers"]["Content-Type"] = "application/json" if reason := kwargs.pop("reason", None): kwargs["headers"]["X-Audit-Log-Reason"] = quote(reason, safe="/ ") # Huge credit and thanks to LordOfPolls for the lock/retry logic. bucket = route.get_bucket( self.buckets.get(route.endpoint) ) # string returning path OR prioritised hash bucket metadata. # The idea is that its regulated by the priority of Discord's bucket header and not just self-computation. # This implementation is based on JDA's bucket implementation, which we heavily use in favour of allowing routes # and other resources to be exhausted first on a separate lock call before hitting global limits. if self.ratelimits.get(bucket): _limiter: Limiter = self.ratelimits.get(bucket) if _limiter.lock.locked(): if ( _limiter.reset_after != 0 ): # Just saying 0 seconds isn't helpful, so this is suppressed. log.warning( f"The current bucket is still under a rate limit. Calling later in {_limiter.reset_after} seconds." ) self._loop.call_later(_limiter.reset_after, _limiter.release_lock) _limiter.reset_after = 0 else: self.ratelimits[bucket] = ( Limiter(lock=Lock(loop=self._loop)) if version_info < (3, 10) else Limiter(lock=Lock()) ) _limiter: Limiter = self.ratelimits.get(bucket) await _limiter.lock.acquire() # _limiter is the per shared bucket/route endpoint # Implement retry logic. The common seems to be 5, so this is hardcoded, for the most part. for tries in range(5): # 3, 5? 5 seems to be common try: self._check_session() await self._check_lock() async with self._session.request( route.method, route.__api__ + route.path, **kwargs ) as response: if response.content_type == "application/json": data = await response.json() if isinstance(data, dict): message: Optional[str] = data.get("message") code: int = data.get("code", response.status) else: message, code = None, response.status else: data, message, code = None, None, response.status reset_after: float = float( response.headers.get( "X-RateLimit-Reset-After", response.headers.get("Retry-After", "0.0") ) ) remaining: str = response.headers.get("X-RateLimit-Remaining") _bucket: str = response.headers.get("X-RateLimit-Bucket") is_global: bool = response.headers.get("X-RateLimit-Global", False) log.debug(f"{route.method}: {route.__api__ + route.path}: {kwargs}") if _bucket is not None: self.buckets[route.endpoint] = _bucket # real-time replacement/update/add if needed. if isinstance(data, dict) and ( data.get("errors") or (code and code not in {429, 31001} and message) ): log.debug( f"RETURN {response.status}: {dumps(data, indent=4, sort_keys=True)}" ) # This "redundant" debug line is for debug use and tracing back the error codes. raise LibraryException(message=message, code=code, severity=40, data=data) elif isinstance(data, dict) and code == 0 and message: log.debug( f"RETURN {response.status}: {dumps(data, indent=4, sort_keys=True)}" ) # This "redundant" debug line is for debug use and tracing back the error codes. raise LibraryException( message=f"'{message}'. Make sure that your token is set properly.", severity=50, ) if code in {429, 31001}: hours = int(reset_after // 3600) minutes = int((reset_after % 3600) // 60) seconds = int(reset_after % 60) log.warning( "(429/31001) The Bot has encountered a rate-limit. Resuming future requests after " f"{f'{hours} hours ' if hours else ''}" f"{f'{minutes} minutes ' if minutes else ''}" f"{f'{seconds} seconds ' if seconds else ''}" ) if is_global: self._global_lock.reset_after = reset_after self._loop.call_later( self._global_lock.reset_after, self._global_lock.lock.release ) else: _limiter.reset_after = reset_after await asyncio.sleep(_limiter.reset_after) continue if remaining is not None and int(remaining) == 0: log.warning( f"The HTTP client has exhausted a per-route ratelimit. Locking route for {reset_after} seconds." ) self._loop.call_later(reset_after, _limiter.release_lock) log.debug(f"RETURN {response.status}: {dumps(data, indent=4, sort_keys=True)}") _limiter.release_lock() # checks if its locked, then releases upon success. return data # These account for general/specific exceptions. (Windows...) except OSError as e: if tries < 4 and e.errno in (54, 10054): await asyncio.sleep(2 * tries + 1) continue with suppress(RuntimeError): _limiter.lock.release() raise # For generic exceptions we give a traceback for debug reasons. except Exception as e: with suppress(RuntimeError): _limiter.lock.release() if isinstance(e, LibraryException): raise log.error("".join(traceback.format_exception(type(e), e, e.__traceback__))) break
[docs] async def close(self) -> None: """Closes the current session.""" await self._session.close()