136 lines
4.2 KiB
Python
136 lines
4.2 KiB
Python
"""Minimal SQLite persistence for the runner.
|
|
|
|
Just a KV store for tracking processed tasks and AutoCora jobs,
|
|
plus a run log for auditing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
import threading
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class StateDB:
|
|
"""Thread-safe SQLite KV store + run log."""
|
|
|
|
def __init__(self, db_path: Path):
|
|
self._path = db_path
|
|
self._local = threading.local()
|
|
self._init_schema()
|
|
|
|
@property
|
|
def _conn(self) -> sqlite3.Connection:
|
|
if not hasattr(self._local, "conn"):
|
|
self._local.conn = sqlite3.connect(str(self._path))
|
|
self._local.conn.row_factory = sqlite3.Row
|
|
self._local.conn.execute("PRAGMA journal_mode=WAL")
|
|
return self._local.conn
|
|
|
|
def _init_schema(self):
|
|
self._conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS kv_store (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS run_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
task_name TEXT NOT NULL,
|
|
task_type TEXT NOT NULL,
|
|
stage TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
started_at TEXT NOT NULL,
|
|
finished_at TEXT,
|
|
result TEXT,
|
|
error TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_run_log_task
|
|
ON run_log(task_id, started_at);
|
|
""")
|
|
self._conn.commit()
|
|
|
|
# ── KV Store ──
|
|
|
|
def kv_set(self, key: str, value: str):
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO kv_store (key, value) VALUES (?, ?)",
|
|
(key, value),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def kv_get(self, key: str) -> str | None:
|
|
row = self._conn.execute(
|
|
"SELECT value FROM kv_store WHERE key = ?", (key,)
|
|
).fetchone()
|
|
return row["value"] if row else None
|
|
|
|
def kv_set_json(self, key: str, data: Any):
|
|
self.kv_set(key, json.dumps(data))
|
|
|
|
def kv_get_json(self, key: str) -> Any | None:
|
|
raw = self.kv_get(key)
|
|
if raw is None:
|
|
return None
|
|
return json.loads(raw)
|
|
|
|
def kv_scan(self, prefix: str) -> list[tuple[str, str]]:
|
|
"""Return all KV pairs where key starts with prefix."""
|
|
rows = self._conn.execute(
|
|
"SELECT key, value FROM kv_store WHERE key LIKE ?",
|
|
(prefix + "%",),
|
|
).fetchall()
|
|
return [(r["key"], r["value"]) for r in rows]
|
|
|
|
def kv_delete(self, key: str):
|
|
self._conn.execute("DELETE FROM kv_store WHERE key = ?", (key,))
|
|
self._conn.commit()
|
|
|
|
# ── Run Log ──
|
|
|
|
def log_run_start(
|
|
self,
|
|
task_id: str,
|
|
task_name: str,
|
|
task_type: str,
|
|
stage: str,
|
|
) -> int:
|
|
"""Log the start of a task run. Returns the run log ID."""
|
|
now = _now()
|
|
cur = self._conn.execute(
|
|
"""INSERT INTO run_log
|
|
(task_id, task_name, task_type, stage, status, started_at)
|
|
VALUES (?, ?, ?, ?, 'running', ?)""",
|
|
(task_id, task_name, task_type, stage, now),
|
|
)
|
|
self._conn.commit()
|
|
return cur.lastrowid # type: ignore[return-value]
|
|
|
|
def log_run_finish(
|
|
self, run_id: int, status: str, result: str | None = None, error: str | None = None
|
|
):
|
|
"""Update a run log entry with the outcome."""
|
|
now = _now()
|
|
self._conn.execute(
|
|
"""UPDATE run_log
|
|
SET status = ?, finished_at = ?, result = ?, error = ?
|
|
WHERE id = ?""",
|
|
(status, now, result, error, run_id),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def get_recent_runs(self, limit: int = 20) -> list[dict]:
|
|
"""Get the most recent run log entries."""
|
|
rows = self._conn.execute(
|
|
"SELECT * FROM run_log ORDER BY started_at DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(UTC).isoformat()
|