Transport

The async I/O building blocks every adapter shares: an HTTP client with retry/backoff, a reconnecting WebSocket base, a token-bucket rate limiter, and the generic paginators.

Pagination

OHLC is paged by fixed time windows sized to one request (span × max_per_request), snapped to the bar. Trades are paged by an opaque per-adapter cursor that drains the whole window — this is what fixes the capped-single-page data loss on liquid pairs. The application binds the adapter’s fetch_*_page in a closure and hands it to the paginator.

Generic paginator — forward (start→end) and backward (cursor-based).

The Paginator drives a source’s fetch_*_page methods. Adapters expose a fetch function with signature fetch(start_ns, end_ns, limit) -> list[T]; the window size is derived from the source’s declared Capability. This eliminates per-exchange chunking code (generalises the Coinbase-300 fix to all adapters).

Caller contract: wrap the adapter’s bound method in a closure that closes over symbol (and span for OHLC) before passing it here. For example:

async def _fetch(start_ns, end_ns, limit):
    return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
async for bar in paginate_ohlc(_fetch, cap, start_ns, end_ns, span):
    ...
async paginate_backward(fetch_page, start_ns, end_ns, max_per_request, *, emit_progress=None)[source]

Paginate backward using opaque cursors.

Parameters:
fetch_pageasync callable

Signature: await fetch_page(cursor, max_per_request) -> (items, next_cursor). cursor is None on the first call.

async paginate_forward(fetch_page, start_ns, end_ns, window_s, max_per_request, *, align_s=None, emit_progress=None)[source]

Paginate forward from start_ns to end_ns in fixed time windows.

Parameters:
fetch_pageasync callable

Signature: await fetch_page(window_start_ns, window_end_ns, max_per_request) -> list[T]. Must already have symbol (and span for OHLC) bound via a closure.

start_ns, end_nsint

Time range in nanoseconds.

window_sint

Window duration in seconds. The paginator advances by window_s * NS on each step regardless of how many items the page returned.

max_per_requestint

Passed as limit to each fetch_page call.

align_sint or None

Granularity to snap start_ns to (e.g. the candle span). Defaults to window_s; snapping to the window would pull the start back by up to a whole window (e.g. ~41 days for 1h candles), fetching data the caller never asked for. Snap to the bar instead so the requested start is honoured.

emit_progresscallable or None

Called with (windows_done, windows_total) after each page.

async paginate_ohlc(fetch_page, cap, start_ns, end_ns, span, *, emit_progress=None)[source]

Paginate OHLC bars forward using declared Capability.

Parameters:
fetch_pageasync callable

Must be a closure with symbol and span already bound: fetch_page(start_ns, end_ns, limit) -> list[OHLCBar].

capCapability

Source capability (provides max_per_request and page_direction).

start_ns, end_nsint

Time range in nanoseconds.

spanint

Candle interval in seconds — used to compute the page window.

async paginate_trades(fetch_page, cap, start_ns, end_ns, *, emit_progress=None, max_pages=1000000)[source]

Paginate trades by cursor, draining the [start_ns, end_ns] window.

Unlike OHLC (fixed-size time windows), trades are far denser than any single page: a one-day window on a liquid pair holds millions of trades but a page is capped at cap.max_per_request. Advancing by a fixed time window — the previous design — silently dropped everything past the first page. This paginator instead follows the adapter’s opaque cursor until the window is exhausted.

Parameters:
fetch_pageasync callable

Closure with symbol bound: fetch_page(start_ns, end_ns, limit, cursor) -> (items, next_cursor). cursor is None on the first call.

capCapability

Source capability (provides max_per_request).

start_ns, end_nsint

Inclusive time range in nanoseconds. Items outside it are filtered out.

emit_progresscallable or None

Called with (pages_done, -1) after each page (total is unknown).

max_pagesint

Hard safety cap on the number of pages, to bound a misbehaving cursor.

HTTP client

Shared by every adapter and reference-counted, so concurrent operations on the same exchange don’t close it out from under each other.

class AsyncHTTPClient(max_retries=5, backoff_base=2.0, timeout=30.0, headers=None)[source]

Thin wrapper around httpx.AsyncClient with retry/backoff.

Parameters:
max_retriesint

Number of retry attempts on transient errors (5xx, network errors).

backoff_basefloat

Exponential backoff base in seconds.

timeoutfloat

Request timeout in seconds.

async get(url, params=None)[source]

Perform a GET request with retry/backoff. Returns parsed JSON.

exception HTTPError(status, url, body='')[source]

Raised when an HTTP request fails after all retries.

WebSocket base

class WebSocketBase(url)[source]

Base async WebSocket client with exponential reconnect.

Subclasses override on_connect to send subscription messages and parse_message to yield domain records from raw frames.

For adapters that need a raw-frame async generator (e.g. to maintain local order-book state across messages), use stream_raw instead.

Parameters:
urlstr

WebSocket endpoint URL.

async on_connect(ws)[source]

Called once after each (re)connect. Override to send subscriptions.

async parse_message(raw)[source]

Parse a raw frame and yield domain records. Override in subclass.

stop()[source]

Request graceful shutdown.

async stream()[source]

Yield parsed domain records, reconnecting on errors.

Delegates to parse_message for frame parsing.

async stream_raw()[source]

Yield raw WebSocket frames, reconnecting with exponential backoff.

Use this in adapters where parse_message is not convenient (e.g. stateful order-book reconstruction that spans multiple frames).

Rate limiter

class RateLimiter(rates=None)[source]

Global per-exchange rate limiter.

Parameters:
ratesdict

Map of exchange name → requests per second.

async acquire(exchange)[source]

Wait until a token is available for exchange, then consume it.