Source code for dccd.sources.bybit

"""Bybit source adapter — OHLC full history, trades recent only (60), order book."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import bybit_interval
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BybitSource"]

logger = logging.getLogger(__name__)

_BASE = "https://api.bybit.com/v5/market"


[docs] class BybitSource(OHLCHistory, OHLCLive, TradesLive, OrderBookSnapshotREST, OrderBookLive): """Bybit source adapter (spot). - **Backfill**: OHLC (full), order-book snapshot. **No trades** (see Notes). - **Stream**: OHLC, trades, order book. Notes ----- Bybit spot exposes only the ~60 most recent trades and no history, so ``TradesHistory`` is deliberately **not implemented** — a trades backfill raises :class:`~dccd.domain.errors.NoCapability` rather than returning a misleading recent slice. Live trades are still available via the stream. See Also -------- dccd.Client : the public facade. Examples -------- >>> from dccd.sources.bybit import BybitSource >>> BybitSource().capability_for(DataType.TRADES, 'rest', 'historical') is None True """ exchange = "bybit" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or AsyncHTTPClient()
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400, 604800, 2592000], ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_depth=200, ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=200), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string.""" return f"{s.base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" interval = bybit_interval(span) if not interval: return [] params: dict[str, Any] = { "category": "spot", "symbol": self.render_symbol(symbol), "interval": interval, "start": start_ns // 1_000_000, "end": end_ns // 1_000_000, "limit": min(limit, 1000), } async with self._http as client: data = await client.get(f"{_BASE}/kline", params) if data.get("retCode") != 0: logger.error("Bybit kline error: %s", data.get("retMsg")) return [] bars = [] for e in data.get("result", {}).get("list", []): bars.append(OHLCBar( ts=int(e[0]) * 1_000_000, open=float(e[1]), high=float(e[2]), low=float(e[3]), close=float(e[4]), volume=float(e[5]), quote_volume=float(e[6]) if len(e) > 6 else None, )) return bars
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"category": "spot", "symbol": self.render_symbol(symbol), "limit": min(depth, 200)} async with self._http as client: data = await client.get(f"{_BASE}/orderbook", params) book = data.get("result", {}) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in book.get("b", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in book.get("a", [])] ts_ms = book.get("ts", int(time.time() * 1000)) return OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "kline", bybit_interval(span) or "1") return ws.stream_ohlc()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "publicTrade") return ws.stream_trades()
[docs] def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots/deltas over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "orderbook", str(depth)) return ws.stream_orderbook()
class _BybitWS(WebSocketBase): def __init__(self, symbol: str, topic: str, param: str = "") -> None: super().__init__("wss://stream.bybit.com/v5/public/spot") self._symbol = symbol self._topic = topic self._param = param async def on_connect(self, ws: Any) -> None: """Send the subscription message after each (re)connect.""" full_topic = f"{self._topic}.{self._param}.{self._symbol}" if self._param else f"{self._topic}.{self._symbol}" await ws.send(json.dumps({"op": "subscribe", "args": [full_topic]})) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: """Parse a raw WebSocket frame into domain records.""" return yield async def stream_ohlc(self) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) if "data" not in data: continue for k in data["data"]: yield OHLCBar( ts=int(k["start"]) * 1_000_000, open=float(k["open"]), high=float(k["high"]), low=float(k["low"]), close=float(k["close"]), volume=float(k["volume"]), ) async def stream_trades(self) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) if "data" not in data: continue for t in data["data"]: yield Trade( ts=int(t["T"]) * 1_000_000, price=float(t["p"]), amount=float(t["v"]), side=t.get("S", "").lower() or None, tid=t.get("i"), ) async def stream_orderbook(self) -> AsyncIterator[OrderBookSnapshot]: """Stream the order book, reconstructing full state from snapshot + deltas. Bybit sends one ``snapshot`` then ``delta`` frames (a level with size 0 is a removal). Yielding the raw delta levels would surface a meaningless/crossed best bid-ask, so we keep the full book and emit it sorted on every frame. """ state_bids: dict[float, float] = {} state_asks: dict[float, float] = {} async for raw in self.stream_raw(): data = json.loads(raw) if "data" not in data: continue d = data["data"] is_snap = data.get("type") == "snapshot" if is_snap: state_bids = {float(b[0]): float(b[1]) for b in d.get("b", [])} state_asks = {float(a[0]): float(a[1]) for a in d.get("a", [])} else: for b in d.get("b", []): price, qty = float(b[0]), float(b[1]) if qty == 0: state_bids.pop(price, None) else: state_bids[price] = qty for a in d.get("a", []): price, qty = float(a[0]), float(a[1]) if qty == 0: state_asks.pop(price, None) else: state_asks[price] = qty bids = [OrderBookLevel(price=p, amount=q) for p, q in sorted(state_bids.items(), reverse=True)] asks = [OrderBookLevel(price=p, amount=q) for p, q in sorted(state_asks.items())] ts_ms = d.get("ts", int(time.time() * 1000)) yield OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks, is_snapshot=is_snap)