116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
"""Task scheduler with heartbeat support."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
from croniter import croniter
|
|
|
|
if TYPE_CHECKING:
|
|
from .agent import Agent
|
|
from .config import Config
|
|
from .db import Database
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
HEARTBEAT_OK = "HEARTBEAT_OK"
|
|
|
|
|
|
class Scheduler:
|
|
def __init__(self, config: Config, db: Database, agent: Agent):
|
|
self.config = config
|
|
self.db = db
|
|
self.agent = agent
|
|
self._stop_event = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
self._heartbeat_thread: threading.Thread | None = None
|
|
|
|
def start(self):
|
|
"""Start the scheduler and heartbeat threads."""
|
|
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler")
|
|
self._thread.start()
|
|
|
|
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="heartbeat")
|
|
self._heartbeat_thread.start()
|
|
|
|
log.info("Scheduler started (poll=%ds, heartbeat=%dm)",
|
|
self.config.scheduler.poll_interval_seconds,
|
|
self.config.scheduler.heartbeat_interval_minutes)
|
|
|
|
def stop(self):
|
|
self._stop_event.set()
|
|
|
|
# ── Scheduled Tasks ──
|
|
|
|
def _poll_loop(self):
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
self._run_due_tasks()
|
|
except Exception as e:
|
|
log.error("Scheduler poll error: %s", e)
|
|
self._stop_event.wait(self.config.scheduler.poll_interval_seconds)
|
|
|
|
def _run_due_tasks(self):
|
|
tasks = self.db.get_due_tasks()
|
|
for task in tasks:
|
|
try:
|
|
log.info("Running scheduled task: %s", task["name"])
|
|
result = self.agent.execute_task(task["prompt"])
|
|
self.db.log_task_run(task["id"], result=result[:2000])
|
|
|
|
# Calculate next run
|
|
schedule = task["schedule"]
|
|
if schedule.startswith("once:"):
|
|
# One-time task, disable it
|
|
self.db._conn.execute(
|
|
"UPDATE scheduled_tasks SET enabled = 0 WHERE id = ?", (task["id"],)
|
|
)
|
|
self.db._conn.commit()
|
|
else:
|
|
# Cron schedule - calculate next run
|
|
now = datetime.now(timezone.utc)
|
|
cron = croniter(schedule, now)
|
|
next_run = cron.get_next(datetime)
|
|
self.db.update_task_next_run(task["id"], next_run.isoformat())
|
|
except Exception as e:
|
|
log.error("Task '%s' failed: %s", task["name"], e)
|
|
self.db.log_task_run(task["id"], error=str(e))
|
|
|
|
# ── Heartbeat ──
|
|
|
|
def _heartbeat_loop(self):
|
|
interval = self.config.scheduler.heartbeat_interval_minutes * 60
|
|
# Wait a bit before first heartbeat
|
|
self._stop_event.wait(60)
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
self._run_heartbeat()
|
|
except Exception as e:
|
|
log.error("Heartbeat error: %s", e)
|
|
self._stop_event.wait(interval)
|
|
|
|
def _run_heartbeat(self):
|
|
heartbeat_path = self.config.identity_dir / "HEARTBEAT.md"
|
|
if not heartbeat_path.exists():
|
|
return
|
|
|
|
checklist = heartbeat_path.read_text(encoding="utf-8")
|
|
prompt = (
|
|
f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n"
|
|
f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n"
|
|
f"{checklist}"
|
|
)
|
|
|
|
result = self.agent.execute_task(prompt, system_context=checklist)
|
|
|
|
if HEARTBEAT_OK in result:
|
|
log.debug("Heartbeat: all clear")
|
|
else:
|
|
log.info("Heartbeat action taken: %s", result[:200])
|