"""Binance source adapter — OHLC, trades, order book (REST + WS)."""
from __future__ import annotations
import json
import logging
from collections.abc import AsyncIterator
from typing import Any
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import binance_interval, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
OHLCHistory,
OHLCLive,
OrderBookLive,
OrderBookSnapshotREST,
TradesHistory,
TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase
__all__ = ["BinanceSource"]
logger = logging.getLogger(__name__)
_BASE_REST = "https://api.binance.com/api/v3"
_BASE_WS = "wss://stream.binance.com:9443/stream"
[docs]
class BinanceSource(
OHLCHistory,
TradesHistory,
OrderBookSnapshotREST,
OHLCLive,
TradesLive,
OrderBookLive,
):
"""Binance source adapter (spot).
The reference adapter — full historical depth and every live channel.
- **Backfill**: OHLC (``klines``, 1 000/req), trades (``aggTrades``,
cursor-paginated by ``fromId``), order-book snapshot (``depth``, ≤ 5 000).
- **Stream**: OHLC (``kline``), trades (``aggTrade``), order book (``depth``).
Adapters are not used directly — they are resolved by the engine from the
registry. Drive them through :class:`dccd.Client` or the CLI.
See Also
--------
dccd.Client : the public facade.
dccd.sources.registry.SourceRegistry : adapter resolution.
Examples
--------
>>> from dccd.sources.binance import BinanceSource
>>> sorted({c.data_type.value for c in BinanceSource().capabilities()})
['ohlc', 'orderbook', 'trades']
"""
exchange = "binance"
def __init__(self, http: AsyncHTTPClient | None = None) -> None:
self._http = http or AsyncHTTPClient()
self._owned_http = http is None
[docs]
def capabilities(self) -> list[Capability]:
"""Declared capabilities, one per (data type × transport × mode)."""
return [
Capability(
data_type=DataType.OHLC, transport="rest", mode="historical",
history="full", max_per_request=1000, page_direction="forward",
spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 28800,
43200, 86400, 259200, 604800, 2592000],
),
Capability(
data_type=DataType.TRADES, transport="rest", mode="historical",
history="full", max_per_request=1000, page_direction="forward",
),
Capability(
data_type=DataType.ORDERBOOK, transport="rest", mode="historical",
max_per_request=1, max_depth=5000,
),
Capability(data_type=DataType.OHLC, transport="ws", mode="live"),
Capability(data_type=DataType.TRADES, transport="ws", mode="live"),
Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=20),
]
[docs]
def render_symbol(self, s: Symbol) -> str:
"""Binance format: BTCUSDT (no separator)."""
return f"{s.base}{s.quote}"
[docs]
async def fetch_ohlc_page(
self,
symbol: Symbol,
span: int,
start_ns: int,
end_ns: int,
limit: int,
) -> list[OHLCBar]:
"""Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`)."""
interval = binance_interval(span)
if not interval:
return []
params: dict[str, Any] = {
"symbol": self.render_symbol(symbol),
"interval": interval,
"startTime": start_ns // 1_000_000,
"endTime": end_ns // 1_000_000,
"limit": limit,
}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/klines", params)
return [
OHLCBar(
ts=int(e[0]) * 1_000_000,
open=float(e[1]),
high=float(e[2]),
low=float(e[3]),
close=float(e[4]),
volume=float(e[5]),
quote_volume=float(e[7]),
trades=int(e[8]),
)
for e in data
]
[docs]
async def fetch_trades_page(
self,
symbol: Symbol,
start_ns: int,
end_ns: int,
limit: int,
cursor: str | None = None,
) -> tuple[list[Trade], str | None]:
"""Fetch one page of aggregate trades (cursor = ``fromId``).
First call (``cursor=None``) is time-bounded on ``start_ns``; subsequent
calls follow the ``fromId`` cursor. The next cursor is the last
aggregate-trade id + 1, returned only while the page is full and the
last trade is still inside the window.
"""
sym = self.render_symbol(symbol)
end_ms = end_ns // 1_000_000
if cursor is None:
params: dict[str, Any] = {
"symbol": sym,
"startTime": start_ns // 1_000_000,
"endTime": end_ms,
"limit": limit,
}
else:
params = {"symbol": sym, "fromId": int(cursor), "limit": limit}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/aggTrades", params)
if not data:
return [], None
trades = [
Trade(
ts=int(e["T"]) * 1_000_000,
price=float(e["p"]),
amount=float(e["q"]),
side="sell" if e["m"] else "buy",
tid=str(e["a"]),
)
for e in data
]
last_ts_ms = int(data[-1]["T"])
next_cursor = (
str(int(data[-1]["a"]) + 1)
if len(data) >= limit and last_ts_ms < end_ms
else None
)
return trades, next_cursor
[docs]
async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot:
"""Fetch a current order-book snapshot up to *depth* levels."""
params = {"symbol": self.render_symbol(symbol), "limit": min(depth, 5000)}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/depth", params)
bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]]
asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]]
import time
return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
[docs]
def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]:
"""Stream live OHLC bars over WebSocket."""
interval = binance_interval(span) or "1m"
pair = self.render_symbol(symbol).lower()
ws = _BinanceKlineWS(pair, interval)
return ws.stream()
[docs]
def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]:
"""Stream live trades over WebSocket."""
pair = self.render_symbol(symbol).lower()
ws = _BinanceTradeWS(pair)
return ws.stream()
[docs]
def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]:
"""Stream live order-book snapshots over WebSocket.
Uses Binance's *partial book depth* stream, which pushes a fully sorted
top-N snapshot — N is clamped to the supported {5, 10, 20}.
"""
pair = self.render_symbol(symbol).lower()
levels = 5 if depth <= 5 else 10 if depth <= 10 else 20
ws = _BinanceDepthWS(pair, levels)
return ws.stream()
class _BinanceKlineWS(WebSocketBase):
def __init__(self, pair: str, interval: str) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@kline_{interval}")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[OHLCBar]:
"""Parse a raw WebSocket frame into domain records."""
data = json.loads(raw)
if data.get("e") != "kline":
return
k = data["k"]
yield OHLCBar(
ts=int(k["t"]) * 1_000_000,
open=float(k["o"]),
high=float(k["h"]),
low=float(k["l"]),
close=float(k["c"]),
volume=float(k["v"]),
quote_volume=float(k["q"]),
trades=int(k["n"]),
)
class _BinanceTradeWS(WebSocketBase):
def __init__(self, pair: str) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@aggTrade")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[Trade]:
"""Parse a raw WebSocket frame into domain records."""
data = json.loads(raw)
if data.get("e") != "aggTrade":
return
yield Trade(
ts=int(data["T"]) * 1_000_000,
price=float(data["p"]),
amount=float(data["q"]),
side="sell" if data["m"] else "buy",
tid=str(data["a"]),
)
class _BinanceDepthWS(WebSocketBase):
# Partial Book Depth Stream: a fully sorted top-N snapshot every 100ms
# (``bids`` best-first, ``asks`` best-first). NOT ``@depth`` (the *diff*
# stream), whose unsorted partial updates make best bid/ask meaningless and
# often crossed.
def __init__(self, pair: str, levels: int = 20) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@depth{levels}@100ms")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[OrderBookSnapshot]:
"""Parse a raw WebSocket frame into domain records."""
import time
data = json.loads(raw)
if "bids" not in data or "asks" not in data:
return
bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]]
asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]]
yield OrderBookSnapshot(
ts=s_to_ns(time.time()),
bids=bids,
asks=asks,
is_snapshot=True,
)