#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-03-25 19:31:56
# @Last modified by: ArthurBernard
# @Last modified time: 2019-09-11 08:47:31
""" Objects and functions to download data from Bitfinex exchange.
.. currentmodule:: dccd.continuous_dl.bitfinex
These functions and objects allow you to continuously download data and update
your database.
High level API
--------------
.. autofunction:: get_data_bitfinex
.. autofunction:: get_orderbook_bitfinex
.. autofunction:: get_trades_bitfinex
Low level API
-------------
.. autoclass:: dccd.continuous_dl.bitfinex.DownloadBitfinexData
:members: set_process_data, set_saver
:special-members: __call__
:show-inheritance:
"""
# Built-in packages
import logging
import time
from typing import Any
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.process_data import set_marketdepth, set_ohlc, set_orders, set_trades
# Third party packages
# Local packages
from dccd.tools.io import IODataBase
__all__ = [
'DownloadBitfinexData', 'get_data_bitfinex', 'get_orderbook_bitfinex',
'get_trades_bitfinex',
]
# =========================================================================== #
# Parser functions #
# =========================================================================== #
def _parser_trades(tData: list[Any]) -> dict[str, Any]:
if tData[1] == 'te':
tData = tData[2]
return {
'tid': tData[0],
'timestamp': tData[1] / 1000,
'price': tData[3],
'amount': abs(tData[2]),
'type': 'buy' if tData[2] > 0. else 'sell',
}
def _parser_book(tData: list[Any]) -> dict[str, Any]:
if isinstance(tData[1], list):
tData = tData[1]
return {'price': str(tData[0]), 'count': tData[1], 'amount': tData[2]}
# =========================================================================== #
# Download objects #
# =========================================================================== #
[docs]
class DownloadBitfinexData(ContinuousDownloader):
""" Basis object to download data from a stream websocket client API.
Parameters
----------
time_step : int or None, optional
Number of seconds between two snapshots of data, minimum is 1, default
is 60 (one minute). Each ``time_step`` seconds data will be processed
and pushed to the database. Pass ``None`` to receive data tick-by-tick
without periodic aggregation.
until : int, optional
Number of seconds before stopping, or a future Unix timestamp at which
to stop. Default is ``3600`` (one hour).
Attributes
----------
host : str
Address of host to connect.
conn_par : dict
Parameters of websocket connection.
ws : websockets.client.WebSocketClientProtocol
Connection with the websocket client.
is_connect : bool
True if is connected, False otherwise.
ts : int
Number of second between two snapshots of data.
t : int
Current timestamp but rounded by `ts`.
until : int
Timestamp to stop to download data.
Methods
-------
set_process_data
set_saver
__call__
"""
def __init__(self, time_step: int = 60, until: int | None = 3600,
checkpoint_dir: str | None = None) -> None:
""" Initialize object.
Parameters
----------
time_step : int or None, optional
Snapshot interval in seconds. Default is ``60``.
until : int or None, optional
Seconds to run, or a future Unix timestamp to stop at.
Default is ``3600``.
checkpoint_dir : str or None, optional
Directory to write the order-book crash-recovery checkpoint.
Disabled when ``None`` (default).
"""
if until is None:
until = 0
elif until > time.time():
until -= int(time.time())
ContinuousDownloader.__init__(self, 'bitfinex', time_step=time_step,
STOP=until, checkpoint_dir=checkpoint_dir)
self._parser_data: dict[str, Any] = {
'book': self.parser_book,
'book_raw': self.parser_raw_book,
'trades': self.parser_trades,
'trades_raw': self.parser_raw_trades,
}
self.logger = logging.getLogger(__name__)
self._load_checkpoint()
[docs]
def parser_raw_book(self, data: list[Any]) -> None:
""" Parse raw order book, each timestep set in a list all orders.
Parameters
----------
data : list
Order data.
"""
parsed = _parser_book(data)
self._raw_parser(parsed)
[docs]
def parser_book(self, data: list[Any]) -> None:
""" Parse market depth of order book.
Parameters
----------
data : list
Order data.
"""
parsed = _parser_book(data)
if parsed['count'] > 0:
if parsed['price'] in self.d.keys():
self.d[parsed['price']]['amount'] += parsed['amount']
else:
self.d[parsed['price']] = parsed
else:
self.d.pop(parsed['price'])
self._data.setdefault(self.t, {'trades': [], 'book': {}})['book'] = {
v['price']: v['amount'] for v in self.d.values()
}
[docs]
def parser_raw_trades(self, data: list[Any]) -> None:
""" Parse raw trade data tick-by-tick.
Parameters
----------
data : list
Trade data.
"""
if data[1] == 'tu':
return
parsed = _parser_trades(data)
self._raw_parser(parsed)
[docs]
def parser_trades(self, data: list[Any]) -> None:
""" Parse trade data and aggregate into OHLCV snapshots.
Parameters
----------
data : list
Trade data.
"""
if data[1] == 'tu':
return
parsed = _parser_trades(data)
self._raw_parser(parsed)
[docs]
async def on_message(self, data: dict[str, Any] | list[Any]) -> None:
""" Route an incoming websocket message to the appropriate parser. """
if isinstance(data, list):
if data[1] == 'hb':
self.logger.info('HeartBeat')
elif isinstance(data[1][0], list):
for d in data[1]:
self.parser(d)
else:
self.parser(data)
else:
self.logger.info('{}'.format(data))
[docs]
def __call__(self, channel: str, **kwargs: Any) -> 'DownloadBitfinexData':
""" Open a websocket connection and save/update the database.
Run asynchronously two loops to get data from bitfinex websocket and
save/update the database.
Parameters
----------
channel : {'book', 'book_raw', 'trades', 'trades_raw'}
Channel to get data, by default data will be aggregated (OHLC for
'trades' and reconstructed orderbook for 'book'), add '_raw' to the
`channel` to get raw data (trade tick by tick or each orders).
**kwargs
Any revelevant keyword arguments will be passed to the websocket
connector, see API documentation [1]_ for more details.
Warnings
--------
'book_raw' and 'trades_raw' can be very memory expensive.
References
----------
.. [1] https://docs.bitfinex.com/v2/docs/ws-public
"""
self.parser = self.get_parser(channel)
channel = channel[:-4] if channel[-4:] == '_raw' else channel
self.logger.info('Try connect WS and set {} stream.'.format(channel))
return super().__call__(channel=channel, **kwargs) # type: ignore[return-value]
# =========================================================================== #
# High level functions #
# =========================================================================== #
[docs]
def get_data_bitfinex(channel: str, process_func: Any, process_params: dict[str, Any] = {},
save_method: str = 'dataframe', io_params: dict[str, Any] = {},
time_step: int = 60, until: int | None = None, path: str | None = None,
**kwargs: Any) -> None:
""" Download data from Bitfinex exchange and update the database.
Parameters
----------
channel : str, {'book', 'book_raw', 'trades', 'trades_raw'}
Websocket channel to get data, by default data will be aggregated (OHLC
for 'trades' and reconstructed orderbook for 'book'), add '_raw' to the
`channel` to get raw data (trade tick by tick or each orders).
process_func : callable
Function to process and clean data before to be saved. Must take `data`
in arguments and can take any optional keywords arguments, cf function
exemples in :mod:`dccd.process_data`.
process_params : dict, optional
Dictionary of the keyword arguments available to `process_func`, cf
documentation into :mod:`dccd.process_data`.
save_method : {'DataFrame', 'SQLite', 'CSV', 'Excel', 'PostgreSQL',\
'Oracle', 'MSSQL', 'MySQL'},
It will create an IODataBase object to save/update the database in the
specified format `save_method`, default is 'DataFrame' it save as
binary pl.DataFrame object. More informations are available into
:mod:`dccd.tools.io`.
io_params : dict, optional
Dictionary of the keyword arguments available to the
``dccd.tools.io.IODataBase`` callable method. Note: With SQL format
some parameters are compulsory, see details into :mod:`dccd.tools.io`.
time_step : int, optional
Number of second between two snapshots of data, default 60 (1 minute).
until : int, optional
Number of seconds before stoping to download and update, default is
None. If `until` equal 0 or None it means it never stop.
path : str, optional
Path to save/update the database, default is None. If `path` is None,
database is saved at the relative path './database/bitfinex/`channel`'.
**kwargs
Any revelevant keyword arguments will be passed to the websocket
connector, see Bitfinex API documentation [2]_ for more details.
Warnings
--------
'book_raw' and 'trades_raw' can be very memory expensive.
See Also
--------
process_data : function to process/clean data (set_marketdepth, set_ohlc,
set_orders, set_marketdepth).
tools.io.IODataBase : object to save/update the database with respect to
specified format.
References
----------
.. [2] https://docs.bitfinex.com/v2/docs/ws-public
"""
if path is None:
path = './database/bitfinex/{}'.format(channel)
saver = IODataBase(path, method=save_method)
downloader = DownloadBitfinexData(time_step=time_step, until=until)
downloader.set_process_data(process_func, **process_params)
downloader.set_saver(saver, **io_params)
downloader(channel, **kwargs)
def get_orders_bitfinex(symbol: str, precision: str = 'P0', frequency: str = 'F0',
lenght: str = '25', time_step: int = 60, until: int | None = None,
path: str | None = None, save_method: str = 'dataframe',
io_params: dict[str, Any] = {}) -> None:
""" Download raw order data from Bitfinex exchange. """
get_data_bitfinex('book_raw', set_orders, time_step=time_step, until=until,
path=path, save_method=save_method, io_params=io_params,
symbol=symbol, precision=precision, frequency=frequency,
lenght=lenght)
[docs]
def get_orderbook_bitfinex(symbol: str, precision: str = 'P0', frequency: str = 'F0',
lenght: str = '25', time_step: int = 60, until: int | None = None,
path: str | None = None, save_method: str = 'dataframe',
io_params: dict[str, Any] = {}) -> None:
""" Download reconstructed order book from Bitfinex exchange. """
get_data_bitfinex('book', set_marketdepth, time_step=time_step,
until=until, path=path, save_method=save_method,
io_params=io_params, symbol=symbol, precision=precision,
frequency=frequency, lenght=lenght)
[docs]
def get_trades_bitfinex(symbol: str, time_step: int = 60, until: int | None = None,
path: str | None = None, save_method: str = 'dataframe',
io_params: dict[str, Any] = {}) -> None:
""" Download trades tick by tick from Bitfinex exchange. """
get_data_bitfinex('trades_raw', set_trades, time_step=time_step,
until=until, path=path, save_method=save_method,
io_params=io_params, symbol=symbol)
def get_ohlc_bitfinex(symbol: str, time_step: int = 60, until: int | None = None,
path: str | None = None, save_method: str = 'dataframe',
io_params: dict[str, Any] = {}) -> None:
""" Download OHLCV data from Bitfinex exchange. """
get_data_bitfinex('trades', set_ohlc, time_step=time_step, until=until,
path=path, save_method=save_method, io_params=io_params,
process_params={'ts': time_step}, symbol=symbol)