Source code for dccd.continuous_dl.bitmex

#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-08-07 11:16:51
# @Last modified by: ArthurBernard
# @Last modified time: 2019-09-11 08:46:12

""" Objects and functions to download data from Bitmex exchange.

.. currentmodule:: dccd.continuous_dl.bitmex

These functions and objects allow you to continuously download data and update
your database.

High level API
--------------

.. autofunction:: get_data_bitmex
.. autofunction:: get_orderbook_bitmex
.. autofunction:: get_trades_bitmex

Low level API
-------------

.. autoclass:: dccd.continuous_dl.bitmex.DownloadBitmexData
   :members: set_process_data, set_saver
   :special-members: __call__
   :show-inheritance:

"""

# Built-in packages
import time
from datetime import datetime as dt
from typing import Any

from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.process_data import set_marketdepth, set_trades

# Third party packages
# Local packages
from dccd.tools.io import IODataBase

__all__ = [
    'DownloadBitmexData', 'get_data_bitmex', 'get_orderbook_bitmex',
    'get_trades_bitmex',
]

# =========================================================================== #
#                              Parser functions                               #
# =========================================================================== #


def _parser_trades(tData: dict[str, Any], i: int = 0) -> dict[str, Any]:
    """Parse a single trade entry from a Bitmex WebSocket message.

    Parameters
    ----------
    tData : dict
        Raw trade dict from the Bitmex 'trade' table action.
        Expected keys: ``'timestamp'``, ``'price'``, ``'size'``, ``'side'``.
    i : int, optional
        Index used to make the ``tid`` unique within the same millisecond,
        default is 0.

    Returns
    -------
    dict
        Normalised trade with keys: ``'tid'``, ``'timestamp'``,
        ``'price'``, ``'amount'``, ``'type'``.

    """
    t = dt.strptime(tData['timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()

    return {
        'tid': int(t * 1000 + i),
        'timestamp': int(t * 1000),
        'price': tData['price'],
        'amount': tData['size'],
        'type': tData['side'].lower(),
    }


def _parser_book(tData: dict[str, Any]) -> dict[str, Any] | int:
    """Parse a single order-book entry from a Bitmex WebSocket message.

    Parameters
    ----------
    tData : dict
        Raw order dict from the Bitmex 'orderBookL2' table action.
        Expected keys: ``'side'``, and optionally ``'price'`` and ``'size'``.

    Returns
    -------
    dict or int
        If ``'price'`` is present: ``{'amount': signed_size, 'price': price}``.
        Otherwise: signed size as int (positive for Buy, negative for Sell).

    """
    if tData['side'] == 'Buy':
        s = 1
    else:
        s = -1

    if 'price' in tData.keys():
        return {'amount': s * tData['size'], 'price': tData['price']}

    return s * tData['size']


# =========================================================================== #
#                              Download objects                               #
# =========================================================================== #


[docs] class DownloadBitmexData(ContinuousDownloader): """ Basis object to download data from a stream websocket client API. Parameters ---------- time_step : int or None, optional Number of seconds between two snapshots of data, minimum is 1, default is 60 (one minute). Each ``time_step`` seconds data will be processed and pushed to the database. Pass ``None`` to receive data tick-by-tick without periodic aggregation. until : int, optional Number of seconds before stopping, or a future Unix timestamp at which to stop. Default is ``3600`` (one hour). Attributes ---------- host : str Address of host to connect. conn_par : dict Parameters of websocket connection. ws : websockets.client.WebSocketClientProtocol Connection with the websocket client. is_connect : bool True if is connected, False otherwise. ts : int Number of second between two snapshots of data. t : int Current timestamp but rounded by `ts`. until : int Timestamp to stop to download data. Methods ------- set_process_data set_saver __call__ """ def __init__(self, time_step: int = 60, until: int | None = 3600) -> None: """ Initialize object. Parameters ---------- time_step : int or None, optional Snapshot interval in seconds. Default is ``60``. until : int or None, optional Seconds to run, or a future Unix timestamp to stop at. Default is ``3600``. """ stop: int if until is None: stop = 0 elif until > time.time(): stop = until - int(time.time()) else: stop = until ContinuousDownloader.__init__(self, 'bitmex', time_step=time_step, STOP=stop) self._parser_data: dict[str, Any] = { 'orderBookL2_25': self.parser_book, 'trade': self.parser_trades, } self.start = False self._load_checkpoint()
[docs] def parser_book(self, data: dict[str, Any]) -> None: """ Parse and maintain a local copy of the order book. Handles ``partial`` (snapshot), ``insert``, ``update``, and ``delete`` actions from the Bitmex WebSocket feed. Parameters ---------- data : dict Order book message from the WebSocket API. Must contain ``'action'`` and ``'data'`` keys. """ action = data['action'] for d in data['data']: if action == 'partial': self.d[d['id']] = _parser_book(d) self.start = True elif not self.start: self.logger.info("Waiting data") continue elif action == 'delete': self.d.pop(d['id']) elif action == 'insert': self.d[d['id']] = _parser_book(d) elif action == 'update': self.d[d['id']]['amount'] = _parser_book(d) else: self.logger.error('Unknown action {}: {}'.format(action, data)) self._data.setdefault(self.t, {'trades': [], 'book': {}})['book'] = { v['price']: v['amount'] for v in self.d.values() }
[docs] def parser_trades(self, data: dict[str, Any]) -> None: """ Parse trade data and accumulate records for the current timestep. Parameters ---------- data : dict Trade message from the WebSocket API. Must contain a ``'data'`` key with a list of trade records. """ slot = self._data.setdefault(self.t, {'trades': [], 'book': {}}) for i, d in enumerate(data['data']): slot['trades'].append(_parser_trades(d, i))
def _restore_book_state(self, state: dict[int, Any]) -> None: # type: ignore[override] self.d = {int(k): v for k, v in state.items()}
[docs] async def on_message(self, data: dict[str, Any] | list[Any]) -> None: """ Route an incoming websocket message to the appropriate parser. """ if isinstance(data, dict): if 'action' not in data.keys(): self.logger.info('No action: {}'.format(data)) else: self.parser(data) else: self.logger.error('Not recognizing: {}'.format(data))
[docs] def __call__(self, *args: str) -> 'DownloadBitmexData': """ Open a websocket connection and save/update the database. Run asynchronously two loops to get data from Bitmex websocket and save/update the database. Parameters ---------- *args : str Positional arguments joined with ``':'`` and passed as the ``args`` subscribe parameter. The first element should be the channel name (e.g. ``'orderBookL2_25'`` or ``'trade'``) followed by any instrument symbol (e.g. ``'XBTUSD'``). Warnings -------- '_raw' option not yet working for Bitmex. References ---------- .. [1] https://www.bitmex.com/api/ """ self.parser = self.get_parser(args[0]) self.logger.info('Try connect WS and set {} stream.'.format(args[0])) return super().__call__(args=':'.join(args)) # type: ignore[return-value]
# =========================================================================== # # High level functions # # =========================================================================== #
[docs] def get_data_bitmex(process_func: Any, *args: str, time_step: int = 60, until: int | None = None, path: str | None = None, save_method: str = 'dataframe', io_params: dict[str, Any] = {}, **kwargs: Any) -> None: """ Download data from Bitmex exchange and update the database. Parameters ---------- process_func : callable Function to process and clean data before saving. Must accept ``data`` as its first argument plus optional keyword arguments; see :mod:`dccd.process_data` for examples. *args : str Channel and optional instrument, e.g. ``'trade', 'XBTUSD'``. Passed directly to :meth:`DownloadBitmexData.__call__`. time_step : int, optional Number of seconds between snapshots, default ``60`` (1 minute). until : int, optional Seconds to run, or a future Unix timestamp to stop at. ``None`` or ``0`` means run indefinitely. path : str, optional Directory for the database. Defaults to ``'database/bitmex/{channel}'``. save_method : {'DataFrame', 'SQLite', 'CSV', 'Excel', 'PostgreSQL',\ 'Oracle', 'MSSQL', 'MySQL'}, optional Storage format for :class:`~dccd.tools.io.IODataBase`, default ``'dataframe'``. io_params : dict, optional Extra keyword arguments forwarded to the :class:`~dccd.tools.io.IODataBase` callable. **kwargs Additional keyword arguments forwarded to the websocket connector. Warnings -------- '_raw' option not yet working for Bitmex. See Also -------- process_data : helper functions to transform raw payloads. tools.io.IODataBase : persistence layer. References ---------- .. [1] https://www.bitmex.com/api/ """ if path is None: path = 'database/bitmex/{}'.format(args[0]) saver = IODataBase(path, method=save_method) downloader = DownloadBitmexData(time_step=time_step, until=until) downloader.set_process_data(process_func) downloader.set_saver(saver, **io_params) downloader(*args)
[docs] def get_orderbook_bitmex(*args: str, time_step: int = 60, until: int | None = None, path: str | None = None, save_method: str = 'dataframe', io_params: dict[str, Any] = {}) -> None: """ Download reconstructed order book from Bitmex exchange. """ get_data_bitmex(set_marketdepth, *args, time_step=time_step, until=until, path=path, save_method=save_method, io_params=io_params)
[docs] def get_trades_bitmex(*args: str, time_step: int = 60, until: int | None = None, path: str | None = None, save_method: str = 'dataframe', io_params: dict[str, Any] = {}) -> None: """ Download trades tick by tick from Bitmex exchange. """ get_data_bitmex(set_trades, *args, time_step=time_step, until=until, path=path, save_method=save_method, io_params=io_params)