Transport¶
The async I/O building blocks every adapter shares: an HTTP client with retry/backoff, a reconnecting WebSocket base, a token-bucket rate limiter, and the generic paginators.
Pagination¶
OHLC is paged by fixed time windows sized to one request
(span × max_per_request), snapped to the bar. Trades are paged by an
opaque per-adapter cursor that drains the whole window — this is what fixes
the capped-single-page data loss on liquid pairs. The application binds the
adapter’s fetch_*_page in a closure and hands it to the paginator.
Generic paginator — forward (start→end) and backward (cursor-based).
The Paginator drives a source’s fetch_*_page methods. Adapters expose a fetch
function with signature fetch(start_ns, end_ns, limit) -> list[T]; the
window size is derived from the source’s declared Capability. This eliminates
per-exchange chunking code (generalises the Coinbase-300 fix to all adapters).
Caller contract: wrap the adapter’s bound method in a closure that closes
over symbol (and span for OHLC) before passing it here. For example:
async def _fetch(start_ns, end_ns, limit):
return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
async for bar in paginate_ohlc(_fetch, cap, start_ns, end_ns, span):
...
- async paginate_backward(fetch_page, start_ns, end_ns, max_per_request, *, emit_progress=None)[source]¶
Paginate backward using opaque cursors.
- Parameters:
- fetch_pageasync callable
Signature:
await fetch_page(cursor, max_per_request) -> (items, next_cursor). cursor isNoneon the first call.
- async paginate_forward(fetch_page, start_ns, end_ns, window_s, max_per_request, *, align_s=None, emit_progress=None)[source]¶
Paginate forward from start_ns to end_ns in fixed time windows.
- Parameters:
- fetch_pageasync callable
Signature:
await fetch_page(window_start_ns, window_end_ns, max_per_request) -> list[T]. Must already havesymbol(andspanfor OHLC) bound via a closure.- start_ns, end_nsint
Time range in nanoseconds.
- window_sint
Window duration in seconds. The paginator advances by
window_s * NSon each step regardless of how many items the page returned.- max_per_requestint
Passed as limit to each
fetch_pagecall.- align_sint or None
Granularity to snap start_ns to (e.g. the candle span). Defaults to
window_s; snapping to the window would pull the start back by up to a whole window (e.g. ~41 days for 1h candles), fetching data the caller never asked for. Snap to the bar instead so the requested start is honoured.- emit_progresscallable or None
Called with
(windows_done, windows_total)after each page.
- async paginate_ohlc(fetch_page, cap, start_ns, end_ns, span, *, emit_progress=None)[source]¶
Paginate OHLC bars forward using declared Capability.
- Parameters:
- fetch_pageasync callable
Must be a closure with
symbolandspanalready bound:fetch_page(start_ns, end_ns, limit) -> list[OHLCBar].- capCapability
Source capability (provides
max_per_requestandpage_direction).- start_ns, end_nsint
Time range in nanoseconds.
- spanint
Candle interval in seconds — used to compute the page window.
- async paginate_trades(fetch_page, cap, start_ns, end_ns, *, emit_progress=None, max_pages=1000000)[source]¶
Paginate trades by cursor, draining the
[start_ns, end_ns]window.Unlike OHLC (fixed-size time windows), trades are far denser than any single page: a one-day window on a liquid pair holds millions of trades but a page is capped at
cap.max_per_request. Advancing by a fixed time window — the previous design — silently dropped everything past the first page. This paginator instead follows the adapter’s opaque cursor until the window is exhausted.- Parameters:
- fetch_pageasync callable
Closure with
symbolbound:fetch_page(start_ns, end_ns, limit, cursor) -> (items, next_cursor).cursorisNoneon the first call.- capCapability
Source capability (provides
max_per_request).- start_ns, end_nsint
Inclusive time range in nanoseconds. Items outside it are filtered out.
- emit_progresscallable or None
Called with
(pages_done, -1)after each page (total is unknown).- max_pagesint
Hard safety cap on the number of pages, to bound a misbehaving cursor.
HTTP client¶
Shared by every adapter and reference-counted, so concurrent operations on the same exchange don’t close it out from under each other.
- class AsyncHTTPClient(max_retries=5, backoff_base=2.0, timeout=30.0, headers=None, exchange=None, limiter=None)[source]¶
Thin wrapper around httpx.AsyncClient with retry/backoff.
- Parameters:
- max_retriesint
Number of retry attempts on transient errors (5xx, network errors).
- backoff_basefloat
Exponential backoff base in seconds.
- timeoutfloat
Request timeout in seconds.
- exchangestr, optional
Exchange name used to key the proactive rate limiter. When set together with limiter, every
getawaits a token before the request, smoothing bursts (e.g. “run all jobs”) to the exchange’s published rate.None(default) disables proactive throttling.- limiterRateLimiter, optional
Shared per-exchange limiter (typically
dccd.transport.ratelimit.shared_limiter). Only consulted when exchange is also set.
WebSocket base¶
- class WebSocketBase(url)[source]¶
Base async WebSocket client with exponential reconnect.
Subclasses override
on_connectto send subscription messages andparse_messageto yield domain records from raw frames.For adapters that need a raw-frame async generator (e.g. to maintain local order-book state across messages), use
stream_rawinstead.- Parameters:
- urlstr
WebSocket endpoint URL.
- async stream()[source]¶
Yield parsed domain records, reconnecting on errors.
Delegates to
parse_messagefor frame parsing.
- async stream_raw()[source]¶
Yield raw WebSocket frames, reconnecting with exponential backoff.
Use this in adapters where
parse_messageis not convenient (e.g. stateful order-book reconstruction that spans multiple frames).
Rate limiter¶
Rate limiting is proactive: a process-wide limiter keyed by exchange is
awaited before every outbound REST request, so concurrent operations on the
same exchange (e.g. “Run all” over many jobs) share one bucket and stay under
the exchange’s published rate. Conservative per-exchange defaults are built in
(e.g. Kraken 1 req/s, Coinbase 3 req/s); reactive 429/Retry-After
handling in the HTTP client remains as a backstop. The HTTP connection pool is
held open for the whole paginated operation — one TLS session per backfill,
not one per page.
- class RateLimiter(rates=None, *, clock=None, sleep=None)[source]¶
Per-exchange rate limiter holding one
TokenBucketeach.- Parameters:
- ratesdict, optional
Map of exchange name → requests per second, merged over the conservative defaults. Unknown exchanges fall back to
_FALLBACK_RATE.- clockcallable, optional
Monotonic time source forwarded to each bucket (test seam).
- sleepcallable, optional
Async sleep forwarded to each bucket (test seam).
Return the process-wide
RateLimitershared by all adapters.One instance per process means concurrent operations on the same exchange share a bucket and are throttled together. Wired in by adapters’ default HTTP client construction.
- Returns:
- RateLimiter