Storage¶
The storage layer persists every data type to Parquet with nanosecond
timestamps, provenance and per-type deduplication, and keeps an append-only run
history in SQLite. ParquetStore is the unified
read/write interface; you rarely touch it directly — dccd.Client and the
operations use it for you.
Directory layout¶
{data_path}/{exchange}/ohlc/{pair}/{span}/YYYY.parquet
{data_path}/{exchange}/trades/{pair}/YYYY-MM-DD.parquet
{data_path}/{exchange}/orderbook/{pair}/YYYY-MM-DD.parquet
exchange — lowercase (
binance,kraken, …).pair —
BTC-USDT(slash replaced by hyphen).span — seconds label (
3600s); OHLC only.OHLC files are annual; trades and order-book files are daily.
Schema & integrity¶
All timestamps are TS — nanoseconds UTC (int64). Each write merges
into the existing file and deduplicates on the natural key per data type:
Data type |
Dedup key |
Notes |
|---|---|---|
OHLC |
|
One bar per span window. |
trades |
|
Many trades share a |
order book |
|
A snapshot’s levels all share one |
Writes are atomic (temp file + os.replace) and serialised per file, so a
reader never sees a half-written Parquet and concurrent writers can’t corrupt it.
Reading data¶
from dccd.storage.parquet import ParquetStore
from dccd.domain.dataset import DatasetId
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
store = ParquetStore("/data/crypto")
ds = DatasetId(exchange="binance", symbol=Symbol(base="BTC", quote="USDT"),
data_type=DataType.OHLC, span=3600)
df = store.load(ds) # a polars.DataFrame, sorted by TS
ParquetStore¶
- class ParquetStore(data_path)[source]¶
Read/write interface for a single DatasetId.
All timestamps (
TS) are nanoseconds UTC (int64).- Parameters:
- data_pathstr or Path
Root directory for all data files.
Examples
>>> import pathlib, tempfile >>> from dccd.domain.dataset import DatasetId >>> from dccd.domain.symbol import Symbol >>> from dccd.domain.types import DataType >>> store = ParquetStore('/tmp/data')
- inventory()[source]¶
Return list of dataset info dicts for all stored data.
Each entry includes
min_ts/max_ts(nanoseconds UTC) androwsso the UI can display the actual data time range.
Run history¶
- class RunsStore(db_path)[source]¶
Append-only SQLite store for JobRun records.
- Parameters:
- db_pathstr or Path
Path to the SQLite database file. Created if absent.
Examples
>>> import tempfile, os >>> with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: ... path = f.name >>> store = RunsStore(path) >>> store.create_run('r1', 'backfill:binance:BTC/USDT:ohlc', 'backfill', 'binance', 'BTC/USDT', 'ohlc') >>> os.unlink(path)
- create_run(run_id, spec_id, operation, exchange, symbol, data_type, started_at=None)[source]¶
Insert a new run row in the
runningstate.
- finish_run(run_id, state, ended_at=None, rows_written=0, error=None)[source]¶
Mark a run finished with its final state, row count and optional error.
Migration¶
One-shot migration of legacy (v2) Parquet files to the canonical v3 schema.
Two independent transformations, both idempotent:
Timestamp scale — v2 stored
TSin seconds; v3 uses nanoseconds.Column schema — v2 OHLC used
quoteVolume/weightedAverage; v3 usesquote_volume(+ atradescount).weightedAverageis dropped.
A file is rewritten if either its timestamp scale or its columns differ from canonical. Files already in v3 form are left untouched.
- migrate_parquet_to_ns(data_path, *, dry_run=False)[source]¶
Migrate all Parquet files under data_path to the canonical v3 schema.
- Parameters:
- data_pathstr or Path
Root data directory.
- dry_runbool
If True, only report what would change without modifying any file.
- Returns:
- list of dict
Per-file report with
path,rows,from_schema,to_schema,rescaled(TS s→ns applied) andmigratedkeys.