"""SQLite-backed runs store for job execution history."""
from __future__ import annotations
import json
import logging
import pathlib
import sqlite3
from contextlib import contextmanager
from typing import Any, Generator
__all__ = ["RunsStore"]
logger = logging.getLogger(__name__)
_SCHEMA = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
spec_id TEXT NOT NULL,
operation TEXT NOT NULL,
exchange TEXT NOT NULL,
symbol TEXT NOT NULL,
data_type TEXT NOT NULL,
state TEXT NOT NULL,
started_at INTEGER,
ended_at INTEGER,
rows_written INTEGER DEFAULT 0,
error TEXT,
progress TEXT,
log_tail TEXT DEFAULT '[]',
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_runs_spec ON runs(spec_id);
CREATE INDEX IF NOT EXISTS idx_runs_state ON runs(state);
CREATE INDEX IF NOT EXISTS idx_runs_created ON runs(created_at);
"""
[docs]
class RunsStore:
"""Append-only SQLite store for JobRun records.
Parameters
----------
db_path : str or Path
Path to the SQLite database file. Created if absent.
Examples
--------
>>> import tempfile, os
>>> with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
... path = f.name
>>> store = RunsStore(path)
>>> store.create_run('r1', 'backfill:binance:BTC/USDT:ohlc', 'backfill', 'binance', 'BTC/USDT', 'ohlc')
>>> os.unlink(path)
"""
def __init__(self, db_path: str | pathlib.Path) -> None:
self._path = pathlib.Path(db_path)
self._path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as conn:
conn.executescript(_SCHEMA)
@contextmanager
def _conn(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(str(self._path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
[docs]
def create_run(
self,
run_id: str,
spec_id: str,
operation: str,
exchange: str,
symbol: str,
data_type: str,
started_at: int | None = None,
) -> None:
"""Insert a new run row in the ``running`` state."""
import time
now = int(time.time() * 1_000_000_000)
with self._conn() as conn:
conn.execute(
"""INSERT INTO runs (run_id, spec_id, operation, exchange, symbol,
data_type, state, started_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, 'running', ?, ?)""",
(run_id, spec_id, operation, exchange, symbol, data_type, started_at, now),
)
[docs]
def finish_run(
self,
run_id: str,
state: str,
ended_at: int | None = None,
rows_written: int = 0,
error: str | None = None,
) -> None:
"""Mark a run finished with its final state, row count and optional error."""
import time
if ended_at is None:
ended_at = int(time.time() * 1_000_000_000)
with self._conn() as conn:
conn.execute(
"""UPDATE runs SET state=?, ended_at=?, rows_written=?, error=?
WHERE run_id=?""",
(state, ended_at, rows_written, error, run_id),
)
[docs]
def update_progress(self, run_id: str, progress: dict[str, Any]) -> None:
"""Persist the latest progress dict for *run_id* (polled by the UI)."""
with self._conn() as conn:
conn.execute(
"UPDATE runs SET progress=? WHERE run_id=?",
(json.dumps(progress), run_id),
)
[docs]
def append_log(self, run_id: str, msg: str, max_lines: int = 100) -> None:
"""Append a log line to the run's bounded ``log_tail``."""
with self._conn() as conn:
row = conn.execute(
"SELECT log_tail FROM runs WHERE run_id=?", (run_id,)
).fetchone()
if row is None:
return
lines = json.loads(row["log_tail"])
lines.append(msg)
if len(lines) > max_lines:
lines = lines[-max_lines:]
conn.execute(
"UPDATE runs SET log_tail=? WHERE run_id=?",
(json.dumps(lines), run_id),
)
[docs]
def get_run(self, run_id: str) -> dict[str, Any] | None:
"""Return one run as a dict, or None if unknown."""
with self._conn() as conn:
row = conn.execute(
"SELECT * FROM runs WHERE run_id=?", (run_id,)
).fetchone()
return dict(row) if row else None
[docs]
def list_runs(
self,
spec_id: str | None = None,
state: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""List recent runs, most recent first, optionally filtered."""
conditions = []
params: list[Any] = []
if spec_id:
conditions.append("spec_id=?")
params.append(spec_id)
if state:
conditions.append("state=?")
params.append(state)
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
with self._conn() as conn:
rows = conn.execute(
f"SELECT * FROM runs {where} ORDER BY created_at DESC LIMIT ?",
params,
).fetchall()
return [dict(r) for r in rows]
[docs]
def active_runs(self) -> list[dict[str, Any]]:
"""Runs currently ``running`` or ``reconnecting``."""
return self.list_runs(state="running") + self.list_runs(state="reconnecting")
[docs]
def prune_old_runs(self, retention_days: int) -> int:
"""Delete terminal non-failed runs older than *retention_days* days.
Runs in states ``succeeded``, ``stale``, and ``cancelled`` that started
more than *retention_days* days ago are removed. ``failed`` rows are
intentionally kept as the long-term error journal. The database is
``VACUUM``-ed after any deletion to reclaim disk space.
Parameters
----------
retention_days : int
Number of days to retain terminal non-failed runs. Pass ``0`` (or
any value ``<= 0``) to disable pruning; the method returns ``0``
immediately without touching the database.
Returns
-------
int
Number of rows deleted (0 when pruning is disabled or when no rows
match the cutoff).
Notes
-----
``VACUUM`` cannot run inside a transaction. This method opens a
separate connection (outside the :meth:`_conn` context manager) for the
``VACUUM`` statement, which is executed only when at least one row was
deleted.
This method must be called from the daemon boot path *after*
:meth:`mark_stale_running` so that freshly-staled orphans age normally
rather than being immediately pruned on the next boot.
"""
if retention_days <= 0:
return 0
import time
cutoff_ns = int(time.time() * 1_000_000_000) - int(retention_days * 86400 * 1_000_000_000)
with self._conn() as conn:
cursor = conn.execute(
"""DELETE FROM runs
WHERE state IN ('succeeded', 'stale', 'cancelled')
AND started_at < ?""",
(cutoff_ns,),
)
deleted = cursor.rowcount
if deleted > 0:
# VACUUM cannot run inside a transaction — open a plain connection.
conn2 = sqlite3.connect(str(self._path))
try:
conn2.execute("VACUUM")
finally:
conn2.close()
return deleted
[docs]
def mark_stale_running(self) -> int:
"""Transition all ``running`` rows to ``stale`` at daemon boot.
Runs left in state ``running`` after a daemon crash or SIGKILL pollute
:meth:`active_runs` and the Dashboard forever. Calling this once during
daemon startup corrects the DB without deleting any history rows.
Parameters
----------
None
Returns
-------
int
Number of rows updated (0 when the store is clean).
Notes
-----
The ``ended_at`` timestamp is set to *now* (nanoseconds UTC) and
``error`` is set to ``'orphaned by daemon restart'`` so the run history
clearly attributes the state change to a restart rather than a normal
completion or a user-visible error.
This method must only be called from the daemon boot path, before any
new runs are started: ``cmd_start`` for ``dccd start`` (called before
the scheduler starts stream workers); the FastAPI lifespan for
standalone ``dccd ui`` (called before the standalone scheduler is
created). Calling it while workers are already running would
incorrectly stale-out their legitimate active runs.
"""
import time
now = int(time.time() * 1_000_000_000)
with self._conn() as conn:
cursor = conn.execute(
"""UPDATE runs SET state='stale', ended_at=?, error='orphaned by daemon restart'
WHERE state='running'""",
(now,),
)
return cursor.rowcount