Source code for dccd.storage.migrate

"""One-shot migration of legacy (v2) Parquet files to the canonical v3 schema.

Two independent transformations, both idempotent:

1. **Timestamp scale** — v2 stored ``TS`` in seconds; v3 uses nanoseconds.
2. **Column schema** — v2 OHLC used ``quoteVolume``/``weightedAverage``; v3 uses
   ``quote_volume`` (+ a ``trades`` count). ``weightedAverage`` is dropped.

A file is rewritten if *either* its timestamp scale *or* its columns differ
from canonical. Files already in v3 form are left untouched.
"""

from __future__ import annotations

import logging
import pathlib
from typing import Any

from dccd.domain.types import DataType
from dccd.storage.parquet import canonicalize

__all__ = ["migrate_parquet_to_ns", "needs_migration"]

logger = logging.getLogger(__name__)

_NS_THRESHOLD = 1_000_000_000_000_000_000  # ~2001 in ns; any real ns ts exceeds it
_DTYPE_DIRS = {
    "ohlc": DataType.OHLC,
    "trades": DataType.TRADES,
    "orderbook": DataType.ORDERBOOK,
}


def _infer_data_type(file_path: pathlib.Path) -> DataType | None:
    """Infer the DataType from the storage layout (.../{ohlc,trades,orderbook}/...)."""
    for part in file_path.parts:
        if part in _DTYPE_DIRS:
            return _DTYPE_DIRS[part]
    return None


def _needs_ts_rescale(df: Any) -> bool:
    if "TS" not in df.columns or len(df) == 0:
        return False
    return int(df["TS"].max()) < _NS_THRESHOLD


def _needs_schema_change(df: Any, data_type: DataType) -> bool:
    from dccd.storage.parquet import _SCHEMAS

    return list(df.columns) != list(_SCHEMAS[data_type].keys())


def needs_migration(file_path: pathlib.Path) -> bool:
    """Return True if the file is not already in canonical v3 form."""
    try:
        import polars as pl

        df = pl.read_parquet(file_path)
        if len(df) == 0:
            return False
        data_type = _infer_data_type(file_path)
        if data_type is None:
            return _needs_ts_rescale(df)
        return _needs_ts_rescale(df) or _needs_schema_change(df, data_type)
    except Exception:
        return False


[docs] def migrate_parquet_to_ns( data_path: str | pathlib.Path, *, dry_run: bool = False, ) -> list[dict[str, Any]]: """Migrate all Parquet files under *data_path* to the canonical v3 schema. Parameters ---------- data_path : str or Path Root data directory. dry_run : bool If True, only report what would change without modifying any file. Returns ------- list of dict Per-file report with ``path``, ``rows``, ``from_schema``, ``to_schema``, ``rescaled`` (TS s→ns applied) and ``migrated`` keys. """ import polars as pl root = pathlib.Path(data_path) report: list[dict[str, Any]] = [] for f in sorted(root.rglob("*.parquet")): try: df = pl.read_parquet(f) except Exception as exc: logger.error("Failed to read %s: %s", f, exc) report.append({"path": str(f), "rows": -1, "migrated": False, "error": str(exc)}) continue n = len(df) data_type = _infer_data_type(f) from_schema = list(df.columns) rescale = _needs_ts_rescale(df) schema_change = data_type is not None and _needs_schema_change(df, data_type) if n == 0 or (not rescale and not schema_change): report.append({ "path": str(f), "rows": n, "from_schema": from_schema, "to_schema": from_schema, "rescaled": False, "migrated": False, }) continue new = df if rescale: new = new.with_columns((pl.col("TS") * 1_000_000_000).alias("TS")) if schema_change and data_type is not None: new = canonicalize(new, data_type) to_schema = list(new.columns) if not dry_run: new.write_parquet(f, compression="snappy") logger.info("Migrated %s (%d rows): %s%s", f, n, from_schema, to_schema) else: logger.info("[dry-run] Would migrate %s (%d rows): %s%s", f, n, from_schema, to_schema) report.append({ "path": str(f), "rows": n, "from_schema": from_schema, "to_schema": to_schema, "rescaled": rescale, "migrated": not dry_run, }) return report