#!/usr/bin/env python3
# coding: utf-8
""" FastAPI web UI and JSON API for the dccd daemon.
.. currentmodule:: dccd.daemon.api
This module is a thin HTTP wiring layer over the existing daemon modules
(:mod:`~dccd.daemon.config`, :mod:`~dccd.daemon.backfill`,
:mod:`~dccd.daemon.scheduler`, :mod:`~dccd.daemon.stream_manager`,
:mod:`~dccd.daemon.storage`). No business logic lives here — every endpoint
delegates to the corresponding daemon function and returns JSON.
The web UI (Jinja2 + htmx templates under ``ui/``) consumes the same
``/api/*`` endpoints, so the front-end can be replaced without touching the
API.
Run it standalone with ``dccd ui`` or embedded in ``dccd start``.
"""
from __future__ import annotations
import collections
import json
import threading
import time
from datetime import date
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _pkg_version
from pathlib import Path
from typing import TYPE_CHECKING
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from dccd.daemon.backfill import has_matching_jobs, run_backfill
from dccd.daemon.config import CollectorConfig, load_config
if TYPE_CHECKING:
from apscheduler.schedulers.background import BackgroundScheduler
from dccd.daemon.health import HealthMonitor
from dccd.daemon.stream_manager import StreamManager
__all__ = ['BackfillTracker', 'create_app']
_UI_DIR = Path(__file__).parent / 'ui'
_TEMPLATES_DIR = _UI_DIR / 'templates'
_STATIC_DIR = _UI_DIR / 'static'
_DATA_TYPES = ('ohlc', 'trades', 'orderbook')
try:
_DCCD_VERSION = _pkg_version('dccd')
except PackageNotFoundError: # editable install without metadata
_DCCD_VERSION = 'dev'
# Keep at most this many finished backfill records on disk so the tracker
# file does not grow without bound; running jobs are always kept.
_MAX_FINISHED_JOBS = 50
_FINISHED_STATES = frozenset({'done', 'cancelled', 'error'})
# Keep at most this many log lines per backfill job (oldest dropped).
_MAX_LOG_LINES = 100
# ---------------------------------------------------------------------------
# Backfill job tracker (disk-persisted, mirrors HealthMonitor's metrics.json)
# ---------------------------------------------------------------------------
[docs]
class BackfillTracker:
""" Track backfill jobs launched from the web UI, persisted to disk.
State lives in ``{local_path}/.dccd/backfill_jobs.json`` so it survives
a UI restart (the persisted ``status`` of an interrupted job stays
``'running'`` until overwritten — the thread itself does not survive a
process restart).
Parameters
----------
local_path : str or Path
Root data directory (``CollectorConfig.storage.local_path``).
"""
def __init__(self, local_path: str | Path) -> None:
self._dir = Path(local_path) / '.dccd'
self._dir.mkdir(parents=True, exist_ok=True)
self._file = self._dir / 'backfill_jobs.json'
self._lock = threading.Lock()
self._events: dict[str, threading.Event] = {}
self._jobs: dict[str, dict] = self._load()
def _load(self) -> dict[str, dict]:
if self._file.exists():
try:
return json.loads(self._file.read_text())
except Exception:
return {}
return {}
def _save(self) -> None:
self._file.write_text(json.dumps(self._jobs, indent=2))
def _prune(self) -> None:
""" Drop the oldest finished jobs beyond ``_MAX_FINISHED_JOBS``. """
finished = [
(rec.get('started_at', 0.0), jid)
for jid, rec in self._jobs.items()
if rec.get('status') in _FINISHED_STATES
]
if len(finished) <= _MAX_FINISHED_JOBS:
return
finished.sort()
for _, jid in finished[:-_MAX_FINISHED_JOBS]:
self._jobs.pop(jid, None)
[docs]
def snapshot(self) -> dict[str, dict]:
""" Return a snapshot of all tracked jobs. """
with self._lock:
return json.loads(json.dumps(self._jobs))
[docs]
def get(self, job_id: str) -> dict | None:
""" Return a single job record, or ``None`` if unknown. """
with self._lock:
job = self._jobs.get(job_id)
return dict(job) if job is not None else None
[docs]
def start(
self,
cfg: CollectorConfig,
exchange: str | None,
pairs: list[str] | None,
start: str,
parallel: bool = False,
) -> str:
""" Launch a backfill in a background thread and return its id. """
slug_ex = exchange or 'all'
slug_pairs = '-'.join(p.replace('/', '') for p in pairs) if pairs else 'all'
job_id = f'{slug_ex}_{slug_pairs}_{int(time.time())}'
record: dict[str, object] = {
'id': job_id,
'exchange': exchange,
'pairs': pairs,
'start': start,
'status': 'running',
'progress': {},
'log': [],
'started_at': time.time(),
'error': None,
}
with self._lock:
self._jobs[job_id] = record
self._prune()
self._save()
event = threading.Event()
self._events[job_id] = event
thread = threading.Thread(
target=self._run,
args=(job_id, cfg, exchange, pairs, start, event, parallel),
daemon=True,
name=f'backfill-{job_id}',
)
thread.start()
return job_id
def _run(
self,
job_id: str,
cfg: CollectorConfig,
exchange: str | None,
pairs: list[str] | None,
start: str,
event: threading.Event,
parallel: bool = False,
) -> None:
def _cb(exch: str, pair: str, done: int, total: int) -> None:
with self._lock:
rec = self._jobs.get(job_id)
if rec is not None:
rec['progress'][pair] = {'done': done, 'total': total}
self._save()
def _msg(exch: str, pair: str, line: str) -> None:
with self._lock:
rec = self._jobs.get(job_id)
if rec is not None:
log = rec['log']
log.append(line)
del log[:-_MAX_LOG_LINES]
self._save()
try:
run_backfill(
cfg, exchange=exchange, pairs=pairs, start=start,
parallel=parallel,
progress_callback=_cb, stop_event=event, message_callback=_msg,
)
status = 'cancelled' if event.is_set() else 'done'
with self._lock:
self._jobs[job_id]['status'] = status
self._save()
except Exception as exc: # noqa: BLE001 — surface any failure to the UI
with self._lock:
self._jobs[job_id]['status'] = 'error'
self._jobs[job_id]['error'] = str(exc)
self._save()
finally:
self._events.pop(job_id, None)
[docs]
def stop(self, job_id: str) -> bool:
""" Signal a running job to cancel. Return ``True`` if it was live. """
event = self._events.get(job_id)
if event is not None:
event.set()
with self._lock:
if job_id in self._jobs:
self._jobs[job_id]['status'] = 'cancelling'
self._save()
return True
return False
# ---------------------------------------------------------------------------
# Request bodies
# ---------------------------------------------------------------------------
class _BackfillBody(BaseModel):
exchange: str | None = None
pairs: list[str] | None = None
start: str = '2020-01-01 00:00:00'
parallel: bool = False
class _CollectBody(BaseModel):
exchange: str | None = None
pair: str | None = None
class _StreamBody(BaseModel):
exchange: str
pair: str
channels: list[str]
# ---------------------------------------------------------------------------
# Shared helpers (read fresh config per request → transparent hot-reload)
# ---------------------------------------------------------------------------
def _cfg(request: Request) -> CollectorConfig:
return load_config(request.app.state.cfg_path)
def _check_auth(request: Request) -> None:
""" Reject the request when a UI token is configured and not provided. """
token = request.app.state.auth_token
if not token:
return
provided: str | None = None
auth = request.headers.get('authorization')
if auth and auth.lower().startswith('bearer '):
provided = auth[7:]
if provided is None:
provided = (
request.cookies.get('dccd_token')
or request.query_params.get('token')
)
if provided != token:
raise HTTPException(status_code=401, detail='Invalid or missing token')
def _scan_inventory(data_path: Path) -> list[dict]:
""" Walk *data_path* and summarise every stored Parquet series.
Returns one dict per ``(exchange, type, pair, span)`` series with date
range, row count, and gap count. Mirrors the logic of the
``dccd inventory`` CLI command.
"""
import polars as pl
from dccd.tools.date_time import TS_to_date
groups: dict[tuple[str, str, str, str], list[Path]] = collections.defaultdict(list)
for dt in _DATA_TYPES:
for f in data_path.glob(f'*/{dt}/**/*.parquet'):
parts = f.relative_to(data_path).parts
exchange_name = parts[0]
pair_slug = parts[2]
span_lbl = parts[3] if dt == 'ohlc' and len(parts) == 5 else '-'
groups[(exchange_name, dt, pair_slug, span_lbl)].append(f)
series: list[dict] = []
for (exchange_name, dt, pair_slug, span_lbl), files in sorted(groups.items()):
files = sorted(files)
ts_col = pl.concat([pl.read_parquet(f, columns=['TS']) for f in files])['TS']
total_rows = len(ts_col)
ts_min = ts_col.min()
ts_max = ts_col.max()
from_date = TS_to_date(int(ts_min), form='%Y-%m-%d') if ts_min is not None else None
to_date = TS_to_date(int(ts_max), form='%Y-%m-%d') if ts_max is not None else None
if dt == 'ohlc':
existing_years = {int(f.stem) for f in files}
gaps = len(set(range(min(existing_years), max(existing_years) + 1)) - existing_years)
else:
existing_days = {f.stem for f in files}
d0 = date.fromisoformat(min(existing_days))
d1 = date.fromisoformat(max(existing_days))
gaps = (d1 - d0).days + 1 - len(existing_days)
series.append({
'exchange': exchange_name,
'pair': pair_slug.replace('-', '/', 1),
'type': dt,
'span': span_lbl,
'first': from_date,
'last': to_date,
'rows': total_rows,
'gaps': gaps,
})
return series
# ---------------------------------------------------------------------------
# App factory
# ---------------------------------------------------------------------------
[docs]
def create_app(cfg_path: str | Path,
stream_manager: StreamManager | None = None,
health: HealthMonitor | None = None,
scheduler: BackgroundScheduler | None = None) -> FastAPI:
""" Build the FastAPI application for *cfg_path*.
Parameters
----------
cfg_path : str or pathlib.Path
Path to the YAML daemon configuration file.
stream_manager : StreamManager, optional
The live :class:`~dccd.daemon.stream_manager.StreamManager` of an
embedding ``dccd start`` process, so the UI reflects and controls the
streams actually running. When ``None`` (standalone ``dccd ui``), a
fresh, *non-started* manager is created — its configured stream jobs
appear stopped until the user starts them from the UI.
health : HealthMonitor, optional
The live :class:`~dccd.daemon.health.HealthMonitor` of an embedding
``dccd start`` process. When ``None`` (standalone ``dccd ui``), a
fresh monitor is created — constructing it wires file logging
(``.dccd/dccd.log``) and metrics so the Logs/Dashboard pages have data
to show.
scheduler : BackgroundScheduler, optional
The live histo scheduler of an embedding ``dccd start`` process. When
``None``, a *non-started* one is built so the UI can start/stop
periodic collection itself.
Returns
-------
fastapi.FastAPI
Configured application with the JSON API (``/api/*``) and the
htmx web UI pages mounted.
"""
cfg_path = Path(cfg_path).expanduser().resolve()
cfg = load_config(cfg_path)
local_path = cfg.storage.local_path
if health is None:
# Constructing the monitor attaches the rotating file handler to the
# root logger (→ .dccd/dccd.log) and initialises metrics, so a
# standalone `dccd ui` surfaces logs and dashboard data too.
from dccd.daemon.health import HealthMonitor
health = HealthMonitor(local_path, cfg.alerts)
if stream_manager is None:
from dccd.daemon.stream_manager import StreamManager
stream_manager = StreamManager(cfg, health=health)
if scheduler is None:
from dccd.daemon.scheduler import build_histo_scheduler
scheduler = build_histo_scheduler(cfg, health)
# When a token is configured, do not expose the unauthenticated OpenAPI
# schema/docs — they would leak the API surface past the auth boundary.
enable_docs = cfg.settings.ui_auth_token is None
app = FastAPI(
title='dccd UI',
docs_url='/api/docs' if enable_docs else None,
openapi_url='/openapi.json' if enable_docs else None,
)
app.state.cfg_path = cfg_path
app.state.auth_token = cfg.settings.ui_auth_token
app.state.config_lock = threading.Lock()
app.state.tracker = BackfillTracker(local_path)
app.state.stream_manager = stream_manager
app.state.health = health
app.state.scheduler = scheduler
app.state.templates = Jinja2Templates(directory=str(_TEMPLATES_DIR))
app.mount('/static', StaticFiles(directory=str(_STATIC_DIR)), name='static')
_register_api(app)
_register_pages(app)
return app
# ---------------------------------------------------------------------------
# JSON API routes
# ---------------------------------------------------------------------------
def _register_api(app: FastAPI) -> None:
""" Attach all ``/api/*`` JSON endpoints to *app*. """
auth = Depends(_check_auth)
# --- Config ---------------------------------------------------------
@app.get('/api/config', dependencies=[auth])
def get_config(request: Request) -> dict:
return _cfg(request).model_dump()
@app.put('/api/config', dependencies=[auth])
def put_config(request: Request, body: dict) -> dict:
try:
validated = CollectorConfig.model_validate(body)
except Exception as exc:
raise HTTPException(status_code=422, detail=str(exc))
import yaml
normalised = validated.model_dump(mode='json')
with request.app.state.config_lock:
request.app.state.cfg_path.write_text(
yaml.dump(normalised, default_flow_style=False)
)
return normalised
@app.post('/api/config/validate', dependencies=[auth])
def validate_config(body: dict) -> dict:
try:
CollectorConfig.model_validate(body)
except Exception as exc:
return {'valid': False, 'error': str(exc)}
return {'valid': True, 'error': None}
# --- Inventory ------------------------------------------------------
@app.get('/api/inventory', dependencies=[auth])
def get_inventory(request: Request) -> list[dict]:
data_path = Path(_cfg(request).storage.local_path)
if not data_path.exists():
return []
return _scan_inventory(data_path)
# --- Metrics --------------------------------------------------------
@app.get('/api/metrics', dependencies=[auth])
def get_metrics(request: Request) -> dict:
f = Path(_cfg(request).storage.local_path) / '.dccd' / 'metrics.json'
if not f.exists():
return {}
try:
return json.loads(f.read_text())
except Exception:
return {}
# --- Jobs -----------------------------------------------------------
@app.get('/api/jobs', dependencies=[auth])
def get_jobs(request: Request) -> dict:
cfg = _cfg(request)
return {
'histo_jobs': [j.model_dump() for j in cfg.histo_jobs],
'stream_jobs': [j.model_dump() for j in cfg.stream_jobs],
}
# --- Backfill -------------------------------------------------------
@app.post('/api/backfill', status_code=202, dependencies=[auth])
def start_backfill(request: Request, body: _BackfillBody) -> dict:
cfg = _cfg(request)
if not has_matching_jobs(cfg, body.exchange, body.pairs):
raise HTTPException(
status_code=400,
detail='No configured histo job matches this exchange/pair. '
'Add one on the Config page first.',
)
job_id = request.app.state.tracker.start(
cfg, body.exchange, body.pairs, body.start, body.parallel,
)
return {'id': job_id}
@app.get('/api/backfill', dependencies=[auth])
def list_backfill(request: Request) -> dict:
return request.app.state.tracker.snapshot()
@app.get('/api/backfill/{job_id}', dependencies=[auth])
def get_backfill(request: Request, job_id: str) -> dict:
job = request.app.state.tracker.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail=f'No backfill {job_id}')
return job
@app.delete('/api/backfill/{job_id}', dependencies=[auth])
def cancel_backfill(request: Request, job_id: str) -> dict:
tracker = request.app.state.tracker
if tracker.get(job_id) is None:
raise HTTPException(status_code=404, detail=f'No backfill {job_id}')
live = tracker.stop(job_id)
return {'status': 'cancelling' if live else 'not_running'}
# --- Collect --------------------------------------------------------
@app.post('/api/collect', status_code=202, dependencies=[auth])
def start_collect(request: Request, body: _CollectBody | None = None) -> dict:
import logging
from dccd.daemon.scheduler import run_histo_job, run_once
cfg = _cfg(request)
health = request.app.state.health
log = logging.getLogger('dccd.daemon.api')
exchange = body.exchange if body else None
pair = body.pair if body else None
label = ' '.join(filter(None, [exchange or 'all', pair])) or 'all'
if exchange is None and pair is None:
def _job() -> None:
log.info('collect started: %s', label)
run_once(cfg, health=health)
log.info('collect finished: %s', label)
else:
targets = [
(job, p)
for job in cfg.histo_jobs
if exchange is None or job.exchange == exchange
for p in job.pairs
if pair is None or p == pair
]
if not targets:
raise HTTPException(status_code=404, detail='No matching histo job')
def _job() -> None:
log.info('collect started: %s', label)
for job, p in targets:
run_histo_job(job, p, cfg.storage.local_path,
cfg.settings.timezone, health)
log.info('collect finished: %s', label)
threading.Thread(target=_job, daemon=True, name='ui-collect').start()
return {'status': 'started'}
# --- Scheduler ------------------------------------------------------
def _sched_active(sched: BackgroundScheduler) -> bool:
# `.running` stays True while paused, so check the actual state:
# jobs only fire in STATE_RUNNING.
from apscheduler.schedulers.base import STATE_RUNNING
return sched.state == STATE_RUNNING
@app.get('/api/scheduler', dependencies=[auth])
def get_scheduler(request: Request) -> dict:
sched = request.app.state.scheduler
active = _sched_active(sched)
jobs = [
{'id': j.id, 'name': j.name,
'next_run': j.next_run_time.timestamp()
if active and j.next_run_time else None}
for j in sched.get_jobs()
]
return {'running': active, 'jobs': jobs}
@app.post('/api/scheduler/start', status_code=202, dependencies=[auth])
def start_scheduler(request: Request) -> dict:
sched = request.app.state.scheduler
if not sched.running:
sched.start()
elif not _sched_active(sched):
sched.resume()
return {'running': True}
@app.post('/api/scheduler/stop', dependencies=[auth])
def stop_scheduler(request: Request) -> dict:
sched = request.app.state.scheduler
if _sched_active(sched):
sched.pause()
return {'running': _sched_active(sched)}
# --- Streams --------------------------------------------------------
@app.get('/api/streams', dependencies=[auth])
def list_streams(request: Request) -> list[dict]:
# Pass the on-disk config so stream jobs added after startup (via the
# Config page) appear without restarting the server.
return request.app.state.stream_manager.status(_cfg(request))
@app.post('/api/streams/start', status_code=202, dependencies=[auth])
def start_stream(request: Request, body: _StreamBody) -> dict:
sm = request.app.state.stream_manager
# Read from the on-disk config so stream jobs added after startup
# (via the Config page) can be started without restarting the server.
job = next(
(j for j in _cfg(request).stream_jobs
if j.exchange == body.exchange and body.pair in j.pairs),
None,
)
if job is None:
raise HTTPException(status_code=404, detail='No matching stream job')
key = sm.start_one(job, body.pair, body.channels)
return {'status': 'started', 'key': key}
@app.post('/api/streams/stop', dependencies=[auth])
def stop_stream(request: Request, body: _StreamBody) -> dict:
sm = request.app.state.stream_manager
key = sm._key(body.exchange, body.pair, body.channels)
live = sm.stop_one(key)
return {'status': 'stopping' if live else 'not_running', 'key': key}
# --- Logs -----------------------------------------------------------
@app.get('/api/logs', dependencies=[auth])
def get_logs(request: Request, tail: int = 200) -> dict:
f = Path(_cfg(request).storage.local_path) / '.dccd' / 'dccd.log'
if not f.exists():
return {'lines': []}
lines = f.read_text(errors='replace').splitlines()
return {'lines': lines[-tail:]}
# --- Storage --------------------------------------------------------
@app.get('/api/storage', dependencies=[auth])
def get_storage(request: Request) -> dict:
from dccd.daemon.storage import RemoteStorage
cfg = _cfg(request)
storage = RemoteStorage(cfg.storage)
last_sync = None
f = Path(cfg.storage.local_path) / '.dccd' / 'last_sync.json'
if f.exists():
try:
last_sync = json.loads(f.read_text())
except Exception:
last_sync = None
return {
'remotes': [r.model_dump() for r in cfg.storage.remotes],
'sync_interval': cfg.storage.sync_interval,
'rclone_available': storage.check_rclone(),
'last_sync': last_sync,
}
@app.post('/api/storage/sync', status_code=202, dependencies=[auth])
def trigger_sync(request: Request) -> dict:
from dccd.daemon.stream_manager import SyncService
cfg = _cfg(request)
if not cfg.storage.remotes:
raise HTTPException(status_code=400, detail='No remotes configured')
svc = SyncService(cfg.storage)
threading.Thread(target=svc.sync_now, daemon=True, name='ui-sync').start()
return {'status': 'started'}
# ---------------------------------------------------------------------------
# HTML page routes (htmx shells consuming the JSON API)
# ---------------------------------------------------------------------------
def _register_pages(app: FastAPI) -> None:
""" Attach the HTML page routes (served by Jinja2 templates). """
auth = Depends(_check_auth)
def _page(name: str, request: Request) -> HTMLResponse:
resp = request.app.state.templates.TemplateResponse(
request, name,
{'request': request, 'version': _DCCD_VERSION,
'active': request.url.path},
)
token = request.query_params.get('token')
if token and token == request.app.state.auth_token:
resp.set_cookie('dccd_token', token, httponly=True, samesite='strict')
return resp
@app.get('/', response_class=HTMLResponse, dependencies=[auth])
def page_dashboard(request: Request) -> HTMLResponse:
return _page('dashboard.html', request)
@app.get('/inventory', response_class=HTMLResponse, dependencies=[auth])
def page_inventory(request: Request) -> HTMLResponse:
return _page('inventory.html', request)
@app.get('/jobs', response_class=HTMLResponse, dependencies=[auth])
def page_jobs(request: Request) -> HTMLResponse:
return _page('jobs.html', request)
@app.get('/logs', response_class=HTMLResponse, dependencies=[auth])
def page_logs(request: Request) -> HTMLResponse:
return _page('logs.html', request)
@app.get('/config', response_class=HTMLResponse, dependencies=[auth])
def page_config(request: Request) -> HTMLResponse:
return _page('config.html', request)
@app.get('/storage', response_class=HTMLResponse, dependencies=[auth])
def page_storage(request: Request) -> HTMLResponse:
return _page('storage.html', request)