Source code for dccd.application.jobs
"""Job model — JobSpec, JobRun, Trigger, JobParams."""
from __future__ import annotations
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
__all__ = ["Trigger", "JobTarget", "JobParams", "JobSpec", "JobRun", "RunState"]
class RunState(str, Enum):
PENDING = "pending"
RUNNING = "running"
RECONNECTING = "reconnecting"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
class Trigger(BaseModel, frozen=True):
"""Job trigger — when/how to execute."""
kind: Literal["once", "interval", "cron", "supervised"]
at: int | None = None
every: int | None = None
cron: str | None = None
class JobTarget(BaseModel, frozen=True):
"""What to collect — exchange + symbol + data type."""
exchange: str
symbol: Symbol
data_type: DataType
span: int | None = None
class JobParams(BaseModel):
"""Operation parameters (all optional, per-operation)."""
# ``start`` accepts the sentinels "last"/"origin", an ISO date string
# ("2024-01-01"), or a nanosecond integer — all parsed in operations.backfill.
start: int | str = "last"
depth: int | None = None
snapshot_interval: int | None = None
derive_from: DataType | None = None
transport: Literal["rest", "ws"] | None = None
[docs]
class JobSpec(BaseModel):
"""Declarative job definition.
A histo job is: backfill + interval trigger (+ start="last").
A stream job is: stream + supervised trigger.
"""
id: str
operation: Literal["backfill", "stream"]
target: JobTarget
trigger: Trigger
params: JobParams = JobParams()
enabled: bool = True
origin: Literal["config", "runtime"] = "config"
@classmethod
def make_id(cls, operation: str, target: JobTarget) -> str:
parts = [operation, target.exchange, str(target.symbol), target.data_type.value]
if target.span:
parts.append(f"{target.span}s")
return ":".join(parts)
class JobRun(BaseModel):
"""One execution of a JobSpec."""
run_id: str
spec_id: str
operation: str
target: JobTarget
state: RunState = RunState.PENDING
started_at: int | None = None
ended_at: int | None = None
rows_written: int = 0
error: str | None = None
progress: dict[str, Any] | None = None
log_tail: list[str] = []