Transport

The async I/O building blocks every adapter shares: an HTTP client with retry/backoff, a reconnecting WebSocket base, a token-bucket rate limiter, and the generic paginators.

Pagination

OHLC is paged by fixed time windows sized to one request (span × max_per_request), snapped to the bar. Trades are paged by an opaque per-adapter cursor that drains the whole window — this is what fixes the capped-single-page data loss on liquid pairs. The application binds the adapter’s fetch_*_page in a closure and hands it to the paginator.

Generic paginator — forward (start→end) and backward (cursor-based).

The Paginator drives a source’s fetch_*_page methods. Adapters expose a fetch function with signature fetch(start_ns, end_ns, limit) -> list[T]; the window size is derived from the source’s declared Capability. This eliminates per-exchange chunking code (generalises the Coinbase-300 fix to all adapters).

Caller contract: wrap the adapter’s bound method in a closure that closes over symbol (and span for OHLC) before passing it here. For example:

async def _fetch(start_ns, end_ns, limit):
    return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
async for bar in paginate_ohlc(_fetch, cap, start_ns, end_ns, span):
    ...
async paginate_backward(fetch_page, start_ns, end_ns, max_per_request, *, emit_progress=None)[source]

Paginate backward using opaque cursors.

Parameters:
fetch_pageasync callable

Signature: await fetch_page(cursor, max_per_request) -> (items, next_cursor). cursor is None on the first call.

async paginate_forward(fetch_page, start_ns, end_ns, window_s, max_per_request, *, align_s=None, emit_progress=None)[source]

Paginate forward from start_ns to end_ns in fixed time windows.

Parameters:
fetch_pageasync callable

Signature: await fetch_page(window_start_ns, window_end_ns, max_per_request) -> list[T]. Must already have symbol (and span for OHLC) bound via a closure.

start_ns, end_nsint

Time range in nanoseconds.

window_sint

Window duration in seconds. The paginator advances by window_s * NS on each step regardless of how many items the page returned.

max_per_requestint

Passed as limit to each fetch_page call.

align_sint or None

Granularity to snap start_ns to (e.g. the candle span). Defaults to window_s; snapping to the window would pull the start back by up to a whole window (e.g. ~41 days for 1h candles), fetching data the caller never asked for. Snap to the bar instead so the requested start is honoured.

emit_progresscallable or None

Called with (windows_done, windows_total) after each page.

async paginate_ohlc(fetch_page, cap, start_ns, end_ns, span, *, emit_progress=None)[source]

Paginate OHLC bars forward using declared Capability.

Parameters:
fetch_pageasync callable

Must be a closure with symbol and span already bound: fetch_page(start_ns, end_ns, limit) -> list[OHLCBar].

capCapability

Source capability (provides max_per_request and page_direction).

start_ns, end_nsint

Time range in nanoseconds.

spanint

Candle interval in seconds — used to compute the page window.

async paginate_trades(fetch_page, cap, start_ns, end_ns, *, emit_progress=None, max_pages=1000000)[source]

Paginate trades by cursor, draining the [start_ns, end_ns] window.

Unlike OHLC (fixed-size time windows), trades are far denser than any single page: a one-day window on a liquid pair holds millions of trades but a page is capped at cap.max_per_request. Advancing by a fixed time window — the previous design — silently dropped everything past the first page. This paginator instead follows the adapter’s opaque cursor until the window is exhausted.

Parameters:
fetch_pageasync callable

Closure with symbol bound: fetch_page(start_ns, end_ns, limit, cursor) -> (items, next_cursor). cursor is None on the first call.

capCapability

Source capability (provides max_per_request).

start_ns, end_nsint

Inclusive time range in nanoseconds. Items outside it are filtered out.

emit_progresscallable or None

Called with (pages_done, -1) after each page (total is unknown).

max_pagesint

Hard safety cap on the number of pages, to bound a misbehaving cursor.

HTTP client

Shared by every adapter and reference-counted, so concurrent operations on the same exchange don’t close it out from under each other.

class AsyncHTTPClient(max_retries=5, backoff_base=2.0, timeout=30.0, headers=None, exchange=None, limiter=None)[source]

Thin wrapper around httpx.AsyncClient with retry/backoff.

Parameters:
max_retriesint

Number of retry attempts on transient errors (5xx, network errors).

backoff_basefloat

Exponential backoff base in seconds.

timeoutfloat

Request timeout in seconds.

exchangestr, optional

Exchange name used to key the proactive rate limiter. When set together with limiter, every get awaits a token before the request, smoothing bursts (e.g. “run all jobs”) to the exchange’s published rate. None (default) disables proactive throttling.

limiterRateLimiter, optional

Shared per-exchange limiter (typically dccd.transport.ratelimit.shared_limiter). Only consulted when exchange is also set.

async get(url, params=None)[source]

Perform a GET request with retry/backoff. Returns parsed JSON.

exception HTTPError(status, url, body='')[source]

Raised when an HTTP request fails after all retries.

WebSocket base

class WebSocketBase(url)[source]

Base async WebSocket client with exponential reconnect.

Subclasses override on_connect to send subscription messages and parse_message to yield domain records from raw frames.

For adapters that need a raw-frame async generator (e.g. to maintain local order-book state across messages), use stream_raw instead.

Parameters:
urlstr

WebSocket endpoint URL.

async on_connect(ws)[source]

Called once after each (re)connect. Override to send subscriptions.

async parse_message(raw)[source]

Parse a raw frame and yield domain records. Override in subclass.

stop()[source]

Request graceful shutdown.

async stream()[source]

Yield parsed domain records, reconnecting on errors.

Delegates to parse_message for frame parsing.

async stream_raw()[source]

Yield raw WebSocket frames, reconnecting with exponential backoff.

Use this in adapters where parse_message is not convenient (e.g. stateful order-book reconstruction that spans multiple frames).

Rate limiter

Rate limiting is proactive: a process-wide limiter keyed by exchange is awaited before every outbound REST request, so concurrent operations on the same exchange (e.g. “Run all” over many jobs) share one bucket and stay under the exchange’s published rate. Conservative per-exchange defaults are built in (e.g. Kraken 1 req/s, Coinbase 3 req/s); reactive 429/Retry-After handling in the HTTP client remains as a backstop. The HTTP connection pool is held open for the whole paginated operation — one TLS session per backfill, not one per page.

class RateLimiter(rates=None, *, clock=None, sleep=None)[source]

Per-exchange rate limiter holding one TokenBucket each.

Parameters:
ratesdict, optional

Map of exchange name → requests per second, merged over the conservative defaults. Unknown exchanges fall back to _FALLBACK_RATE.

clockcallable, optional

Monotonic time source forwarded to each bucket (test seam).

sleepcallable, optional

Async sleep forwarded to each bucket (test seam).

async acquire(exchange)[source]

Wait until a token is available for exchange, then consume it.

shared_limiter()[source]

Return the process-wide RateLimiter shared by all adapters.

One instance per process means concurrent operations on the same exchange share a bucket and are throttled together. Wired in by adapters’ default HTTP client construction.

Returns:
RateLimiter