"""Binance source adapter — OHLC, trades, order book (REST + WS)."""
from __future__ import annotations
import json
import logging
from collections.abc import AsyncIterator
from typing import Any
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import binance_interval, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
OHLCHistory,
OHLCLive,
OrderBookLive,
OrderBookSnapshotREST,
TradesHistory,
TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase
__all__ = ["BinanceSource"]
logger = logging.getLogger(__name__)
_BASE_REST = "https://api.binance.com/api/v3"
_BASE_WS = "wss://stream.binance.com:9443/stream"
[docs]
class BinanceSource(
OHLCHistory,
TradesHistory,
OrderBookSnapshotREST,
OHLCLive,
TradesLive,
OrderBookLive,
):
"""Binance adapter for OHLC, trades and order book.
Supports: REST historical (klines 1000/req, aggTrades 1000/req, depth 5000),
WebSocket live (kline, aggTrade, depth).
"""
exchange = "binance"
def __init__(self, http: AsyncHTTPClient | None = None) -> None:
self._http = http or AsyncHTTPClient()
self._owned_http = http is None
def capabilities(self) -> list[Capability]:
return [
Capability(
data_type=DataType.OHLC, transport="rest", mode="historical",
history="full", max_per_request=1000, page_direction="forward",
spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 28800,
43200, 86400, 259200, 604800, 2592000],
),
Capability(
data_type=DataType.TRADES, transport="rest", mode="historical",
history="full", max_per_request=1000, page_direction="forward",
),
Capability(
data_type=DataType.ORDERBOOK, transport="rest", mode="historical",
max_per_request=1, max_depth=5000,
),
Capability(data_type=DataType.OHLC, transport="ws", mode="live"),
Capability(data_type=DataType.TRADES, transport="ws", mode="live"),
Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=5000),
]
[docs]
def render_symbol(self, s: Symbol) -> str:
"""Binance format: BTCUSDT (no separator)."""
return f"{s.base}{s.quote}"
async def fetch_ohlc_page(
self,
symbol: Symbol,
span: int,
start_ns: int,
end_ns: int,
limit: int,
) -> list[OHLCBar]:
interval = binance_interval(span)
if not interval:
return []
params: dict[str, Any] = {
"symbol": self.render_symbol(symbol),
"interval": interval,
"startTime": start_ns // 1_000_000,
"endTime": end_ns // 1_000_000,
"limit": limit,
}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/klines", params)
return [
OHLCBar(
ts=int(e[0]) * 1_000_000,
open=float(e[1]),
high=float(e[2]),
low=float(e[3]),
close=float(e[4]),
volume=float(e[5]),
quote_volume=float(e[7]),
trades=int(e[8]),
)
for e in data
]
[docs]
async def fetch_trades_page(
self,
symbol: Symbol,
start_ns: int,
end_ns: int,
limit: int,
cursor: str | None = None,
) -> tuple[list[Trade], str | None]:
"""Fetch one page of aggregate trades (cursor = ``fromId``).
First call (``cursor=None``) is time-bounded on ``start_ns``; subsequent
calls follow the ``fromId`` cursor. The next cursor is the last
aggregate-trade id + 1, returned only while the page is full and the
last trade is still inside the window.
"""
sym = self.render_symbol(symbol)
end_ms = end_ns // 1_000_000
if cursor is None:
params: dict[str, Any] = {
"symbol": sym,
"startTime": start_ns // 1_000_000,
"endTime": end_ms,
"limit": limit,
}
else:
params = {"symbol": sym, "fromId": int(cursor), "limit": limit}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/aggTrades", params)
if not data:
return [], None
trades = [
Trade(
ts=int(e["T"]) * 1_000_000,
price=float(e["p"]),
amount=float(e["q"]),
side="sell" if e["m"] else "buy",
tid=str(e["a"]),
)
for e in data
]
last_ts_ms = int(data[-1]["T"])
next_cursor = (
str(int(data[-1]["a"]) + 1)
if len(data) >= limit and last_ts_ms < end_ms
else None
)
return trades, next_cursor
async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot:
params = {"symbol": self.render_symbol(symbol), "limit": min(depth, 5000)}
async with self._http as client:
data = await client.get(f"{_BASE_REST}/depth", params)
bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data["bids"]]
asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data["asks"]]
import time
return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]:
interval = binance_interval(span) or "1m"
pair = self.render_symbol(symbol).lower()
ws = _BinanceKlineWS(pair, interval)
return ws.stream()
def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]:
pair = self.render_symbol(symbol).lower()
ws = _BinanceTradeWS(pair)
return ws.stream()
def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]:
pair = self.render_symbol(symbol).lower()
ws = _BinanceDepthWS(pair)
return ws.stream()
class _BinanceKlineWS(WebSocketBase):
def __init__(self, pair: str, interval: str) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@kline_{interval}")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[OHLCBar]:
data = json.loads(raw)
if data.get("e") != "kline":
return
k = data["k"]
yield OHLCBar(
ts=int(k["t"]) * 1_000_000,
open=float(k["o"]),
high=float(k["h"]),
low=float(k["l"]),
close=float(k["c"]),
volume=float(k["v"]),
quote_volume=float(k["q"]),
trades=int(k["n"]),
)
class _BinanceTradeWS(WebSocketBase):
def __init__(self, pair: str) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@aggTrade")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[Trade]:
data = json.loads(raw)
if data.get("e") != "aggTrade":
return
yield Trade(
ts=int(data["T"]) * 1_000_000,
price=float(data["p"]),
amount=float(data["q"]),
side="sell" if data["m"] else "buy",
tid=str(data["a"]),
)
class _BinanceDepthWS(WebSocketBase):
def __init__(self, pair: str) -> None:
super().__init__(f"wss://stream.binance.com:9443/ws/{pair}@depth@100ms")
async def parse_message(self, raw: str | bytes) -> AsyncIterator[OrderBookSnapshot]:
import time
data = json.loads(raw)
bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in data.get("b", [])]
asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in data.get("a", [])]
yield OrderBookSnapshot(
ts=s_to_ns(time.time()),
bids=bids,
asks=asks,
is_snapshot=False,
)