Source code for dccd.sources.bitmex

"""BitMEX source adapter — bucketed OHLC (1m/5m/1h/1d), trades, order book."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import NS, s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesHistory,
    TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BitMEXSource"]

logger = logging.getLogger(__name__)

_BASE = "https://www.bitmex.com/api/v1"
_WS_URL = "wss://www.bitmex.com/realtime"

_BITMEX_BINS = {60: "1m", 300: "5m", 3600: "1h", 86400: "1d"}


[docs] class BitMEXSource(OHLCHistory, TradesHistory, OrderBookSnapshotREST, OHLCLive, TradesLive, OrderBookLive): """BitMEX source adapter. - **Backfill**: OHLC (bucketed — **1m / 5m / 1h / 1d only**), trades (full), order-book snapshot (``orderBook/L2``). - **Stream**: OHLC, trades, order book. Notes ----- BitMEX buckets candles, so only the four spans above are available; other spans raise. BTC is rendered ``XBT`` (e.g. ``XBTUSD``). See Also -------- dccd.Client : the public facade. Examples -------- >>> from dccd.sources.bitmex import BitMEXSource >>> sorted(BitMEXSource().capability_for(DataType.OHLC, 'rest', 'historical').spans) [60, 300, 3600, 86400] """ exchange = "bitmex" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or AsyncHTTPClient()
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=list(_BITMEX_BINS.keys()), ), Capability( data_type=DataType.TRADES, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", ), Capability(data_type=DataType.ORDERBOOK, transport="rest", mode="historical"), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=10), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string.""" base = "XBT" if s.base == "BTC" else s.base return f"{base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" bin_size = _BITMEX_BINS.get(span) if not bin_size: return [] from datetime import datetime, timezone start_dt = datetime.fromtimestamp(start_ns / NS, tz=timezone.utc).isoformat() params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "binSize": bin_size, "startTime": start_dt, "count": min(limit, 1000), "reverse": False, } async with self._http as client: data = await client.get(f"{_BASE}/trade/bucketed", params) bars = [] for e in (data if isinstance(data, list) else []): from datetime import datetime, timezone ts_str = e.get("timestamp", "") try: dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) ts_ns = s_to_ns(dt.timestamp()) except Exception: continue if ts_ns > end_ns: break bars.append(OHLCBar( ts=ts_ns, open=float(e.get("open") or 0), high=float(e.get("high") or 0), low=float(e.get("low") or 0), close=float(e.get("close") or 0), volume=float(e.get("volume") or 0), )) return bars
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch one page of trades (cursor = ISO ``startTime``, forward). BitMEX returns up to 1 000 trades ascending from ``startTime``. On a full page we advance the cursor to the last timestamp + 1 ms to avoid re-fetching the boundary trade, and continue until the window is drained. """ from datetime import datetime, timedelta, timezone page_count = min(limit, 1000) if cursor is not None: start_dt = cursor else: start_dt = datetime.fromtimestamp(start_ns / NS, tz=timezone.utc).isoformat() params: dict[str, Any] = { "symbol": self.render_symbol(symbol), "startTime": start_dt, "count": page_count, "reverse": False, } async with self._http as client: data = await client.get(f"{_BASE}/trade", params) rows = data if isinstance(data, list) else [] trades: list[Trade] = [] last_ts_ns: int | None = None for e in rows: ts_str = e.get("timestamp", "") try: dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) ts_ns = s_to_ns(dt.timestamp()) except Exception: continue last_ts_ns = ts_ns trades.append(Trade( ts=ts_ns, price=float(e.get("price") or 0), amount=float(e.get("size") or 0), side="buy" if e.get("side") == "Buy" else "sell", tid=str(e.get("trdMatchID", "")), )) if len(rows) < page_count or last_ts_ns is None or last_ts_ns >= end_ns: return trades, None next_dt = datetime.fromtimestamp(last_ts_ns / NS, tz=timezone.utc) + timedelta(milliseconds=1) return trades, next_dt.isoformat()
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"symbol": self.render_symbol(symbol), "depth": depth} async with self._http as client: data = await client.get(f"{_BASE}/orderBook/L2", params) bids, asks = [], [] for e in (data if isinstance(data, list) else []): lvl = OrderBookLevel(price=float(e.get("price", 0)), amount=float(e.get("size", 0))) if e.get("side") == "Buy": bids.append(lvl) else: asks.append(lvl) return OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" bin_size = _BITMEX_BINS.get(span, "1m") ws = _BitMEXWS(self.render_symbol(symbol), f"tradeBin{bin_size}", "ohlc") return ws.stream()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" ws = _BitMEXWS(self.render_symbol(symbol), "trade", "trades") return ws.stream()
[docs] def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots over WebSocket. Uses the ``orderBook10`` topic — a full top-10 snapshot on every update — rather than ``orderBookL2_25``, whose id-keyed insert/update/delete deltas carry no price on updates and can't yield a correct best bid-ask. """ ws = _BitMEXWS(self.render_symbol(symbol), "orderBook10", "book") return ws.stream()
class _BitMEXWS(WebSocketBase): def __init__(self, symbol: str, topic: str, mode: str) -> None: super().__init__(_WS_URL) self._symbol = symbol self._topic = topic self._mode = mode async def on_connect(self, ws: Any) -> None: """Send the subscription message after each (re)connect.""" await ws.send(json.dumps({"op": "subscribe", "args": [f"{self._topic}:{self._symbol}"]})) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: """Parse a raw WebSocket frame into domain records.""" from datetime import datetime data = json.loads(raw) if "data" not in data: return if self._mode == "ohlc": for e in data["data"]: ts_str = e.get("timestamp", "") try: dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) ts_ns = s_to_ns(dt.timestamp()) except Exception: continue yield OHLCBar( ts=ts_ns, open=float(e.get("open") or 0), high=float(e.get("high") or 0), low=float(e.get("low") or 0), close=float(e.get("close") or 0), volume=float(e.get("volume") or 0), ) elif self._mode == "trades": for e in data["data"]: ts_str = e.get("timestamp", "") try: dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) ts_ns = s_to_ns(dt.timestamp()) except Exception: continue yield Trade( ts=ts_ns, price=float(e.get("price") or 0), amount=float(e.get("size") or 0), side="buy" if e.get("side") == "Buy" else "sell", tid=str(e.get("trdMatchID", "")), ) elif self._mode == "book": # orderBook10: each frame carries the full top-10 as `bids`/`asks` # arrays of [price, size] (bids best-first, asks best-first). for e in data["data"]: bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in e.get("bids", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in e.get("asks", [])] if bids or asks: yield OrderBookSnapshot(ts=s_to_ns(time.time()), bids=bids, asks=asks, is_snapshot=True)