Source code for dccd.storage.coverage_sqlite

"""SQLite-backed coverage manifest.

Records, per dataset, the extent of data we have collected — independent of the
Parquet files themselves. This is what lets local data be **dropped** (to free
disk) without forcing a re-download: ``backfill(start="last")`` falls back to the
manifest's ``max_ts`` when no local file is present, so it resumes from where
collection left off instead of from the bounded default lookback.

Lives at ``{data_path}/.dccd/coverage.db`` — the ``.dccd`` directory is never
purged, so the manifest survives a local-data drop.
"""

from __future__ import annotations

import logging
import pathlib
import sqlite3
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generator

if TYPE_CHECKING:
    from dccd.domain.dataset import DatasetId

__all__ = ["CoverageStore"]

logger = logging.getLogger(__name__)

_SCHEMA = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;

CREATE TABLE IF NOT EXISTS coverage (
    dataset_id  TEXT PRIMARY KEY,
    exchange    TEXT NOT NULL,
    symbol      TEXT NOT NULL,
    data_type   TEXT NOT NULL,
    span        INTEGER,
    min_ts      INTEGER,
    max_ts      INTEGER,
    rows        INTEGER DEFAULT 0,
    updated_at  INTEGER NOT NULL
);
"""


[docs] class CoverageStore: """Per-dataset coverage manifest (min/max ts + row count). Parameters ---------- db_path : str or Path Path to the SQLite database file. Created if absent. """ def __init__(self, db_path: str | pathlib.Path) -> None: self._path = pathlib.Path(db_path) self._path.parent.mkdir(parents=True, exist_ok=True) with self._conn() as conn: conn.executescript(_SCHEMA) @contextmanager def _conn(self) -> Generator[sqlite3.Connection, None, None]: conn = sqlite3.connect(str(self._path)) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close()
[docs] def record( self, ds: "DatasetId", *, min_ts: int | None, max_ts: int | None, rows_added: int = 0, ) -> None: """Upsert coverage for *ds*, widening the [min_ts, max_ts] envelope. ``min_ts``/``max_ts`` are merged with any existing row (min of mins, max of maxes), so a later backfill never *narrows* the recorded extent; ``rows_added`` accumulates (an approximate stored-row tally for display). """ key = str(ds) now = int(time.time() * 1_000_000_000) with self._conn() as conn: row = conn.execute( "SELECT min_ts, max_ts, rows FROM coverage WHERE dataset_id=?", (key,), ).fetchone() if row is not None: new_min = _merge(row["min_ts"], min_ts, min) new_max = _merge(row["max_ts"], max_ts, max) new_rows = (row["rows"] or 0) + rows_added conn.execute( "UPDATE coverage SET min_ts=?, max_ts=?, rows=?, updated_at=? " "WHERE dataset_id=?", (new_min, new_max, new_rows, now, key), ) else: conn.execute( "INSERT INTO coverage (dataset_id, exchange, symbol, " "data_type, span, min_ts, max_ts, rows, updated_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (key, ds.exchange, str(ds.symbol), ds.data_type.value, ds.span, min_ts, max_ts, rows_added, now), )
[docs] def get_max_ts(self, ds: "DatasetId") -> int | None: """Return the recorded ``max_ts`` for *ds*, or ``None`` if unknown.""" with self._conn() as conn: row = conn.execute( "SELECT max_ts FROM coverage WHERE dataset_id=?", (str(ds),) ).fetchone() return row["max_ts"] if row else None
[docs] def list_all(self) -> list[dict[str, Any]]: """Return every coverage row (most recently updated first).""" with self._conn() as conn: rows = conn.execute( "SELECT * FROM coverage ORDER BY updated_at DESC" ).fetchall() return [dict(r) for r in rows]
def _merge(existing: int | None, incoming: int | None, op: Any) -> int | None: """Combine two optional ints with *op* (min/max), ignoring ``None``.""" vals = [v for v in (existing, incoming) if v is not None] return op(vals) if vals else None