"""Generic paginator — forward (start→end) and backward (cursor-based).
The Paginator drives a source's fetch_*_page methods. Adapters expose a fetch
function with signature ``fetch(start_ns, end_ns, limit) -> list[T]``; the
window size is derived from the source's declared Capability. This eliminates
per-exchange chunking code (generalises the Coinbase-300 fix to all adapters).
**Caller contract**: wrap the adapter's bound method in a closure that closes
over ``symbol`` (and ``span`` for OHLC) before passing it here. For example::
async def _fetch(start_ns, end_ns, limit):
return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
async for bar in paginate_ohlc(_fetch, cap, start_ns, end_ns, span):
...
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator, Callable, Coroutine
from typing import Any, TypeVar
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, Trade
from dccd.domain.timeutils import NS, align_ns
__all__ = ["paginate_forward", "paginate_backward", "paginate_ohlc", "paginate_trades"]
logger = logging.getLogger(__name__)
T = TypeVar("T")
async def paginate_forward(
fetch_page: Callable[[int, int, int], Coroutine[Any, Any, list[T]]],
start_ns: int,
end_ns: int,
window_s: int,
max_per_request: int,
*,
align_s: int | None = None,
emit_progress: Callable[[int, int], None] | None = None,
) -> AsyncIterator[T]:
"""Paginate forward from *start_ns* to *end_ns* in fixed time windows.
Parameters
----------
fetch_page : async callable
Signature: ``await fetch_page(window_start_ns, window_end_ns, max_per_request) -> list[T]``.
Must already have ``symbol`` (and ``span`` for OHLC) bound via a closure.
start_ns, end_ns : int
Time range in nanoseconds.
window_s : int
Window duration in seconds. The paginator advances by ``window_s * NS``
on each step regardless of how many items the page returned.
max_per_request : int
Passed as *limit* to each ``fetch_page`` call.
align_s : int or None
Granularity to snap *start_ns* to (e.g. the candle span). Defaults to
``window_s``; snapping to the *window* would pull the start back by up
to a whole window (e.g. ~41 days for 1h candles), fetching data the
caller never asked for. Snap to the bar instead so the requested start
is honoured.
emit_progress : callable or None
Called with ``(windows_done, windows_total)`` after each page.
"""
window_ns = window_s * NS
snap = align_s if align_s is not None else window_s
cur = align_ns(start_ns, snap) if snap >= 60 else start_ns
total_windows = max(1, (end_ns - cur + window_ns - 1) // window_ns)
done = 0
while cur < end_ns:
chunk_end = min(cur + window_ns, end_ns)
items = await fetch_page(cur, chunk_end, max_per_request)
for item in items:
yield item
cur = chunk_end
done += 1
if emit_progress:
emit_progress(done, total_windows)
async def paginate_backward(
fetch_page: Callable[[str | None, int], Coroutine[Any, Any, tuple[list[T], str | None]]],
start_ns: int,
end_ns: int,
max_per_request: int,
*,
emit_progress: Callable[[int, int], None] | None = None,
) -> AsyncIterator[T]:
"""Paginate backward using opaque cursors.
Parameters
----------
fetch_page : async callable
Signature: ``await fetch_page(cursor, max_per_request) -> (items, next_cursor)``.
*cursor* is ``None`` on the first call.
"""
cursor: str | None = None
page = 0
while True:
items, next_cursor = await fetch_page(cursor, max_per_request)
filtered = [item for item in items if start_ns <= _get_ts(item) <= end_ns]
for item in filtered:
yield item
page += 1
if emit_progress:
emit_progress(page, -1)
if next_cursor is None:
break
if items and _get_ts(items[-1]) < start_ns:
break
cursor = next_cursor
def _get_ts(item: Any) -> int:
return item.ts if hasattr(item, "ts") else 0
[docs]
async def paginate_ohlc(
fetch_page: Callable[[int, int, int], Coroutine[Any, Any, list[OHLCBar]]],
cap: Capability,
start_ns: int,
end_ns: int,
span: int,
*,
emit_progress: Callable[[int, int], None] | None = None,
) -> AsyncIterator[OHLCBar]:
"""Paginate OHLC bars forward using declared Capability.
Parameters
----------
fetch_page : async callable
Must be a closure with ``symbol`` and ``span`` already bound:
``fetch_page(start_ns, end_ns, limit) -> list[OHLCBar]``.
cap : Capability
Source capability (provides ``max_per_request`` and ``page_direction``).
start_ns, end_ns : int
Time range in nanoseconds.
span : int
Candle interval in seconds — used to compute the page window.
"""
max_per = cap.max_per_request or 1000
# Window = span * max_per_request so each call fills exactly one page, but
# snap the start to the bar (span) — not the window — so the requested start
# is honoured rather than pulled back by up to one whole window.
window_s = span * max_per
async for bar in paginate_forward(
fetch_page, start_ns, end_ns, window_s, max_per,
align_s=span, emit_progress=emit_progress,
):
yield bar
[docs]
async def paginate_trades(
fetch_page: Callable[
[int, int, int, str | None],
Coroutine[Any, Any, tuple[list[Trade], str | None]],
],
cap: Capability,
start_ns: int,
end_ns: int,
*,
emit_progress: Callable[[int, int], None] | None = None,
max_pages: int = 1_000_000,
) -> AsyncIterator[Trade]:
"""Paginate trades by **cursor**, draining the ``[start_ns, end_ns]`` window.
Unlike OHLC (fixed-size time windows), trades are far denser than any single
page: a one-day window on a liquid pair holds millions of trades but a page
is capped at ``cap.max_per_request``. Advancing by a fixed time window —
the previous design — silently dropped everything past the first page. This
paginator instead follows the adapter's opaque cursor until the window is
exhausted.
Parameters
----------
fetch_page : async callable
Closure with ``symbol`` bound:
``fetch_page(start_ns, end_ns, limit, cursor) -> (items, next_cursor)``.
``cursor`` is ``None`` on the first call.
cap : Capability
Source capability (provides ``max_per_request``).
start_ns, end_ns : int
Inclusive time range in nanoseconds. Items outside it are filtered out.
emit_progress : callable or None
Called with ``(pages_done, -1)`` after each page (total is unknown).
max_pages : int
Hard safety cap on the number of pages, to bound a misbehaving cursor.
"""
max_per = cap.max_per_request or 1000
cursor: str | None = None
pages = 0
while pages < max_pages:
items, next_cursor = await fetch_page(start_ns, end_ns, max_per, cursor)
out_of_window = False
for item in items:
ts = _get_ts(item)
if ts > end_ns:
out_of_window = True
break
if ts >= start_ns:
yield item
pages += 1
if emit_progress:
emit_progress(pages, -1)
if out_of_window or next_cursor is None or next_cursor == cursor:
break
cursor = next_cursor