Source code for dccd.storage.parquet

"""Parquet-based storage for OHLC, trades, and order book data.

Builds on the existing DataStore logic but uses nanosecond timestamps
and stores provenance in Parquet metadata.
"""

from __future__ import annotations

import logging
import pathlib
import threading
from datetime import datetime, timezone
from typing import Any

import polars as pl

from dccd.domain.dataset import DatasetId, Provenance
from dccd.domain.timeutils import NS, ns_to_dt, span_label
from dccd.domain.types import DataType

__all__ = ["ParquetStore"]

logger = logging.getLogger(__name__)

_OHLC_SCHEMA = {
    "TS": pl.Int64,
    "open": pl.Float64,
    "high": pl.Float64,
    "low": pl.Float64,
    "close": pl.Float64,
    "volume": pl.Float64,
    "quote_volume": pl.Float64,
    "trades": pl.Int64,
}

_TRADES_SCHEMA = {
    "TS": pl.Int64,
    "price": pl.Float64,
    "amount": pl.Float64,
    "side": pl.Utf8,
    "tid": pl.Utf8,
}

_BOOK_SCHEMA = {
    "TS": pl.Int64,
    "side": pl.Utf8,
    "price": pl.Float64,
    "amount": pl.Float64,
    "count": pl.Int64,
    "is_snapshot": pl.Boolean,
}

_SCHEMAS: dict[DataType, dict[str, Any]] = {
    DataType.OHLC: _OHLC_SCHEMA,
    DataType.TRADES: _TRADES_SCHEMA,
    DataType.ORDERBOOK: _BOOK_SCHEMA,
}

# Legacy (v2) → canonical (v3) column renames. ``weightedAverage`` is dropped:
# v3 has no equivalent (it carries a trade-count instead, unrecoverable here).
_LEGACY_RENAME = {"quoteVolume": "quote_volume"}
_LEGACY_DROP = ("weightedAverage",)


def canonicalize(df: pl.DataFrame, data_type: DataType) -> pl.DataFrame:
    """Coerce a (possibly legacy v2) frame to the canonical v3 schema.

    Renames legacy columns, drops unsupported ones, fills missing canonical
    columns with nulls, and selects them in canonical order. Idempotent on
    frames already in v3 schema. This is the single normalisation point shared
    by reads, merges and migration, so legacy data is never lost on ``concat``.
    """
    schema = _SCHEMAS[data_type]
    if df.is_empty() and not df.columns:
        return df
    rename = {k: v for k, v in _LEGACY_RENAME.items() if k in df.columns and v not in df.columns}
    if rename:
        df = df.rename(rename)
    drop = [c for c in _LEGACY_DROP if c in df.columns]
    if drop:
        df = df.drop(drop)
    missing = [
        pl.lit(None, dtype=dtype).alias(name)
        for name, dtype in schema.items()
        if name not in df.columns
    ]
    if missing:
        df = df.with_columns(missing)
    return df.select(list(schema.keys()))


[docs] class ParquetStore: """Read/write interface for a single DatasetId. All timestamps (``TS``) are **nanoseconds UTC** (int64). Parameters ---------- data_path : str or Path Root directory for all data files. Examples -------- >>> import pathlib, tempfile >>> from dccd.domain.dataset import DatasetId >>> from dccd.domain.symbol import Symbol >>> from dccd.domain.types import DataType >>> store = ParquetStore('/tmp/data') """ def __init__(self, data_path: str | pathlib.Path) -> None: self._root = pathlib.Path(data_path) # save() is a read-modify-write per period file and runs in worker # threads (operations flush via asyncio.to_thread). Concurrent saves to # the *same* file (e.g. "run all jobs", or a scheduled job overlapping a # manual one) would otherwise interleave and corrupt the Parquet. One # lock per file path serialises those while leaving different files # (datasets/periods) fully parallel. self._file_locks: dict[str, threading.Lock] = {} self._file_locks_guard = threading.Lock() def _lock_for(self, file_path: pathlib.Path) -> threading.Lock: key = str(file_path) with self._file_locks_guard: lock = self._file_locks.get(key) if lock is None: lock = threading.Lock() self._file_locks[key] = lock return lock
[docs] def directory(self, ds: DatasetId) -> pathlib.Path: """Return the directory for *ds*, creating it if needed.""" pair_slug = ds.pair_slug() root = self._root / ds.exchange if ds.data_type == DataType.OHLC: if ds.span is None: raise ValueError( f"DatasetId {ds} has data_type=OHLC but span is None. " "Set span when constructing the DatasetId." ) d = root / "ohlc" / pair_slug / span_label(ds.span) else: d = root / ds.data_type.value / pair_slug d.mkdir(parents=True, exist_ok=True) return d
def _period_fmt(self, ds: DatasetId) -> str: return "%Y" if ds.data_type == DataType.OHLC else "%Y-%m-%d" def _file_path(self, ds: DatasetId, period: str) -> pathlib.Path: return self.directory(ds) / f"{period}.parquet"
[docs] def save( self, ds: DatasetId, records: list[Any], provenance: Provenance | None = None, ) -> int: """Write *records* to Parquet, merging with existing data. Parameters ---------- ds : DatasetId records : list OHLCBar, Trade, or OrderBookSnapshot objects. provenance : Provenance or None Returns ------- int Number of rows written. """ if not records: return 0 df = self._to_dataframe(ds, records) if len(df) == 0: return 0 fmt = self._period_fmt(ds) df_with_period = df.with_columns( pl.from_epoch("TS", time_unit="ns").dt.strftime(fmt).alias("_period") ) total_written = 0 for period in df_with_period["_period"].unique().sort().to_list(): incoming = df_with_period.filter(pl.col("_period") == period).drop("_period") # Count incoming rows *before* merge — this is what the caller # should see as "rows written" (not the post-dedup file size). total_written += len(incoming) file_path = self._file_path(ds, period) # Serialise the read-modify-write of this file against concurrent # saves so the Parquet can't be corrupted or lose an update. with self._lock_for(file_path): merged = self._merge(file_path, incoming, ds) self._write_parquet(file_path, merged, provenance) return total_written
[docs] def load( self, ds: DatasetId, start_ns: int | None = None, end_ns: int | None = None, ) -> pl.DataFrame: """Load data for *ds* in the given nanosecond range.""" directory = self.directory(ds) files = sorted(directory.glob("*.parquet")) if not files: return pl.DataFrame() pieces = [] for f in files: try: df = canonicalize(pl.read_parquet(f), ds.data_type) if start_ns is not None: df = df.filter(pl.col("TS") >= start_ns) if end_ns is not None: df = df.filter(pl.col("TS") <= end_ns) if len(df) > 0: pieces.append(df) except Exception: logger.warning("Corrupted parquet file %s — skipping", f) if not pieces: return pl.DataFrame() return pl.concat(pieces).sort("TS")
[docs] def last_timestamp(self, ds: DatasetId) -> int | None: """Return last TS in ns, or None if no data.""" directory = self.directory(ds) files = sorted(directory.glob("*.parquet"), reverse=True) for f in files: try: df = pl.read_parquet(f, columns=["TS"]) if len(df) > 0: return int(df["TS"].max()) # type: ignore[arg-type] except Exception: pass return None
[docs] def missing_intervals( self, ds: DatasetId, start_ns: int, end_ns: int ) -> list[tuple[int, int]]: """Return gaps as (start_ns, end_ns) pairs within [start_ns, end_ns].""" if ds.data_type != DataType.OHLC or ds.span is None: last = self.last_timestamp(ds) effective = max(start_ns, last + 1) if last is not None else start_ns return [(effective, end_ns)] if effective < end_ns else [] span_ns = ds.span * NS current_year = datetime.now(tz=timezone.utc).year start_dt = ns_to_dt(start_ns) end_dt = ns_to_dt(end_ns) intervals: list[tuple[int, int]] = [] for year in range(start_dt.year, end_dt.year + 1): year_start_ns = int(datetime(year, 1, 1, tzinfo=timezone.utc).timestamp()) * NS year_end_ns = int(datetime(year + 1, 1, 1, tzinfo=timezone.utc).timestamp()) * NS ivl_start = max(start_ns, year_start_ns) ivl_end = min(end_ns, year_end_ns) if ivl_start >= ivl_end: continue file_path = self._file_path(ds, str(year)) if file_path.exists(): if year < current_year and self._is_year_complete(ds, year): continue try: df = pl.read_parquet(file_path, columns=["TS"]) if len(df) > 0: file_min = int(df["TS"].min()) # type: ignore[arg-type] file_max = int(df["TS"].max()) # type: ignore[arg-type] if ivl_start < file_min: intervals.append((ivl_start, file_min)) trailing = file_max + span_ns if trailing < ivl_end: intervals.append((trailing, ivl_end)) continue except Exception: pass intervals.append((ivl_start, ivl_end)) return intervals
[docs] def inventory(self) -> list[dict[str, Any]]: """Return list of dataset info dicts for all stored data. Each entry includes ``min_ts`` / ``max_ts`` (nanoseconds UTC) and ``rows`` so the UI can display the actual data time range. """ result = [] for exchange_dir in sorted(self._root.iterdir()): if not exchange_dir.is_dir() or exchange_dir.name.startswith("."): continue exchange = exchange_dir.name for dtype_dir in sorted(exchange_dir.iterdir()): if not dtype_dir.is_dir(): continue dtype = dtype_dir.name if dtype not in ("ohlc", "trades", "orderbook"): continue for pair_dir in sorted(dtype_dir.iterdir()): if not pair_dir.is_dir(): continue pair = pair_dir.name if dtype == "ohlc": for span_dir in sorted(pair_dir.iterdir()): if not span_dir.is_dir(): continue files = sorted(span_dir.glob("*.parquet")) if files: from dccd.domain.timeutils import str_to_span span_s = str_to_span(span_dir.name) if span_s is None: # Fallback: parse "3600s" format try: span_s = int(span_dir.name.rstrip("s")) except ValueError: span_s = None min_ts, max_ts, rows = self._ts_range(files) result.append({ "exchange": exchange, "pair": pair, "data_type": dtype, "span": span_s, "files": len(files), "rows": rows, "min_ts": min_ts, "max_ts": max_ts, }) else: files = sorted(pair_dir.glob("*.parquet")) if files: min_ts, max_ts, rows = self._ts_range(files) result.append({ "exchange": exchange, "pair": pair, "data_type": dtype, "files": len(files), "rows": rows, "min_ts": min_ts, "max_ts": max_ts, }) return result
def _ts_range( self, files: list[pathlib.Path] ) -> tuple[int | None, int | None, int]: """Return (min_ts_ns, max_ts_ns, total_rows) across a list of parquet files.""" min_ts: int | None = None max_ts: int | None = None total_rows = 0 for f in files: try: df = pl.read_parquet(f, columns=["TS"]) n = len(df) if n == 0: continue total_rows += n fmin = int(df["TS"].min()) # type: ignore[arg-type] fmax = int(df["TS"].max()) # type: ignore[arg-type] if min_ts is None or fmin < min_ts: min_ts = fmin if max_ts is None or fmax > max_ts: max_ts = fmax except Exception: pass return min_ts, max_ts, total_rows def _is_year_complete(self, ds: DatasetId, year: int) -> bool: if ds.span is None: return False file_path = self._file_path(ds, str(year)) if not file_path.exists(): return False try: df = pl.read_parquet(file_path, columns=["TS"]) year_start = datetime(year, 1, 1, tzinfo=timezone.utc) year_end = datetime(year + 1, 1, 1, tzinfo=timezone.utc) expected = int((year_end - year_start).total_seconds()) // ds.span return len(df) >= expected except Exception: return False def _to_dataframe(self, ds: DatasetId, records: list[Any]) -> pl.DataFrame: if ds.data_type == DataType.OHLC: rows = [ { "TS": r.ts, "open": r.open, "high": r.high, "low": r.low, "close": r.close, "volume": r.volume, "quote_volume": r.quote_volume, "trades": r.trades, } for r in records ] return pl.DataFrame(rows, schema=_OHLC_SCHEMA) elif ds.data_type == DataType.TRADES: rows = [ { "TS": r.ts, "price": r.price, "amount": r.amount, "side": r.side, "tid": r.tid, } for r in records ] return pl.DataFrame(rows, schema=_TRADES_SCHEMA) else: rows = [] for snap in records: for lvl in snap.bids: rows.append({ "TS": snap.ts, "side": "bid", "price": lvl.price, "amount": lvl.amount, "count": lvl.count, "is_snapshot": snap.is_snapshot, }) for lvl in snap.asks: rows.append({ "TS": snap.ts, "side": "ask", "price": lvl.price, "amount": lvl.amount, "count": lvl.count, "is_snapshot": snap.is_snapshot, }) return pl.DataFrame(rows, schema=_BOOK_SCHEMA) def _dedup_subset(self, ds: DatasetId, df: pl.DataFrame) -> list[str]: """Natural dedup key for *ds*. ``TS`` alone is unique only for OHLC. Trades collide on TS (exchanges timestamp at ms → many share a ns), so deduping on TS would drop distinct trades; we key on the trade id when present, else a composite. Order-book rows share one TS across every price level, so they key on (TS, side, price). """ if ds.data_type == DataType.OHLC: return ["TS"] if ds.data_type == DataType.TRADES: if "tid" in df.columns and df["tid"].null_count() == 0: return ["tid"] return ["TS", "price", "amount", "side"] return ["TS", "side", "price"] # order book level def _merge(self, file_path: pathlib.Path, new: pl.DataFrame, ds: DatasetId) -> pl.DataFrame: """Merge new data with existing file, deduplicating on the natural key. The existing file is **canonicalised** before the concat so that legacy (v2) files — whose columns differ (``quoteVolume``/``weightedAverage``) — are aligned to the v3 schema instead of raising a schema error. We never silently overwrite on a read error: an unreadable file is a fault worth surfacing, not a reason to drop its rows. """ if not file_path.exists(): return new.unique(subset=self._dedup_subset(ds, new), keep="last").sort("TS") existing = canonicalize(pl.read_parquet(file_path), ds.data_type) merged = pl.concat([existing, new]) return merged.unique(subset=self._dedup_subset(ds, merged), keep="last").sort("TS") def _write_parquet( self, file_path: pathlib.Path, df: pl.DataFrame, provenance: Provenance | None, ) -> None: meta: dict[str, str] = {} if provenance is not None: meta["dccd.provenance"] = provenance.model_dump_json() # Write to a temp file then atomically rename, so a concurrent reader # (load/last_timestamp/inventory) never sees a half-written file — it # observes either the old complete file or the new one. import os tmp = file_path.with_suffix(file_path.suffix + f".tmp.{os.getpid()}.{threading.get_ident()}") try: # Polars >=1.x persists key/value metadata into the Parquet footer. df.write_parquet(tmp, compression="snappy", metadata=meta or None) os.replace(tmp, file_path) finally: if tmp.exists(): tmp.unlink(missing_ok=True)
[docs] @staticmethod def read_provenance(file_path: str | pathlib.Path) -> Provenance | None: """Return the :class:`Provenance` stored in a Parquet file, if any.""" import json import pyarrow.parquet as pq kv = pq.read_metadata(str(file_path)).metadata or {} raw = kv.get(b"dccd.provenance") or kv.get("dccd.provenance") if raw is None: return None if isinstance(raw, bytes): raw = raw.decode() return Provenance(**json.loads(raw))