Storage

The storage layer persists every data type to Parquet with nanosecond timestamps, provenance and per-type deduplication, and keeps an append-only run history in SQLite. ParquetStore is the unified read/write interface; you rarely touch it directly — dccd.Client and the operations use it for you.

Directory layout

{data_path}/{exchange}/ohlc/{pair}/{span}/YYYY.parquet
{data_path}/{exchange}/trades/{pair}/YYYY-MM-DD.parquet
{data_path}/{exchange}/orderbook/{pair}/YYYY-MM-DD.parquet
  • exchange — lowercase (binance, kraken, …).

  • pairBTC-USDT (slash replaced by hyphen).

  • span — seconds label (3600s); OHLC only.

  • OHLC files are annual; trades and order-book files are daily.

Schema & integrity

All timestamps are TSnanoseconds UTC (int64). Each write merges into the existing file and deduplicates on the natural key per data type:

Data type

Dedup key

Notes

OHLC

TS

One bar per span window.

trades

tid (else TS, price, amount, side)

Many trades share a TS — keying on TS alone would lose them.

order book

TS, side, price

A snapshot’s levels all share one TS.

Writes are atomic (temp file + os.replace) and serialised per file, so a reader never sees a half-written Parquet and concurrent writers can’t corrupt it.

Reading data

from dccd.storage.parquet import ParquetStore
from dccd.domain.dataset import DatasetId
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType

store = ParquetStore("/data/crypto")
ds = DatasetId(exchange="binance", symbol=Symbol(base="BTC", quote="USDT"),
               data_type=DataType.OHLC, span=3600)
df = store.load(ds)          # a polars.DataFrame, sorted by TS

ParquetStore

class ParquetStore(data_path)[source]

Read/write interface for a single DatasetId.

All timestamps (TS) are nanoseconds UTC (int64).

Parameters:
data_pathstr or Path

Root directory for all data files.

Examples

>>> import pathlib, tempfile
>>> from dccd.domain.dataset import DatasetId
>>> from dccd.domain.symbol import Symbol
>>> from dccd.domain.types import DataType
>>> store = ParquetStore('/tmp/data')
directory(ds)[source]

Return the directory for ds, creating it if needed.

inventory()[source]

Return list of dataset info dicts for all stored data.

Each entry includes min_ts / max_ts (nanoseconds UTC) and rows so the UI can display the actual data time range.

last_timestamp(ds)[source]

Return last TS in ns, or None if no data.

load(ds, start_ns=None, end_ns=None)[source]

Load data for ds in the given nanosecond range.

missing_intervals(ds, start_ns, end_ns)[source]

Return gaps as (start_ns, end_ns) pairs within [start_ns, end_ns].

static read_provenance(file_path)[source]

Return the Provenance stored in a Parquet file, if any.

save(ds, records, provenance=None)[source]

Write records to Parquet, merging with existing data.

Parameters:
dsDatasetId
recordslist

OHLCBar, Trade, or OrderBookSnapshot objects.

provenanceProvenance or None
Returns:
int

Number of rows written.

Run history

class RunsStore(db_path)[source]

Append-only SQLite store for JobRun records.

Parameters:
db_pathstr or Path

Path to the SQLite database file. Created if absent.

Examples

>>> import tempfile, os
>>> with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
...     path = f.name
>>> store = RunsStore(path)
>>> store.create_run('r1', 'backfill:binance:BTC/USDT:ohlc', 'backfill', 'binance', 'BTC/USDT', 'ohlc')
>>> os.unlink(path)
active_runs()[source]

Runs currently running or reconnecting.

append_log(run_id, msg, max_lines=100)[source]

Append a log line to the run’s bounded log_tail.

create_run(run_id, spec_id, operation, exchange, symbol, data_type, started_at=None)[source]

Insert a new run row in the running state.

finish_run(run_id, state, ended_at=None, rows_written=0, error=None)[source]

Mark a run finished with its final state, row count and optional error.

get_run(run_id)[source]

Return one run as a dict, or None if unknown.

list_runs(spec_id=None, state=None, limit=50)[source]

List recent runs, most recent first, optionally filtered.

update_progress(run_id, progress)[source]

Persist the latest progress dict for run_id (polled by the UI).

Migration

One-shot migration of legacy (v2) Parquet files to the canonical v3 schema.

Two independent transformations, both idempotent:

  1. Timestamp scale — v2 stored TS in seconds; v3 uses nanoseconds.

  2. Column schema — v2 OHLC used quoteVolume/weightedAverage; v3 uses quote_volume (+ a trades count). weightedAverage is dropped.

A file is rewritten if either its timestamp scale or its columns differ from canonical. Files already in v3 form are left untouched.

migrate_parquet_to_ns(data_path, *, dry_run=False)[source]

Migrate all Parquet files under data_path to the canonical v3 schema.

Parameters:
data_pathstr or Path

Root data directory.

dry_runbool

If True, only report what would change without modifying any file.

Returns:
list of dict

Per-file report with path, rows, from_schema, to_schema, rescaled (TS s→ns applied) and migrated keys.

needs_migration(file_path)[source]

Return True if the file is not already in canonical v3 form.