Source code for dccd.continuous_dl.bybit

#!/usr/bin/env python3
# coding: utf-8

""" Objects and functions to download data from Bybit exchange (WebSocket).

.. currentmodule:: dccd.continuous_dl.bybit

High level API
--------------

.. autofunction:: get_data_bybit
.. autofunction:: get_orderbook_bybit
.. autofunction:: get_trades_bybit

Low level API
-------------

.. autoclass:: dccd.continuous_dl.bybit.DownloadBybitData
   :members: set_process_data, set_saver
   :special-members: __call__
   :show-inheritance:

"""

# Built-in packages
import logging
import time

# Third party packages
# Local packages
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.tools.io import IODataBase

__all__ = [
    'DownloadBybitData', 'get_data_bybit', 'get_orderbook_bybit',
    'get_trades_bybit',
]

_BYBIT_WS_URL = 'wss://stream.bybit.com/v5/public/spot'


def _parser_trades(msg):
    """Parse a publicTrade message from Bybit WebSocket v5.

    Parameters
    ----------
    msg : dict
        Raw message with a ``'data'`` list of trade dicts.
        Each trade dict contains: ``'i'`` (id), ``'T'`` (timestamp ms),
        ``'p'`` (price), ``'v'`` (volume), ``'S'`` (side 'Buy'/'Sell').

    Returns
    -------
    list of dict
        Each dict has keys: ``'tid'``, ``'timestamp'``, ``'price'``,
        ``'amount'``, ``'type'`` ('buy' or 'sell').

    """
    return [{
        'tid': int(d['i']),
        'timestamp': int(d['T']) / 1000,
        'price': float(d['p']),
        'amount': float(d['v']),
        'type': 'buy' if d['S'] == 'Buy' else 'sell',
    } for d in msg.get('data', [])]


def _parser_book(msg):
    """Parse an orderbook message from Bybit WebSocket v5.

    Parameters
    ----------
    msg : dict
        Raw message with a ``'data'`` dict containing ``'b'`` (bids) and
        ``'a'`` (asks), each a list of ``[price_str, qty_str]``.

    Returns
    -------
    dict
        Unified book dict: bid prices as positive float values keyed by the
        price string, ask prices prefixed with ``'-'`` as negative float values.

    """
    data = msg.get('data', {})
    book = {}
    for bid in data.get('b', []):
        book[bid[0]] = float(bid[1])
    for ask in data.get('a', []):
        book['-' + ask[0]] = -float(ask[1])
    return book


[docs] class DownloadBybitData(ContinuousDownloader): """ Download data continuously from Bybit via WebSocket v5. Parameters ---------- pair : str Trading pair symbol (e.g. 'BTCUSDT'). time_step : int, optional Seconds between data snapshots, default is 60. until : int, optional Seconds to run or stop timestamp, default is 3600. Attributes ---------- host : str WebSocket URL. is_connect : bool True if connected. ts : int Snapshot interval in seconds. until : int Stop timestamp. Methods ------- set_process_data set_saver __call__ """ def __init__(self, pair='BTCUSDT', time_step=60, until=3600, checkpoint_dir=None): """ Initialize object. """ if until is None: until = 0 elif until > time.time(): until -= int(time.time()) self.pair = pair ContinuousDownloader.__init__( self, _BYBIT_WS_URL, time_step=time_step, STOP=until, checkpoint_dir=checkpoint_dir, subs={'op': 'subscribe', 'args': [f'publicTrade.{pair}', f'orderbook.50.{pair}']}, ) self._parser_data = { 'trades': self.parser_trades, 'book': self.parser_book, } self.logger = logging.getLogger(__name__) self._load_checkpoint()
[docs] async def on_message(self, msg): """ Dispatch incoming WebSocket messages. """ topic = msg.get('topic', '') if topic.startswith('publicTrade'): self.parser_trades(msg) elif topic.startswith('orderbook'): self.parser_book(msg)
[docs] def parser_trades(self, msg): """ Parse and store trade messages. Parameters ---------- msg : dict Raw WebSocket trade message. """ self._push_trades(_parser_trades(msg))
[docs] def parser_book(self, msg): """ Parse and update order book from WebSocket messages. Parameters ---------- msg : dict Raw WebSocket orderbook message. """ self._push_book_updates(_parser_book(msg))
[docs] def get_trades_bybit(path, pair='BTCUSDT', time_step=60, until=3600, form='csv'): """ Download trades data from Bybit. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair, default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds, default is 3600. form : str, optional Save format, default is 'csv'. """ downloader = DownloadBybitData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_orderbook_bybit(path, pair='BTCUSDT', time_step=60, until=3600, form='csv'): """ Download order book data from Bybit. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair, default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds, default is 3600. form : str, optional Save format, default is 'csv'. """ downloader = DownloadBybitData(pair=pair, time_step=time_step, until=until) downloader.set_book_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_data_bybit(path, pair='BTCUSDT', time_step=60, until=3600, form='csv'): """ Download order book and trades data from Bybit. Parameters ---------- path : str Root path; trades saved under ``<path>/trades/``, book under ``<path>/book/``. pair : str, optional Trading pair, default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds, default is 3600. form : str, optional Save format, default is 'csv'. """ downloader = DownloadBybitData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(f'{path}/trades', method=form)) downloader.set_book_saver(IODataBase(f'{path}/book', method=form)) downloader(pair=pair)