"""Application configuration — extends daemon config with v3 JobSpecs."""
from __future__ import annotations
import os
import pathlib
from typing import Any
import yaml
from pydantic import BaseModel, Field, field_validator, model_validator
from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
__all__ = [
"AlertConfig",
"AppConfig",
"JobConfig",
"RemoteConfig",
"SettingsConfig",
"StorageConfig",
"load_config",
"resolve_config_path",
"DEFAULT_CONFIG_PATH",
]
_XDG_CONFIG_HOME = pathlib.Path(
os.environ.get("XDG_CONFIG_HOME", "~/.config")
).expanduser()
DEFAULT_CONFIG_PATH = _XDG_CONFIG_HOME / "dccd" / "config.yml"
SUPPORTED_EXCHANGES: frozenset[str] = frozenset(
{"binance", "kraken", "bybit", "okx", "coinbase", "bitfinex", "bitmex"}
)
[docs]
class SettingsConfig(BaseModel):
"""Global settings: data path, timezone, and web-UI bind/auth."""
data_path: str = "./data/crypto"
timezone: str = "local"
ui_host: str = "127.0.0.1"
ui_port: int = 8080
ui_auth_token: str | None = None
ui_allow_origins: list[str] = Field(default_factory=list)
@field_validator("data_path")
@classmethod
def _expand(cls, v: str) -> str:
return str(pathlib.Path(v).expanduser())
@field_validator("timezone")
@classmethod
def _validate_tz(cls, v: str) -> str:
if v.upper() in ("LOCAL", "UTC"):
return v
try:
from zoneinfo import ZoneInfo
ZoneInfo(v)
except KeyError:
raise ValueError(f"Unknown timezone {v!r}")
return v
[docs]
class RemoteConfig(BaseModel):
"""One rclone remote target for sync."""
provider: str = "rclone"
remote: str
[docs]
class StorageConfig(BaseModel):
"""Storage settings: local path, rclone remotes, and sync interval."""
local_path: str = ""
remotes: list[RemoteConfig] = Field(default_factory=list)
sync_interval: int = 3600
[docs]
class AlertConfig(BaseModel):
"""Health-alert settings: webhook URL and error threshold."""
webhook_url: str | None = None
max_consecutive_errors: int = 3
[docs]
class JobConfig(BaseModel):
"""YAML job definition — more human-friendly than JobSpec.
Validation enforces:
- ``exchange`` must be a known exchange name.
- ``data_type='ohlc'`` requires ``span`` to be set.
- ``pairs`` must be non-empty and use ``BASE/QUOTE`` format.
"""
exchange: str
pairs: list[str]
data_type: str = "ohlc"
operation: str = "backfill"
span: int | None = None
trigger_kind: str = "interval"
every: int | None = None
cron: str | None = None
start: str = "last"
depth: int | None = None
snapshot_interval: int | None = None
@field_validator("exchange")
@classmethod
def _validate_exchange(cls, v: str) -> str:
if v.lower() not in SUPPORTED_EXCHANGES:
raise ValueError(
f"Unknown exchange {v!r}. Supported: {sorted(SUPPORTED_EXCHANGES)}"
)
return v.lower()
@field_validator("pairs")
@classmethod
def _validate_pairs(cls, v: list[str]) -> list[str]:
if not v:
raise ValueError("'pairs' must not be empty")
for pair in v:
if "/" not in pair and "-" not in pair:
raise ValueError(
f"Pair {pair!r} must use 'BASE/QUOTE' or 'BASE-QUOTE' format"
)
return v
@model_validator(mode="after")
def _validate_span_for_ohlc(self) -> "JobConfig":
if self.data_type == "ohlc" and self.span is None:
raise ValueError("'span' is required when data_type='ohlc'")
return self
def to_job_specs(self) -> list[JobSpec]:
"""Expand a multi-pair JobConfig into a list of JobSpecs."""
specs = []
data_type = DataType(self.data_type)
# Only recurring triggers carry an interval; for ``interval``/``cron`` it
# defaults to the span when unset. ``manual``/``once``/``supervised``
# must not report a spurious cadence (the UI reads it as "scheduled").
every = (
(self.every or self.span)
if self.trigger_kind in ("interval", "cron")
else self.every
)
trigger = Trigger(
kind=self.trigger_kind, # type: ignore[arg-type]
every=every,
cron=self.cron,
)
for pair in self.pairs:
sym = Symbol.parse(pair)
target = JobTarget(
exchange=self.exchange,
symbol=sym,
data_type=data_type,
span=self.span,
)
params = JobParams(
start=self.start,
depth=self.depth,
snapshot_interval=self.snapshot_interval,
)
spec = JobSpec(
id=JobSpec.make_id(self.operation, target),
operation=self.operation, # type: ignore[arg-type]
target=target,
trigger=trigger,
params=params,
)
specs.append(spec)
return specs
[docs]
class AppConfig(BaseModel):
"""Top-level config: settings, storage, alerts and the list of jobs."""
settings: SettingsConfig = Field(default_factory=SettingsConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
alerts: AlertConfig = Field(default_factory=AlertConfig)
jobs: list[JobConfig] = Field(default_factory=list)
@model_validator(mode="after")
def _propagate_path(self) -> "AppConfig":
if not self.storage.local_path:
self.storage.local_path = self.settings.data_path
return self
def all_job_specs(self) -> list[JobSpec]:
"""Expand every :class:`JobConfig` into its per-pair :class:`JobSpec` list."""
specs = []
for job in self.jobs:
specs.extend(job.to_job_specs())
return specs
# ------------------------------------------------------------------
# Runtime CRUD — used by the UI/API to add, edit and remove jobs
# without hand-editing config.yml. Writes normalise to one pair per
# ``JobConfig`` so a single dataset can be edited/deleted in isolation;
# multi-pair JobConfigs already in the file are still read normally and
# are split on demand (see :meth:`update_job_start`/:meth:`remove_job`).
# ------------------------------------------------------------------
def _spec_id_of(self, job: "JobConfig", pair: str) -> str:
"""Return the :class:`JobSpec` id a (job, pair) pair would produce."""
sym = Symbol.parse(pair)
target = JobTarget(
exchange=job.exchange,
symbol=sym,
data_type=DataType(job.data_type),
span=job.span,
)
return JobSpec.make_id(job.operation, target)
def add_job(
self,
*,
operation: str,
exchange: str,
pair: str,
data_type: str = "ohlc",
span: int | None = None,
start: str = "last",
trigger_kind: str = "interval",
every: int | None = None,
cron: str | None = None,
depth: int | None = None,
snapshot_interval: int | None = None,
) -> str:
"""Append a single-pair :class:`JobConfig`; return its job id.
Raises ``ValueError`` if a job with the same id already exists or if
the field combination is invalid (validation is delegated to
:class:`JobConfig`).
"""
job = JobConfig(
exchange=exchange,
pairs=[pair],
data_type=data_type,
operation=operation,
span=span,
trigger_kind=trigger_kind,
every=every,
cron=cron,
start=start,
depth=depth,
snapshot_interval=snapshot_interval,
)
job_id = self._spec_id_of(job, job.pairs[0])
existing = {s.id for s in self.all_job_specs()}
if job_id in existing:
raise ValueError(f"Job {job_id!r} already exists")
self.jobs.append(job)
return job_id
def remove_job(self, job_id: str) -> bool:
"""Remove the pair matching *job_id*; drop the JobConfig if it empties.
Returns ``True`` if a matching job was found and removed.
"""
found = False
new_jobs: list[JobConfig] = []
for job in self.jobs:
remaining = [p for p in job.pairs if self._spec_id_of(job, p) != job_id]
if len(remaining) != len(job.pairs):
found = True
if remaining:
new_jobs.append(job.model_copy(update={"pairs": remaining}))
# else: drop the now-empty JobConfig entirely
else:
new_jobs.append(job)
if found:
self.jobs = new_jobs
return found
def update_job_start(self, job_id: str, start: str) -> bool:
"""Set the ``start`` (first date) of the job matching *job_id*.
Multi-pair JobConfigs are split so only the targeted pair changes.
Returns ``True`` if a matching job was found and updated.
"""
return self._update_job_fields(job_id, {"start": start})
def update_job_schedule(self, job_id: str, every: int | None) -> bool:
"""Set the recurring backfill schedule of the job matching *job_id*.
``every`` is the interval in seconds between automatic backfills (run by
the daemon's :class:`~dccd.application.scheduler.Scheduler` in
``dccd start``); ``None`` clears the schedule (manual runs only). The
frequency is independent of the OHLC span but must not be smaller than
it — sampling more often than the bar size just refetches the same bar.
Multi-pair JobConfigs are split so only the targeted pair changes.
Returns ``True`` if a matching job was found and updated.
"""
if every is not None:
for job in self.jobs:
if any(self._spec_id_of(job, p) == job_id for p in job.pairs):
if job.span and every < job.span:
raise ValueError(
f"Schedule interval ({every}s) must be ≥ the span "
f"({job.span}s)."
)
break
# ``every=None`` clears the schedule → a manual (never auto-run) job.
kind = "interval" if every is not None else "manual"
return self._update_job_fields(job_id, {"every": every, "trigger_kind": kind})
def _update_job_fields(self, job_id: str, fields: dict[str, Any]) -> bool:
"""Apply *fields* to the single pair matching *job_id*, splitting the
owning multi-pair JobConfig so siblings are untouched."""
new_jobs: list[JobConfig] = []
found = False
for job in self.jobs:
match = [p for p in job.pairs if self._spec_id_of(job, p) == job_id]
if not match:
new_jobs.append(job)
continue
found = True
others = [p for p in job.pairs if self._spec_id_of(job, p) != job_id]
if others:
new_jobs.append(job.model_copy(update={"pairs": others}))
new_jobs.append(job.model_copy(update={"pairs": match, **fields}))
if found:
self.jobs = new_jobs
return found
def resolve_config_path(path: str | pathlib.Path | None = None) -> pathlib.Path:
"""Resolve the config path (explicit, then ``./config.yml``, then XDG)."""
if path is not None:
return pathlib.Path(path).expanduser()
xdg_cfg = (
pathlib.Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")).expanduser()
/ "dccd" / "config.yml"
)
candidates = [pathlib.Path("config.yml"), xdg_cfg]
for candidate in candidates:
if candidate.exists():
return candidate
tried = ", ".join(str(c) for c in candidates)
raise FileNotFoundError(f"No config file found. Tried: {tried}")
def load_config(path: str | pathlib.Path) -> AppConfig:
"""Load and validate a YAML config into an :class:`AppConfig`."""
with open(path) as f:
data = yaml.safe_load(f) or {}
return AppConfig.model_validate(data)