Source code for dccd.daemon.config

#!/usr/bin/env python3
# coding: utf-8

""" Declarative configuration for the dccd daemon.

Loads a YAML file and validates it with Pydantic v2 models.

"""

from __future__ import annotations

import os
import pathlib
from typing import Any

import yaml
from pydantic import BaseModel, Field, field_validator, model_validator

__all__ = [
    'AlertConfig',
    'CollectorConfig',
    'DEFAULT_CONFIG_PATH',
    'HistoJob',
    'RemoteConfig',
    'SettingsConfig',
    'StorageConfig',
    'StreamJob',
    'load_config',
    'resolve_config_path',
]

_XDG_CONFIG_HOME: pathlib.Path = pathlib.Path(
    os.environ.get('XDG_CONFIG_HOME', '~/.config')
).expanduser()

DEFAULT_CONFIG_PATH: pathlib.Path = _XDG_CONFIG_HOME / 'dccd' / 'config.yml'

SUPPORTED_HISTO_EXCHANGES: frozenset[str] = frozenset(
    {'binance', 'kraken', 'bybit', 'okx', 'coinbase'}
)
SUPPORTED_STREAM_EXCHANGES: frozenset[str] = frozenset(
    {'binance', 'kraken', 'bybit', 'okx', 'bitfinex', 'bitmex'}
)
SUPPORTED_FORMATS: frozenset[str] = frozenset({'xlsx', 'csv', 'parquet'})


[docs] class SettingsConfig(BaseModel): """ Global local settings shared by the daemon and the backfill command. Parameters ---------- data_path : str Root directory for all local data files. Default ``'./data/crypto'``. timezone : str Timezone used to interpret date strings and label output files. ``'local'`` (default) uses the system timezone, ``'UTC'`` uses UTC, any other value is an IANA name (e.g. ``'Europe/Paris'``). """ data_path: str = './data/crypto' timezone: str = 'local' @field_validator('data_path') @classmethod def _expand_path(cls, v: str) -> str: return str(pathlib.Path(v).expanduser()) @field_validator('timezone') @classmethod def _validate_timezone(cls, v: str) -> str: if v.upper() in ('LOCAL', 'UTC'): return v try: from zoneinfo import ZoneInfo ZoneInfo(v) except KeyError: raise ValueError( f"Unknown timezone {v!r}. " "Use 'local', 'UTC', or an IANA name like 'Europe/Paris'." ) return v
[docs] class RemoteConfig(BaseModel): """ Remote storage configuration for rclone. Parameters ---------- provider : str Remote provider, default is ``'rclone'``. remote : str rclone remote destination, e.g. ``'mynas:crypto/'``. """ provider: str = 'rclone' remote: str
[docs] class StorageConfig(BaseModel): """ Local and optional remote storage configuration. Parameters ---------- local_path : str, optional Root data directory. When omitted, :attr:`SettingsConfig.data_path` is used (propagated by :class:`CollectorConfig`'s validator). remotes : list of RemoteConfig Remote destinations. Empty list (default) keeps data locally only. Multiple entries are all synced by :class:`SyncService`. sync_interval : int Seconds between periodic syncs to remote destinations. ``0`` disables the sync service. Default is ``3600`` (1 hour). """ local_path: str = '' remotes: list[RemoteConfig] = Field(default_factory=list) sync_interval: int = 3600
[docs] class HistoJob(BaseModel): """ Historical (REST) data collection job. Parameters ---------- exchange : str Exchange name. Must be one of ``SUPPORTED_HISTO_EXCHANGES``. pairs : list of str Trading pairs in ``'CRYPTO/FIAT'`` format (e.g. ``'BTC/USDT'``). span : int Candle interval in seconds. Must be >= 60. format : str Output format: ``'xlsx'``, ``'csv'``, or ``'parquet'``. """ exchange: str pairs: list[str] span: int format: str = 'parquet' max_retries: int = Field(default=3, ge=1, le=10) retry_delay: float = Field(default=2.0, ge=0.0) @field_validator('exchange') @classmethod def _validate_exchange(cls, v: str) -> str: if v not in SUPPORTED_HISTO_EXCHANGES: raise ValueError( f"Unknown exchange {v!r}. " f"Supported: {sorted(SUPPORTED_HISTO_EXCHANGES)}" ) return v @field_validator('pairs') @classmethod def _validate_pairs(cls, v: list[str]) -> list[str]: if not v: raise ValueError("'pairs' must not be empty") for pair in v: if '/' not in pair: raise ValueError( f"Pair {pair!r} must be in 'CRYPTO/FIAT' format (e.g. 'BTC/USDT')" ) return v @field_validator('span') @classmethod def _validate_span(cls, v: int) -> int: if v < 60: raise ValueError(f"span must be >= 60 seconds, got {v}") return v @field_validator('format') @classmethod def _validate_format(cls, v: str) -> str: if v not in SUPPORTED_FORMATS: raise ValueError( f"Unknown format {v!r}. Supported: {sorted(SUPPORTED_FORMATS)}" ) return v
[docs] class StreamJob(BaseModel): """ Real-time (WebSocket) data collection job. Parameters ---------- exchange : str Exchange name. Must be one of ``SUPPORTED_STREAM_EXCHANGES``. pairs : list of str Trading pairs (format depends on exchange). channels : list of str WebSocket channels to subscribe to (e.g. ``['trades', 'book']``). time_step : int Snapshot interval in seconds, default is 60. """ exchange: str pairs: list[str] channels: list[str] time_step: int = 60 @field_validator('exchange') @classmethod def _validate_exchange(cls, v: str) -> str: if v not in SUPPORTED_STREAM_EXCHANGES: raise ValueError( f"Unknown exchange {v!r}. " f"Supported: {sorted(SUPPORTED_STREAM_EXCHANGES)}" ) return v @field_validator('pairs', 'channels') @classmethod def _validate_nonempty(cls, v: list[str], info: Any) -> list[str]: if not v: raise ValueError(f"'{info.field_name}' must not be empty") return v
[docs] class AlertConfig(BaseModel): """ Optional alerting configuration. Parameters ---------- webhook_url : str or None Slack/Discord webhook URL for error notifications. ``None`` disables alerts. max_consecutive_errors : int Number of consecutive job failures before sending an alert, default 3. """ webhook_url: str | None = None max_consecutive_errors: int = 3
[docs] class CollectorConfig(BaseModel): """ Root configuration model for the dccd daemon. Parameters ---------- settings : SettingsConfig Global local settings (data path, timezone). storage : StorageConfig Remote storage configuration. ``local_path`` defaults to ``settings.data_path`` when not set explicitly. histo_jobs : list of HistoJob REST API polling jobs. stream_jobs : list of StreamJob WebSocket streaming jobs. alerts : AlertConfig Alerting settings. """ settings: SettingsConfig = Field(default_factory=SettingsConfig) storage: StorageConfig = Field(default_factory=StorageConfig) histo_jobs: list[HistoJob] = Field(default_factory=list) stream_jobs: list[StreamJob] = Field(default_factory=list) alerts: AlertConfig = Field(default_factory=AlertConfig) @model_validator(mode='after') def _propagate_data_path(self) -> 'CollectorConfig': if not self.storage.local_path: self.storage.local_path = self.settings.data_path return self @model_validator(mode='after') def _at_least_one_job(self) -> 'CollectorConfig': if not self.histo_jobs and not self.stream_jobs: raise ValueError( "Configuration must define at least one job " "(histo_jobs or stream_jobs)" ) return self
[docs] def resolve_config_path(path: str | pathlib.Path | None = None) -> pathlib.Path: """ Return the config file path to use, applying XDG fallback when *path* is None. Parameters ---------- path : str, pathlib.Path, or None Explicit config path. When ``None``, the function searches in order: ``./config.yml`` (current working directory), then :data:`DEFAULT_CONFIG_PATH` (``$XDG_CONFIG_HOME/dccd/config.yml``). Returns ------- pathlib.Path Resolved path. When *path* is not ``None`` the value is returned as-is (after ``expanduser``); existence is **not** checked. Raises ------ FileNotFoundError When *path* is ``None`` and none of the candidate paths exist. """ if path is not None: return pathlib.Path(path).expanduser() xdg_cfg = ( pathlib.Path(os.environ.get('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'dccd' / 'config.yml' ) candidates = [pathlib.Path('config.yml'), xdg_cfg] for candidate in candidates: if candidate.exists(): return candidate tried = ', '.join(str(c) for c in candidates) raise FileNotFoundError(f'No config file found. Tried: {tried}')
[docs] def load_config(path: str | pathlib.Path) -> CollectorConfig: """ Load and validate a YAML daemon configuration file. Parameters ---------- path : str or pathlib.Path Path to the YAML configuration file. Returns ------- CollectorConfig Validated configuration object. Raises ------ FileNotFoundError If *path* does not exist. yaml.YAMLError If the file contains invalid YAML. pydantic.ValidationError If the configuration fails validation. """ with open(path) as f: data = yaml.safe_load(f) return CollectorConfig.model_validate(data)