Source code for dccd.process_data

#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-08-06 15:25:49
# @Last modified by: ArthurBernard
# @Last modified time: 2026-05-24

""" Functions to clean, sort and other process data. """

# Built-in packages
import time

# External packages
import numpy as np
import polars as pl

__all__ = ['set_marketdepth', 'set_ohlc', 'set_orders', 'set_trades']


[docs] def set_orders(orders, t=None): """ Set a dataframe with list of each order. Parameters ---------- orders : list Each orders in a list. Returns ------- pl.DataFrame List of orders as dataframe. """ if t is None: t = int(time.time()) df = pl.DataFrame(orders, schema_overrides={k: pl.Float64 for k in orders[0]}) return df.with_columns(pl.lit(t).alias('timestamp'))
[docs] def set_marketdepth(book, t=None): """ Set a market depth dataframe with list of order books. Parameters ---------- book : dict Orderbook as dict, where keys is the price (positive=bid, negative=ask) and value is the amount. Returns ------- pl.DataFrame Order book as flat dataframe with columns ``['TS', 'side', 'rank', 'price', 'cum_amount', 'vwab']``. """ if t is None: t = int(time.time()) bid_keys = sorted([float(k) for k, v in book.items() if float(v) > 0], reverse=True) ask_keys = sorted([float(k) for k, v in book.items() if float(v) < 0]) def _side_rows(keys, side): amounts = np.array([float(book[str(int(k)) if k == int(k) else str(k)]) for k in keys]) if side == 'ask': amounts = np.abs(amounts) cum = np.cumsum(amounts) vwab = np.cumsum(amounts * np.abs(keys)) / cum return [ {'TS': t, 'side': side, 'rank': i, 'price': float(k), 'cum_amount': float(c), 'vwab': float(v)} for i, (k, c, v) in enumerate(zip(keys, cum, vwab)) ] rows = _side_rows(bid_keys, 'bid') + _side_rows(ask_keys, 'ask') if not rows: return pl.DataFrame(schema={'TS': pl.Int64, 'side': pl.Utf8, 'rank': pl.Int64, 'price': pl.Float64, 'cum_amount': pl.Float64, 'vwab': pl.Float64}) return pl.DataFrame(rows)
[docs] def set_trades(trades): """ Set a dataframe with list of trades. Parameters ---------- trades : list Historical trades tick by tick as list. Returns ------- pl.DataFrame Historical trades tick by tick as dataframe. """ return pl.DataFrame(trades).sort('tid')
[docs] def set_ohlc(trades, ts=60): """ Aggregate and set a dataframe with list of trades. Parameters ---------- trades : list Historical trades tick by tick as list. ts : int, optional Timestep in seconds to aggregate data, default is 60. Returns ------- pl.DataFrame Aggregated trades as OHLC dataframe with columns ``['TS', 'open', 'high', 'low', 'close', 'volume']``. """ df = pl.DataFrame(trades).sort('tid') df = df.with_columns((pl.col('timestamp') / 1000).alias('ts_sec')) df = df.with_columns( pl.from_epoch(pl.col('ts_sec').cast(pl.Int64), time_unit='s').alias('ts_dt') ) result = ( df.sort('ts_dt') .group_by_dynamic('ts_dt', every=f'{ts}s', closed='left', start_by='datapoint') .agg( pl.col('price').first().alias('open'), pl.col('price').max().alias('high'), pl.col('price').min().alias('low'), pl.col('price').last().alias('close'), pl.col('amount').sum().alias('volume'), ) .with_columns( pl.col('ts_dt').dt.epoch(time_unit='s').alias('TS') ) .drop('ts_dt') .select(['TS', 'open', 'high', 'low', 'close', 'volume']) ) return result