Source code for dccd.continuous_dl.binance

#!/usr/bin/env python3
# coding: utf-8

""" Objects and functions to download data from Binance exchange (WebSocket).

.. currentmodule:: dccd.continuous_dl.binance

High level API
--------------

.. autofunction:: get_data_binance
.. autofunction:: get_orderbook_binance
.. autofunction:: get_trades_binance

Low level API
-------------

.. autoclass:: dccd.continuous_dl.binance.DownloadBinanceData
   :members: set_process_data, set_saver
   :special-members: __call__
   :show-inheritance:

"""

# Built-in packages
import logging
import time

# Third party packages
# Local packages
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.tools.io import IODataBase

__all__ = [
    'DownloadBinanceData', 'get_data_binance', 'get_orderbook_binance',
    'get_trades_binance',
]

_BINANCE_WS_URL = 'wss://stream.binance.com:9443/stream?streams={sym}@trade/{sym}@depth50@100ms'


def _parser_trades(data: dict) -> list[dict]:
    """ Parse a trade message from Binance combined stream.

    Parameters
    ----------
    data : dict
        The ``data`` field of a combined-stream trade message.

    Returns
    -------
    list of dict
        Each dict has keys: ``tid``, ``timestamp``, ``price``, ``amount``,
        ``type`` ('buy' or 'sell').

    """
    return [{
        'tid': data['t'],
        'timestamp': int(data['T']) / 1000,
        'price': float(data['p']),
        'amount': float(data['q']),
        'type': 'sell' if data['m'] else 'buy',
    }]


def _parser_book(data: dict) -> dict:
    """ Parse a depth message from Binance combined stream.

    Parameters
    ----------
    data : dict
        The ``data`` field of a combined-stream depth message.

    Returns
    -------
    dict
        Price levels as strings; bids positive, asks prefixed with ``'-'``
        and negative. Zero quantity means the level was removed.

    """
    book: dict[str, float] = {}
    for bid in data.get('b', []):
        book[bid[0]] = float(bid[1])
    for ask in data.get('a', []):
        book['-' + ask[0]] = -float(ask[1])
    return book


[docs] class DownloadBinanceData(ContinuousDownloader): """ Download data continuously from Binance via combined WebSocket streams. Parameters ---------- pair : str Trading pair symbol in Binance format (e.g. 'BTCUSDT'). time_step : int, optional Seconds between data snapshots, default is 60. until : int, optional Seconds to run or stop timestamp, default is 3600. Attributes ---------- host : str WebSocket URL (combined stream endpoint). is_connect : bool True if connected. ts : int Snapshot interval in seconds. until : int Stop timestamp. Methods ------- set_process_data set_saver __call__ """ def __init__(self, pair: str = 'BTCUSDT', time_step: int = 60, until: int | None = 3600, checkpoint_dir: str | None = None) -> None: """ Initialize object. """ if until is None: until = 0 elif until > time.time(): until -= int(time.time()) self.pair = pair url = _BINANCE_WS_URL.format(sym=pair.lower()) ContinuousDownloader.__init__(self, url, time_step=time_step, STOP=until, checkpoint_dir=checkpoint_dir) self._parser_data = { 'trades': self.parser_trades, 'book': self.parser_book, } self.logger = logging.getLogger(__name__) self._load_checkpoint() async def _subscribe(self, **kwargs: object) -> None: """ Wait for connection; Binance streams are declared in the URL. """ await self.wait_that('ws') self.is_connect = True
[docs] async def on_message(self, msg: dict) -> None: """ Dispatch incoming combined-stream WebSocket messages. Parameters ---------- msg : dict Combined-stream envelope with ``stream`` and ``data`` keys. """ stream = msg.get('stream', '') if '@trade' in stream: self.parser_trades(msg['data']) elif '@depth' in stream: self.parser_book(msg['data'])
[docs] def parser_trades(self, data: dict) -> None: """ Parse and store a trade message. Parameters ---------- data : dict The ``data`` field from the combined-stream trade envelope. """ self._push_trades(_parser_trades(data))
[docs] def parser_book(self, data: dict) -> None: """ Parse and update the order book from a depth message. Parameters ---------- data : dict The ``data`` field from the combined-stream depth envelope. """ self._push_book_updates(_parser_book(data))
[docs] def get_trades_binance(path: str, pair: str = 'BTCUSDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download trades data from Binance via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in Binance format (e.g. 'BTCUSDT'), default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadBinanceData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_orderbook_binance(path: str, pair: str = 'BTCUSDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book data from Binance via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in Binance format (e.g. 'BTCUSDT'), default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadBinanceData(pair=pair, time_step=time_step, until=until) downloader.set_book_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_data_binance(path: str, pair: str = 'BTCUSDT', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book and trades data from Binance via WebSocket. Parameters ---------- path : str Root path; trades saved under ``<path>/trades/``, book under ``<path>/book/``. pair : str, optional Trading pair in Binance format (e.g. 'BTCUSDT'), default is 'BTCUSDT'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadBinanceData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(f'{path}/trades', method=form)) downloader.set_book_saver(IODataBase(f'{path}/book', method=form)) downloader(pair=pair)