Source code for dccd.sources.binance

"""Binance source adapter — OHLC, trades, order book (REST + WS)."""

from __future__ import annotations

import json
import logging
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import binance_interval, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesHistory,
    TradesLive,
    default_http_client,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BinanceSource"]

logger = logging.getLogger(__name__)

_BASE_REST = "https://api.binance.com/api/v3"
_BASE_WS = "wss://stream.binance.com:9443/stream"


def _parse_ohlc_page(data: list[Any]) -> list[OHLCBar]:
    """Parse a raw Binance klines response into :class:`~dccd.domain.records.OHLCBar` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is
        ``[open_time_ms, open, high, low, close, volume, close_time_ms,
        quote_vol, trades, ...]``.
    """
    return [
        OHLCBar(
            ts=int(e[0]) * 1_000_000,
            open=float(e[1]),
            high=float(e[2]),
            low=float(e[3]),
            close=float(e[4]),
            volume=float(e[5]),
            quote_volume=float(e[7]),
            trades=int(e[8]),
        )
        for e in data
    ]


def _parse_aggtrades_page(data: list[Any], end_ms: int | None = None) -> tuple[list[Trade], str | None]:
    """Parse a raw Binance aggTrades response into :class:`~dccd.domain.records.Trade` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is a dict with ``a`` (agg id),
        ``p`` (price), ``q`` (qty), ``T`` (timestamp ms), ``m`` (is maker).
    end_ms:
        Upper bound in milliseconds; used to decide whether to return a cursor.
    """
    if not data:
        return [], None
    trades = [
        Trade(
            ts=int(e["T"]) * 1_000_000,
            price=float(e["p"]),
            amount=float(e["q"]),
            side="sell" if e["m"] else "buy",
            tid=str(e["a"]),
        )
        for e in data
    ]
    last_ts_ms = int(data[-1]["T"])
    next_cursor = (
        str(int(data[-1]["a"]) + 1)
        if end_ms is not None and len(data) > 0 and last_ts_ms < end_ms
        else None
    )
    return trades, next_cursor


[docs] class BinanceSource( OHLCHistory, TradesHistory, OrderBookSnapshotREST, OHLCLive, TradesLive, OrderBookLive, ): """Binance source adapter (spot). The reference adapter — full historical depth and every live channel. - **Backfill**: OHLC (``klines``, 1 000/req), trades (``aggTrades``, cursor-paginated by ``fromId``), order-book snapshot (``depth``, ≤ 5 000). - **Stream**: OHLC (``kline``), trades (``aggTrade``), order book (``depth``). Adapters are not used directly — they are resolved by the engine from the registry. Drive them through :class:`dccd.Client` or the CLI. See Also -------- dccd.Client : the public facade. dccd.sources.registry.SourceRegistry : adapter resolution. Examples -------- >>> from dccd.sources.binance import BinanceSource >>> sorted({c.data_type.value for c in BinanceSource().capabilities()}) ['ohlc', 'orderbook', 'trades'] """ exchange = "binance" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or default_http_client(self.exchange) self._owned_http = http is None
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 28800, 43200, 86400, 259200, 604800, 2592000], ), Capability( data_type=DataType.TRADES, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_per_request=1, max_depth=5000, ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=20, depths=[5, 10, 20]), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Binance format: BTCUSDT (no separator).""" return f"{s.base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" interval = binance_interval(span) if not interval: return [] params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "interval": interval, "startTime": start_ns // 1_000_000, "endTime": end_ns // 1_000_000, "limit": limit, } async with self._http as client: data = await client.get(f"{_BASE_REST}/klines", params) return _parse_ohlc_page(data)
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch one page of aggregate trades (cursor = ``fromId``). First call (``cursor=None``) is time-bounded on ``start_ns``; subsequent calls follow the ``fromId`` cursor. The next cursor is the last aggregate-trade id + 1, returned only while the page is full and the last trade is still inside the window. """ sym = self.render_symbol(symbol) end_ms = end_ns // 1_000_000 if cursor is None: params: dict[str, Any] = { "symbol": sym, "startTime": start_ns // 1_000_000, "endTime": end_ms, "limit": limit, } else: params = {"symbol": sym, "fromId": int(cursor), "limit": limit} async with self._http as client: data = await client.get(f"{_BASE_REST}/aggTrades", params) return _parse_aggtrades_page(data, end_ms=end_ms if len(data) >= limit else None)
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"symbol": self.render_symbol(symbol), "limit": min(depth, 5000)} async with self._http as client: data = await client.get(f"{_BASE_REST}/depth", params) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]] import time return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" interval = binance_interval(span) or "1m" pair = self.render_symbol(symbol).lower() ws = _BinanceKlineWS(pair, interval) return ws.stream()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" pair = self.render_symbol(symbol).lower() ws = _BinanceTradeWS(pair) return ws.stream()
[docs] def stream_orderbook( self, symbol: Symbol, depth: int, *, min_interval: float = 0.0, ) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots over WebSocket. Uses Binance's *partial book depth* stream, which pushes a fully sorted top-N snapshot — N is clamped to the supported {5, 10, 20}. """ pair = self.render_symbol(symbol).lower() levels = 5 if depth <= 5 else 10 if depth <= 10 else 20 ws = _BinanceDepthWS(pair, levels) return ws.stream(min_interval=min_interval)
class _BinanceKlineWS(WebSocketBase): def __init__(self, pair: str, interval: str) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@kline_{interval}") async def parse_message(self, raw: str | bytes) -> AsyncIterator[OHLCBar]: """Parse a raw WebSocket frame into domain records.""" data = json.loads(raw) if data.get("e") != "kline": return k = data["k"] yield OHLCBar( ts=int(k["t"]) * 1_000_000, open=float(k["o"]), high=float(k["h"]), low=float(k["l"]), close=float(k["c"]), volume=float(k["v"]), quote_volume=float(k["q"]), trades=int(k["n"]), ) class _BinanceTradeWS(WebSocketBase): def __init__(self, pair: str) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@aggTrade") async def parse_message(self, raw: str | bytes) -> AsyncIterator[Trade]: """Parse a raw WebSocket frame into domain records.""" data = json.loads(raw) if data.get("e") != "aggTrade": return yield Trade( ts=int(data["T"]) * 1_000_000, price=float(data["p"]), amount=float(data["q"]), side="sell" if data["m"] else "buy", tid=str(data["a"]), ) class _BinanceDepthWS(WebSocketBase): # Partial Book Depth Stream: a fully sorted top-N snapshot every 100ms # (``bids`` best-first, ``asks`` best-first). NOT ``@depth`` (the *diff* # stream), whose unsorted partial updates make best bid/ask meaningless and # often crossed. def __init__(self, pair: str, levels: int = 20) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@depth{levels}@100ms") async def parse_message(self, raw: str | bytes) -> AsyncIterator[OrderBookSnapshot]: """Parse a raw WebSocket frame into domain records.""" import time data = json.loads(raw) if "bids" not in data or "asks" not in data: return bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]] yield OrderBookSnapshot( ts=s_to_ns(time.time()), bids=bids, asks=asks, is_snapshot=True, ) async def stream(self, min_interval: float = 0.0) -> AsyncIterator[OrderBookSnapshot]: """Yield parsed order-book snapshots, throttled to *min_interval* seconds. The throttle check is applied on the raw frame **before** calling ``parse_message`` so no pydantic objects are constructed for frames that will be discarded. ``min_interval=0.0`` preserves the legacy per-frame behaviour. """ import time as _time last_emit: float = -float("inf") # first frame always emits async for raw in self.stream_raw(): # Fast path: do a cheap JSON sniff before committing to parse_message. now = _time.monotonic() if now - last_emit < min_interval: continue async for record in self.parse_message(raw): last_emit = _time.monotonic() yield record