Daemon (dccd.daemon)¶
Daemon module for autonomous data collection.
Submodules¶
|
Historical OHLC backfill engine for the dccd daemon. |
|
Declarative configuration for the dccd daemon. |
|
Health monitoring for the dccd daemon. |
|
Remote storage abstraction for the dccd daemon. |
|
Historical data scheduler for the dccd daemon. |
|
Real-time stream manager and periodic sync service for the dccd daemon. |
The daemon module provides an autonomous, server-side data collector.
It reads a declarative YAML configuration, runs historical REST jobs on a
schedule (APScheduler), opens WebSocket streams for real-time collection, and
periodically syncs all local data to one or more remote destinations via rclone.
Per-job metrics and a rotating log file are maintained by
HealthMonitor.
Quick start (CLI)¶
Install the daemon extra:
pip install "dccd[daemon]"
Write a configuration file (see Configuration):
settings: data_path: /data/crypto/ timezone: UTC # 'local', 'UTC', or any IANA name storage: remotes: - provider: rclone remote: "mynas:crypto/" sync_interval: 3600 histo_jobs: - exchange: binance pairs: [BTC/USDT, ETH/USDT] span: 3600 # candle interval in seconds format: parquet # stored as annual Parquet: binance/ohlc/BTC-USDT/1h/YYYY.parquet # Optional real-time streams stream_jobs: - exchange: binance pairs: [BTC/USDT] channels: [trades, book] time_step: 60 # stored daily: binance/trades/BTC-USDT/YYYY-MM-DD.parquet # Optional webhook alerts on consecutive failures alerts: webhook_url: "https://hooks.slack.com/services/..." max_consecutive_errors: 3
Validate, backfill, collect, or start the daemon:
# Check config without running anything dccd validate --config config.yml # Config: config.yml # data_path : /data/crypto/ # timezone : UTC # remotes : 1 # histo_jobs : 2 # stream_jobs : 1 # Config is valid. # Backfill full OHLC history for all histo_jobs (resumable) dccd backfill --config config.yml --start "2020-01-01 00:00:00" # Dry run — estimate windows and total time without downloading dccd backfill --config config.yml --dry-run # Restrict to one exchange or specific pairs dccd backfill --config config.yml --exchange kraken dccd backfill --config config.yml --pairs BTC/USDT ETH/USDT # Run all jobs in parallel threads dccd backfill --config config.yml --parallel # One incremental batch per histo job, then exit (for cron) dccd collect --config config.yml # Done. successes=2 failures=0 # Continuous daemon (block until Ctrl-C / SIGTERM) dccd start --config config.yml # Inspect per-job health after the daemon has run dccd status --config config.yml # job last_run last_success rows errors # ------------------------------------------------------------------------- # binance/BTC/USDT 2026-05-17 10:00 2026-05-17 10:00 1200 0 # binance/ETH/USDT 2026-05-17 10:00 2026-05-17 10:00 980 0 # Add a new histo job to an existing config in-place dccd add --exchange kraken --pair ETH/USD --span 86400 --config config.yml # Remove a pair (or whole job) from the config dccd remove --exchange kraken --pair ETH/USD --span 86400 --config config.yml # Inspect all data stored on disk (OHLC, trades, orderbook) dccd inventory --config config.yml # Enable shell tab-completion (run once after install) dccd --install-completion # exchange pair type span from to rows gaps # ------------------------------------------------------------------------------- # binance BTC/USDT ohlc 1h 2020-01-01 2026-05-24 1 234 0 # binance BTC/USDT trades - 2021-01-01 2026-05-24 50 0
Note
The --config option is optional for all commands. When omitted, dccd
searches for ./config.yml in the current directory first, then falls back
to $XDG_CONFIG_HOME/dccd/config.yml (default ~/.config/dccd/config.yml).
Python API¶
Use the components directly when you need to embed the daemon inside your
own process or customise startup/shutdown logic. The script
examples/daemon_example.py shows the full wiring:
from dccd.daemon.config import load_config
from dccd.daemon.health import HealthMonitor
from dccd.daemon.scheduler import build_histo_scheduler, run_once
from dccd.daemon.stream_manager import StreamManager
config = load_config('config.yml')
health = HealthMonitor(config.settings.data_path, config.alerts)
# --- one-shot mode (cron-friendly) ---
run_once(config, health=health)
# --- or continuous mode ---
scheduler = build_histo_scheduler(config, health=health)
stream_mgr = StreamManager(config, health=health)
scheduler.start()
stream_mgr.start()
# … wait for stop signal …
scheduler.shutdown(wait=False)
stream_mgr.stop()
Graceful shutdown¶
When embedding the daemon in your own process, use a threading.Event
to handle SIGINT / SIGTERM cleanly:
import signal
import threading
stop_event = threading.Event()
signal.signal(signal.SIGINT, lambda *_: stop_event.set())
signal.signal(signal.SIGTERM, lambda *_: stop_event.set())
scheduler = build_histo_scheduler(config, health=health)
stream_mgr = StreamManager(config, health=health)
scheduler.start()
stream_mgr.start()
stop_event.wait() # blocks until Ctrl-C or SIGTERM
scheduler.shutdown(wait=False)
stream_mgr.stop()
Health monitoring¶
HealthMonitor maintains per-job metrics in a
rotating metrics.json file and writes a dedicated log per job. After a
run you can inspect collected metrics programmatically:
from dccd.daemon.health import HealthMonitor
health = HealthMonitor(config.settings.data_path, config.alerts)
# … run scheduler / stream_mgr …
metrics = health.get_metrics()
for key, m in metrics.items():
print(f'{key:40s} rows={m.rows_collected:6d} errors={m.errors_count}')
When the number of consecutive errors on a job exceeds max_consecutive_errors
(set in the alerts: YAML block), the monitor fires a webhook alert.
Webhook alerts¶
Add an alerts: block to your config to receive a Slack (or any incoming
webhook) notification when a job fails repeatedly:
alerts:
webhook_url: "https://hooks.slack.com/services/T.../B.../..."
max_consecutive_errors: 3 # alert after 3 consecutive failures
The payload is a JSON {"text": "..."} compatible with the Slack
Incoming Webhooks format. Any service that accepts the same format
(Discord, Mattermost, …) works without any code changes.
Per-job retry settings¶
Fine-tune retry behaviour per histo_job:
histo_jobs:
- exchange: binance
pairs: [BTC/USDT]
span: 3600
max_retries: 5 # attempts before marking window as failed (default 3)
retry_delay: 1.0 # initial wait in seconds; doubles on each attempt
The retry delay is exponential: retry_delay × 2^(attempt − 1). With
retry_delay=1.0 and max_retries=5 the waits are 1 s, 2 s, 4 s, 8 s.
Configuration¶
|
Load and validate a YAML daemon configuration file. |
|
Return the config file path to use, applying XDG fallback when path is None. |
Path subclass for non-Windows systems. |
|
|
Root configuration model for the dccd daemon. |
|
Global local settings shared by the daemon and the backfill command. |
|
Local and optional remote storage configuration. |
|
Remote storage configuration for rclone. |
|
Historical (REST) data collection job. |
|
Real-time (WebSocket) data collection job. |
|
Optional alerting configuration. |
Backfill¶
|
Build the appropriate backfill strategy for an (exchange, pair). |
|
Run historical backfill for all matching jobs in cfg. |
|
Backfill strategy for exchanges with a paginated OHLC endpoint. |
|
Backfill strategy for Kraken using the |
Scheduler¶
|
Build an APScheduler BackgroundScheduler from a CollectorConfig. |
|
Download and save one (exchange, pair) candle job locally. |
|
Execute all histo_jobs once and return. |
Stream manager¶
|
Manage real-time WebSocket collection jobs. |
|
Periodically push the entire local data directory to all remotes. |
Health monitoring¶
|
Monitor job health, persist metrics, and send webhook alerts. |
|
Per-job health metrics, updated after every execution attempt. |
Storage¶
|
Push local data directories to remote destinations via rclone. |
Data Store¶
The DataStore class provides the unified storage
layer. See Storage (dccd.storage) for the full API reference.
CLI¶
The dccd command is a typer application
installed as a console script by pip install "dccd[daemon]".
Its commands are documented in the Quick start section above.
Command-line interface for the dccd daemon.
Entry point installed by pyproject.toml [project.scripts]:
pip install "dccd[daemon]"
dccd --help
Commands¶
dccd validate --config PATH
Parse and validate the YAML config; print a one-line summary.
Exit 0 on success, 1 on any error (file not found, bad YAML,
Pydantic validation failure).
dccd backfill --config PATH [--exchange X] [--pairs A B] [--start DATE]
[--parallel] [--dry-run]
Download the full OHLC history for every histo_job defined in the
config, resuming from the last saved timestamp. Runs in the
foreground; use --parallel to run all jobs simultaneously.
dccd collect --config PATH
Fetch one incremental batch per histo_job, then exit.
Downloads candles from the last saved timestamp to now.
Designed for cron scheduling or as a single daemon tick.
dccd start --config PATH
Start the continuous daemon in the foreground:
- APScheduler BackgroundScheduler for all histo_jobs (calls collect)
- StreamManager (one thread per WebSocket pair)
- SyncService (periodic rclone push to remotes)
Block until SIGINT (Ctrl-C) or SIGTERM; shuts down cleanly on signal.
dccd status --config PATH
Read {local_path}/.dccd/metrics.json and render a table:
job last_run last_success rows errors
-----------------------------------------------------------------------
binance/BTC/USDT 2026-05-17 10:00 2026-05-17 10:00 1200 0
kraken/ETH/USD 2026-05-17 09:58 2026-05-17 09:30 800 3
dccd add --exchange X --pair Y --span N [--config PATH]
Append a new histo_job to the YAML config file in-place and
re-validate the modified config before writing.
dccd remove --exchange X --pair Y --span N [--config PATH]
Remove a pair from a histo_job (or the whole job if it was the last
pair) and re-validate before writing.
dccd inventory [--config PATH]
Scan data_path and print a table of all stored data (OHLC, trades,
orderbook) with date range, row count, and gap count per series.