#!/usr/bin/env python3
# coding: utf-8
""" Health monitoring for the dccd daemon.
Tracks per-job metrics (last run, last success, rows collected, error count),
persists them to a JSON file, configures a rotating log handler, and sends
optional webhook alerts when consecutive failures exceed the configured
threshold.
"""
from __future__ import annotations
import json
import logging
import time
import urllib.request
from dataclasses import asdict, dataclass
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dccd.daemon.config import AlertConfig
__all__ = ['HealthMonitor', 'JobMetrics']
logger = logging.getLogger(__name__)
_LOG_MAX_BYTES = 10 * 1024 * 1024 # 10 MB
_LOG_BACKUP_COUNT = 5
[docs]
@dataclass
class JobMetrics:
""" Per-job health metrics, updated after every execution attempt.
``errors_count`` is a *consecutive* failure counter: it resets to 0
whenever :meth:`HealthMonitor.record_success` is called, so it reflects
the current streak of failures rather than a lifetime total.
Parameters
----------
last_run_at : float or None
Unix timestamp of the most recent execution attempt (success or failure).
last_success_at : float or None
Unix timestamp of the most recent successful execution.
``None`` until the first success.
rows_collected : int
Cumulative number of rows saved across all successful runs.
errors_count : int
Number of *consecutive* failures since the last success.
Resets to 0 on the next successful run.
"""
last_run_at: float | None = None
last_success_at: float | None = None
rows_collected: int = 0
errors_count: int = 0
[docs]
class HealthMonitor:
""" Monitor job health, persist metrics, and send webhook alerts.
``HealthMonitor`` serves three purposes:
1. **Rotating log** — attaches a :class:`~logging.handlers.RotatingFileHandler`
(10 MB × 5 backups) to the *root* logger on construction, so every
``logging`` call anywhere in the process lands in
``{local_path}/.dccd/dccd.log`` in addition to the console.
2. **Per-job metrics** — :meth:`record_success` / :meth:`record_failure`
update a :class:`JobMetrics` entry for each ``(exchange, pair)`` key and
flush the full metrics dict to ``{local_path}/.dccd/metrics.json`` after
each call. The JSON file is reloaded on startup, so metrics survive
daemon restarts.
3. **Webhook alerts** — when ``errors_count`` reaches
``alerts.max_consecutive_errors``, a JSON POST is sent to
``alerts.webhook_url`` (Slack / Discord / generic). Alerting is
completely optional: pass ``AlertConfig()`` with no ``webhook_url`` to
disable it.
Use this class directly when embedding the scheduler in your own process
(see :func:`~dccd.daemon.scheduler.run_once` and
:func:`~dccd.daemon.scheduler.build_histo_scheduler`). The ``dccd``
CLI commands instantiate it automatically.
Parameters
----------
local_path : str or Path
Root data directory (``CollectorConfig.storage.local_path``).
The hidden directory ``{local_path}/.dccd/`` is created on init if it
does not exist.
alerts : AlertConfig
Alerting configuration (webhook URL and error threshold).
Notes
-----
The rotating log handler is added to the **root** logger, not to the
module logger. All loggers in the process therefore write to the file
after :class:`HealthMonitor` is constructed — this is intentional so that
APScheduler, WebSocket, and application logs are all captured together.
Calling :meth:`record_failure` does *not* suppress or re-raise the
original exception. The caller is responsible for exception handling;
``HealthMonitor`` only observes the outcome.
Examples
--------
Standalone usage inside a custom scheduler loop:
>>> from dccd.daemon.config import AlertConfig
>>> from dccd.daemon.health import HealthMonitor
>>> import tempfile, pathlib
>>> with tempfile.TemporaryDirectory() as tmp:
... alerts = AlertConfig() # no webhook
... monitor = HealthMonitor(tmp, alerts)
... monitor.record_success('binance', 'BTC/USDT', rows=120)
... monitor.record_success('binance', 'BTC/USDT', rows=95)
... monitor.record_failure('kraken', 'ETH/USD')
... m = monitor.get_metrics()
... print(m['binance/BTC/USDT'].rows_collected)
... print(m['kraken/ETH/USD'].errors_count)
215
1
"""
def __init__(self, local_path: str | Path, alerts: AlertConfig) -> None:
self._dir = Path(local_path) / '.dccd'
self._dir.mkdir(parents=True, exist_ok=True)
self._metrics_file = self._dir / 'metrics.json'
self._alerts = alerts
self._metrics: dict[str, JobMetrics] = {}
self._load_metrics()
self._setup_logging()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def record_success(self, exchange: str, pair: str, rows: int = 0) -> None:
""" Record a successful job execution.
Parameters
----------
exchange : str
Exchange name (e.g. ``'binance'``).
pair : str
Trading pair (e.g. ``'BTC/USDT'``).
rows : int, optional
Number of data rows collected, default 0.
"""
key = self._key(exchange, pair)
m = self._metrics.setdefault(key, JobMetrics())
now = time.time()
m.last_run_at = now
m.last_success_at = now
m.rows_collected += rows
m.errors_count = 0
self._save_metrics()
logger.debug('health: success %s %s rows=%d', exchange, pair, rows)
[docs]
def record_failure(self, exchange: str, pair: str) -> None:
""" Record a failed job execution.
Parameters
----------
exchange : str
Exchange name.
pair : str
Trading pair.
"""
key = self._key(exchange, pair)
m = self._metrics.setdefault(key, JobMetrics())
m.last_run_at = time.time()
m.errors_count += 1
self._save_metrics()
logger.warning('health: failure %s %s errors=%d', exchange, pair, m.errors_count)
if (self._alerts.webhook_url
and m.errors_count >= self._alerts.max_consecutive_errors):
self._send_alert(exchange, pair, m.errors_count)
[docs]
def get_metrics(self) -> dict[str, JobMetrics]:
""" Return a snapshot of the current metrics dict.
Returns
-------
dict of str to JobMetrics
Keys are ``'{exchange}/{pair}'`` strings.
"""
return dict(self._metrics)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _key(exchange: str, pair: str) -> str:
return f'{exchange}/{pair}'
def _send_alert(self, exchange: str, pair: str, errors_count: int) -> None:
text = (
f':warning: dccd: {errors_count} consecutive errors '
f'on {exchange} {pair}'
)
payload = json.dumps({'text': text}).encode()
req = urllib.request.Request(
self._alerts.webhook_url, # type: ignore[arg-type]
data=payload,
headers={'Content-Type': 'application/json'},
)
try:
urllib.request.urlopen(req, timeout=5)
logger.info('health: alert sent for %s %s', exchange, pair)
except Exception:
logger.exception('health: failed to send alert for %s %s', exchange, pair)
def _save_metrics(self) -> None:
data = {k: asdict(v) for k, v in self._metrics.items()}
self._metrics_file.write_text(json.dumps(data, indent=2))
def _load_metrics(self) -> None:
if self._metrics_file.exists():
try:
data = json.loads(self._metrics_file.read_text())
self._metrics = {k: JobMetrics(**v) for k, v in data.items()}
except Exception:
logger.warning('health: could not load metrics from %s', self._metrics_file)
def _setup_logging(self) -> None:
log_file = self._dir / 'dccd.log'
handler = RotatingFileHandler(
log_file,
maxBytes=_LOG_MAX_BYTES,
backupCount=_LOG_BACKUP_COUNT,
)
handler.setFormatter(
logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
)
logging.getLogger().addHandler(handler)