Source code for dccd.sources.binance

"""Binance source adapter — OHLC, trades, order book (REST + WS)."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import (
    FundingRate,
    OHLCBar,
    OpenInterest,
    OrderBookLevel,
    OrderBookSnapshot,
    Trade,
)
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import binance_interval, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
    FundingHistory,
    OHLCHistory,
    OHLCLive,
    OpenInterestHistory,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesHistory,
    TradesLive,
    default_http_client,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BinanceSource"]

logger = logging.getLogger(__name__)

_BASE_REST = "https://api.binance.com/api/v3"
_BASE_WS = "wss://stream.binance.com:9443/stream"
_BASE_FAPI = "https://fapi.binance.com/fapi/v1"
_BASE_FDATA = "https://fapi.binance.com/futures/data"
_CONTRACT_TYPE = {
    "perp": "PERPETUAL",
    "quarter": "CURRENT_QUARTER",
    "next_quarter": "NEXT_QUARTER",
}
# The only ``period`` values ``openInterestHist`` accepts — a strict subset of
# what :func:`~dccd.domain.timeutils.binance_interval` can emit (no 1m/3m/8h/…).
_OI_PERIODS = frozenset({"5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "1d"})
# ``openInterestHist`` serves a rolling 30-day window and returns HTTP 400
# (code -1130, "parameter 'startTime' is invalid.") for any startTime at or
# beyond the boundary instead of trimming to it — verified 2026-07-04.
# The floor is therefore re-evaluated at request time, with a safety margin so
# clock skew, rate-limit waits and retries cannot push a request back over it.
_OI_WINDOW_MS = 30 * 86400 * 1000
_OI_FLOOR_MARGIN_MS = 5 * 60 * 1000


def _parse_ohlc_page(data: list[Any]) -> list[OHLCBar]:
    """Parse a raw Binance klines response into :class:`~dccd.domain.records.OHLCBar` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is
        ``[open_time_ms, open, high, low, close, volume, close_time_ms,
        quote_vol, trades, ...]``.
    """
    return [
        OHLCBar(
            ts=int(e[0]) * 1_000_000,
            open=float(e[1]),
            high=float(e[2]),
            low=float(e[3]),
            close=float(e[4]),
            volume=float(e[5]),
            quote_volume=float(e[7]),
            trades=int(e[8]),
        )
        for e in data
    ]


def _parse_aggtrades_page(data: list[Any], end_ms: int | None = None) -> tuple[list[Trade], str | None]:
    """Parse a raw Binance aggTrades response into :class:`~dccd.domain.records.Trade` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is a dict with ``a`` (agg id),
        ``p`` (price), ``q`` (qty), ``T`` (timestamp ms), ``m`` (is maker).
    end_ms:
        Upper bound in milliseconds; used to decide whether to return a cursor.
    """
    if not data:
        return [], None
    trades = [
        Trade(
            ts=int(e["T"]) * 1_000_000,
            price=float(e["p"]),
            amount=float(e["q"]),
            side="sell" if e["m"] else "buy",
            tid=str(e["a"]),
        )
        for e in data
    ]
    last_ts_ms = int(data[-1]["T"])
    next_cursor = (
        str(int(data[-1]["a"]) + 1)
        if end_ms is not None and len(data) > 0 and last_ts_ms < end_ms
        else None
    )
    return trades, next_cursor


def _parse_funding_page(data: list[Any], limit: int) -> tuple[list[FundingRate], str | None]:
    """Parse a raw Binance ``fundingRate`` response into :class:`~dccd.domain.records.FundingRate` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is a dict with ``fundingTime``
        (ms), ``fundingRate`` (str), and ``markPrice`` (str, may be empty or
        absent). The response is ascending by ``fundingTime``.
    limit:
        The *limit* passed to the request — used to detect a full page (the
        signal that more data may follow).
    """
    if not data:
        return [], None
    rates = [
        FundingRate(
            ts=int(e["fundingTime"]) * 1_000_000,
            rate=float(e["fundingRate"]),
            mark_price=float(e["markPrice"]) if e.get("markPrice") else None,
        )
        for e in data
    ]
    next_cursor = str(int(data[-1]["fundingTime"]) + 1) if len(data) == limit else None
    return rates, next_cursor


def _parse_oi_page(
    data: list[Any], span: int, end_ms: int,
) -> tuple[list[OpenInterest], str | None]:
    """Parse a raw Binance ``openInterestHist`` response into :class:`~dccd.domain.records.OpenInterest` records.

    Parameters
    ----------
    data:
        The parsed JSON list — each element is a dict with ``timestamp`` (ms),
        ``sumOpenInterest`` (str, base asset) and ``sumOpenInterestValue``
        (str, quote asset). The response is ascending by ``timestamp``.
    span:
        Observation cadence in seconds — the next cursor is the last
        timestamp advanced by one span.
    end_ms:
        The **global** window end in milliseconds. Because each request's
        window is bounded to the page capacity (see ``fetch_oi_page``), page
        fullness is *not* a reliable continuation signal — a boundary page can
        be short while plenty of window remains. The walk continues as long as
        the next slot still falls inside the global window.
    """
    if not data:
        return [], None
    oi = [
        OpenInterest(
            ts=int(e["timestamp"]) * 1_000_000,
            open_interest=float(e["sumOpenInterest"]),
            open_interest_value=float(e["sumOpenInterestValue"]),
        )
        for e in data
    ]
    next_ms = int(data[-1]["timestamp"]) + span * 1000
    next_cursor = str(next_ms) if next_ms <= end_ms else None
    return oi, next_cursor


[docs] class BinanceSource( OHLCHistory, TradesHistory, OrderBookSnapshotREST, FundingHistory, OpenInterestHistory, OHLCLive, TradesLive, OrderBookLive, ): """Binance source adapter (spot + USDS-M futures OHLC, funding, open interest). The reference adapter — full historical depth and every live channel. - **Backfill**: OHLC (``klines``, 1 000/req), trades (``aggTrades``, cursor-paginated by ``fromId``), order-book snapshot (``depth``, ≤ 5 000), realized funding rate (USDS-M ``fundingRate``, ``perp`` market only), open-interest statistics (USDS-M ``openInterestHist``, ``perp`` only — Binance serves at most the **last 30 days**, hence ``history="recent"`` + ``recent_window_s``; run a recurring job to accumulate history forward). - **Stream**: OHLC (``kline``), trades (``aggTrade``), order book (``depth``). - **Derivative OHLC**: a non-spot :attr:`~dccd.domain.symbol.Symbol.market` (``perp``, ``quarter``, ``next_quarter``) routes ``fetch_ohlc_page`` to the USDS-M futures ``continuousKlines`` endpoint instead of spot ``klines`` — no live futures channels are declared (WS caps stay spot-only). Adapters are not used directly — they are resolved by the engine from the registry. Drive them through :class:`dccd.Client` or the CLI. See Also -------- dccd.Client : the public facade. dccd.sources.registry.SourceRegistry : adapter resolution. Examples -------- >>> from dccd.sources.binance import BinanceSource >>> sorted({c.data_type.value for c in BinanceSource().capabilities()}) ['funding', 'ohlc', 'open_interest', 'orderbook', 'trades'] """ exchange = "binance" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or default_http_client(self.exchange) self._owned_http = http is None
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 28800, 43200, 86400, 259200, 604800, 2592000], markets=["spot", "perp", "quarter", "next_quarter"], ), Capability( data_type=DataType.TRADES, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_per_request=1, max_depth=5000, ), Capability( data_type=DataType.FUNDING, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", markets=["perp"], ), Capability( data_type=DataType.OPEN_INTEREST, transport="rest", mode="historical", history="recent", recent_window_s=30 * 86400, max_per_request=500, page_direction="forward", markets=["perp"], spans=[300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400], ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=20, depths=[5, 10, 20]), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Binance format: BTCUSDT (no separator).""" return f"{s.base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" interval = binance_interval(span) if not interval: return [] if symbol.market != "spot": # USDS-M futures (continuous contract): auto-rolled quarterly series # or perpetual OHLC. Same 12-field kline layout as spot, so # ``_parse_ohlc_page`` is reused unchanged. Uses the shared # "binance" rate-limit bucket (conservative — fapi actually has its # own, separate limits from spot) and the shared reference-counted # HTTP client. fapi_params: dict[str, Any] = { "pair": self.render_symbol(symbol), "contractType": _CONTRACT_TYPE[symbol.market], "interval": interval, "startTime": start_ns // 1_000_000, "endTime": end_ns // 1_000_000, "limit": min(limit, 1500), } async with self._http as client: data = await client.get(f"{_BASE_FAPI}/continuousKlines", fapi_params) return _parse_ohlc_page(data) params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "interval": interval, "startTime": start_ns // 1_000_000, "endTime": end_ns // 1_000_000, "limit": limit, } async with self._http as client: data = await client.get(f"{_BASE_REST}/klines", params) return _parse_ohlc_page(data)
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch one page of aggregate trades (cursor = ``fromId``). First call (``cursor=None``) is time-bounded on ``start_ns``; subsequent calls follow the ``fromId`` cursor. The next cursor is the last aggregate-trade id + 1, returned only while the page is full and the last trade is still inside the window. """ sym = self.render_symbol(symbol) end_ms = end_ns // 1_000_000 if cursor is None: params: dict[str, Any] = { "symbol": sym, "startTime": start_ns // 1_000_000, "endTime": end_ms, "limit": limit, } else: params = {"symbol": sym, "fromId": int(cursor), "limit": limit} async with self._http as client: data = await client.get(f"{_BASE_REST}/aggTrades", params) return _parse_aggtrades_page(data, end_ms=end_ms if len(data) >= limit else None)
[docs] async def fetch_funding_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[FundingRate], str | None]: """Fetch one page of realized funding rates (cursor = next ``startTime`` in ms). First call (``cursor=None``) is time-bounded on ``start_ns``; subsequent calls advance ``startTime`` to the last event's ``fundingTime`` + 1 ms. USDS-M futures only — ``fundingRate`` has no spot equivalent. """ params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "startTime": int(cursor) if cursor else start_ns // 1_000_000, "endTime": end_ns // 1_000_000, "limit": min(limit, 1000), } async with self._http as client: data = await client.get(f"{_BASE_FAPI}/fundingRate", params) return _parse_funding_page(data, min(limit, 1000))
[docs] async def fetch_oi_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[OpenInterest], str | None]: """Fetch one page of open-interest statistics (cursor = next ``startTime`` in ms). USDS-M ``openInterestHist`` — ascending pages, walked forward like :meth:`fetch_funding_page`: the first call (``cursor=None``) anchors on ``start_ns``, then each page advances ``startTime`` to the last observation's timestamp + one *span*. Two quirks of the real endpoint (verified 2026-07-04) shape the request: - Binance serves only the **last 30 days** (declared as ``history="recent"``) and returns HTTP 400 for a ``startTime`` at or beyond the rolling boundary — so ``startTime`` is floored to the window (re-evaluated at request time, with a safety margin). - When the requested window holds more than ``limit`` observations, Binance returns the **latest** ones in it, not the earliest — so each request's ``endTime`` is bounded to the page capacity (``limit`` slots) and the continuation signal is *window left*, not page fullness. Returns ``([], None)`` without a request when *span* has no supported ``period`` mapping (only {5m, 15m, 30m, 1h, 2h, 4h, 6h, 12h, 1d}) or when the whole window predates the 30-day floor. """ period = binance_interval(span) if period is None or period not in _OI_PERIODS: logger.warning("No Binance openInterestHist period for span=%d", span) return [], None limit_eff = min(limit, 500) end_ms = end_ns // 1_000_000 start_ms = int(cursor) if cursor else start_ns // 1_000_000 floor_ms = int(time.time() * 1000) - _OI_WINDOW_MS + _OI_FLOOR_MARGIN_MS start_ms = max(start_ms, floor_ms) if start_ms > end_ms: return [], None # ``limit`` slots from an *aligned* start span (limit-1)×span ms # inclusive; a wider window would drop its oldest observations. end_req_ms = min(end_ms, start_ms + (limit_eff - 1) * span * 1000) params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "period": period, "limit": limit_eff, "startTime": start_ms, "endTime": end_req_ms, } async with self._http as client: data = await client.get(f"{_BASE_FDATA}/openInterestHist", params) return _parse_oi_page(data, span, end_ms)
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"symbol": self.render_symbol(symbol), "limit": min(depth, 5000)} async with self._http as client: data = await client.get(f"{_BASE_REST}/depth", params) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]] import time return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" interval = binance_interval(span) or "1m" pair = self.render_symbol(symbol).lower() ws = _BinanceKlineWS(pair, interval) return ws.stream()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" pair = self.render_symbol(symbol).lower() ws = _BinanceTradeWS(pair) return ws.stream()
[docs] def stream_orderbook( self, symbol: Symbol, depth: int, *, min_interval: float = 0.0, ) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots over WebSocket. Uses Binance's *partial book depth* stream, which pushes a fully sorted top-N snapshot — N is clamped to the supported {5, 10, 20}. """ pair = self.render_symbol(symbol).lower() levels = 5 if depth <= 5 else 10 if depth <= 10 else 20 ws = _BinanceDepthWS(pair, levels) return ws.stream(min_interval=min_interval)
class _BinanceKlineWS(WebSocketBase): def __init__(self, pair: str, interval: str) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@kline_{interval}") async def parse_message(self, raw: str | bytes) -> AsyncIterator[OHLCBar]: """Parse a raw WebSocket frame into domain records.""" data = json.loads(raw) if data.get("e") != "kline": return k = data["k"] yield OHLCBar( ts=int(k["t"]) * 1_000_000, open=float(k["o"]), high=float(k["h"]), low=float(k["l"]), close=float(k["c"]), volume=float(k["v"]), quote_volume=float(k["q"]), trades=int(k["n"]), ) class _BinanceTradeWS(WebSocketBase): def __init__(self, pair: str) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@aggTrade") async def parse_message(self, raw: str | bytes) -> AsyncIterator[Trade]: """Parse a raw WebSocket frame into domain records.""" data = json.loads(raw) if data.get("e") != "aggTrade": return yield Trade( ts=int(data["T"]) * 1_000_000, price=float(data["p"]), amount=float(data["q"]), side="sell" if data["m"] else "buy", tid=str(data["a"]), ) class _BinanceDepthWS(WebSocketBase): # Partial Book Depth Stream: a fully sorted top-N snapshot every 100ms # (``bids`` best-first, ``asks`` best-first). NOT ``@depth`` (the *diff* # stream), whose unsorted partial updates make best bid/ask meaningless and # often crossed. def __init__(self, pair: str, levels: int = 20) -> None: super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@depth{levels}@100ms") async def parse_message(self, raw: str | bytes) -> AsyncIterator[OrderBookSnapshot]: """Parse a raw WebSocket frame into domain records.""" import time data = json.loads(raw) if "bids" not in data or "asks" not in data: return bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]] yield OrderBookSnapshot( ts=s_to_ns(time.time()), bids=bids, asks=asks, is_snapshot=True, ) async def stream(self, min_interval: float = 0.0) -> AsyncIterator[OrderBookSnapshot]: """Yield parsed order-book snapshots, throttled to *min_interval* seconds. The throttle check is applied on the raw frame **before** calling ``parse_message`` so no pydantic objects are constructed for frames that will be discarded. ``min_interval=0.0`` preserves the legacy per-frame behaviour. """ import time as _time last_emit: float = -float("inf") # first frame always emits async for raw in self.stream_raw(): # Fast path: do a cheap JSON sniff before committing to parse_message. now = _time.monotonic() if now - last_emit < min_interval: continue async for record in self.parse_message(raw): last_emit = _time.monotonic() yield record