Source code for dccd.sources.bybit

"""Bybit source adapter — OHLC full history, trades recent only (60), order book, funding, open interest."""

from __future__ import annotations

# Built-in
import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

# Local
from dccd.domain.capability import Capability
from dccd.domain.records import (
    FundingRate,
    OHLCBar,
    OpenInterest,
    OrderBookLevel,
    OrderBookSnapshot,
    Trade,
)
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import bybit_interval, bybit_oi_interval
from dccd.domain.types import DataType
from dccd.sources.base import (
    FundingHistory,
    OHLCHistory,
    OHLCLive,
    OpenInterestHistory,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesLive,
    default_http_client,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["BybitSource"]

logger = logging.getLogger(__name__)

_BASE = "https://api.bybit.com/v5/market"


[docs] class BybitSource( OHLCHistory, OHLCLive, TradesLive, OrderBookSnapshotREST, OrderBookLive, FundingHistory, OpenInterestHistory, ): """Bybit source adapter (spot + USDT-perpetual funding/open interest). - **Backfill**: OHLC (full), order-book snapshot, realized funding (``linear`` ``perp`` market only), open interest (``linear`` ``perp``, full history back to symbol launch). **No trades** (see Notes). - **Stream**: OHLC, trades, order book. Notes ----- Bybit spot exposes only the ~60 most recent trades and no history, so ``TradesHistory`` is deliberately **not implemented** — a trades backfill raises :class:`~dccd.domain.errors.NoCapability` rather than returning a misleading recent slice. Live trades are still available via the stream. Funding history (``GET /v5/market/funding/history``) has two quirks: Bybit rejects a request carrying ``startTime`` without ``endTime`` (both must always be sent), and pages come back **newest-first** — the adapter walks backward, using each page's oldest timestamp minus one millisecond as the next page's ``endTime``. Open interest (``GET /v5/market/open-interest``) is span-typed (``5min`` to ``1d`` buckets via :func:`~dccd.domain.timeutils.bybit_oi_interval`) and, unlike funding, exposes a **real** ``nextPageCursor`` — it is passed through unchanged as the opaque cursor instead of being reconstructed from timestamps. See Also -------- dccd.Client : the public facade. Examples -------- >>> from dccd.sources.bybit import BybitSource >>> BybitSource().capability_for(DataType.TRADES, 'rest', 'historical') is None True """ exchange = "bybit" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or default_http_client(self.exchange)
[docs] def capabilities(self) -> list[Capability]: """Declared capabilities, one per (data type × transport × mode).""" return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=1000, page_direction="forward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400, 604800, 2592000], ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_depth=200, ), Capability( data_type=DataType.FUNDING, transport="rest", mode="historical", history="full", max_per_request=200, page_direction="backward", markets=["perp"], ), Capability( data_type=DataType.OPEN_INTEREST, transport="rest", mode="historical", history="full", max_per_request=200, page_direction="backward", markets=["perp"], spans=[300, 900, 1800, 3600, 14400, 86400], ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=1000, depths=[1, 50, 200, 1000]), ]
[docs] def render_symbol(self, s: Symbol) -> str: """Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string.""" return f"{s.base}{s.quote}"
[docs] async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: """Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`).""" interval = bybit_interval(span) if not interval: return [] params: dict[str, Any] = { "category": "spot", "symbol": self.render_symbol(symbol), "interval": interval, "start": start_ns // 1_000_000, "end": end_ns // 1_000_000, "limit": min(limit, 1000), } async with self._http as client: data = await client.get(f"{_BASE}/kline", params) if data.get("retCode") != 0: logger.error("Bybit kline error: %s", data.get("retMsg")) return [] bars = [] for e in data.get("result", {}).get("list", []): bars.append(OHLCBar( ts=int(e[0]) * 1_000_000, open=float(e[1]), high=float(e[2]), low=float(e[3]), close=float(e[4]), volume=float(e[5]), quote_volume=float(e[6]) if len(e) > 6 else None, )) return bars
[docs] async def fetch_funding_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[FundingRate], str | None]: """Fetch one page of realized funding rates, walking backward from ``end_ns``. Bybit's ``funding/history`` endpoint rejects a request that carries ``startTime`` without ``endTime`` — both are always sent. Pages are **newest-first**, so *cursor* (when set) is the previous page's oldest timestamp minus 1 ms, reused as this call's ``endTime``; ``startTime`` stays pinned to ``start_ns`` throughout the walk. ``linear`` category only — this endpoint has no spot equivalent. """ limit_eff = min(limit, 200) params: dict[str, Any] = { "category": "linear", "symbol": self.render_symbol(symbol), "startTime": start_ns // 1_000_000, "endTime": int(cursor) if cursor else end_ns // 1_000_000, "limit": limit_eff, } async with self._http as client: data = await client.get(f"{_BASE}/funding/history", params) if data.get("retCode") != 0: logger.error("Bybit funding error: %s", data.get("retMsg")) return [], None items = data.get("result", {}).get("list", []) if not items: return [], None rates = [ FundingRate( ts=int(e["fundingRateTimestamp"]) * 1_000_000, rate=float(e["fundingRate"]), ) for e in items ] oldest_ms = int(items[-1]["fundingRateTimestamp"]) next_cursor = ( str(oldest_ms - 1) if len(items) == limit_eff and oldest_ms > start_ns // 1_000_000 else None ) return rates, next_cursor
[docs] async def fetch_oi_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[OpenInterest], str | None]: """Fetch one page of open interest, following Bybit's real cursor. Unlike ``funding/history``, ``open-interest`` returns a genuine ``result.nextPageCursor`` — it is passed through unchanged rather than reconstructed from timestamps. ``linear`` category only (no spot equivalent). Returns ``([], None)`` when *span* has no :func:`~dccd.domain.timeutils.bybit_oi_interval` mapping. """ interval = bybit_oi_interval(span) if interval is None: return [], None params: dict[str, Any] = { "category": "linear", "symbol": self.render_symbol(symbol), "intervalTime": interval, "startTime": start_ns // 1_000_000, "endTime": end_ns // 1_000_000, "limit": min(limit, 200), } if cursor: params["cursor"] = cursor async with self._http as client: data = await client.get(f"{_BASE}/open-interest", params) if data.get("retCode") != 0: logger.error("Bybit open-interest error: %s", data.get("retMsg")) return [], None result = data.get("result", {}) oi = [ OpenInterest( ts=int(e["timestamp"]) * 1_000_000, open_interest=float(e["openInterest"]), ) for e in result.get("list", []) ] next_cursor = result.get("nextPageCursor") or None return oi, next_cursor
[docs] async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: """Fetch a current order-book snapshot up to *depth* levels.""" params = {"category": "spot", "symbol": self.render_symbol(symbol), "limit": min(depth, 200)} async with self._http as client: data = await client.get(f"{_BASE}/orderbook", params) book = data.get("result", {}) bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in book.get("b", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in book.get("a", [])] ts_ms = book.get("ts", int(time.time() * 1000)) return OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks)
[docs] def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "kline", bybit_interval(span) or "1") return ws.stream_ohlc()
[docs] def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "publicTrade") return ws.stream_trades()
[docs] def stream_orderbook( self, symbol: Symbol, depth: int, *, min_interval: float = 0.0, ) -> AsyncIterator[OrderBookSnapshot]: """Stream live order-book snapshots/deltas over WebSocket.""" ws = _BybitWS(self.render_symbol(symbol), "orderbook", str(depth)) return ws.stream_orderbook(depth=depth, min_interval=min_interval)
class _BybitWS(WebSocketBase): def __init__(self, symbol: str, topic: str, param: str = "") -> None: super().__init__("wss://stream.bybit.com/v5/public/spot") self._symbol = symbol self._topic = topic self._param = param async def on_connect(self, ws: Any) -> None: """Send the subscription message after each (re)connect.""" full_topic = f"{self._topic}.{self._param}.{self._symbol}" if self._param else f"{self._topic}.{self._symbol}" await ws.send(json.dumps({"op": "subscribe", "args": [full_topic]})) def _check_sub_ack(self, data: dict[str, Any]) -> None: """Raise on a rejected subscription instead of silently filtering it.""" if data.get("op") == "subscribe" and data.get("success") is False: raise RuntimeError( f"bybit {self._topic} subscription rejected for " f"{self._symbol}: {data.get('ret_msg', 'unknown error')}" ) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: """Parse a raw WebSocket frame into domain records.""" return yield async def stream_ohlc(self) -> AsyncIterator[OHLCBar]: """Stream live OHLC bars over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue for k in data["data"]: yield OHLCBar( ts=int(k["start"]) * 1_000_000, open=float(k["open"]), high=float(k["high"]), low=float(k["low"]), close=float(k["close"]), volume=float(k["volume"]), ) async def stream_trades(self) -> AsyncIterator[Trade]: """Stream live trades over WebSocket.""" async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue for t in data["data"]: yield Trade( ts=int(t["T"]) * 1_000_000, price=float(t["p"]), amount=float(t["v"]), side=t.get("S", "").lower() or None, tid=t.get("i"), ) async def stream_orderbook( self, *, depth: int = 50, min_interval: float = 0.0 ) -> AsyncIterator[OrderBookSnapshot]: """Stream the order book, reconstructing full state from snapshot + deltas. Bybit sends one ``snapshot`` then ``delta`` frames (a level with size 0 is a removal). Delta frames are applied to cheap dict state on every WS frame; pydantic objects are only constructed when a capture is due (controlled by *min_interval*), eliminating per-frame CPU burn. At emit time both sides are sorted, truncated to *depth*, and the dicts are pruned to those same top-N levels. Bybit WS says the client truncates after applying updates; pruning at emit bounds stale-level retention to at most one interval. All emitted snapshots carry ``is_snapshot=True`` (full reconstructed state). """ state_bids: dict[float, float] = {} state_asks: dict[float, float] = {} last_emit: float = -float("inf") # ensure the first frame always emits async for raw in self.stream_raw(): data = json.loads(raw) self._check_sub_ack(data) if "data" not in data: continue d = data["data"] is_snap = data.get("type") == "snapshot" if is_snap: state_bids = {float(b[0]): float(b[1]) for b in d.get("b", [])} state_asks = {float(a[0]): float(a[1]) for a in d.get("a", [])} else: for b in d.get("b", []): price, qty = float(b[0]), float(b[1]) if qty == 0: state_bids.pop(price, None) else: state_bids[price] = qty for a in d.get("a", []): price, qty = float(a[0]), float(a[1]) if qty == 0: state_asks.pop(price, None) else: state_asks[price] = qty # Throttle check: skip pydantic construction for frames that # won't be saved (min_interval=0.0 preserves legacy per-frame). now = time.monotonic() if now - last_emit < min_interval: continue last_emit = now # Sort + truncate to subscribed depth; prune dicts to match so # stale levels beyond depth are discarded at most one interval later. sorted_bids = sorted(state_bids.items(), reverse=True)[:depth] sorted_asks = sorted(state_asks.items())[:depth] state_bids = dict(sorted_bids) state_asks = dict(sorted_asks) bids = [OrderBookLevel(price=p, amount=q) for p, q in sorted_bids] asks = [OrderBookLevel(price=p, amount=q) for p, q in sorted_asks] ts_ms = d.get("ts", int(time.time() * 1000)) yield OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks, is_snapshot=True)