HealthMonitor

Defined in dccd.daemon.health

class HealthMonitor(local_path, alerts)[source]

Bases: object

Monitor job health, persist metrics, and send webhook alerts.

HealthMonitor serves three purposes:

  1. Rotating log — attaches a RotatingFileHandler (10 MB × 5 backups) to the root logger on construction, so every logging call anywhere in the process lands in {local_path}/.dccd/dccd.log in addition to the console.

  2. Per-job metricsrecord_success / record_failure update a JobMetrics entry for each (exchange, pair) key and flush the full metrics dict to {local_path}/.dccd/metrics.json after each call. The JSON file is reloaded on startup, so metrics survive daemon restarts.

  3. Webhook alerts — when errors_count reaches alerts.max_consecutive_errors, a JSON POST is sent to alerts.webhook_url (Slack / Discord / generic). Alerting is completely optional: pass AlertConfig() with no webhook_url to disable it.

Use this class directly when embedding the scheduler in your own process (see run_once and build_histo_scheduler). The dccd CLI commands instantiate it automatically.

Parameters:
local_pathstr or Path

Root data directory (CollectorConfig.storage.local_path). The hidden directory {local_path}/.dccd/ is created on init if it does not exist.

alertsAlertConfig

Alerting configuration (webhook URL and error threshold).

Notes

The rotating log handler is added to the root logger, not to the module logger. All loggers in the process therefore write to the file after HealthMonitor is constructed — this is intentional so that APScheduler, WebSocket, and application logs are all captured together.

Calling record_failure does not suppress or re-raise the original exception. The caller is responsible for exception handling; HealthMonitor only observes the outcome.

Examples

Standalone usage inside a custom scheduler loop:

>>> from dccd.daemon.config import AlertConfig
>>> from dccd.daemon.health import HealthMonitor
>>> import tempfile, pathlib
>>> with tempfile.TemporaryDirectory() as tmp:
...     alerts = AlertConfig()           # no webhook
...     monitor = HealthMonitor(tmp, alerts)
...     monitor.record_success('binance', 'BTC/USDT', rows=120)
...     monitor.record_success('binance', 'BTC/USDT', rows=95)
...     monitor.record_failure('kraken',  'ETH/USD')
...     m = monitor.get_metrics()
...     print(m['binance/BTC/USDT'].rows_collected)
...     print(m['kraken/ETH/USD'].errors_count)
215
1
get_metrics()[source]

Return a snapshot of the current metrics dict.

Returns:
dict of str to JobMetrics

Keys are '{exchange}/{pair}' strings.

record_failure(exchange, pair)[source]

Record a failed job execution.

Parameters:
exchangestr

Exchange name.

pairstr

Trading pair.

record_success(exchange, pair, rows=0)[source]

Record a successful job execution.

Parameters:
exchangestr

Exchange name (e.g. 'binance').

pairstr

Trading pair (e.g. 'BTC/USDT').

rowsint, optional

Number of data rows collected, default 0.