"""Bybit source adapter — OHLC full history, trades recent only (60), order book."""
from __future__ import annotations
import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import bybit_interval
from dccd.domain.types import DataType
from dccd.sources.base import (
OHLCHistory,
OHLCLive,
OrderBookLive,
OrderBookSnapshotREST,
TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase
__all__ = ["BybitSource"]
logger = logging.getLogger(__name__)
_BASE = "https://api.bybit.com/v5/market"
[docs]
class BybitSource(OHLCHistory, OHLCLive, TradesLive, OrderBookSnapshotREST, OrderBookLive):
"""Bybit source adapter (spot).
- **Backfill**: OHLC (full), order-book snapshot. **No trades** (see Notes).
- **Stream**: OHLC, trades, order book.
Notes
-----
Bybit spot exposes only the ~60 most recent trades and no history, so
``TradesHistory`` is deliberately **not implemented** — a trades backfill
raises :class:`~dccd.domain.errors.NoCapability` rather than returning a
misleading recent slice. Live trades are still available via the stream.
See Also
--------
dccd.Client : the public facade.
Examples
--------
>>> from dccd.sources.bybit import BybitSource
>>> BybitSource().capability_for(DataType.TRADES, 'rest', 'historical') is None
True
"""
exchange = "bybit"
def __init__(self, http: AsyncHTTPClient | None = None) -> None:
self._http = http or AsyncHTTPClient()
[docs]
def capabilities(self) -> list[Capability]:
"""Declared capabilities, one per (data type × transport × mode)."""
return [
Capability(
data_type=DataType.OHLC, transport="rest", mode="historical",
history="full", max_per_request=1000, page_direction="forward",
spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400, 604800, 2592000],
),
Capability(
data_type=DataType.ORDERBOOK, transport="rest", mode="historical",
max_depth=200,
),
Capability(data_type=DataType.OHLC, transport="ws", mode="live"),
Capability(data_type=DataType.TRADES, transport="ws", mode="live"),
Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=1000, depths=[1, 50, 200, 1000]),
]
[docs]
def render_symbol(self, s: Symbol) -> str:
"""Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string."""
return f"{s.base}{s.quote}"
[docs]
async def fetch_ohlc_page(
self,
symbol: Symbol,
span: int,
start_ns: int,
end_ns: int,
limit: int,
) -> list[OHLCBar]:
"""Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`)."""
interval = bybit_interval(span)
if not interval:
return []
params: dict[str, Any] = {
"category": "spot",
"symbol": self.render_symbol(symbol),
"interval": interval,
"start": start_ns // 1_000_000,
"end": end_ns // 1_000_000,
"limit": min(limit, 1000),
}
async with self._http as client:
data = await client.get(f"{_BASE}/kline", params)
if data.get("retCode") != 0:
logger.error("Bybit kline error: %s", data.get("retMsg"))
return []
bars = []
for e in data.get("result", {}).get("list", []):
bars.append(OHLCBar(
ts=int(e[0]) * 1_000_000,
open=float(e[1]),
high=float(e[2]),
low=float(e[3]),
close=float(e[4]),
volume=float(e[5]),
quote_volume=float(e[6]) if len(e) > 6 else None,
))
return bars
[docs]
async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot:
"""Fetch a current order-book snapshot up to *depth* levels."""
params = {"category": "spot", "symbol": self.render_symbol(symbol), "limit": min(depth, 200)}
async with self._http as client:
data = await client.get(f"{_BASE}/orderbook", params)
book = data.get("result", {})
bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in book.get("b", [])]
asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in book.get("a", [])]
ts_ms = book.get("ts", int(time.time() * 1000))
return OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks)
[docs]
def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]:
"""Stream live OHLC bars over WebSocket."""
ws = _BybitWS(self.render_symbol(symbol), "kline", bybit_interval(span) or "1")
return ws.stream_ohlc()
[docs]
def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]:
"""Stream live trades over WebSocket."""
ws = _BybitWS(self.render_symbol(symbol), "publicTrade")
return ws.stream_trades()
[docs]
def stream_orderbook(
self,
symbol: Symbol,
depth: int,
*,
min_interval: float = 0.0,
) -> AsyncIterator[OrderBookSnapshot]:
"""Stream live order-book snapshots/deltas over WebSocket."""
ws = _BybitWS(self.render_symbol(symbol), "orderbook", str(depth))
return ws.stream_orderbook(depth=depth, min_interval=min_interval)
class _BybitWS(WebSocketBase):
def __init__(self, symbol: str, topic: str, param: str = "") -> None:
super().__init__("wss://stream.bybit.com/v5/public/spot")
self._symbol = symbol
self._topic = topic
self._param = param
async def on_connect(self, ws: Any) -> None:
"""Send the subscription message after each (re)connect."""
full_topic = f"{self._topic}.{self._param}.{self._symbol}" if self._param else f"{self._topic}.{self._symbol}"
await ws.send(json.dumps({"op": "subscribe", "args": [full_topic]}))
def _check_sub_ack(self, data: dict[str, Any]) -> None:
"""Raise on a rejected subscription instead of silently filtering it."""
if data.get("op") == "subscribe" and data.get("success") is False:
raise RuntimeError(
f"bybit {self._topic} subscription rejected for "
f"{self._symbol}: {data.get('ret_msg', 'unknown error')}"
)
async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]:
"""Parse a raw WebSocket frame into domain records."""
return
yield
async def stream_ohlc(self) -> AsyncIterator[OHLCBar]:
"""Stream live OHLC bars over WebSocket."""
async for raw in self.stream_raw():
data = json.loads(raw)
self._check_sub_ack(data)
if "data" not in data:
continue
for k in data["data"]:
yield OHLCBar(
ts=int(k["start"]) * 1_000_000,
open=float(k["open"]),
high=float(k["high"]),
low=float(k["low"]),
close=float(k["close"]),
volume=float(k["volume"]),
)
async def stream_trades(self) -> AsyncIterator[Trade]:
"""Stream live trades over WebSocket."""
async for raw in self.stream_raw():
data = json.loads(raw)
self._check_sub_ack(data)
if "data" not in data:
continue
for t in data["data"]:
yield Trade(
ts=int(t["T"]) * 1_000_000,
price=float(t["p"]),
amount=float(t["v"]),
side=t.get("S", "").lower() or None,
tid=t.get("i"),
)
async def stream_orderbook(
self, *, depth: int = 50, min_interval: float = 0.0
) -> AsyncIterator[OrderBookSnapshot]:
"""Stream the order book, reconstructing full state from snapshot + deltas.
Bybit sends one ``snapshot`` then ``delta`` frames (a level with size 0
is a removal). Delta frames are applied to cheap dict state on every WS
frame; pydantic objects are only constructed when a capture is due
(controlled by *min_interval*), eliminating per-frame CPU burn.
At emit time both sides are sorted, truncated to *depth*, and the dicts
are pruned to those same top-N levels. Bybit WS says the client truncates
after applying updates; pruning at emit bounds stale-level retention to
at most one interval.
All emitted snapshots carry ``is_snapshot=True`` (full reconstructed state).
"""
state_bids: dict[float, float] = {}
state_asks: dict[float, float] = {}
last_emit: float = -float("inf") # ensure the first frame always emits
async for raw in self.stream_raw():
data = json.loads(raw)
self._check_sub_ack(data)
if "data" not in data:
continue
d = data["data"]
is_snap = data.get("type") == "snapshot"
if is_snap:
state_bids = {float(b[0]): float(b[1]) for b in d.get("b", [])}
state_asks = {float(a[0]): float(a[1]) for a in d.get("a", [])}
else:
for b in d.get("b", []):
price, qty = float(b[0]), float(b[1])
if qty == 0:
state_bids.pop(price, None)
else:
state_bids[price] = qty
for a in d.get("a", []):
price, qty = float(a[0]), float(a[1])
if qty == 0:
state_asks.pop(price, None)
else:
state_asks[price] = qty
# Throttle check: skip pydantic construction for frames that
# won't be saved (min_interval=0.0 preserves legacy per-frame).
now = time.monotonic()
if now - last_emit < min_interval:
continue
last_emit = now
# Sort + truncate to subscribed depth; prune dicts to match so
# stale levels beyond depth are discarded at most one interval later.
sorted_bids = sorted(state_bids.items(), reverse=True)[:depth]
sorted_asks = sorted(state_asks.items())[:depth]
state_bids = dict(sorted_bids)
state_asks = dict(sorted_asks)
bids = [OrderBookLevel(price=p, amount=q) for p, q in sorted_bids]
asks = [OrderBookLevel(price=p, amount=q) for p, q in sorted_asks]
ts_ms = d.get("ts", int(time.time() * 1000))
yield OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks, is_snapshot=True)