Daemon (dccd.daemon)

Daemon module for autonomous data collection.

Submodules

backfill

Historical OHLC backfill engine for the dccd daemon.

config

Declarative configuration for the dccd daemon.

health

Health monitoring for the dccd daemon.

storage

Remote storage abstraction for the dccd daemon.

scheduler

Historical data scheduler for the dccd daemon.

stream_manager

Real-time stream manager and periodic sync service for the dccd daemon.

The daemon module provides an autonomous, server-side data collector. It reads a declarative YAML configuration, runs historical REST jobs on a schedule (APScheduler), opens WebSocket streams for real-time collection, and periodically syncs all local data to one or more remote destinations via rclone. Per-job metrics and a rotating log file are maintained by HealthMonitor.

Quick start (CLI)

  1. Install the daemon extra:

    pip install "dccd[daemon]"
    
  2. Write a configuration file (see Configuration):

    settings:
      data_path: /data/crypto/
      timezone: UTC          # 'local', 'UTC', or any IANA name
    
    storage:
      remotes:
        - provider: rclone
          remote: "mynas:crypto/"
      sync_interval: 3600
    
    histo_jobs:
      - exchange: binance
        pairs: [BTC/USDT, ETH/USDT]
        span: 3600          # candle interval in seconds
        format: parquet     # stored as annual Parquet: binance/ohlc/BTC-USDT/1h/YYYY.parquet
    
    # Optional real-time streams
    stream_jobs:
      - exchange: binance
        pairs: [BTC/USDT]
        channels: [trades, book]
        time_step: 60       # stored daily: binance/trades/BTC-USDT/YYYY-MM-DD.parquet
    
    # Optional webhook alerts on consecutive failures
    alerts:
      webhook_url: "https://hooks.slack.com/services/..."
      max_consecutive_errors: 3
    
  3. Validate, backfill, collect, or start the daemon:

    # Check config without running anything
    dccd validate --config config.yml
    # Config: config.yml
    #   data_path          : /data/crypto/
    #   timezone           : UTC
    #   remotes            : 1
    #   histo_jobs         : 2
    #   stream_jobs        : 1
    # Config is valid.
    
    # Backfill full OHLC history for all histo_jobs (resumable)
    dccd backfill --config config.yml --start "2020-01-01 00:00:00"
    
    # Dry run — estimate windows and total time without downloading
    dccd backfill --config config.yml --dry-run
    
    # Restrict to one exchange or specific pairs
    dccd backfill --config config.yml --exchange kraken
    dccd backfill --config config.yml --pairs BTC/USDT ETH/USDT
    
    # Run all jobs in parallel threads
    dccd backfill --config config.yml --parallel
    
    # One incremental batch per histo job, then exit (for cron)
    dccd collect --config config.yml
    # Done. successes=2 failures=0
    
    # Continuous daemon (block until Ctrl-C / SIGTERM)
    dccd start --config config.yml
    
    # Inspect per-job health after the daemon has run
    dccd status --config config.yml
    # job                      last_run          last_success       rows  errors
    # -------------------------------------------------------------------------
    # binance/BTC/USDT         2026-05-17 10:00  2026-05-17 10:00   1200       0
    # binance/ETH/USDT         2026-05-17 10:00  2026-05-17 10:00    980       0
    
    # Add a new histo job to an existing config in-place
    dccd add --exchange kraken --pair ETH/USD --span 86400 --config config.yml
    
    # Remove a pair (or whole job) from the config
    dccd remove --exchange kraken --pair ETH/USD --span 86400 --config config.yml
    
    # Inspect all data stored on disk (OHLC, trades, orderbook)
    dccd inventory --config config.yml
    
    # Enable shell tab-completion (run once after install)
    dccd --install-completion
    # exchange   pair         type        span   from         to           rows  gaps
    # -------------------------------------------------------------------------------
    # binance    BTC/USDT     ohlc        1h     2020-01-01   2026-05-24  1 234     0
    # binance    BTC/USDT     trades      -      2021-01-01   2026-05-24     50     0
    

Note

The --config option is optional for all commands. When omitted, dccd searches for ./config.yml in the current directory first, then falls back to $XDG_CONFIG_HOME/dccd/config.yml (default ~/.config/dccd/config.yml).

Python API

Use the components directly when you need to embed the daemon inside your own process or customise startup/shutdown logic. The script examples/daemon_example.py shows the full wiring:

from dccd.daemon.config import load_config
from dccd.daemon.health import HealthMonitor
from dccd.daemon.scheduler import build_histo_scheduler, run_once
from dccd.daemon.stream_manager import StreamManager

config  = load_config('config.yml')
health  = HealthMonitor(config.settings.data_path, config.alerts)

# --- one-shot mode (cron-friendly) ---
run_once(config, health=health)

# --- or continuous mode ---
scheduler  = build_histo_scheduler(config, health=health)
stream_mgr = StreamManager(config, health=health)
scheduler.start()
stream_mgr.start()
# … wait for stop signal …
scheduler.shutdown(wait=False)
stream_mgr.stop()

Graceful shutdown

When embedding the daemon in your own process, use a threading.Event to handle SIGINT / SIGTERM cleanly:

import signal
import threading

stop_event = threading.Event()
signal.signal(signal.SIGINT,  lambda *_: stop_event.set())
signal.signal(signal.SIGTERM, lambda *_: stop_event.set())

scheduler  = build_histo_scheduler(config, health=health)
stream_mgr = StreamManager(config, health=health)
scheduler.start()
stream_mgr.start()

stop_event.wait()          # blocks until Ctrl-C or SIGTERM

scheduler.shutdown(wait=False)
stream_mgr.stop()

Health monitoring

HealthMonitor maintains per-job metrics in a rotating metrics.json file and writes a dedicated log per job. After a run you can inspect collected metrics programmatically:

from dccd.daemon.health import HealthMonitor

health = HealthMonitor(config.settings.data_path, config.alerts)

# … run scheduler / stream_mgr …

metrics = health.get_metrics()
for key, m in metrics.items():
    print(f'{key:40s}  rows={m.rows_collected:6d}  errors={m.errors_count}')

When the number of consecutive errors on a job exceeds max_consecutive_errors (set in the alerts: YAML block), the monitor fires a webhook alert.

Webhook alerts

Add an alerts: block to your config to receive a Slack (or any incoming webhook) notification when a job fails repeatedly:

alerts:
  webhook_url: "https://hooks.slack.com/services/T.../B.../..."
  max_consecutive_errors: 3   # alert after 3 consecutive failures

The payload is a JSON {"text": "..."} compatible with the Slack Incoming Webhooks format. Any service that accepts the same format (Discord, Mattermost, …) works without any code changes.

Per-job retry settings

Fine-tune retry behaviour per histo_job:

histo_jobs:
  - exchange: binance
    pairs: [BTC/USDT]
    span: 3600
    max_retries: 5      # attempts before marking window as failed (default 3)
    retry_delay: 1.0    # initial wait in seconds; doubles on each attempt

The retry delay is exponential: retry_delay × 2^(attempt 1). With retry_delay=1.0 and max_retries=5 the waits are 1 s, 2 s, 4 s, 8 s.

Configuration

config.load_config(path)

Load and validate a YAML daemon configuration file.

config.resolve_config_path([path])

Return the config file path to use, applying XDG fallback when path is None.

config.DEFAULT_CONFIG_PATH

Path subclass for non-Windows systems.

config.CollectorConfig(*[, settings, ...])

Root configuration model for the dccd daemon.

config.SettingsConfig(*[, data_path, ...])

Global local settings shared by the daemon and the backfill command.

config.StorageConfig(*[, local_path, ...])

Local and optional remote storage configuration.

config.RemoteConfig(*[, provider])

Remote storage configuration for rclone.

config.HistoJob(*, exchange, pairs, span[, ...])

Historical (REST) data collection job.

config.StreamJob(*, exchange, pairs, channels)

Real-time (WebSocket) data collection job.

config.AlertConfig(*[, webhook_url, ...])

Optional alerting configuration.

Backfill

backfill.make_job(exchange, crypto, fiat, ...)

Build the appropriate backfill strategy for an (exchange, pair).

backfill.run_backfill(cfg[, exchange, ...])

Run historical backfill for all matching jobs in cfg.

backfill.OHLCBackfill(obj, max_candles, ...)

Backfill strategy for exchanges with a paginated OHLC endpoint.

backfill.KrakenBackfill(obj, sleep, form[, ...])

Backfill strategy for Kraken using the /Trades endpoint.

Scheduler

scheduler.build_histo_scheduler(config[, health])

Build an APScheduler BackgroundScheduler from a CollectorConfig.

scheduler.run_histo_job(job, pair, base_path)

Download and save one (exchange, pair) candle job locally.

scheduler.run_once(config[, health])

Execute all histo_jobs once and return.

Stream manager

stream_manager.StreamManager(config[, health])

Manage real-time WebSocket collection jobs.

stream_manager.SyncService(config)

Periodically push the entire local data directory to all remotes.

Health monitoring

health.HealthMonitor(local_path, alerts)

Monitor job health, persist metrics, and send webhook alerts.

health.JobMetrics([last_run_at, ...])

Per-job health metrics, updated after every execution attempt.

Storage

storage.RemoteStorage(config)

Push local data directories to remote destinations via rclone.

Web UI

A FastAPI + htmx web interface mirrors the CLI in the browser: a dashboard of live health metrics, the data inventory, job management (add/remove pairs, launch and cancel backfills), a log tail, a config editor, and remote-storage status. It requires the ui extra and is started either standalone with dccd ui or automatically by dccd start:

pip install "dccd[daemon,ui]"
dccd ui --config config.yml          # http://127.0.0.1:8080

The HTTP layer is a thin wrapper over the existing daemon modules — every endpoint delegates to the same functions used by the CLI — so the JSON API (/api/*) and the htmx templates are fully decoupled.

api.create_app(cfg_path[, stream_manager, ...])

Build the FastAPI application for cfg_path.

api.BackfillTracker(local_path)

Track backfill jobs launched from the web UI, persisted to disk.

Data Store

The DataStore class provides the unified storage layer. See Storage (dccd.storage) for the full API reference.

CLI

The dccd command is a typer application installed as a console script by pip install "dccd[daemon]". Its commands are documented in the Quick start section above.

Command-line interface for the dccd daemon.

Entry point installed by pyproject.toml [project.scripts]:

pip install "dccd[daemon]"
dccd --help

Commands

dccd validate --config PATH
    Parse and validate the YAML config; print a one-line summary.
    Exit 0 on success, 1 on any error (file not found, bad YAML,
    Pydantic validation failure).

dccd backfill --config PATH [--exchange X] [--pairs A B] [--start DATE]
              [--parallel] [--dry-run]
    Download the full OHLC history for every histo_job defined in the
    config, resuming from the last saved timestamp.  Runs in the
    foreground; use --parallel to run all jobs simultaneously.

dccd collect --config PATH
    Fetch one incremental batch per histo_job, then exit.
    Downloads candles from the last saved timestamp to now.
    Designed for cron scheduling or as a single daemon tick.

dccd start --config PATH
    Start the continuous daemon in the foreground:
    - APScheduler BackgroundScheduler for all histo_jobs (calls collect)
    - StreamManager (one thread per WebSocket pair)
    - SyncService (periodic rclone push to remotes)
    Block until SIGINT (Ctrl-C) or SIGTERM; shuts down cleanly on signal.

dccd status --config PATH
    Read {local_path}/.dccd/metrics.json and render a table:

        job                      last_run          last_success      rows  errors
        -----------------------------------------------------------------------
        binance/BTC/USDT         2026-05-17 10:00  2026-05-17 10:00  1200       0
        kraken/ETH/USD           2026-05-17 09:58  2026-05-17 09:30   800       3

dccd add --exchange X --pair Y --span N [--config PATH]
    Append a new histo_job to the YAML config file in-place and
    re-validate the modified config before writing.

dccd remove --exchange X --pair Y --span N [--config PATH]
    Remove a pair from a histo_job (or the whole job if it was the last
    pair) and re-validate before writing.

dccd inventory [--config PATH]
    Scan data_path and print a table of all stored data (OHLC, trades,
    orderbook) with date range, row count, and gap count per series.