Source code for dccd.transport.ratelimit

"""Token-bucket rate limiter, one bucket per exchange.

A single process-wide :class:`RateLimiter` (see :func:`shared_limiter`) is shared
by every adapter so that *concurrent* operations on the same exchange — e.g. a
"run all jobs" burst, or a scheduled backfill overlapping a manual one — draw
from one bucket and are serialised to the exchange's published rate, instead of
each adapter firing at the full rate independently. This is *proactive*
throttling; :class:`~dccd.transport.http.AsyncHTTPClient` keeps its *reactive*
429/Retry-After handling as a backstop.

Default rates are deliberately conservative (public, unauthenticated REST):

============  =========  ====================================================
exchange      req/s      source
============  =========  ====================================================
binance       10.0       weight-based, 1200 weight/min; klines weight 2 → high
coinbase       3.0       public endpoints 3 req/s (docs.cdp.coinbase.com)
kraken         1.0       public endpoints ~1 req/s (support.kraken.com)
bybit         10.0       public market data ~10 req/s
okx            8.0       history-candles 20 req/2s = 10/s; kept under for margin
bitfinex       1.0       public REST 10-90 req/min → ~1 req/s conservative
bitmex         0.5       unauthenticated ~30 req/min → 0.5 req/s
============  =========  ====================================================
"""

from __future__ import annotations

import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator, Awaitable, Callable

__all__ = ["RateLimiter", "shared_limiter"]

_DEFAULT_RATES: dict[str, float] = {
    "binance": 10.0,
    "coinbase": 3.0,
    "kraken": 1.0,
    "bybit": 10.0,
    "okx": 8.0,
    "bitfinex": 1.0,
    "bitmex": 0.5,
}

# Rate used for any exchange not listed in ``_DEFAULT_RATES``.
_FALLBACK_RATE = 3.0


class TokenBucket:
    """Single token-bucket for one exchange.

    Parameters
    ----------
    rate : float
        Sustained requests per second (also the burst capacity).
    clock : callable, optional
        Monotonic time source (seconds). Injectable for tests; defaults to
        :func:`time.monotonic`.
    sleep : callable, optional
        Async sleep coroutine factory. Injectable for tests; defaults to
        :func:`asyncio.sleep`.
    """

    def __init__(
        self,
        rate: float,
        *,
        clock: Callable[[], float] | None = None,
        sleep: Callable[[float], Awaitable[None]] | None = None,
    ) -> None:
        self._rate = rate
        self._tokens = rate
        self._clock = clock or time.monotonic
        self._sleep = sleep or asyncio.sleep
        self._last = self._clock()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Wait until a token is available, then consume it."""
        async with self._lock:
            now = self._clock()
            elapsed = now - self._last
            self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
            self._last = now
            if self._tokens < 1.0:
                wait = (1.0 - self._tokens) / self._rate
                await self._sleep(wait)
                self._last = self._clock()
                self._tokens = 0.0
            else:
                self._tokens -= 1.0


[docs] class RateLimiter: """Per-exchange rate limiter holding one :class:`TokenBucket` each. Parameters ---------- rates : dict, optional Map of exchange name → requests per second, merged over the conservative defaults. Unknown exchanges fall back to ``_FALLBACK_RATE``. clock : callable, optional Monotonic time source forwarded to each bucket (test seam). sleep : callable, optional Async sleep forwarded to each bucket (test seam). """ def __init__( self, rates: dict[str, float] | None = None, *, clock: Callable[[], float] | None = None, sleep: Callable[[float], Awaitable[None]] | None = None, ) -> None: self._rates = {**_DEFAULT_RATES, **(rates or {})} self._clock = clock self._sleep = sleep self._buckets: dict[str, TokenBucket] = {} def _bucket(self, exchange: str) -> TokenBucket: if exchange not in self._buckets: rate = self._rates.get(exchange, _FALLBACK_RATE) self._buckets[exchange] = TokenBucket( rate, clock=self._clock, sleep=self._sleep ) return self._buckets[exchange]
[docs] async def acquire(self, exchange: str) -> None: """Wait until a token is available for *exchange*, then consume it.""" await self._bucket(exchange).acquire()
@asynccontextmanager async def __call__(self, exchange: str) -> AsyncIterator[None]: await self.acquire(exchange) yield
_shared_limiter = RateLimiter() def shared_limiter() -> RateLimiter: """Return the process-wide :class:`RateLimiter` shared by all adapters. One instance per process means concurrent operations on the same exchange share a bucket and are throttled together. Wired in by adapters' default HTTP client construction. Returns ------- RateLimiter """ return _shared_limiter