Source code for dccd.sources.bybit

"""Bybit source adapter — OHLC full history, trades recent only (60), order book."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import bybit_interval
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BybitSource"]

logger = logging.getLogger(__name__)

_BASE = "https://api.bybit.com/v5/market"


[docs] class BybitSource(OHLCHistory, OHLCLive, TradesLive, OrderBookSnapshotREST, OrderBookLive): """Bybit source adapter (spot). - **Backfill**: OHLC (full), order-book snapshot. **No trades** (see Notes). - **Stream**: OHLC, trades, order book. Notes ----- Bybit spot exposes only the ~60 most recent trades and no history, so ``TradesHistory`` is deliberately **not implemented** — a trades backfill raises :class:`~dccd.domain.errors.NoCapability` rather than returning a misleading recent slice. Live trades are still available via the stream. See Also -------- dccd.Client : the public facade. Examples -------- >>> from dccd.sources.bybit import BybitSource >>> BybitSource().capability_for(DataType.TRADES, 'rest', 'historical') is None True """ exchange = "bybit" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or AsyncHTTPClient()
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400, 604800, 2592000], ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_depth=200, ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=1000, depths=[1, 50, 200, 1000]), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string.""" return f"{s.base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" interval = bybit_interval(span) if not interval: return [] params: dict[str, Any] = { "category": "spot", "symbol": self.render_symbol(symbol), "interval": interval, "start": start_ns // 1_000_000, "end": end_ns // 1_000_000, "limit": min(limit, 1000), } async with self._http as client: data = await client.get(f"{_BASE}/kline", params) if data.get("retCode") != 0: logger.error("Bybit kline error: %s", data.get("retMsg")) return [] bars = [] for e in data.get("result", {}).get("list", []): bars.append(OHLCBar( ts=int(e[0]) * 1_000_000, open=float(e[1]), high=float(e[2]), low=float(e[3]), close=float(e[4]), volume=float(e[5]), quote_volume=float(e[6]) if len(e) > 6 else None, )) return bars
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"category": "spot", "symbol": self.render_symbol(symbol), "limit": min(depth, 200)} async with self._http as client: data = await client.get(f"{_BASE}/orderbook", params) book = data.get("result", {}) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in book.get("b", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in book.get("a", [])] ts_ms = book.get("ts", int(time.time() * 1000)) return OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "kline", bybit_interval(span) or "1") return ws.stream_ohlc()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "publicTrade") return ws.stream_trades()
[docs] def stream_orderbook( self, symbol: Symbol, depth: int, *, min_interval: float = 0.0, ) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots/deltas over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "orderbook", str(depth)) return ws.stream_orderbook(depth=depth, min_interval=min_interval)
class _BybitWS(WebSocketBase): def __init__(self, symbol: str, topic: str, param: str = "") -> None: super().__init__("wss://stream.bybit.com/v5/public/spot") self._symbol = symbol self._topic = topic self._param = param async def on_connect(self, ws: Any) -> None: """Send the subscription message after each (re)connect.""" full_topic = f"{self._topic}.{self._param}.{self._symbol}" if self._param else f"{self._topic}.{self._symbol}" await ws.send(json.dumps({"op": "subscribe", "args": [full_topic]})) def _check_sub_ack(self, data: dict[str, Any]) -> None: """Raise on a rejected subscription instead of silently filtering it.""" if data.get("op") == "subscribe" and data.get("success") is False: raise RuntimeError( f"bybit {self._topic} subscription rejected for " f"{self._symbol}: {data.get('ret_msg', 'unknown error')}" ) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: """Parse a raw WebSocket frame into domain records.""" return yield async def stream_ohlc(self) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue for k in data["data"]: yield OHLCBar( ts=int(k["start"]) * 1_000_000, open=float(k["open"]), high=float(k["high"]), low=float(k["low"]), close=float(k["close"]), volume=float(k["volume"]), ) async def stream_trades(self) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue for t in data["data"]: yield Trade( ts=int(t["T"]) * 1_000_000, price=float(t["p"]), amount=float(t["v"]), side=t.get("S", "").lower() or None, tid=t.get("i"), ) async def stream_orderbook( self, *, depth: int = 50, min_interval: float = 0.0 ) -> AsyncIterator[OrderBookSnapshot]: """Stream the order book, reconstructing full state from snapshot + deltas. Bybit sends one ``snapshot`` then ``delta`` frames (a level with size 0 is a removal). Delta frames are applied to cheap dict state on every WS frame; pydantic objects are only constructed when a capture is due (controlled by *min_interval*), eliminating per-frame CPU burn. At emit time both sides are sorted, truncated to *depth*, and the dicts are pruned to those same top-N levels. Bybit WS says the client truncates after applying updates; pruning at emit bounds stale-level retention to at most one interval. All emitted snapshots carry ``is_snapshot=True`` (full reconstructed state). """ state_bids: dict[float, float] = {} state_asks: dict[float, float] = {} last_emit: float = -float("inf") # ensure the first frame always emits async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue d = data["data"] is_snap = data.get("type") == "snapshot" if is_snap: state_bids = {float(b[0]): float(b[1]) for b in d.get("b", [])} state_asks = {float(a[0]): float(a[1]) for a in d.get("a", [])} else: for b in d.get("b", []): price, qty = float(b[0]), float(b[1]) if qty == 0: state_bids.pop(price, None) else: state_bids[price] = qty for a in d.get("a", []): price, qty = float(a[0]), float(a[1]) if qty == 0: state_asks.pop(price, None) else: state_asks[price] = qty # Throttle check: skip pydantic construction for frames that # won't be saved (min_interval=0.0 preserves legacy per-frame). now = time.monotonic() if now - last_emit < min_interval: continue last_emit = now # Sort + truncate to subscribed depth; prune dicts to match so # stale levels beyond depth are discarded at most one interval later. sorted_bids = sorted(state_bids.items(), reverse=True)[:depth] sorted_asks = sorted(state_asks.items())[:depth] state_bids = dict(sorted_bids) state_asks = dict(sorted_asks) bids = [OrderBookLevel(price=p, amount=q) for p, q in sorted_bids] asks = [OrderBookLevel(price=p, amount=q) for p, q in sorted_asks] ts_ms = d.get("ts", int(time.time() * 1000)) yield OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks, is_snapshot=True)