Source code for dccd.continuous_dl.okx

#!/usr/bin/env python3
# coding: utf-8

""" Objects and functions to download data from OKX exchange (WebSocket).

.. currentmodule:: dccd.continuous_dl.okx

High level API
--------------

.. autofunction:: get_data_okx
.. autofunction:: get_orderbook_okx
.. autofunction:: get_trades_okx

Low level API
-------------

.. autoclass:: dccd.continuous_dl.okx.DownloadOKXData
   :members: set_process_data, set_saver
   :special-members: __call__
   :show-inheritance:

"""

# Built-in packages
import logging
import time

# Third party packages
# Local packages
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.tools.io import IODataBase

__all__ = [
    'DownloadOKXData', 'get_data_okx', 'get_orderbook_okx', 'get_trades_okx',
]

_OKX_WS_URL = 'wss://ws.okx.com:8443/ws/v5/public'

_OKX_WS_INTERVALS: dict[int, str] = {
    60: '1m', 300: '5m', 900: '15m', 1800: '30m',
    3600: '1H', 7200: '2H', 14400: '4H', 21600: '6H', 43200: '12H',
    86400: '1D', 604800: '1W',
}


def _okx_ws_interval(span: int) -> str:
    """ Convert a span in seconds to an OKX WebSocket candle channel suffix. """
    try:
        return _OKX_WS_INTERVALS[span]
    except KeyError:
        raise ValueError(
            f"Unsupported OKX WebSocket interval: {span}s. "
            f"Supported: {list(_OKX_WS_INTERVALS)}"
        )


def _parser_trades(data: list[dict]) -> list[dict]:
    """ Parse a trades message from OKX WebSocket.

    Parameters
    ----------
    data : list of dict
        The ``data`` field of an OKX trades push message.

    Returns
    -------
    list of dict
        Each dict has keys: ``tid``, ``timestamp``, ``price``, ``amount``,
        ``type`` ('buy' or 'sell').

    """
    return [{
        'tid': int(d['tradeId']),
        'timestamp': int(d['ts']) / 1000,
        'price': float(d['px']),
        'amount': float(d['sz']),
        'type': d['side'],
    } for d in data]


def _parser_book(data: list[dict]) -> dict[str, float]:
    """ Parse a books message from OKX WebSocket.

    Parameters
    ----------
    data : list of dict
        The ``data`` field of an OKX books push message.
        Each element has ``bids`` and ``asks`` arrays of
        ``[price, size, ...]`` entries.

    Returns
    -------
    dict
        Price levels as strings; bids positive, asks prefixed with ``'-'``
        and negative. Zero size means the level was removed.

    """
    book: dict[str, float] = {}
    for snap in data:
        for bid in snap.get('bids', []):
            book[bid[0]] = float(bid[1])
        for ask in snap.get('asks', []):
            book['-' + ask[0]] = -float(ask[1])
    return book


def _parser_kline(data: list[list]) -> list[dict]:
    """ Parse a candle message from OKX WebSocket.

    Parameters
    ----------
    data : list of list
        The ``data`` field of an OKX candle push message.
        Each row: ``[ts_ms, open, high, low, close, vol, ...]``.

    Returns
    -------
    list of dict
        Each dict has keys: ``timestamp``, ``open``, ``high``, ``low``,
        ``close``, ``volume``.

    """
    return [{
        'timestamp': int(row[0]) / 1000,
        'open': float(row[1]),
        'high': float(row[2]),
        'low': float(row[3]),
        'close': float(row[4]),
        'volume': float(row[5]),
    } for row in data]


[docs] class DownloadOKXData(ContinuousDownloader): """ Download data continuously from OKX via WebSocket v5. Parameters ---------- pair : str Trading pair in OKX format (e.g. 'BTC-USDT'). time_step : int, optional Seconds between data snapshots, default is 60. until : int, optional Seconds to run or stop timestamp, default is 3600. span : int, optional Candle interval in seconds; if given, subscribes to the candle channel in addition to trades and book. Default is None. Attributes ---------- host : str WebSocket URL. is_connect : bool True if connected. ts : int Snapshot interval in seconds. until : int Stop timestamp. Methods ------- set_process_data set_saver __call__ """ def __init__(self, pair: str = 'BTC-USDT', time_step: int = 60, until: int | None = 3600, span: int | None = None, checkpoint_dir: str | None = None) -> None: """ Initialize object. """ if until is None: until = 0 elif until > time.time(): until -= int(time.time()) self.pair = pair args: list[dict] = [ {'channel': 'trades', 'instId': pair}, {'channel': 'books50-l2-tbt', 'instId': pair}, ] if span is not None: args.append({'channel': f'candle{_okx_ws_interval(span)}', 'instId': pair}) ContinuousDownloader.__init__( self, _OKX_WS_URL, time_step=time_step, STOP=until, checkpoint_dir=checkpoint_dir, subs={'op': 'subscribe', 'args': args}, ) self._parser_data = { 'trades': self.parser_trades, 'book': self.parser_book, 'kline': self.parser_kline, } self.logger = logging.getLogger(__name__) self._load_checkpoint()
[docs] async def on_message(self, msg: dict) -> None: """ Dispatch incoming OKX WebSocket push messages. Parameters ---------- msg : dict OKX push message with ``arg`` and ``data`` fields. """ channel = msg.get('arg', {}).get('channel', '') if channel == 'trades': self.parser_trades(msg['data']) elif channel.startswith('books'): self.parser_book(msg) elif channel.startswith('candle'): self.parser_kline(msg['data'])
[docs] def parser_trades(self, data: list[dict]) -> None: """ Parse and store a trades push message. Parameters ---------- data : list of dict The ``data`` field from the OKX trades push message. """ self._push_trades(_parser_trades(data))
[docs] def parser_book(self, msg: dict) -> None: """ Parse and update the order book from a books push message. Parameters ---------- msg : dict Full OKX books push message (contains ``action`` and ``data``). """ self._push_book_updates(_parser_book(msg.get('data', [])))
[docs] def parser_kline(self, data: list[list]) -> None: """ Parse and store a candle push message. Parameters ---------- data : list of list The ``data`` field from the OKX candle push message. """ for candle in _parser_kline(data): self._raw_parser(candle)
[docs] def get_trades_okx(path: str, pair: str = 'BTC-USDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download trades data from OKX via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in OKX format (e.g. 'BTC-USDT'), default is 'BTC-USDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadOKXData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_orderbook_okx(path: str, pair: str = 'BTC-USDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book data from OKX via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in OKX format (e.g. 'BTC-USDT'), default is 'BTC-USDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadOKXData(pair=pair, time_step=time_step, until=until) downloader.set_book_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_data_okx(path: str, pair: str = 'BTC-USDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book and trades data from OKX via WebSocket. Parameters ---------- path : str Root path; trades saved under ``<path>/trades/``, book under ``<path>/book/``. pair : str, optional Trading pair in OKX format (e.g. 'BTC-USDT'), default is 'BTC-USDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadOKXData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(f'{path}/trades', method=form)) downloader.set_book_saver(IODataBase(f'{path}/book', method=form)) downloader(pair=pair)