Source code for dccd.interfaces.api.app

"""FastAPI application — routes 1:1 with the Operation Registry + SSE events.

The UI (``/``, ``/inventory``, …) is a thin client that consumes this API;
all business logic lives in ``dccd.application``.

Architecture note
-----------------
All mutable server state lives in ``app.state`` and is initialised in the
``lifespan`` context manager. Endpoint helpers (``_store``, ``_bus``, …) read
from ``app.state`` via the ``Request`` object so the wiring is always explicit
and testable.

Background-task safety
----------------------
``start_backfill`` spawns an ``asyncio.Task``. The task captures *references*
to the infrastructure objects (``reg``, ``store``, ``runs_store``, ``bus``)
as local variables **before** the task is created — not via the ``Request``
object, which Starlette may recycle after the response is sent.
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
import pathlib
from collections.abc import Coroutine
from typing import Any, cast

from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel

from dccd.application.config import AppConfig, load_config, resolve_config_path
from dccd.application.events import EventBus
from dccd.application.jobs import JobParams, JobSpec, JobTarget, Trigger
from dccd.application.registry import REGISTRY
from dccd.application.scheduler import Scheduler
from dccd.application.service_factory import (
    build_registry,
    build_runs_store,
    build_store,
)
from dccd.domain.symbol import Symbol
from dccd.domain.types import DataType
from dccd.storage.runs_sqlite import RunsStore

_UI_DIR = pathlib.Path(__file__).parent.parent / "ui"
_TEMPLATES_DIR = _UI_DIR / "templates"
_STATIC_DIR = _UI_DIR / "static"

__all__ = ["create_app"]

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Module-level request models — FastAPI introspects these at import time;
# defining them inside create_app() would confuse its dependency resolution.
# ---------------------------------------------------------------------------

class BackfillRequest(BaseModel):
    """Request body for ``POST /api/backfill``."""

    exchange: str
    symbol: str
    data_type: str = "ohlc"
    span: int | None = None
    start: str = "last"


class StreamAction(BaseModel):
    """Request body for ``POST /api/streams/start`` and ``/stop``."""

    spec_id: str


class ReadRequest(BaseModel):
    """Request body for ``POST /api/read``."""

    exchange: str
    symbol: str
    data_type: str = "ohlc"
    span: int | None = None
    start_ns: int | None = None
    end_ns: int | None = None


class JobRunRequest(BaseModel):
    """Request body for ``POST /api/jobs/run``."""

    job_id: str


class MigrateRequest(BaseModel):
    """Request body for ``POST /api/migrate``."""

    dry_run: bool = True


# ---------------------------------------------------------------------------
# Application factory
# ---------------------------------------------------------------------------

[docs] def create_app( config_path: str | pathlib.Path | None = None, config: AppConfig | None = None, scheduler: Scheduler | None = None, ) -> FastAPI: """Create and return the FastAPI application. Parameters ---------- config_path : str or Path or None Path to ``config.yml``. Resolved via XDG fallback when ``None``. config : AppConfig or None Pre-loaded config. Takes precedence over *config_path*. scheduler : Scheduler or None Pass a running :class:`~dccd.application.scheduler.Scheduler` from ``dccd start`` so the UI controls the live daemon's jobs and streams. When ``None``, a standalone (non-started) scheduler is created. """ @contextlib.asynccontextmanager async def lifespan(app: FastAPI): # --- startup --- cfg = config if cfg is None: try: path = resolve_config_path(config_path) cfg = load_config(path) except Exception: cfg = AppConfig() app.state.config = cfg app.state.config_path = config_path app.state.store = build_store(cfg.settings.data_path) app.state.runs_store = build_runs_store(cfg.settings.data_path) app.state.event_bus = EventBus() app.state.registry = build_registry() if scheduler is not None: app.state.scheduler = scheduler else: app.state.scheduler = Scheduler( app.state.registry, app.state.store, app.state.runs_store, app.state.event_bus, ) # Register stream workers from config so they can be started/stopped # from the UI even in standalone dccd-ui mode (without dccd start). stream_specs = [s for s in cfg.all_job_specs() if s.operation == "stream"] app.state.scheduler.register_streams(stream_specs) app.state.all_specs = cfg.all_job_specs() # Keeps strong references to background tasks so Python's GC doesn't # collect them mid-execution (asyncio only holds a weak ref internally). app.state.bg_tasks = set() # Per-run stop events so a running backfill can be cancelled via # DELETE /api/backfill/{run_id} (e.g. a runaway multi-million-row trades # backfill the user launched by mistake). app.state.backfill_stops = {} yield # --- shutdown --- app = FastAPI(title="dccd v3", version="3.0.0", lifespan=lifespan) # CORS: no wildcard. The UI is served same-origin, so it needs no CORS at # all; allowing every origin let any website's JS drive the local API # (reachable on 127.0.0.1 from the user's browser). Cross-origin access is # opt-in via settings.ui_allow_origins. allow_origins = list(getattr(getattr(config, "settings", None), "ui_allow_origins", []) or []) if allow_origins: app.add_middleware( CORSMiddleware, allow_origins=allow_origins, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def _auth_guard(request: Request, call_next): """Require a Bearer token on /api/* when settings.ui_auth_token is set. Page routes are intentionally not gated (browsers can't send Bearer on navigation); for untrusted networks, front the UI with a reverse proxy. """ cfg = getattr(request.app.state, "config", None) token = getattr(getattr(cfg, "settings", None), "ui_auth_token", None) if ( token and request.url.path.startswith("/api/") and request.method != "OPTIONS" ): # Header for normal calls; query param for EventSource (SSE), which # cannot set Authorization headers. bearer = request.headers.get("Authorization") == f"Bearer {token}" query = request.query_params.get("token") == token if not (bearer or query): return JSONResponse({"detail": "Unauthorized"}, status_code=401) return await call_next(request) if _STATIC_DIR.exists(): app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static") templates: Jinja2Templates | None = ( Jinja2Templates(directory=str(_TEMPLATES_DIR)) if _TEMPLATES_DIR.exists() else None ) # -- state accessors (read from app.state via request) -- def _spawn(request: Request, coro: "Coroutine[Any, Any, Any]") -> "asyncio.Task[Any]": """Create a background task and hold a strong reference to it.""" task = asyncio.create_task(coro) bg: set[Any] = request.app.state.bg_tasks bg.add(task) task.add_done_callback(bg.discard) return task def _parse_run(run: dict[str, Any]) -> dict[str, Any]: """Parse JSON-encoded fields returned from SQLite.""" import json as _json for key in ("progress", "log_tail"): if isinstance(run.get(key), str): try: run[key] = _json.loads(run[key]) except Exception: pass return run def _cfg(request: Request) -> AppConfig: return cast(AppConfig, request.app.state.config) def _store(request: Request): return request.app.state.store def _runs(request: Request) -> RunsStore: return cast(RunsStore, request.app.state.runs_store) def _sched(request: Request) -> Scheduler: return cast(Scheduler, request.app.state.scheduler) def _reg(request: Request): return request.app.state.registry def _bus(request: Request) -> EventBus: return cast(EventBus, request.app.state.event_bus) def _run_backfill_tracked(request: Request, spec: JobSpec, run_id: str) -> None: """Spawn a backfill with a cancellable stop event registered by run_id.""" reg = _reg(request) store = _store(request) runs_store = _runs(request) bus = _bus(request) stops = request.app.state.backfill_stops stop_event = asyncio.Event() stops[run_id] = stop_event async def _run() -> None: from dccd.application.operations import backfill try: await backfill(spec, registry=reg, store=store, runs_store=runs_store, events=bus.for_run(run_id), run_id=run_id, stop_event=stop_event) except Exception as exc: logger.error("Backfill task %s failed: %s", run_id, exc) finally: stops.pop(run_id, None) _spawn(request, _run()) # ----------------------------------------------------------------------- # Operations # ----------------------------------------------------------------------- @app.get("/api/operations") async def list_operations() -> dict[str, Any]: """List all registered operations.""" return {"operations": [{"name": op} for op in REGISTRY.operations]} # ----------------------------------------------------------------------- # Inventory # ----------------------------------------------------------------------- @app.get("/api/inventory") async def get_inventory(request: Request) -> dict[str, Any]: """Return all stored datasets.""" return {"datasets": _store(request).inventory()} # ----------------------------------------------------------------------- # Backfill # ----------------------------------------------------------------------- @app.post("/api/backfill") async def start_backfill(body: BackfillRequest, request: Request) -> dict[str, Any]: """Launch a backfill job asynchronously and return its ``run_id``. The ``run_id`` can be polled via ``GET /api/backfill/{run_id}``. """ try: sym = Symbol.parse(body.symbol) except ValueError as e: raise HTTPException(400, str(e)) data_type = DataType(body.data_type) # Validate span for OHLC before the task starts — avoids a silent background crash. if data_type == DataType.OHLC and not body.span: raise HTTPException(400, "span is required for data_type='ohlc'") target = JobTarget( exchange=body.exchange, symbol=sym, data_type=data_type, span=body.span, ) spec = JobSpec( id=JobSpec.make_id("backfill", target), operation="backfill", target=target, trigger=Trigger(kind="once"), params=JobParams(start=body.start), origin="runtime", ) # Generate a URL-safe run_id and pass it into backfill() so both the # API polling endpoint and the RunsStore use the same identifier. # We use a short UUID (no slashes) instead of embedding spec.id which # may contain '/' from the symbol (e.g. BTC/USDT) and would break routing. import uuid as _uuid run_id = str(_uuid.uuid4()) _run_backfill_tracked(request, spec, run_id) return {"run_id": run_id, "status": "started"} @app.get("/api/backfill/{run_id}") async def get_backfill_status(run_id: str, request: Request) -> dict[str, Any]: """Get the status of a backfill run by ``run_id``.""" run = _runs(request).get_run(run_id) if not run: raise HTTPException(404, f"Run {run_id!r} not found") return _parse_run(run) @app.delete("/api/backfill/{run_id}") async def cancel_backfill(run_id: str, request: Request) -> dict[str, Any]: """Cancel a running backfill (cooperative — stops at the next page).""" ev = request.app.state.backfill_stops.get(run_id) if ev is None: raise HTTPException(404, f"No running backfill {run_id!r}") ev.set() return {"status": "cancelling", "run_id": run_id} @app.get("/api/runs") async def list_runs(request: Request, limit: int = 50) -> dict[str, Any]: """List recent job runs.""" return {"runs": [_parse_run(r) for r in _runs(request).list_runs(limit=limit)]} # ----------------------------------------------------------------------- # Stream control # ----------------------------------------------------------------------- @app.get("/api/streams") async def list_streams(request: Request) -> dict[str, Any]: """List stream jobs and their running state.""" sched = _sched(request) return {"streams": [ {"id": sid, "running": running} for sid, running in sched.stream_status().items() ]} @app.post("/api/streams/start") async def start_stream(body: StreamAction, request: Request) -> dict[str, Any]: """Start a supervised stream job.""" if not _sched(request).start_stream(body.spec_id): raise HTTPException(404, f"Stream job {body.spec_id!r} not found") return {"status": "started"} @app.post("/api/streams/stop") async def stop_stream(body: StreamAction, request: Request) -> dict[str, Any]: """Stop a supervised stream job.""" if not await _sched(request).stop_stream(body.spec_id): raise HTTPException(404, f"Stream job {body.spec_id!r} not found") return {"status": "stopped"} # ----------------------------------------------------------------------- # Jobs # ----------------------------------------------------------------------- @app.get("/api/jobs") async def list_jobs(request: Request) -> dict[str, Any]: """List all configured job specs and their current state.""" specs = getattr(request.app.state, "all_specs", _cfg(request).all_job_specs()) stream_status = _sched(request).stream_status() return {"jobs": [ { "id": s.id, "operation": s.operation, "exchange": s.target.exchange, "symbol": str(s.target.symbol), "data_type": s.target.data_type.value, "span": s.target.span, "trigger": s.trigger.kind, "enabled": s.enabled, "running": stream_status.get(s.id, False) if s.operation == "stream" else None, } for s in specs ]} @app.post("/api/jobs/run") async def run_job_now(body: JobRunRequest, request: Request) -> dict[str, Any]: """Trigger an immediate one-shot backfill for a configured job. Uses a POST body to avoid URL-routing issues with job IDs that contain slashes (e.g. ``backfill:binance:BTC/USDT:ohlc:3600s``). """ import uuid as _uuid job_id = body.job_id specs = getattr(request.app.state, "all_specs", _cfg(request).all_job_specs()) spec = next((s for s in specs if s.id == job_id), None) if spec is None: raise HTTPException(404, f"Job {job_id!r} not found") if spec.operation != "backfill": raise HTTPException(400, "Only backfill jobs can be triggered manually; use /api/streams/start for stream jobs") run_id = str(_uuid.uuid4()) _run_backfill_tracked(request, spec, run_id) return {"run_id": run_id, "status": "started", "job_id": job_id} @app.post("/api/jobs/run-all") async def run_all_backfill_jobs(request: Request) -> dict[str, Any]: """Trigger an immediate backfill for all enabled backfill jobs.""" import uuid as _uuid specs = getattr(request.app.state, "all_specs", _cfg(request).all_job_specs()) backfill_specs = [s for s in specs if s.operation == "backfill" and s.enabled] run_ids = [] for spec in backfill_specs: run_id = str(_uuid.uuid4()) run_ids.append({"run_id": run_id, "job_id": spec.id}) _run_backfill_tracked(request, spec, run_id) return {"started": len(run_ids), "runs": run_ids} # ----------------------------------------------------------------------- # Config # ----------------------------------------------------------------------- @app.get("/api/config") async def get_config(request: Request) -> dict[str, Any]: """Return the current configuration as a dict.""" return _cfg(request).model_dump() @app.put("/api/config") async def update_config(request: Request) -> dict[str, Any]: """Replace the configuration; persists to disk when *config_path* is set.""" body = await request.json() try: new_cfg = AppConfig.model_validate(body) except Exception as e: raise HTTPException(422, str(e)) cfg_path = request.app.state.config_path if cfg_path: import yaml with open(cfg_path, "w") as f: yaml.safe_dump(new_cfg.model_dump(), f) request.app.state.config = new_cfg # Refresh runtime state so /api/jobs and stream control reflect the new # config without a restart (D11): rebuild specs and register new streams. request.app.state.all_specs = new_cfg.all_job_specs() new_streams = [s for s in request.app.state.all_specs if s.operation == "stream"] request.app.state.scheduler.register_streams(new_streams) return {"status": "ok"} # ----------------------------------------------------------------------- # Read # ----------------------------------------------------------------------- @app.post("/api/read") async def read_data(body: ReadRequest, request: Request) -> dict[str, Any]: """Read stored data for a dataset (returns at most 1 000 rows).""" from dccd.application.operations import read try: sym = Symbol.parse(body.symbol) except ValueError as e: raise HTTPException(400, str(e)) target = JobTarget( exchange=body.exchange, symbol=sym, data_type=DataType(body.data_type), span=body.span, ) df = read(target, store=_store(request), start_ns=body.start_ns, end_ns=body.end_ns) rows = df.to_dicts() if hasattr(df, "to_dicts") else [] return {"rows": len(rows), "data": rows[:1000]} # ----------------------------------------------------------------------- # SSE events # ----------------------------------------------------------------------- @app.get("/api/events") async def sse_events(request: Request) -> StreamingResponse: """Server-Sent Events stream of progress/log/status events.""" queue = _bus(request).enable_queue() async def _generator(): while True: if await request.is_disconnected(): break try: event = await asyncio.wait_for(queue.get(), timeout=30.0) yield f"data: {event.model_dump_json()}\n\n" except asyncio.TimeoutError: yield ": heartbeat\n\n" return StreamingResponse(_generator(), media_type="text/event-stream") # ----------------------------------------------------------------------- # Migration # ----------------------------------------------------------------------- @app.post("/api/migrate") async def migrate_data(body: MigrateRequest, request: Request) -> dict[str, Any]: """Migrate existing Parquet files from seconds to nanosecond timestamps.""" from dccd.storage.migrate import migrate_parquet_to_ns report = await asyncio.to_thread( migrate_parquet_to_ns, _cfg(request).settings.data_path, dry_run=body.dry_run, ) return {"report": report} # ----------------------------------------------------------------------- # Health # ----------------------------------------------------------------------- @app.get("/health") async def health() -> dict[str, str]: """Liveness check.""" return {"status": "ok"} # ----------------------------------------------------------------------- # UI pages # ----------------------------------------------------------------------- if templates is not None: from importlib.metadata import version as _pkg_version def _tpl_ctx(request: Request, page: str) -> dict[str, Any]: # request is NOT included here — Starlette 1.x injects it automatically # when using the new TemplateResponse(request, name, context) signature. try: ver = _pkg_version("dccd") except Exception: ver = "dev" cfg = getattr(request.app.state, "config", None) token = getattr(getattr(cfg, "settings", None), "ui_auth_token", None) return { "active": request.url.path, "version": ver, "page": page, "auth_token": token or "", } # Starlette >= 0.29 / 1.x signature: TemplateResponse(request, name, context) @app.get("/") async def ui_dashboard(request: Request): return templates.TemplateResponse(request, "dashboard.html", _tpl_ctx(request, "dashboard")) @app.get("/inventory") async def ui_inventory(request: Request): return templates.TemplateResponse(request, "inventory.html", _tpl_ctx(request, "inventory")) @app.get("/jobs") async def ui_jobs(request: Request): return templates.TemplateResponse(request, "jobs.html", _tpl_ctx(request, "jobs")) @app.get("/config") async def ui_config(request: Request): return templates.TemplateResponse(request, "config.html", _tpl_ctx(request, "config")) @app.get("/logs") async def ui_logs(request: Request): return templates.TemplateResponse(request, "logs.html", _tpl_ctx(request, "logs")) @app.get("/storage") async def ui_storage(request: Request): return templates.TemplateResponse(request, "storage.html", _tpl_ctx(request, "storage")) return app