"""Application configuration — extends daemon config with v3 JobSpecs."""
from __future__ import annotations
import os
import pathlib
import yaml
from pydantic import BaseModel, Field, field_validator, model_validator
from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
__all__ = [
"AlertConfig",
"AppConfig",
"JobConfig",
"RemoteConfig",
"SettingsConfig",
"StorageConfig",
"load_config",
"resolve_config_path",
"DEFAULT_CONFIG_PATH",
]
_XDG_CONFIG_HOME = pathlib.Path(
os.environ.get("XDG_CONFIG_HOME", "~/.config")
).expanduser()
DEFAULT_CONFIG_PATH = _XDG_CONFIG_HOME / "dccd" / "config.yml"
SUPPORTED_EXCHANGES: frozenset[str] = frozenset(
{"binance", "kraken", "bybit", "okx", "coinbase", "bitfinex", "bitmex"}
)
class SettingsConfig(BaseModel):
data_path: str = "./data/crypto"
timezone: str = "local"
ui_host: str = "127.0.0.1"
ui_port: int = 8080
ui_auth_token: str | None = None
ui_allow_origins: list[str] = Field(default_factory=list)
@field_validator("data_path")
@classmethod
def _expand(cls, v: str) -> str:
return str(pathlib.Path(v).expanduser())
@field_validator("timezone")
@classmethod
def _validate_tz(cls, v: str) -> str:
if v.upper() in ("LOCAL", "UTC"):
return v
try:
from zoneinfo import ZoneInfo
ZoneInfo(v)
except KeyError:
raise ValueError(f"Unknown timezone {v!r}")
return v
class RemoteConfig(BaseModel):
provider: str = "rclone"
remote: str
class StorageConfig(BaseModel):
local_path: str = ""
remotes: list[RemoteConfig] = Field(default_factory=list)
sync_interval: int = 3600
class AlertConfig(BaseModel):
webhook_url: str | None = None
max_consecutive_errors: int = 3
[docs]
class JobConfig(BaseModel):
"""YAML job definition — more human-friendly than JobSpec.
Validation enforces:
- ``exchange`` must be a known exchange name.
- ``data_type='ohlc'`` requires ``span`` to be set.
- ``pairs`` must be non-empty and use ``BASE/QUOTE`` format.
"""
exchange: str
pairs: list[str]
data_type: str = "ohlc"
operation: str = "backfill"
span: int | None = None
trigger_kind: str = "interval"
every: int | None = None
cron: str | None = None
start: str = "last"
depth: int | None = None
snapshot_interval: int | None = None
@field_validator("exchange")
@classmethod
def _validate_exchange(cls, v: str) -> str:
if v.lower() not in SUPPORTED_EXCHANGES:
raise ValueError(
f"Unknown exchange {v!r}. Supported: {sorted(SUPPORTED_EXCHANGES)}"
)
return v.lower()
@field_validator("pairs")
@classmethod
def _validate_pairs(cls, v: list[str]) -> list[str]:
if not v:
raise ValueError("'pairs' must not be empty")
for pair in v:
if "/" not in pair and "-" not in pair:
raise ValueError(
f"Pair {pair!r} must use 'BASE/QUOTE' or 'BASE-QUOTE' format"
)
return v
@model_validator(mode="after")
def _validate_span_for_ohlc(self) -> "JobConfig":
if self.data_type == "ohlc" and self.span is None:
raise ValueError("'span' is required when data_type='ohlc'")
return self
[docs]
def to_job_specs(self) -> list[JobSpec]:
"""Expand a multi-pair JobConfig into a list of JobSpecs."""
specs = []
data_type = DataType(self.data_type)
trigger = Trigger(
kind=self.trigger_kind, # type: ignore[arg-type]
every=self.every or self.span,
cron=self.cron,
)
for pair in self.pairs:
sym = Symbol.parse(pair)
target = JobTarget(
exchange=self.exchange,
symbol=sym,
data_type=data_type,
span=self.span,
)
params = JobParams(
start=self.start,
depth=self.depth,
snapshot_interval=self.snapshot_interval,
)
spec = JobSpec(
id=JobSpec.make_id(self.operation, target),
operation=self.operation, # type: ignore[arg-type]
target=target,
trigger=trigger,
params=params,
)
specs.append(spec)
return specs
[docs]
class AppConfig(BaseModel):
settings: SettingsConfig = Field(default_factory=SettingsConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
alerts: AlertConfig = Field(default_factory=AlertConfig)
jobs: list[JobConfig] = Field(default_factory=list)
@model_validator(mode="after")
def _propagate_path(self) -> "AppConfig":
if not self.storage.local_path:
self.storage.local_path = self.settings.data_path
return self
def all_job_specs(self) -> list[JobSpec]:
specs = []
for job in self.jobs:
specs.extend(job.to_job_specs())
return specs
def resolve_config_path(path: str | pathlib.Path | None = None) -> pathlib.Path:
if path is not None:
return pathlib.Path(path).expanduser()
xdg_cfg = (
pathlib.Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")).expanduser()
/ "dccd" / "config.yml"
)
candidates = [pathlib.Path("config.yml"), xdg_cfg]
for candidate in candidates:
if candidate.exists():
return candidate
tried = ", ".join(str(c) for c in candidates)
raise FileNotFoundError(f"No config file found. Tried: {tried}")
def load_config(path: str | pathlib.Path) -> AppConfig:
with open(path) as f:
data = yaml.safe_load(f) or {}
return AppConfig.model_validate(data)