Source code for dccd.sources.coinbase

"""Coinbase source adapter — OHLC (300/req), trades, order book (REST + WS)."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import NS, coinbase_granularity, ns_to_s, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesHistory,
    TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["CoinbaseSource"]

logger = logging.getLogger(__name__)

_BASE = "https://api.exchange.coinbase.com"
_COINBASE_SPANS = [60, 300, 900, 3600, 21600, 86400]


[docs] class CoinbaseSource( OHLCHistory, TradesHistory, OrderBookSnapshotREST, OHLCLive, TradesLive, OrderBookLive, ): """Coinbase source adapter. - **Backfill**: OHLC (full, 300 candles/req — windowed automatically), trades (recent only — see Notes), order-book snapshot (level 2). - **Stream**: trades only. Notes ----- Coinbase paginates trades through ``CB-AFTER`` response *headers*, which the JSON-only transport does not expose, so trades backfill returns a single recent page (declared ``history="recent"``). Live OHLC / order book are not implemented and are not declared as capabilities. See Also -------- dccd.Client : the public facade. Examples -------- >>> from dccd.sources.coinbase import CoinbaseSource >>> CoinbaseSource().capability_for(DataType.OHLC, 'rest', 'historical').max_per_request 300 """ exchange = "coinbase" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or AsyncHTTPClient()
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=300, page_direction="forward", spans=_COINBASE_SPANS, ), Capability( data_type=DataType.TRADES, transport="rest", mode="historical", history="recent", max_per_request=100, page_direction="backward", ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_depth=50, ), # Only the trades WS channel is implemented. OHLC/orderbook live # are intentionally NOT declared (no honest implementation yet) so # the engine rejects them instead of running an empty stream. Capability(data_type=DataType.TRADES, transport="ws", mode="live"), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Coinbase format: BTC-USD.""" return f"{s.base}-{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" gran = coinbase_granularity(span) if not gran: logger.warning("Coinbase does not support span=%d", span) return [] pair = self.render_symbol(symbol) start_dt = datetime.fromtimestamp(ns_to_s(start_ns), tz=timezone.utc).isoformat() end_dt = datetime.fromtimestamp(ns_to_s(end_ns), tz=timezone.utc).isoformat() params: dict[str, Any] = { "start": start_dt, "end": end_dt, "granularity": gran, } async with self._http as client: data = await client.get(f"{_BASE}/products/{pair}/candles", params) if not isinstance(data, list): logger.error("Coinbase candles unexpected response: %r", data) return [] return [ OHLCBar( ts=int(e[0]) * NS, open=float(e[3]), high=float(e[2]), low=float(e[1]), close=float(e[4]), volume=float(e[5]), # Coinbase candles carry no quote volume; close×volume would be # a fabricated approximation, so leave it null (see fidelity # matrix in the docs). quote_volume=None, ) for e in data if isinstance(e, (list, tuple)) and len(e) >= 6 ]
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch the most recent trades (``history="recent"``). Coinbase Exchange paginates trades through ``CB-AFTER``/``CB-BEFORE`` *response headers*, which the current JSON-only transport does not expose. We therefore return a single recent page and never a cursor — deep history is genuinely unavailable here (declared via the ``history="recent"`` capability, enforced by the backfill engine). """ pair = self.render_symbol(symbol) async with self._http as client: data = await client.get( f"{_BASE}/products/{pair}/trades", {"limit": min(limit, 100)}, ) result = [] for e in (data if isinstance(data, list) else []): try: ts_str = e.get("time", "") ts_dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) ts_ns = s_to_ns(ts_dt.timestamp()) if not (start_ns <= ts_ns <= end_ns): continue result.append(Trade( ts=ts_ns, price=float(e["price"]), amount=float(e["size"]), side=e.get("side"), tid=str(e.get("trade_id", "")), )) except Exception: continue return result, None
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" pair = self.render_symbol(symbol) async with self._http as client: data = await client.get(f"{_BASE}/products/{pair}/book", {"level": 2}) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1]), count=int(b[2]) if len(b) > 2 else None) for b in data.get("bids", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1]), count=int(a[2]) if len(a) > 2 else None) for a in data.get("asks", [])] return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" ws = _CoinbaseWS(self.render_symbol(symbol)) return ws.stream_trades()
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" raise NotImplementedError("Coinbase live OHLC stream is not implemented")
[docs] def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots/deltas over WebSocket.""" raise NotImplementedError("Coinbase live order book stream is not implemented")
class _CoinbaseWS(WebSocketBase): def __init__(self, pair: str) -> None: super().__init__("wss://advanced-trade-ws.coinbase.com") self._pair = pair async def on_connect(self, ws: Any) -> None: """Send the subscription message after each (re)connect.""" await ws.send(json.dumps({ "type": "subscribe", "product_ids": [self._pair], "channel": "market_trades", })) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Trade]: """Parse a raw WebSocket frame into domain records.""" data = json.loads(raw) for event in data.get("events", []): for t in event.get("trades", []): try: ts_str = t.get("time", "") ts_dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) yield Trade( ts=s_to_ns(ts_dt.timestamp()), price=float(t["price"]), amount=float(t["size"]), side=t.get("side", "").lower() or None, tid=str(t.get("trade_id", "")), ) except Exception: continue async def stream_trades(self) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" async for item in self.stream(): yield item