Source code for dccd.sources.base

"""Source protocols — fine-grained per (data_type × mode)."""

from __future__ import annotations

from collections.abc import AsyncIterator

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType

__all__ = [
    "Source",
    "OHLCHistory",
    "TradesHistory",
    "OrderBookSnapshotREST",
    "OHLCLive",
    "TradesLive",
    "OrderBookLive",
]


[docs] class Source: """Base mixin for all source adapters. Adapters inherit from ``Source`` and one or more capability protocols. They declare capabilities and render exchange-specific symbol strings. """ exchange: str = ""
[docs] def capabilities(self) -> list[Capability]: """Return list of declared capabilities.""" return []
[docs] def render_symbol(self, s: Symbol) -> str: """Render a canonical Symbol to the exchange-specific string.""" return str(s)
[docs] def capability_for( self, data_type: DataType, transport: str, mode: str, ) -> Capability | None: """Return the declared :class:`Capability` for this combination, or None.""" for cap in self.capabilities(): if ( cap.data_type == data_type and cap.transport == transport and cap.mode == mode ): return cap return None
[docs] class OHLCHistory(Source): """Protocol: can fetch historical OHLC pages via REST."""
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch up to *limit* OHLC bars of *span* seconds in ``[start_ns, end_ns]``.""" raise NotImplementedError
[docs] class TradesHistory(Source): """Protocol: can fetch historical trade pages via REST. Cursor contract: ``fetch_trades_page`` returns ``(trades, next_cursor)``. The *cursor* is an opaque, adapter-defined string used to continue inside the ``[start_ns, end_ns)`` window: - ``cursor=None`` on the first call — anchor on ``start_ns`` (or ``end_ns`` for adapters that page backward). - ``next_cursor`` is ``None`` when the window is exhausted (the adapter returned a short/last page, or the next item would fall outside the window). Returning a non-``None`` cursor tells the paginator to call again. This lets the generic paginator drain a window completely — fixing the capped-single-page data loss that affected every liquid pair — without per-exchange chunking in the application layer. """
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch one page of trades; return ``(trades, next_cursor)`` (see class).""" raise NotImplementedError
[docs] class OrderBookSnapshotREST(Source): """Protocol: can fetch an order book snapshot via REST."""
[docs] async def fetch_orderbook( self, symbol: Symbol, depth: int, ) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" raise NotImplementedError
[docs] class OHLCLive(Source): """Protocol: can stream live OHLC bars via WebSocket."""
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Yield live OHLC bars of *span* seconds over WebSocket.""" raise NotImplementedError
[docs] class TradesLive(Source): """Protocol: can stream live trades via WebSocket."""
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Yield live trades over WebSocket.""" raise NotImplementedError
[docs] class OrderBookLive(Source): """Protocol: can stream live order book snapshots/deltas via WebSocket."""
[docs] def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]: """Yield live order-book snapshots/deltas over WebSocket.""" raise NotImplementedError