Source code for dccd.histo_dl.okx

#!/usr/bin/env python3
# coding: utf-8

""" Objects to download historical data from OKX exchange.

.. currentmodule:: dccd.histo_dl.okx

.. autoclass:: FromOKX
   :members: import_data, save, get_data, import_trades, save_trades, import_orderbook, save_orderbook
   :show-inheritance:

"""

from __future__ import annotations

# Import built-in packages
from typing import Any

# Import third-party packages
# Import local packages
from dccd.histo_dl.exchange import ImportDataCryptoCurrencies

__all__ = ['FromOKX']

_OKX_INTERVALS = {
    60: '1m', 300: '5m', 900: '15m', 1800: '30m',
    3600: '1H', 7200: '2H', 14400: '4H', 21600: '6H', 43200: '12H',
    86400: '1D', 604800: '1W',
}


def okx_interval(span):
    """ Return the OKX bar string for the given span in seconds.

    Parameters
    ----------
    span : int
        Interval in seconds.

    Returns
    -------
    str
        OKX bar identifier.

    Examples
    --------
    >>> okx_interval(3600)
    '1H'

    >>> okx_interval(86400)
    '1D'

    """
    interval = _OKX_INTERVALS.get(span)
    if interval is None:
        raise ValueError(f"Unsupported OKX interval: {span}s")
    return interval


[docs] class FromOKX(ImportDataCryptoCurrencies): """ Class to import crypto-currencies data from the OKX exchange. Parameters ---------- path : str Root directory for data files. crypto : str Crypto-currency symbol, e.g. ``'BTC'``. span : int or str Candle interval in seconds (minimum 60) or a label such as ``'hourly'`` or ``'1h'``. fiat : str, optional Quote currency. Default is ``'USDT'``. The pair is formatted with a hyphen separator (e.g. ``'BTC'`` + ``'USDT'`` → ``'BTC-USDT'``). form : str, optional Legacy parameter — ignored. Storage is always Parquet via :class:`~dccd.storage.DataStore`. tz : str, optional Timezone for date parsing: ``'local'`` (default), ``'UTC'``, or any IANA timezone name. See Also -------- FromBinance, FromCoinbase, FromKraken, FromBybit Notes ----- Uses the OKX v5 REST API [1]_. References ---------- .. [1] https://www.okx.com/docs-v5/en/#rest-api-market-data-get-candlesticks Attributes ---------- pair : str Instrument ID (e.g. 'BTC-USDT'). start, end : int Timestamps bounding the download. span : int Seconds between observations. full_path : str Directory managed by :class:`~dccd.storage.DataStore` — ``{path}/okx/ohlc/{pair}/{span}/``. Methods ------- import_data save get_data import_trades save_trades import_orderbook save_orderbook """
[docs] @staticmethod def format_pair(crypto: str, fiat: str) -> str: """ Return the OKX pair symbol for *crypto* and *fiat*. Parameters ---------- crypto, fiat : str Asset symbols (e.g. ``'BTC'``, ``'USDT'``). Returns ------- str Dash-separated pair (e.g. ``'BTC-USDT'``). """ return crypto + '-' + fiat
def __init__(self, path, crypto, span, fiat='USDT', form='xlsx', tz='local'): ImportDataCryptoCurrencies.__init__( self, path, crypto, span, 'OKX', fiat, form, tz=tz ) self.pair = self.format_pair(crypto, fiat) def _import_data(self, start: int | str = 'last', end: int | str = 'now') -> list[dict[str, Any]]: self.start, self.end = self._set_time(start, end) param = { 'instId': self.pair, 'bar': okx_interval(self.span), 'before': self.start * 1000 - 1, # -1 ms: OKX `before` is exclusive 'after': self.end * 1000, 'limit': 300, } r = self._fetch('https://www.okx.com/api/v5/market/history-candles', param) body = r.json() if body.get('code', '0') != '0': raise RuntimeError(f"OKX API error {body['code']}: {body.get('msg', body)}") text = body['data'] text.reverse() data = [{ 'date': float(e[0]) / 1000, 'open': float(e[1]), 'high': float(e[2]), 'low': float(e[3]), 'close': float(e[4]), 'volume': float(e[5]), 'quoteVolume': float(e[7]), } for e in text] return data def _import_trades(self, start: int, end: int) -> list[dict[str, Any]]: r = self._fetch( 'https://www.okx.com/api/v5/market/trades', {'instId': self.pair, 'limit': 500}, ) return [{ 'tid': int(e['tradeId']), 'timestamp': float(e['ts']) / 1000, 'price': float(e['px']), 'amount': float(e['sz']), 'type': e['side'], } for e in r.json()['data']] def _import_orderbook(self, depth: int = 50) -> list[dict[str, Any]]: r = self._fetch( 'https://www.okx.com/api/v5/market/books', {'instId': self.pair, 'sz': depth}, ) book = r.json()['data'][0] result = [] for bid in book['bids']: count = int(bid[3]) if bid[3] else None result.append({'side': 'bid', 'price': bid[0], 'amount': float(bid[1]), 'count': count}) for ask in book['asks']: count = int(ask[3]) if ask[3] else None result.append({'side': 'ask', 'price': ask[0], 'amount': float(ask[1]), 'count': count}) return result
[docs] def import_data(self, start: int | str = 'last', end: int | str = 'now') -> ImportDataCryptoCurrencies: """ Download data from OKX for a specific time interval. Parameters ---------- start : int or str Timestamp of the first observation or date 'yyyy-mm-dd hh:mm:ss'. end : int or str Timestamp of the last observation or date 'yyyy-mm-dd hh:mm:ss'. Returns ------- data : pl.DataFrame OHLCV data sorted and cleaned. """ data = self._import_data(start=start, end=end) return self._sort_data(data)