Source code for dccd.storage

#!/usr/bin/env python3
# coding: utf-8

"""Unified data storage for all dccd data types.

:class:`DataStore` is the single point of entry for reading and writing
crypto data regardless of exchange, data type (OHLC, trades, order book),
or collection method (REST or WebSocket).

Directory layout
----------------
::

    {data_path}/{exchange}/ohlc/{pair}/{span}/YYYY.parquet
    {data_path}/{exchange}/trades/{pair}/YYYY-MM-DD.parquet
    {data_path}/{exchange}/orderbook/{pair}/YYYY-MM-DD.parquet

- *exchange*: lowercase (``'binance'``, ``'kraken'``…)
- *pair*: ``BTC-USDT`` (slash replaced by hyphen — slash is invalid in paths)
- *span*: short label ``'1m'``, ``'1h'``, ``'1d'``… (OHLC only)
- Granularity: **annual** for OHLC, **daily** for trades/orderbook

"""

from __future__ import annotations

import logging
import pathlib
from datetime import datetime, timezone

import polars as pl

from dccd.tools.date_time import span_label

__all__ = ['DataStore']

logger = logging.getLogger(__name__)

_DEFAULT_START_TS: int = 1325376000  # 2012-01-01 00:00:00 UTC


[docs] class DataStore: """Unified read/write interface for a single (exchange, pair, data_type). Parameters ---------- data_path : str Root directory for all local data files (e.g. ``'/data/crypto'``). exchange : str Exchange name, lowercase (e.g. ``'binance'``). pair : str Trading pair in ``'CRYPTO/FIAT'`` format (e.g. ``'BTC/USDT'``). The slash is converted to a hyphen for the file-system path. span : int or None Candle interval in seconds. Required for ``data_type='ohlc'``; pass ``None`` for trades and orderbook. data_type : {'ohlc', 'trades', 'orderbook'} Kind of data stored in this instance. Attributes ---------- directory : pathlib.Path Absolute directory where files are stored. Created on first access. """ def __init__( self, data_path: str, exchange: str, pair: str, span: int | None, data_type: str = 'ohlc', ) -> None: if data_type not in ('ohlc', 'trades', 'orderbook'): raise ValueError( f"data_type must be 'ohlc', 'trades', or 'orderbook', got {data_type!r}" ) if data_type == 'ohlc' and span is None: raise ValueError("span is required for data_type='ohlc'") self.data_path = data_path self.exchange = exchange.lower() self._pair_slug = pair.replace('/', '-') self.span = span self.data_type = data_type self._dir: pathlib.Path | None = None @property def directory(self) -> pathlib.Path: """Absolute directory for this store (created if absent).""" if self._dir is None: root = pathlib.Path(self.data_path) / self.exchange if self.data_type == 'ohlc': assert self.span is not None self._dir = root / 'ohlc' / self._pair_slug / span_label(self.span) else: self._dir = root / self.data_type / self._pair_slug self._dir.mkdir(parents=True, exist_ok=True) return self._dir # ------------------------------------------------------------------ # Write # ------------------------------------------------------------------
[docs] def save(self, df: pl.DataFrame) -> None: """Write *df* into the appropriate period file(s), merging with existing data. OHLC data is grouped by year; trades and orderbook by calendar day. Rows are merged on ``'TS'`` (dedup ``keep='last'``), sorted ascending, and written as Parquet. Parameters ---------- df : pl.DataFrame Data to persist. Must contain a ``'TS'`` column (Unix timestamps). """ if len(df) == 0: return if 'TS' not in df.columns: raise ValueError("DataFrame must contain a 'TS' column") if self.data_type == 'ohlc': self._save_grouped(df, fmt='%Y') else: self._save_grouped(df, fmt='%Y-%m-%d')
def _save_grouped(self, df: pl.DataFrame, fmt: str) -> None: df_with_period = df.with_columns( pl.from_epoch('TS', time_unit='s').dt.strftime(fmt).alias('_period') ) for label in df_with_period['_period'].unique().sort().to_list(): group = df_with_period.filter(pl.col('_period') == label).drop('_period') file_path = self.directory / f'{label}.parquet' if file_path.exists(): try: existing = pl.read_parquet(file_path) group = ( pl.concat([existing, group]) .unique(subset=['TS'], keep='last') .sort('TS') ) except Exception: logger.warning('Corrupted file %s — overwriting.', file_path) group = group.unique(subset=['TS'], keep='last').sort('TS') else: group = group.unique(subset=['TS'], keep='last').sort('TS') group.write_parquet(file_path) # ------------------------------------------------------------------ # Read # ------------------------------------------------------------------
[docs] def load( self, start: int | None = None, end: int | None = None, ) -> pl.DataFrame: """Load and concatenate all period files covering ``[start, end]``. Parameters ---------- start : int or None, optional Inclusive lower bound (Unix timestamp). ``None`` means no lower bound. end : int or None, optional Inclusive upper bound (Unix timestamp). ``None`` means no upper bound. Returns ------- pl.DataFrame Concatenated data, sorted by ``'TS'``, filtered to ``[start, end]``. Empty DataFrame if no files are found. """ files = sorted(self.directory.glob('*.parquet')) if not files: return pl.DataFrame() pieces: list[pl.DataFrame] = [] for f in files: try: pieces.append(pl.read_parquet(f)) except Exception: logger.warning('Skipping corrupted file %s', f) if not pieces: return pl.DataFrame() df = pl.concat(pieces).sort('TS') if start is not None: df = df.filter(pl.col('TS') >= start) if end is not None: df = df.filter(pl.col('TS') <= end) return df
# ------------------------------------------------------------------ # Metadata # ------------------------------------------------------------------
[docs] def existing_periods(self) -> list[str]: """List period labels for all available files. Returns ------- list of str Sorted list of year strings (``['2024', '2025']``) for OHLC, or date strings (``['2026-05-20', '2026-05-21']``) for trades/orderbook. """ return sorted(f.stem for f in self.directory.glob('*.parquet'))
[docs] def last_timestamp(self) -> int | None: """Return the last ``TS`` value in the most recent period file. Returns ------- int or None Unix timestamp of the last row, or ``None`` if no data exists. """ periods = self.existing_periods() if not periods: return None # Iterate from the most recent period backward until a readable file is found. for period in reversed(periods): file_path = self.directory / f'{period}.parquet' try: df = pl.read_parquet(file_path, columns=['TS']) if len(df) > 0: return int(df['TS'].max()) except Exception: logger.warning('Corrupted file %s — removing.', file_path) file_path.unlink(missing_ok=True) return None
[docs] def is_period_complete(self, year: int) -> bool: """Return True if the parquet file for *year* contains all expected rows. Parameters ---------- year : int Calendar year to check (e.g. ``2024``). Returns ------- bool ``False`` for non-OHLC stores, missing files, or when the row count is below the expected number of candles for that year. """ if self.data_type != 'ohlc' or self.span is None: return False file_path = self.directory / f'{year}.parquet' if not file_path.exists(): return False df = pl.read_parquet(file_path, columns=['TS']) year_start = datetime(year, 1, 1, tzinfo=timezone.utc) year_end = datetime(year + 1, 1, 1, tzinfo=timezone.utc) expected = int((year_end - year_start).total_seconds()) // self.span return len(df) >= expected
[docs] def missing_intervals(self, start: int, end: int) -> list[tuple[int, int]]: """Return the list of ``(start, end)`` intervals within ``[start, end]`` that still need to be downloaded. For OHLC stores the method inspects existing annual parquet files: complete past years are skipped entirely; incomplete or absent years yield an interval from the last saved timestamp (``+ span``) to the end of that year. The current calendar year always extends from the last saved row to *end*. For trades / orderbook stores (no ``span``) the method falls back to a simple resume: one interval from ``last_timestamp + span`` (or *start* if no data) to *end*. Parameters ---------- start : int Desired start timestamp (Unix seconds). end : int Desired end timestamp (Unix seconds). Returns ------- list of (int, int) Ordered list of ``(ivl_start, ivl_end)`` pairs to download. Empty list means all data is already present. """ if self.data_type != 'ohlc' or self.span is None: last = self.last_timestamp() effective = max(start, last) if last is not None else start return [(effective, end)] if effective < end else [] current_year = datetime.now(tz=timezone.utc).year start_year = datetime.fromtimestamp(start, tz=timezone.utc).year end_year = datetime.fromtimestamp(end, tz=timezone.utc).year intervals: list[tuple[int, int]] = [] for year in range(start_year, end_year + 1): year_start_ts = int(datetime(year, 1, 1, tzinfo=timezone.utc).timestamp()) year_end_ts = int(datetime(year + 1, 1, 1, tzinfo=timezone.utc).timestamp()) ivl_start = max(start, year_start_ts) ivl_end = min(end, year_end_ts) if ivl_start >= ivl_end: continue file_path = self.directory / f'{year}.parquet' if file_path.exists(): if year < current_year and self.is_period_complete(year): continue # full year already on disk — skip df = pl.read_parquet(file_path, columns=['TS']) if len(df) > 0: file_min = int(df['TS'].min()) file_max = int(df['TS'].max()) # Gap before the first saved row (e.g. backfill requested # from an earlier date than the file's first candle) if ivl_start < file_min: intervals.append((ivl_start, file_min)) # Trailing gap after the last saved row trailing = file_max + self.span if trailing < ivl_end: intervals.append((trailing, ivl_end)) continue # file exists but is empty: fall through to full-interval append if ivl_start < ivl_end: intervals.append((ivl_start, ivl_end)) return intervals