Source code for dccd

"""dccd — Download Crypto Currency Data v3.

Three usage modes:
  1. Python API   — ``from dccd import Client``
  2. CLI daemon   — ``dccd start --config config.yml``
  3. HTTP API/UI  — ``dccd ui --config config.yml``

Examples
--------
>>> from dccd import __version__
>>> isinstance(__version__, str)
True
"""

from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _pkg_version

try:
    __version__: str = _pkg_version("dccd")
except PackageNotFoundError:
    __version__ = "unknown"

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    import asyncio

    import polars as pl

    from dccd.application.config import AppConfig
    from dccd.sources.registry import SourceRegistry
    from dccd.storage.parquet import ParquetStore

__all__ = ["__version__", "Client"]


[docs] class Client: """Async context manager facade for dccd v3. Parameters ---------- config_path : str or None Path to config.yml. Resolved via XDG fallback when None. Examples -------- >>> import asyncio >>> async def example(): ... from dccd import Client ... async with Client() as c: ... pass """ def __init__(self, config_path: str | None = None) -> None: self._config_path = config_path self._config: AppConfig | None = None self._store: ParquetStore | None = None self._registry: SourceRegistry | None = None def _require_ready(self) -> tuple["SourceRegistry", "ParquetStore"]: if self._registry is None or self._store is None: raise RuntimeError("Client must be used inside 'async with Client() as c:'") return self._registry, self._store async def __aenter__(self) -> "Client": from dccd.application.config import AppConfig, load_config, resolve_config_path from dccd.application.service_factory import build_registry, build_store try: path = resolve_config_path(self._config_path) self._config = load_config(path) except FileNotFoundError: self._config = AppConfig() # Single source of truth for adapter wiring — same as CLI and API. self._store = build_store(self._config.settings.data_path) self._registry = build_registry() return self async def __aexit__(self, *args: Any) -> None: pass
[docs] async def backfill(self, exchange: str, symbol: str, data_type: str = "ohlc", span: int | None = None, start: str = "last") -> dict[str, Any]: """Backfill one dataset. Parameters ---------- exchange : str symbol : str E.g. ``'BTC/USDT'`` or ``'BTC-USD'``. data_type : str ``'ohlc'``, ``'trades'``, or ``'orderbook'``. span : int or None Required for OHLC. start : str ``'last'``, ``'origin'``, or ISO date. """ from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger from dccd.application.operations import backfill as do_backfill from dccd.domain.symbol import Symbol from dccd.domain.types import DataType sym = Symbol.parse(symbol) target = JobTarget( exchange=exchange, symbol=sym, data_type=DataType(data_type), span=span, ) spec = JobSpec( id=JobSpec.make_id("backfill", target), operation="backfill", target=target, trigger=Trigger(kind="once"), params=JobParams(start=start), origin="runtime", ) registry, store = self._require_ready() return await do_backfill(spec, registry=registry, store=store)
[docs] async def stream(self, exchange: str, symbol: str, data_type: str = "trades", span: int | None = None, depth: int | None = None, snapshot_interval: int | None = None, stop_event: "asyncio.Event | None" = None) -> None: """Stream live data until *stop_event* is set. Parameters mirror :meth:`backfill`; ``stop_event`` is an :class:`asyncio.Event` used to stop the stream cleanly. """ from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger from dccd.application.operations import stream as do_stream from dccd.domain.symbol import Symbol from dccd.domain.types import DataType target = JobTarget(exchange=exchange, symbol=Symbol.parse(symbol), data_type=DataType(data_type), span=span) spec = JobSpec( id=JobSpec.make_id("stream", target), operation="stream", target=target, trigger=Trigger(kind="supervised"), params=JobParams(depth=depth, snapshot_interval=snapshot_interval), origin="runtime", ) registry, store = self._require_ready() await do_stream(spec, registry=registry, store=store, stop_event=stop_event)
[docs] def read(self, exchange: str, symbol: str, data_type: str = "ohlc", span: int | None = None, start_ns: int | None = None, end_ns: int | None = None) -> "pl.DataFrame": """Read stored data for a dataset as a Polars DataFrame.""" from typing import cast from dccd.application.jobs import JobTarget from dccd.application.operations import read as do_read from dccd.domain.symbol import Symbol from dccd.domain.types import DataType _, store = self._require_ready() target = JobTarget(exchange=exchange, symbol=Symbol.parse(symbol), data_type=DataType(data_type), span=span) return cast("pl.DataFrame", do_read(target, store=store, start_ns=start_ns, end_ns=end_ns))
[docs] def inventory(self) -> list[dict[str, Any]]: """List stored datasets.""" from dccd.application.operations import inventory _, store = self._require_ready() return inventory(store=store)