Transport¶
The async I/O building blocks every adapter shares: an HTTP client with retry/backoff, a reconnecting WebSocket base, a token-bucket rate limiter, and the generic paginators.
Pagination¶
OHLC is paged by fixed time windows sized to one request
(span × max_per_request), snapped to the bar. Trades are paged by an
opaque per-adapter cursor that drains the whole window — this is what fixes
the capped-single-page data loss on liquid pairs. The application binds the
adapter’s fetch_*_page in a closure and hands it to the paginator.
Generic paginator — forward (start→end) and backward (cursor-based).
The Paginator drives a source’s fetch_*_page methods. Adapters expose a fetch
function with signature fetch(start_ns, end_ns, limit) -> list[T]; the
window size is derived from the source’s declared Capability. This eliminates
per-exchange chunking code (generalises the Coinbase-300 fix to all adapters).
Caller contract: wrap the adapter’s bound method in a closure that closes
over symbol (and span for OHLC) before passing it here. For example:
async def _fetch(start_ns, end_ns, limit):
return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
async for bar in paginate_ohlc(_fetch, cap, start_ns, end_ns, span):
...
- async paginate_backward(fetch_page, start_ns, end_ns, max_per_request, *, emit_progress=None)[source]¶
Paginate backward using opaque cursors.
- Parameters:
- fetch_pageasync callable
Signature:
await fetch_page(cursor, max_per_request) -> (items, next_cursor). cursor isNoneon the first call.
- async paginate_forward(fetch_page, start_ns, end_ns, window_s, max_per_request, *, align_s=None, emit_progress=None)[source]¶
Paginate forward from start_ns to end_ns in fixed time windows.
- Parameters:
- fetch_pageasync callable
Signature:
await fetch_page(window_start_ns, window_end_ns, max_per_request) -> list[T]. Must already havesymbol(andspanfor OHLC) bound via a closure.- start_ns, end_nsint
Time range in nanoseconds.
- window_sint
Window duration in seconds. The paginator advances by
window_s * NSon each step regardless of how many items the page returned.- max_per_requestint
Passed as limit to each
fetch_pagecall.- align_sint or None
Granularity to snap start_ns to (e.g. the candle span). Defaults to
window_s; snapping to the window would pull the start back by up to a whole window (e.g. ~41 days for 1h candles), fetching data the caller never asked for. Snap to the bar instead so the requested start is honoured.- emit_progresscallable or None
Called with
(windows_done, windows_total)after each page.
- async paginate_ohlc(fetch_page, cap, start_ns, end_ns, span, *, emit_progress=None)[source]¶
Paginate OHLC bars forward using declared Capability.
- Parameters:
- fetch_pageasync callable
Must be a closure with
symbolandspanalready bound:fetch_page(start_ns, end_ns, limit) -> list[OHLCBar].- capCapability
Source capability (provides
max_per_requestandpage_direction).- start_ns, end_nsint
Time range in nanoseconds.
- spanint
Candle interval in seconds — used to compute the page window.
- async paginate_trades(fetch_page, cap, start_ns, end_ns, *, emit_progress=None, max_pages=1000000)[source]¶
Paginate trades by cursor, draining the
[start_ns, end_ns]window.Unlike OHLC (fixed-size time windows), trades are far denser than any single page: a one-day window on a liquid pair holds millions of trades but a page is capped at
cap.max_per_request. Advancing by a fixed time window — the previous design — silently dropped everything past the first page. This paginator instead follows the adapter’s opaque cursor until the window is exhausted.- Parameters:
- fetch_pageasync callable
Closure with
symbolbound:fetch_page(start_ns, end_ns, limit, cursor) -> (items, next_cursor).cursorisNoneon the first call.- capCapability
Source capability (provides
max_per_request).- start_ns, end_nsint
Inclusive time range in nanoseconds. Items outside it are filtered out.
- emit_progresscallable or None
Called with
(pages_done, -1)after each page (total is unknown).- max_pagesint
Hard safety cap on the number of pages, to bound a misbehaving cursor.
HTTP client¶
Shared by every adapter and reference-counted, so concurrent operations on the same exchange don’t close it out from under each other.
- class AsyncHTTPClient(max_retries=5, backoff_base=2.0, timeout=30.0, headers=None)[source]¶
Thin wrapper around httpx.AsyncClient with retry/backoff.
- Parameters:
- max_retriesint
Number of retry attempts on transient errors (5xx, network errors).
- backoff_basefloat
Exponential backoff base in seconds.
- timeoutfloat
Request timeout in seconds.
WebSocket base¶
- class WebSocketBase(url)[source]¶
Base async WebSocket client with exponential reconnect.
Subclasses override
on_connectto send subscription messages andparse_messageto yield domain records from raw frames.For adapters that need a raw-frame async generator (e.g. to maintain local order-book state across messages), use
stream_rawinstead.- Parameters:
- urlstr
WebSocket endpoint URL.
- async stream()[source]¶
Yield parsed domain records, reconnecting on errors.
Delegates to
parse_messagefor frame parsing.
- async stream_raw()[source]¶
Yield raw WebSocket frames, reconnecting with exponential backoff.
Use this in adapters where
parse_messageis not convenient (e.g. stateful order-book reconstruction that spans multiple frames).