"""SQLite-backed runs store for job execution history."""
from __future__ import annotations
import json
import logging
import pathlib
import sqlite3
from contextlib import contextmanager
from typing import Any, Generator
__all__ = ["RunsStore"]
logger = logging.getLogger(__name__)
_SCHEMA = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
spec_id TEXT NOT NULL,
operation TEXT NOT NULL,
exchange TEXT NOT NULL,
symbol TEXT NOT NULL,
data_type TEXT NOT NULL,
state TEXT NOT NULL,
started_at INTEGER,
ended_at INTEGER,
rows_written INTEGER DEFAULT 0,
error TEXT,
progress TEXT,
log_tail TEXT DEFAULT '[]',
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_runs_spec ON runs(spec_id);
CREATE INDEX IF NOT EXISTS idx_runs_state ON runs(state);
CREATE INDEX IF NOT EXISTS idx_runs_created ON runs(created_at);
"""
[docs]
class RunsStore:
"""Append-only SQLite store for JobRun records.
Parameters
----------
db_path : str or Path
Path to the SQLite database file. Created if absent.
Examples
--------
>>> import tempfile, os
>>> with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
... path = f.name
>>> store = RunsStore(path)
>>> store.create_run('r1', 'backfill:binance:BTC/USDT:ohlc', 'backfill', 'binance', 'BTC/USDT', 'ohlc')
>>> os.unlink(path)
"""
def __init__(self, db_path: str | pathlib.Path) -> None:
self._path = pathlib.Path(db_path)
self._path.parent.mkdir(parents=True, exist_ok=True)
with self._conn() as conn:
conn.executescript(_SCHEMA)
@contextmanager
def _conn(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(str(self._path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_run(
self,
run_id: str,
spec_id: str,
operation: str,
exchange: str,
symbol: str,
data_type: str,
started_at: int | None = None,
) -> None:
import time
now = int(time.time() * 1_000_000_000)
with self._conn() as conn:
conn.execute(
"""INSERT INTO runs (run_id, spec_id, operation, exchange, symbol,
data_type, state, started_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, 'running', ?, ?)""",
(run_id, spec_id, operation, exchange, symbol, data_type, started_at, now),
)
def finish_run(
self,
run_id: str,
state: str,
ended_at: int | None = None,
rows_written: int = 0,
error: str | None = None,
) -> None:
import time
if ended_at is None:
ended_at = int(time.time() * 1_000_000_000)
with self._conn() as conn:
conn.execute(
"""UPDATE runs SET state=?, ended_at=?, rows_written=?, error=?
WHERE run_id=?""",
(state, ended_at, rows_written, error, run_id),
)
def update_progress(self, run_id: str, progress: dict[str, Any]) -> None:
with self._conn() as conn:
conn.execute(
"UPDATE runs SET progress=? WHERE run_id=?",
(json.dumps(progress), run_id),
)
def append_log(self, run_id: str, msg: str, max_lines: int = 100) -> None:
with self._conn() as conn:
row = conn.execute(
"SELECT log_tail FROM runs WHERE run_id=?", (run_id,)
).fetchone()
if row is None:
return
lines = json.loads(row["log_tail"])
lines.append(msg)
if len(lines) > max_lines:
lines = lines[-max_lines:]
conn.execute(
"UPDATE runs SET log_tail=? WHERE run_id=?",
(json.dumps(lines), run_id),
)
def get_run(self, run_id: str) -> dict[str, Any] | None:
with self._conn() as conn:
row = conn.execute(
"SELECT * FROM runs WHERE run_id=?", (run_id,)
).fetchone()
return dict(row) if row else None
def list_runs(
self,
spec_id: str | None = None,
state: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
conditions = []
params: list[Any] = []
if spec_id:
conditions.append("spec_id=?")
params.append(spec_id)
if state:
conditions.append("state=?")
params.append(state)
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
with self._conn() as conn:
rows = conn.execute(
f"SELECT * FROM runs {where} ORDER BY created_at DESC LIMIT ?",
params,
).fetchall()
return [dict(r) for r in rows]
def active_runs(self) -> list[dict[str, Any]]:
return self.list_runs(state="running") + self.list_runs(state="reconnecting")