"""Async scheduler + stream supervisor."""
from __future__ import annotations
import asyncio
import logging
import time
from dccd.application.events import EventBus
from dccd.application.jobs import JobSpec
from dccd.sources.registry import SourceRegistry
from dccd.storage.parquet import ParquetStore
from dccd.storage.runs_sqlite import RunsStore
__all__ = ["Scheduler"]
logger = logging.getLogger(__name__)
class _StreamWorker:
"""Supervised stream job wrapper."""
def __init__(
self,
spec: JobSpec,
registry: SourceRegistry,
store: ParquetStore,
runs_store: RunsStore | None,
events: EventBus,
) -> None:
self._spec = spec
self._registry = registry
self._store = store
self._runs_store = runs_store
self._events = events
self._task: asyncio.Task[None] | None = None
self._stop_event = asyncio.Event()
@property
def is_running(self) -> bool:
return self._task is not None and not self._task.done()
def start(self) -> None:
if self.is_running:
return
self._stop_event.clear()
self._task = asyncio.create_task(self._run_forever())
async def stop(self) -> None:
self._stop_event.set()
if self._task:
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception):
pass
self._task = None
async def _run_forever(self) -> None:
from dccd.application.operations import stream
delay = 5.0
while not self._stop_event.is_set():
try:
run_events = self._events.for_run(f"{self._spec.id}@stream")
await stream(
self._spec,
registry=self._registry,
store=self._store,
runs_store=self._runs_store,
events=run_events,
stop_event=self._stop_event,
)
except asyncio.CancelledError:
return
except Exception as exc:
logger.warning("Stream %s failed: %s — restarting in %ds", self._spec.id, exc, delay)
await asyncio.sleep(delay)
delay = min(delay * 2, 60)
[docs]
class Scheduler:
"""Orchestrates JobSpecs (intervals → backfill; supervised → streams).
Parameters
----------
registry : SourceRegistry
store : ParquetStore
runs_store : RunsStore or None
events : EventBus
"""
def __init__(
self,
registry: SourceRegistry,
store: ParquetStore,
runs_store: RunsStore | None = None,
events: EventBus | None = None,
) -> None:
self._registry = registry
self._store = store
self._runs_store = runs_store
self._events = events or EventBus()
self._streams: dict[str, _StreamWorker] = {}
self._interval_tasks: list[asyncio.Task[None]] = []
self._running = False
def _track(self, task: asyncio.Task[None]) -> None:
"""Hold a strong reference to *task* and drop it once it completes."""
self._interval_tasks.append(task)
task.add_done_callback(
lambda t: self._interval_tasks.remove(t) if t in self._interval_tasks else None
)
[docs]
def register_streams(self, specs: list[JobSpec]) -> None:
"""Register stream workers from config without starting them.
Called by the app lifespan so stream jobs appear in ``/api/streams``
and can be started/stopped from the UI even in ``dccd ui`` mode.
"""
for spec in specs:
if spec.operation == "stream" and spec.enabled and spec.id not in self._streams:
worker = _StreamWorker(
spec, self._registry, self._store, self._runs_store, self._events
)
self._streams[spec.id] = worker
[docs]
async def run_now(self, spec: JobSpec) -> None:
"""Trigger a one-shot backfill for *spec* immediately."""
self._track(asyncio.create_task(self._run_once(spec)))
[docs]
async def start(self, specs: list[JobSpec]) -> None:
"""Start all enabled specs (full daemon mode)."""
self._running = True
for spec in specs:
if not spec.enabled:
continue
if spec.trigger.kind == "supervised":
if spec.id not in self._streams:
worker = _StreamWorker(spec, self._registry, self._store, self._runs_store, self._events)
self._streams[spec.id] = worker
self._streams[spec.id].start()
elif spec.trigger.kind in ("interval", "cron"):
self._track(asyncio.create_task(self._interval_loop(spec)))
elif spec.trigger.kind == "once":
# Track the task so stop() can cancel it and the event loop
# cannot silently GC it before it finishes.
self._track(asyncio.create_task(self._run_once(spec)))
[docs]
async def stop(self) -> None:
"""Stop all running jobs."""
self._running = False
for task in self._interval_tasks:
task.cancel()
for worker in self._streams.values():
await worker.stop()
self._interval_tasks.clear()
async def _interval_loop(self, spec: JobSpec) -> None:
every = spec.trigger.every or spec.target.span or 3600
while self._running:
await self._run_once(spec)
await asyncio.sleep(every)
async def _run_once(self, spec: JobSpec) -> None:
from dccd.application.operations import backfill
try:
run_events = self._events.for_run(f"{spec.id}@{int(time.time())}")
await backfill(
spec,
registry=self._registry,
store=self._store,
runs_store=self._runs_store,
events=run_events,
)
except Exception as exc:
logger.error("Scheduled job %s failed: %s", spec.id, exc)
def start_stream(self, spec_id: str) -> bool:
worker = self._streams.get(spec_id)
if worker:
worker.start()
return True
return False
async def stop_stream(self, spec_id: str) -> bool:
worker = self._streams.get(spec_id)
if worker:
await worker.stop()
return True
return False
def stream_status(self) -> dict[str, bool]:
return {sid: w.is_running for sid, w in self._streams.items()}