Source code for dccd.sources.okx

"""OKX source adapter — OHLC (candles + history-candles), trades, order book."""

from __future__ import annotations

import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any

from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import okx_interval
from dccd.domain.types import DataType
from dccd.sources.base import (
    OHLCHistory,
    OHLCLive,
    OrderBookLive,
    OrderBookSnapshotREST,
    TradesHistory,
    TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase

__all__ = ["OKXSource"]

logger = logging.getLogger(__name__)

_BASE = "https://www.okx.com/api/v5/market"
_WS_BASE = "wss://ws.okx.com:8443/ws/v5/public"


[docs] class OKXSource( OHLCHistory, TradesHistory, OrderBookSnapshotREST, OHLCLive, TradesLive, OrderBookLive, ): """OKX adapter — uses history-candles for deep OHLC, history-trades for deep trades.""" exchange = "okx" def __init__(self, http: AsyncHTTPClient | None = None) -> None: self._http = http or AsyncHTTPClient() def capabilities(self) -> list[Capability]: return [ Capability( data_type=DataType.OHLC, transport="rest", mode="historical", history="full", max_per_request=100, page_direction="backward", spans=[60, 180, 300, 900, 1800, 3600, 7200, 14400, 21600, 43200, 86400, 604800, 2592000], ), Capability( data_type=DataType.TRADES, transport="rest", mode="historical", history="full", max_per_request=100, page_direction="backward", ), Capability( data_type=DataType.ORDERBOOK, transport="rest", mode="historical", max_depth=400, ), Capability(data_type=DataType.OHLC, transport="ws", mode="live"), Capability(data_type=DataType.TRADES, transport="ws", mode="live"), Capability(data_type=DataType.ORDERBOOK, transport="ws", mode="live", max_depth=400), ] def render_symbol(self, s: Symbol) -> str: return f"{s.base}-{s.quote}" async def fetch_ohlc_page( self, symbol: Symbol, span: int, start_ns: int, end_ns: int, limit: int, ) -> list[OHLCBar]: bar = okx_interval(span) if not bar: return [] pair = self.render_symbol(symbol) params: dict[str, Any] = { "instId": pair, "bar": bar, "before": str(start_ns // 1_000_000), "after": str(end_ns // 1_000_000), "limit": min(limit, 100), } async with self._http as client: data = await client.get(f"{_BASE}/history-candles", params) bars = [] for e in data.get("data", []): bars.append(OHLCBar( ts=int(e[0]) * 1_000_000, open=float(e[1]), high=float(e[2]), low=float(e[3]), close=float(e[4]), volume=float(e[5]), quote_volume=float(e[6]) if len(e) > 6 else None, )) return bars
[docs] async def fetch_trades_page( self, symbol: Symbol, start_ns: int, end_ns: int, limit: int, cursor: str | None = None, ) -> tuple[list[Trade], str | None]: """Fetch one page of trades (cursor = OKX ``after`` ts in ms). OKX ``history-trades`` (type=2) returns records *earlier than* the ``after`` timestamp, newest first. We page **backward** from ``end_ns`` until the oldest item drops below ``start_ns`` or a short page arrives. """ pair = self.render_symbol(symbol) after = cursor if cursor is not None else str(end_ns // 1_000_000) params: dict[str, Any] = { "instId": pair, "limit": min(limit, 100), "after": after, "type": "2", } async with self._http as client: data = await client.get(f"{_BASE}/history-trades", params) rows = data.get("data", []) trades = [ Trade( ts=int(e["ts"]) * 1_000_000, price=float(e["px"]), amount=float(e["sz"]), side="buy" if e.get("side") == "buy" else "sell", tid=str(e.get("tradeId", "")), ) for e in rows ] if not rows or len(rows) < min(limit, 100): return trades, None oldest_ts_ms = int(rows[-1]["ts"]) next_cursor = str(oldest_ts_ms) if oldest_ts_ms > start_ns // 1_000_000 else None return trades, next_cursor
async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot: pair = self.render_symbol(symbol) params = {"instId": pair, "sz": min(depth, 400)} async with self._http as client: data = await client.get(f"{_BASE}/books", params) book = (data.get("data") or [{}])[0] bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in book.get("bids", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in book.get("asks", [])] ts_ms = int(book.get("ts", int(time.time() * 1000))) return OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks) def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]: bar = okx_interval(span) or "1m" ws = _OKXWS(self.render_symbol(symbol), "candle" + bar, "ohlc") return ws.stream() def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]: ws = _OKXWS(self.render_symbol(symbol), "trades", "trades") return ws.stream() def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]: ws = _OKXWS(self.render_symbol(symbol), "books", "books") return ws.stream()
class _OKXWS(WebSocketBase): def __init__(self, instId: str, channel: str, mode: str) -> None: super().__init__(_WS_BASE) self._instId = instId self._channel = channel self._mode = mode async def on_connect(self, ws: Any) -> None: await ws.send(json.dumps({ "op": "subscribe", "args": [{"channel": self._channel, "instId": self._instId}], })) async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: data = json.loads(raw) if "data" not in data: return if self._mode == "ohlc": for e in data["data"]: yield OHLCBar( ts=int(e[0]) * 1_000_000, open=float(e[1]), high=float(e[2]), low=float(e[3]), close=float(e[4]), volume=float(e[5]), ) elif self._mode == "trades": for e in data["data"]: yield Trade( ts=int(e["ts"]) * 1_000_000, price=float(e["px"]), amount=float(e["sz"]), side=e.get("side"), tid=str(e.get("tradeId", "")), ) elif self._mode == "books": for snap in data["data"]: bids = [OrderBookLevel(price=float(b[0]), amount=float(b[1])) for b in snap.get("bids", [])] asks = [OrderBookLevel(price=float(a[0]), amount=float(a[1])) for a in snap.get("asks", [])] ts_ms = int(snap.get("ts", int(time.time() * 1000))) yield OrderBookSnapshot(ts=ts_ms * 1_000_000, bids=bids, asks=asks)