"""Parquet-based storage for OHLC, trades, and order book data.
Builds on the existing DataStore logic but uses nanosecond timestamps
and stores provenance in Parquet metadata.
"""
from __future__ import annotations
import logging
import pathlib
import threading
from datetime import datetime, timezone
from typing import Any
import polars as pl
from dccd.domain.dataset import DatasetId, Provenance
from dccd.domain.timeutils import NS, ns_to_dt, span_label
from dccd.domain.types import DataType
__all__ = ["ParquetStore"]
logger = logging.getLogger(__name__)
_OHLC_SCHEMA = {
"TS": pl.Int64,
"open": pl.Float64,
"high": pl.Float64,
"low": pl.Float64,
"close": pl.Float64,
"volume": pl.Float64,
"quote_volume": pl.Float64,
"trades": pl.Int64,
}
_TRADES_SCHEMA = {
"TS": pl.Int64,
"price": pl.Float64,
"amount": pl.Float64,
"side": pl.Utf8,
"tid": pl.Utf8,
}
_BOOK_SCHEMA = {
"TS": pl.Int64,
"side": pl.Utf8,
"price": pl.Float64,
"amount": pl.Float64,
"count": pl.Int64,
"is_snapshot": pl.Boolean,
}
_SCHEMAS: dict[DataType, dict[str, Any]] = {
DataType.OHLC: _OHLC_SCHEMA,
DataType.TRADES: _TRADES_SCHEMA,
DataType.ORDERBOOK: _BOOK_SCHEMA,
}
# Legacy (v2) → canonical (v3) column renames. ``weightedAverage`` is dropped:
# v3 has no equivalent (it carries a trade-count instead, unrecoverable here).
_LEGACY_RENAME = {"quoteVolume": "quote_volume"}
_LEGACY_DROP = ("weightedAverage",)
def canonicalize(df: pl.DataFrame, data_type: DataType) -> pl.DataFrame:
"""Coerce a (possibly legacy v2) frame to the canonical v3 schema.
Renames legacy columns, drops unsupported ones, fills missing canonical
columns with nulls, and selects them in canonical order. Idempotent on
frames already in v3 schema. This is the single normalisation point shared
by reads, merges and migration, so legacy data is never lost on ``concat``.
"""
schema = _SCHEMAS[data_type]
if df.is_empty() and not df.columns:
return df
rename = {k: v for k, v in _LEGACY_RENAME.items() if k in df.columns and v not in df.columns}
if rename:
df = df.rename(rename)
drop = [c for c in _LEGACY_DROP if c in df.columns]
if drop:
df = df.drop(drop)
missing = [
pl.lit(None, dtype=dtype).alias(name)
for name, dtype in schema.items()
if name not in df.columns
]
if missing:
df = df.with_columns(missing)
return df.select(list(schema.keys()))
[docs]
class ParquetStore:
"""Read/write interface for a single DatasetId.
All timestamps (``TS``) are **nanoseconds UTC** (int64).
Parameters
----------
data_path : str or Path
Root directory for all data files.
Examples
--------
>>> import pathlib, tempfile
>>> from dccd.domain.dataset import DatasetId
>>> from dccd.domain.symbol import Symbol
>>> from dccd.domain.types import DataType
>>> store = ParquetStore('/tmp/data')
"""
def __init__(self, data_path: str | pathlib.Path) -> None:
self._root = pathlib.Path(data_path)
# save() is a read-modify-write per period file and runs in worker
# threads (operations flush via asyncio.to_thread). Concurrent saves to
# the *same* file (e.g. "run all jobs", or a scheduled job overlapping a
# manual one) would otherwise interleave and corrupt the Parquet. One
# lock per file path serialises those while leaving different files
# (datasets/periods) fully parallel.
self._file_locks: dict[str, threading.Lock] = {}
self._file_locks_guard = threading.Lock()
def _lock_for(self, file_path: pathlib.Path) -> threading.Lock:
key = str(file_path)
with self._file_locks_guard:
lock = self._file_locks.get(key)
if lock is None:
lock = threading.Lock()
self._file_locks[key] = lock
return lock
[docs]
def directory(self, ds: DatasetId) -> pathlib.Path:
"""Return the directory for *ds*, creating it if needed."""
pair_slug = ds.pair_slug()
root = self._root / ds.exchange
if ds.data_type == DataType.OHLC:
if ds.span is None:
raise ValueError(
f"DatasetId {ds} has data_type=OHLC but span is None. "
"Set span when constructing the DatasetId."
)
d = root / "ohlc" / pair_slug / span_label(ds.span)
else:
d = root / ds.data_type.value / pair_slug
d.mkdir(parents=True, exist_ok=True)
return d
def _period_fmt(self, ds: DatasetId) -> str:
return "%Y" if ds.data_type == DataType.OHLC else "%Y-%m-%d"
def _file_path(self, ds: DatasetId, period: str) -> pathlib.Path:
return self.directory(ds) / f"{period}.parquet"
[docs]
def save(
self,
ds: DatasetId,
records: list[Any],
provenance: Provenance | None = None,
) -> int:
"""Write *records* to Parquet, merging with existing data.
Parameters
----------
ds : DatasetId
records : list
OHLCBar, Trade, or OrderBookSnapshot objects.
provenance : Provenance or None
Returns
-------
int
Number of rows written.
"""
if not records:
return 0
df = self._to_dataframe(ds, records)
if len(df) == 0:
return 0
fmt = self._period_fmt(ds)
df_with_period = df.with_columns(
pl.from_epoch("TS", time_unit="ns").dt.strftime(fmt).alias("_period")
)
total_written = 0
for period in df_with_period["_period"].unique().sort().to_list():
incoming = df_with_period.filter(pl.col("_period") == period).drop("_period")
# Count incoming rows *before* merge — this is what the caller
# should see as "rows written" (not the post-dedup file size).
total_written += len(incoming)
file_path = self._file_path(ds, period)
# Serialise the read-modify-write of this file against concurrent
# saves so the Parquet can't be corrupted or lose an update.
with self._lock_for(file_path):
merged = self._merge(file_path, incoming, ds)
self._write_parquet(file_path, merged, provenance)
return total_written
[docs]
def load(
self,
ds: DatasetId,
start_ns: int | None = None,
end_ns: int | None = None,
) -> pl.DataFrame:
"""Load data for *ds* in the given nanosecond range."""
directory = self.directory(ds)
files = sorted(directory.glob("*.parquet"))
if not files:
return pl.DataFrame()
pieces = []
for f in files:
try:
df = canonicalize(pl.read_parquet(f), ds.data_type)
if start_ns is not None:
df = df.filter(pl.col("TS") >= start_ns)
if end_ns is not None:
df = df.filter(pl.col("TS") <= end_ns)
if len(df) > 0:
pieces.append(df)
except Exception:
logger.warning("Corrupted parquet file %s — skipping", f)
if not pieces:
return pl.DataFrame()
return pl.concat(pieces).sort("TS")
[docs]
def last_timestamp(self, ds: DatasetId) -> int | None:
"""Return last TS in ns, or None if no data."""
directory = self.directory(ds)
files = sorted(directory.glob("*.parquet"), reverse=True)
for f in files:
try:
df = pl.read_parquet(f, columns=["TS"])
if len(df) > 0:
return int(df["TS"].max()) # type: ignore[arg-type]
except Exception:
pass
return None
[docs]
def missing_intervals(
self, ds: DatasetId, start_ns: int, end_ns: int
) -> list[tuple[int, int]]:
"""Return gaps as (start_ns, end_ns) pairs within [start_ns, end_ns]."""
if ds.data_type != DataType.OHLC or ds.span is None:
last = self.last_timestamp(ds)
effective = max(start_ns, last + 1) if last is not None else start_ns
return [(effective, end_ns)] if effective < end_ns else []
span_ns = ds.span * NS
current_year = datetime.now(tz=timezone.utc).year
start_dt = ns_to_dt(start_ns)
end_dt = ns_to_dt(end_ns)
intervals: list[tuple[int, int]] = []
for year in range(start_dt.year, end_dt.year + 1):
year_start_ns = int(datetime(year, 1, 1, tzinfo=timezone.utc).timestamp()) * NS
year_end_ns = int(datetime(year + 1, 1, 1, tzinfo=timezone.utc).timestamp()) * NS
ivl_start = max(start_ns, year_start_ns)
ivl_end = min(end_ns, year_end_ns)
if ivl_start >= ivl_end:
continue
file_path = self._file_path(ds, str(year))
if file_path.exists():
if year < current_year and self._is_year_complete(ds, year):
continue
try:
df = pl.read_parquet(file_path, columns=["TS"])
if len(df) > 0:
file_min = int(df["TS"].min()) # type: ignore[arg-type]
file_max = int(df["TS"].max()) # type: ignore[arg-type]
if ivl_start < file_min:
intervals.append((ivl_start, file_min))
trailing = file_max + span_ns
if trailing < ivl_end:
intervals.append((trailing, ivl_end))
continue
except Exception:
pass
intervals.append((ivl_start, ivl_end))
return intervals
[docs]
def inventory(self) -> list[dict[str, Any]]:
"""Return list of dataset info dicts for all stored data.
Each entry includes ``min_ts`` / ``max_ts`` (nanoseconds UTC) and
``rows`` so the UI can display the actual data time range.
"""
result = []
for exchange_dir in sorted(self._root.iterdir()):
if not exchange_dir.is_dir() or exchange_dir.name.startswith("."):
continue
exchange = exchange_dir.name
for dtype_dir in sorted(exchange_dir.iterdir()):
if not dtype_dir.is_dir():
continue
dtype = dtype_dir.name
if dtype not in ("ohlc", "trades", "orderbook"):
continue
for pair_dir in sorted(dtype_dir.iterdir()):
if not pair_dir.is_dir():
continue
pair = pair_dir.name
if dtype == "ohlc":
for span_dir in sorted(pair_dir.iterdir()):
if not span_dir.is_dir():
continue
files = sorted(span_dir.glob("*.parquet"))
if files:
from dccd.domain.timeutils import str_to_span
span_s = str_to_span(span_dir.name)
if span_s is None:
# Fallback: parse "3600s" format
try:
span_s = int(span_dir.name.rstrip("s"))
except ValueError:
span_s = None
min_ts, max_ts, rows = self._ts_range(files)
result.append({
"exchange": exchange,
"pair": pair,
"data_type": dtype,
"span": span_s,
"files": len(files),
"rows": rows,
"min_ts": min_ts,
"max_ts": max_ts,
})
else:
files = sorted(pair_dir.glob("*.parquet"))
if files:
min_ts, max_ts, rows = self._ts_range(files)
result.append({
"exchange": exchange,
"pair": pair,
"data_type": dtype,
"files": len(files),
"rows": rows,
"min_ts": min_ts,
"max_ts": max_ts,
})
return result
def _ts_range(
self, files: list[pathlib.Path]
) -> tuple[int | None, int | None, int]:
"""Return (min_ts_ns, max_ts_ns, total_rows) across a list of parquet files."""
min_ts: int | None = None
max_ts: int | None = None
total_rows = 0
for f in files:
try:
df = pl.read_parquet(f, columns=["TS"])
n = len(df)
if n == 0:
continue
total_rows += n
fmin = int(df["TS"].min()) # type: ignore[arg-type]
fmax = int(df["TS"].max()) # type: ignore[arg-type]
if min_ts is None or fmin < min_ts:
min_ts = fmin
if max_ts is None or fmax > max_ts:
max_ts = fmax
except Exception:
pass
return min_ts, max_ts, total_rows
def _is_year_complete(self, ds: DatasetId, year: int) -> bool:
if ds.span is None:
return False
file_path = self._file_path(ds, str(year))
if not file_path.exists():
return False
try:
df = pl.read_parquet(file_path, columns=["TS"])
year_start = datetime(year, 1, 1, tzinfo=timezone.utc)
year_end = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
expected = int((year_end - year_start).total_seconds()) // ds.span
return len(df) >= expected
except Exception:
return False
def _to_dataframe(self, ds: DatasetId, records: list[Any]) -> pl.DataFrame:
if ds.data_type == DataType.OHLC:
rows = [
{
"TS": r.ts,
"open": r.open,
"high": r.high,
"low": r.low,
"close": r.close,
"volume": r.volume,
"quote_volume": r.quote_volume,
"trades": r.trades,
}
for r in records
]
return pl.DataFrame(rows, schema=_OHLC_SCHEMA)
elif ds.data_type == DataType.TRADES:
rows = [
{
"TS": r.ts,
"price": r.price,
"amount": r.amount,
"side": r.side,
"tid": r.tid,
}
for r in records
]
return pl.DataFrame(rows, schema=_TRADES_SCHEMA)
else:
rows = []
for snap in records:
for lvl in snap.bids:
rows.append({
"TS": snap.ts,
"side": "bid",
"price": lvl.price,
"amount": lvl.amount,
"count": lvl.count,
"is_snapshot": snap.is_snapshot,
})
for lvl in snap.asks:
rows.append({
"TS": snap.ts,
"side": "ask",
"price": lvl.price,
"amount": lvl.amount,
"count": lvl.count,
"is_snapshot": snap.is_snapshot,
})
return pl.DataFrame(rows, schema=_BOOK_SCHEMA)
def _dedup_subset(self, ds: DatasetId, df: pl.DataFrame) -> list[str]:
"""Natural dedup key for *ds*. ``TS`` alone is unique only for OHLC.
Trades collide on TS (exchanges timestamp at ms → many share a ns), so
deduping on TS would drop distinct trades; we key on the trade id when
present, else a composite. Order-book rows share one TS across every
price level, so they key on (TS, side, price).
"""
if ds.data_type == DataType.OHLC:
return ["TS"]
if ds.data_type == DataType.TRADES:
if "tid" in df.columns and df["tid"].null_count() == 0:
return ["tid"]
return ["TS", "price", "amount", "side"]
return ["TS", "side", "price"] # order book level
def _merge(self, file_path: pathlib.Path, new: pl.DataFrame, ds: DatasetId) -> pl.DataFrame:
"""Merge new data with existing file, deduplicating on the natural key.
The existing file is **canonicalised** before the concat so that legacy
(v2) files — whose columns differ (``quoteVolume``/``weightedAverage``)
— are aligned to the v3 schema instead of raising a schema error. We
never silently overwrite on a read error: an unreadable file is a fault
worth surfacing, not a reason to drop its rows.
"""
if not file_path.exists():
return new.unique(subset=self._dedup_subset(ds, new), keep="last").sort("TS")
existing = canonicalize(pl.read_parquet(file_path), ds.data_type)
merged = pl.concat([existing, new])
return merged.unique(subset=self._dedup_subset(ds, merged), keep="last").sort("TS")
def _write_parquet(
self,
file_path: pathlib.Path,
df: pl.DataFrame,
provenance: Provenance | None,
) -> None:
meta: dict[str, str] = {}
if provenance is not None:
meta["dccd.provenance"] = provenance.model_dump_json()
# Write to a temp file then atomically rename, so a concurrent reader
# (load/last_timestamp/inventory) never sees a half-written file — it
# observes either the old complete file or the new one.
import os
tmp = file_path.with_suffix(file_path.suffix + f".tmp.{os.getpid()}.{threading.get_ident()}")
try:
# Polars >=1.x persists key/value metadata into the Parquet footer.
df.write_parquet(tmp, compression="snappy", metadata=meta or None)
os.replace(tmp, file_path)
finally:
if tmp.exists():
tmp.unlink(missing_ok=True)
[docs]
@staticmethod
def read_provenance(file_path: str | pathlib.Path) -> Provenance | None:
"""Return the :class:`Provenance` stored in a Parquet file, if any."""
import json
import pyarrow.parquet as pq
kv = pq.read_metadata(str(file_path)).metadata or {}
raw = kv.get(b"dccd.provenance") or kv.get("dccd.provenance")
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode()
return Provenance(**json.loads(raw))