Source code for dccd.sources.base
"""Source protocols — fine-grained per (data_type × mode)."""
from __future__ import annotations
from collections.abc import AsyncIterator
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
__all__ = [
"Source",
"OHLCHistory",
"TradesHistory",
"OrderBookSnapshotREST",
"OHLCLive",
"TradesLive",
"OrderBookLive",
]
[docs]
class Source:
"""Base mixin for all source adapters.
Adapters inherit from ``Source`` and one or more capability protocols.
They declare capabilities and render exchange-specific symbol strings.
"""
exchange: str = ""
[docs]
def capabilities(self) -> list[Capability]:
"""Return list of declared capabilities."""
return []
[docs]
def render_symbol(self, s: Symbol) -> str:
"""Render a canonical Symbol to the exchange-specific string."""
return str(s)
[docs]
def capability_for(
self,
data_type: DataType,
transport: str,
mode: str,
) -> Capability | None:
"""Return the declared :class:`Capability` for this combination, or None."""
for cap in self.capabilities():
if (
cap.data_type == data_type
and cap.transport == transport
and cap.mode == mode
):
return cap
return None
[docs]
class OHLCHistory(Source):
"""Protocol: can fetch historical OHLC pages via REST."""
[docs]
async def fetch_ohlc_page(
self,
symbol: Symbol,
span: int,
start_ns: int,
end_ns: int,
limit: int,
) -> list[OHLCBar]:
"""Fetch up to *limit* OHLC bars of *span* seconds in ``[start_ns, end_ns]``."""
raise NotImplementedError
[docs]
class TradesHistory(Source):
"""Protocol: can fetch historical trade pages via REST.
Cursor contract: ``fetch_trades_page`` returns ``(trades, next_cursor)``.
The *cursor* is an opaque, adapter-defined string used to continue inside
the ``[start_ns, end_ns)`` window:
- ``cursor=None`` on the first call — anchor on ``start_ns`` (or ``end_ns``
for adapters that page backward).
- ``next_cursor`` is ``None`` when the window is exhausted (the adapter
returned a short/last page, or the next item would fall outside the
window). Returning a non-``None`` cursor tells the paginator to call again.
This lets the generic paginator drain a window completely — fixing the
capped-single-page data loss that affected every liquid pair — without
per-exchange chunking in the application layer.
"""
[docs]
async def fetch_trades_page(
self,
symbol: Symbol,
start_ns: int,
end_ns: int,
limit: int,
cursor: str | None = None,
) -> tuple[list[Trade], str | None]:
"""Fetch one page of trades; return ``(trades, next_cursor)`` (see class)."""
raise NotImplementedError
[docs]
class OrderBookSnapshotREST(Source):
"""Protocol: can fetch an order book snapshot via REST."""
[docs]
async def fetch_orderbook(
self,
symbol: Symbol,
depth: int,
) -> OrderBookSnapshot:
"""Fetch a current order-book snapshot up to *depth* levels."""
raise NotImplementedError
[docs]
class OHLCLive(Source):
"""Protocol: can stream live OHLC bars via WebSocket."""
[docs]
def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]:
"""Yield live OHLC bars of *span* seconds over WebSocket."""
raise NotImplementedError
[docs]
class TradesLive(Source):
"""Protocol: can stream live trades via WebSocket."""
[docs]
def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]:
"""Yield live trades over WebSocket."""
raise NotImplementedError
[docs]
class OrderBookLive(Source):
"""Protocol: can stream live order book snapshots/deltas via WebSocket."""
[docs]
def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]:
"""Yield live order-book snapshots/deltas over WebSocket."""
raise NotImplementedError