Source code for dccd

"""dccd — Download Crypto Currency Data v3.

Three usage modes:
  1. Python API   — ``from dccd import Client``
  2. CLI daemon   — ``dccd start --config config.yml``
  3. HTTP API/UI  — ``dccd ui --config config.yml``

Examples
--------
>>> from dccd import __version__
>>> isinstance(__version__, str)
True
"""

from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _pkg_version

try:
    __version__: str = _pkg_version("dccd")
except PackageNotFoundError:
    __version__ = "unknown"

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    import asyncio

    import polars as pl

    from dccd.application.config import AppConfig
    from dccd.sources.registry import SourceRegistry
    from dccd.storage.parquet import ParquetStore

__all__ = ["__version__", "Client"]


[docs] class Client: """Async facade for dccd — the one-stop entry point. Wires every exchange adapter and the local Parquet store, and exposes the four operations as methods: :meth:`backfill` (download history), :meth:`stream` (collect live), :meth:`read` (load stored data) and :meth:`inventory` (list datasets). Use it as an async context manager so the shared HTTP client is opened and closed cleanly. Parameters ---------- config_path : str or None Path to ``config.yml``. Resolved via the XDG fallback when ``None``; only ``settings.data_path`` is needed for direct use. See Also -------- dccd.application.operations.backfill : the underlying operation. Examples -------- >>> import asyncio >>> async def main(): ... async with Client() as c: ... await c.backfill('binance', 'BTC/USDT', 'ohlc', span=3600, ... start='2024-01-01') ... return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).height >>> asyncio.run(main()) # doctest: +SKIP 168 """ def __init__(self, config_path: str | None = None) -> None: self._config_path = config_path self._config: AppConfig | None = None self._store: ParquetStore | None = None self._registry: SourceRegistry | None = None def _require_ready(self) -> tuple["SourceRegistry", "ParquetStore"]: if self._registry is None or self._store is None: raise RuntimeError("Client must be used inside 'async with Client() as c:'") return self._registry, self._store async def __aenter__(self) -> "Client": from dccd.application.config import AppConfig, load_config, resolve_config_path from dccd.application.service_factory import build_registry, build_store try: path = resolve_config_path(self._config_path) self._config = load_config(path) except FileNotFoundError: self._config = AppConfig() # Single source of truth for adapter wiring — same as CLI and API. self._store = build_store(self._config.settings.data_path) self._registry = build_registry() return self async def __aexit__(self, *args: Any) -> None: pass
[docs] async def backfill(self, exchange: str, symbol: str, data_type: str = "ohlc", span: int | None = None, start: str = "last") -> dict[str, Any]: """Download historical data for one dataset into the local store. Resumes and deduplicates: running it again only adds what is missing. Trades are cursor-paginated and drain the whole requested window. Parameters ---------- exchange : str Exchange name, e.g. ``'binance'``. See :doc:`/exchanges`. symbol : str Trading pair, ``'BTC/USDT'`` or ``'BTC-USD'``. data_type : str, default 'ohlc' ``'ohlc'``, ``'trades'`` or ``'orderbook'``. span : int or None Candle size in seconds — **required** for ``'ohlc'``. start : str, default 'last' ``'last'`` (resume from the last stored row), ``'origin'`` (full history) or an ISO date such as ``'2024-01-01'``. Returns ------- dict ``{'run_id', 'rows_written', 'start_ns', 'end_ns'}`` on success; ``{'run_id', 'rows_written', 'error'}`` on failure. See Also -------- read : load the result. stream : live collection instead of history. Examples -------- >>> async def main(): ... async with Client() as c: ... r = await c.backfill('binance', 'BTC/USDT', 'ohlc', ... span=3600, start='2024-01-01') ... return r['rows_written'] >>> asyncio.run(main()) # doctest: +SKIP 168 """ from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger from dccd.application.operations import backfill as do_backfill from dccd.domain.symbol import Symbol from dccd.domain.types import DataType sym = Symbol.parse(symbol) target = JobTarget( exchange=exchange, symbol=sym, data_type=DataType(data_type), span=span, ) spec = JobSpec( id=JobSpec.make_id("backfill", target), operation="backfill", target=target, trigger=Trigger(kind="once"), params=JobParams(start=start), origin="runtime", ) registry, store = self._require_ready() return await do_backfill(spec, registry=registry, store=store)
[docs] async def stream(self, exchange: str, symbol: str, data_type: str = "trades", span: int | None = None, depth: int | None = None, snapshot_interval: int | None = None, stop_event: "asyncio.Event | None" = None) -> None: """Collect live data over WebSocket until *stop_event* is set. Parameters mirror :meth:`backfill`. ``depth`` / ``snapshot_interval`` apply to order-book streams. For long-running collection prefer a ``supervised`` stream job in the config and ``dccd start`` (auto-reconnect). Parameters ---------- exchange, symbol : str data_type : str, default 'trades' ``'ohlc'``, ``'trades'`` or ``'orderbook'``. span, depth, snapshot_interval : int or None stop_event : asyncio.Event or None Set it to stop the stream cleanly. See Also -------- backfill : download history instead of live data. Examples -------- >>> async def main(): ... async with Client() as c: ... stop = asyncio.Event() ... asyncio.get_running_loop().call_later(10, stop.set) ... await c.stream('binance', 'BTC/USDT', 'trades', stop_event=stop) >>> asyncio.run(main()) # doctest: +SKIP """ from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger from dccd.application.operations import stream as do_stream from dccd.domain.symbol import Symbol from dccd.domain.types import DataType target = JobTarget(exchange=exchange, symbol=Symbol.parse(symbol), data_type=DataType(data_type), span=span) spec = JobSpec( id=JobSpec.make_id("stream", target), operation="stream", target=target, trigger=Trigger(kind="supervised"), params=JobParams(depth=depth, snapshot_interval=snapshot_interval), origin="runtime", ) registry, store = self._require_ready() await do_stream(spec, registry=registry, store=store, stop_event=stop_event)
[docs] def read(self, exchange: str, symbol: str, data_type: str = "ohlc", span: int | None = None, start_ns: int | None = None, end_ns: int | None = None) -> "pl.DataFrame": """Read stored data for a dataset as a Polars DataFrame. Parameters ---------- exchange, symbol : str data_type : str, default 'ohlc' span : int or None Required for ``'ohlc'``. start_ns, end_ns : int or None Optional inclusive nanosecond bounds. Returns ------- polars.DataFrame Sorted by ``TS`` (nanoseconds UTC), deduplicated. Empty if no data. See Also -------- backfill : populate the dataset. inventory : list what is stored. Examples -------- >>> async def main(): ... async with Client() as c: ... return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).columns >>> asyncio.run(main()) # doctest: +SKIP ['TS', 'open', 'high', 'low', 'close', 'volume', 'quote_volume', 'trades'] """ from typing import cast from dccd.application.jobs import JobTarget from dccd.application.operations import read as do_read from dccd.domain.symbol import Symbol from dccd.domain.types import DataType _, store = self._require_ready() target = JobTarget(exchange=exchange, symbol=Symbol.parse(symbol), data_type=DataType(data_type), span=span) return cast("pl.DataFrame", do_read(target, store=store, start_ns=start_ns, end_ns=end_ns))
[docs] def inventory(self) -> list[dict[str, Any]]: """List every stored dataset with its coverage. Returns ------- list of dict One entry per dataset with ``exchange``, ``pair``, ``data_type``, ``span``, ``rows``, ``min_ts`` and ``max_ts`` (nanoseconds UTC). Examples -------- >>> async def main(): ... async with Client() as c: ... return [d['pair'] for d in c.inventory()] >>> asyncio.run(main()) # doctest: +SKIP ['BTC-USDT'] """ from dccd.application.operations import inventory _, store = self._require_ready() return inventory(store=store)