Source code for dccd.domain.transforms

"""Pure data transformations — no I/O, used by adapters and derivation."""

from __future__ import annotations

from dccd.domain.records import OHLCBar, Trade
from dccd.domain.timeutils import align_ns

__all__ = ["aggregate_ohlc"]


[docs] def aggregate_ohlc(trades: list[Trade], span: int) -> list[OHLCBar]: """Aggregate trades into OHLC bars of *span* seconds. Parameters ---------- trades : list[Trade] Input trades, sorted ascending by ``ts`` (nanoseconds UTC). span : int Bar duration in seconds. Returns ------- list[OHLCBar] One bar per non-empty span window, ascending. Examples -------- >>> from dccd.domain.records import Trade >>> trades = [ ... Trade(ts=1_000_000_000_000_000_000, price=100.0, amount=1.0, side='buy'), ... Trade(ts=1_000_000_060_000_000_000, price=110.0, amount=2.0, side='sell'), ... ] >>> bars = aggregate_ohlc(trades, span=60) >>> len(bars) 2 """ if not trades: return [] buckets: dict[int, list[Trade]] = {} for trade in sorted(trades, key=lambda t: t.ts): bucket_ts = align_ns(trade.ts, span) buckets.setdefault(bucket_ts, []).append(trade) bars: list[OHLCBar] = [] for ts_open, bucket in sorted(buckets.items()): prices = [t.price for t in bucket] volume = sum(t.amount for t in bucket) bars.append( OHLCBar( ts=ts_open, open=prices[0], high=max(prices), low=min(prices), close=prices[-1], volume=volume, trades=len(bucket), ) ) return bars