Source code for dccd.continuous_dl.kraken

#!/usr/bin/env python3
# coding: utf-8

""" Objects and functions to download data from Kraken exchange (WebSocket).

.. currentmodule:: dccd.continuous_dl.kraken

High level API
--------------

.. autofunction:: get_data_kraken
.. autofunction:: get_orderbook_kraken
.. autofunction:: get_trades_kraken

Low level API
-------------

.. autoclass:: dccd.continuous_dl.kraken.DownloadKrakenData
   :members: set_process_data, set_saver
   :special-members: __call__
   :show-inheritance:

"""

# Built-in packages
import json
import logging
import time
from datetime import datetime, timezone

# Third party packages
# Local packages
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.tools.io import IODataBase

__all__ = [
    'DownloadKrakenData', 'get_data_kraken', 'get_orderbook_kraken',
    'get_trades_kraken',
]

_KRAKEN_WS_URL = 'wss://ws.kraken.com/v2'


def _iso_to_ts(iso: str) -> int:
    """ Convert an ISO 8601 timestamp string to a Unix timestamp (seconds). """
    return int(datetime.fromisoformat(iso.replace('Z', '+00:00'))
               .replace(tzinfo=timezone.utc).timestamp())


def _parser_trades(data: list[dict]) -> list[dict]:
    """ Parse a trade push message from Kraken WebSocket v2.

    Parameters
    ----------
    data : list of dict
        The ``data`` field of a Kraken trade push message.

    Returns
    -------
    list of dict
        Each dict has keys: ``tid``, ``timestamp``, ``price``, ``amount``,
        ``type`` ('buy' or 'sell').

    """
    return [{
        'tid': d['trade_id'],
        'timestamp': _iso_to_ts(d['timestamp']),
        'price': float(d['price']),
        'amount': float(d['qty']),
        'type': d['side'],
    } for d in data]


def _parser_book(data: list[dict]) -> dict[str, float]:
    """ Parse a book push message from Kraken WebSocket v2.

    Parameters
    ----------
    data : list of dict
        The ``data`` field of a Kraken book push message.
        Each element has ``bids`` and ``asks`` lists of
        ``{price, qty}`` dicts.

    Returns
    -------
    dict
        Price levels as strings; bids positive, asks prefixed with ``'-'``
        and negative. Zero quantity means the level was removed.

    """
    book: dict[str, float] = {}
    for snap in data:
        for bid in snap.get('bids', []):
            book[str(bid['price'])] = float(bid['qty'])
        for ask in snap.get('asks', []):
            book['-' + str(ask['price'])] = -float(ask['qty'])
    return book


def _parser_kline(data: list[dict]) -> list[dict]:
    """ Parse an ohlc push message from Kraken WebSocket v2.

    Parameters
    ----------
    data : list of dict
        The ``data`` field of a Kraken ohlc push message.

    Returns
    -------
    list of dict
        Each dict has keys: ``timestamp``, ``open``, ``high``, ``low``,
        ``close``, ``volume``.

    """
    return [{
        'timestamp': _iso_to_ts(d['interval_begin']),
        'open': float(d['open']),
        'high': float(d['high']),
        'low': float(d['low']),
        'close': float(d['close']),
        'volume': float(d['volume']),
    } for d in data]


[docs] class DownloadKrakenData(ContinuousDownloader): """ Download data continuously from Kraken via WebSocket v2. Parameters ---------- pair : str Trading pair in Kraken format (e.g. 'BTC/USD'). time_step : int, optional Seconds between data snapshots, default is 60. until : int, optional Seconds to run or stop timestamp, default is 3600. span : int, optional OHLCV interval in seconds; if given, also subscribes to the ohlc channel. Must be a multiple of 60. Default is None. Attributes ---------- host : str WebSocket URL. is_connect : bool True if connected. ts : int Snapshot interval in seconds. until : int Stop timestamp. Methods ------- set_process_data set_saver __call__ """ def __init__(self, pair: str = 'BTC/USD', time_step: int = 60, until: int | None = 3600, span: int | None = None, checkpoint_dir: str | None = None) -> None: """ Initialize object. """ if until is None: until = 0 elif until > time.time(): until -= int(time.time()) self.pair = pair self._span = span ContinuousDownloader.__init__( self, _KRAKEN_WS_URL, time_step=time_step, STOP=until, checkpoint_dir=checkpoint_dir, ) self._parser_data = { 'trades': self.parser_trades, 'book': self.parser_book, 'kline': self.parser_kline, } self.logger = logging.getLogger(__name__) self._load_checkpoint() async def _subscribe(self, **kwargs: object) -> None: """ Send per-channel subscribe messages to Kraken WebSocket v2. """ await self.wait_that('ws') await self.ws.send(json.dumps({ 'method': 'subscribe', 'params': {'channel': 'trade', 'symbol': [self.pair]}, })) await self.ws.send(json.dumps({ 'method': 'subscribe', 'params': {'channel': 'book', 'symbol': [self.pair], 'depth': 50}, })) if self._span is not None: period = max(1, self._span // 60) await self.ws.send(json.dumps({ 'method': 'subscribe', 'params': {'channel': 'ohlc', 'symbol': [self.pair], 'period': period}, })) self.is_connect = True
[docs] async def on_message(self, msg: dict) -> None: """ Dispatch incoming Kraken WebSocket v2 push messages. Parameters ---------- msg : dict Kraken push message with ``channel`` and ``data`` fields. """ channel = msg.get('channel', '') msg_type = msg.get('type', '') if msg_type in ('heartbeat', 'pong') or channel in ('heartbeat', 'status'): return if channel == 'trade': self.parser_trades(msg.get('data', [])) elif channel == 'book': self.parser_book(msg) elif channel == 'ohlc': self.parser_kline(msg.get('data', []))
[docs] def parser_trades(self, data: list[dict]) -> None: """ Parse and store a trade push message. Parameters ---------- data : list of dict The ``data`` field from the Kraken trade push message. """ self._push_trades(_parser_trades(data))
[docs] def parser_book(self, msg: dict) -> None: """ Parse and update the order book from a book push message. Parameters ---------- msg : dict Full Kraken book push message (contains ``type`` and ``data``). """ self._push_book_updates(_parser_book(msg.get('data', [])))
[docs] def parser_kline(self, data: list[dict]) -> None: """ Parse and store an ohlc push message. Parameters ---------- data : list of dict The ``data`` field from the Kraken ohlc push message. """ for candle in _parser_kline(data): self._raw_parser(candle)
[docs] def get_trades_kraken(path: str, pair: str = 'BTC/USD', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download trades data from Kraken via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in Kraken format (e.g. 'BTC/USD'), default is 'BTC/USD'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadKrakenData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_orderbook_kraken(path: str, pair: str = 'BTC/USD', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book data from Kraken via WebSocket. Parameters ---------- path : str Path to save data. pair : str, optional Trading pair in Kraken format (e.g. 'BTC/USD'), default is 'BTC/USD'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadKrakenData(pair=pair, time_step=time_step, until=until) downloader.set_book_saver(IODataBase(path, method=form)) downloader(pair=pair)
[docs] def get_data_kraken(path: str, pair: str = 'BTC/USD', time_step: int = 60, until: int = 3600, form: str = 'csv') -> None: """ Download order book and trades data from Kraken via WebSocket. Parameters ---------- path : str Root path; trades saved under ``<path>/trades/``, book under ``<path>/book/``. pair : str, optional Trading pair in Kraken format (e.g. 'BTC/USD'), default is 'BTC/USD'. time_step : int, optional Seconds between snapshots, default is 60. until : int, optional Duration in seconds or stop timestamp, default is 3600. form : str, optional Save format ('csv', 'parquet', etc.), default is 'csv'. """ downloader = DownloadKrakenData(pair=pair, time_step=time_step, until=until) downloader.set_trades_saver(IODataBase(f'{path}/trades', method=form)) downloader.set_book_saver(IODataBase(f'{path}/book', method=form)) downloader(pair=pair)