CheddahBot/cheddahbot/scheduler.py

405 lines
15 KiB
Python

"""Task scheduler with heartbeat and ClickUp polling support."""
from __future__ import annotations
import json
import logging
import re
import threading
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from croniter import croniter
if TYPE_CHECKING:
from .agent import Agent
from .config import Config
from .db import Database
from .notifications import NotificationBus
log = logging.getLogger(__name__)
HEARTBEAT_OK = "HEARTBEAT_OK"
# Matches **Docx:** `path/to/file.docx` patterns in tool output
_DOCX_PATH_RE = re.compile(r"\*\*Docx:\*\*\s*`([^`]+\.docx)`")
def _extract_docx_paths(result: str) -> list[str]:
"""Extract .docx file paths from a tool result string."""
return _DOCX_PATH_RE.findall(result)
class Scheduler:
def __init__(
self,
config: Config,
db: Database,
agent: Agent,
notification_bus: NotificationBus | None = None,
):
self.config = config
self.db = db
self.agent = agent
self.notification_bus = notification_bus
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._heartbeat_thread: threading.Thread | None = None
self._clickup_thread: threading.Thread | None = None
self._clickup_client = None
def start(self):
"""Start the scheduler, heartbeat, and ClickUp threads."""
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler")
self._thread.start()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop, daemon=True, name="heartbeat"
)
self._heartbeat_thread.start()
# Start ClickUp polling if configured
if self.config.clickup.enabled:
self._clickup_thread = threading.Thread(
target=self._clickup_loop, daemon=True, name="clickup"
)
self._clickup_thread.start()
log.info(
"ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes
)
else:
log.info("ClickUp integration disabled (no API token)")
log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds,
self.config.scheduler.heartbeat_interval_minutes,
)
def stop(self):
self._stop_event.set()
if self._clickup_client:
self._clickup_client.close()
def _notify(self, message: str, category: str = "clickup"):
"""Push a notification through the bus if available."""
if self.notification_bus:
self.notification_bus.push(message, category)
else:
log.info("Notification [%s]: %s", category, message)
# ── Scheduled Tasks ──
def _poll_loop(self):
while not self._stop_event.is_set():
try:
self._run_due_tasks()
except Exception as e:
log.error("Scheduler poll error: %s", e)
self._stop_event.wait(self.config.scheduler.poll_interval_seconds)
def _run_due_tasks(self):
tasks = self.db.get_due_tasks()
for task in tasks:
try:
log.info("Running scheduled task: %s", task["name"])
result = self.agent.execute_task(task["prompt"])
self.db.log_task_run(task["id"], result=result[:2000])
# Calculate next run
schedule = task["schedule"]
if schedule.startswith("once:"):
# One-time task, disable it
self.db.disable_task(task["id"])
else:
# Cron schedule - calculate next run
now = datetime.now(UTC)
cron = croniter(schedule, now)
next_run = cron.get_next(datetime)
self.db.update_task_next_run(task["id"], next_run.isoformat())
except Exception as e:
log.error("Task '%s' failed: %s", task["name"], e)
self.db.log_task_run(task["id"], error=str(e))
# ── Heartbeat ──
def _heartbeat_loop(self):
interval = self.config.scheduler.heartbeat_interval_minutes * 60
# Wait a bit before first heartbeat
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._run_heartbeat()
except Exception as e:
log.error("Heartbeat error: %s", e)
self._stop_event.wait(interval)
def _run_heartbeat(self):
heartbeat_path = self.config.identity_dir / "HEARTBEAT.md"
if not heartbeat_path.exists():
return
checklist = heartbeat_path.read_text(encoding="utf-8")
prompt = (
f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n"
f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n"
f"{checklist}"
)
result = self.agent.execute_task(prompt, system_context=checklist)
if HEARTBEAT_OK in result:
log.debug("Heartbeat: all clear")
else:
log.info("Heartbeat action taken: %s", result[:200])
# ── ClickUp Integration ──
def _get_clickup_client(self):
"""Lazy-init the ClickUp API client."""
if self._clickup_client is None:
from .clickup import ClickUpClient
self._clickup_client = ClickUpClient(
api_token=self.config.clickup.api_token,
workspace_id=self.config.clickup.workspace_id,
task_type_field_name=self.config.clickup.task_type_field_name,
)
return self._clickup_client
def _clickup_loop(self):
"""Poll ClickUp for tasks on a regular interval."""
interval = self.config.clickup.poll_interval_minutes * 60
# Wait before first poll to let other systems initialize
self._stop_event.wait(30)
# On startup, recover orphaned executing tasks
self._recover_orphaned_tasks()
while not self._stop_event.is_set():
try:
self._poll_clickup()
self._execute_approved_tasks()
except Exception as e:
log.error("ClickUp poll error: %s", e)
self._stop_event.wait(interval)
def _recover_orphaned_tasks(self):
"""Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'."""
pairs = self.db.kv_scan("clickup:task:")
for key, value in pairs:
if not key.endswith(":state"):
continue
try:
state = json.loads(value)
if state.get("state") == "executing":
task_id = state["clickup_task_id"]
log.warning("Recovering orphaned executing task: %s", task_id)
state["state"] = "approved"
state["error"] = None
self.db.kv_set(key, json.dumps(state))
except (json.JSONDecodeError, KeyError):
pass
def _poll_clickup(self):
"""Discover new tasks from ClickUp and process them."""
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
log.warning("ClickUp space_id not configured, skipping poll")
return
tasks = client.get_tasks_from_space(
space_id,
statuses=self.config.clickup.poll_statuses,
)
# Load active task IDs to avoid re-processing
active_raw = self.db.kv_get("clickup:active_task_ids")
active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set()
for task in tasks:
if task.id in active_ids:
continue # Already tracked
self._process_clickup_task(task, active_ids)
# Save updated active IDs
self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids)))
def _process_clickup_task(self, task, active_ids: set[str]):
"""Discover a new ClickUp task, map to skill, decide action."""
now = datetime.now(UTC).isoformat()
skill_map = self.config.clickup.skill_map
# Build state object
state = {
"state": "discovered",
"clickup_task_id": task.id,
"clickup_task_name": task.name,
"task_type": task.task_type,
"skill_name": None,
"discovered_at": now,
"started_at": None,
"completed_at": None,
"error": None,
"deliverable_paths": [],
"custom_fields": task.custom_fields,
}
# Try to map task type to a skill
mapping = skill_map.get(task.task_type)
if not mapping:
state["state"] = "unmapped"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task discovered but no skill mapping found.\n"
f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n"
f"Configure a skill_map entry in config.yaml to handle this task type."
)
log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type)
return
tool_name = mapping.get("tool", "")
auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute)
state["skill_name"] = tool_name
if auto_execute:
state["state"] = "approved"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task auto-approved for execution.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`"
)
log.info("Auto-approved ClickUp task: %s%s", task.name, tool_name)
else:
state["state"] = "awaiting_approval"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task needs your approval.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`\n"
f'Use `clickup_approve_task("{task.id}")` to approve or '
f'`clickup_decline_task("{task.id}")` to decline.'
)
log.info("ClickUp task awaiting approval: %s%s", task.name, tool_name)
def _execute_approved_tasks(self):
"""Scan for approved tasks and execute them."""
pairs = self.db.kv_scan("clickup:task:")
for key, value in pairs:
if not key.endswith(":state"):
continue
try:
state = json.loads(value)
except json.JSONDecodeError:
continue
if state.get("state") != "approved":
continue
self._execute_clickup_task(state, key)
def _execute_clickup_task(self, state: dict, kv_key: str):
"""Execute a single approved ClickUp task."""
task_id = state["clickup_task_id"]
task_name = state["clickup_task_name"]
skill_name = state["skill_name"]
now = datetime.now(UTC).isoformat()
log.info("Executing ClickUp task: %s%s", task_name, skill_name)
# Update state to executing
state["state"] = "executing"
state["started_at"] = now
self.db.kv_set(kv_key, json.dumps(state))
# Set ClickUp status to "in progress"
client = self._get_clickup_client()
client.update_task_status(task_id, self.config.clickup.in_progress_status)
try:
# Build tool arguments from field mapping
args = self._build_tool_args(state)
# Execute the skill via the tool registry
if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute(skill_name, args)
else:
result = self.agent.execute_task(
f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. "
f"Task description: {state.get('custom_fields', {})}"
)
# Extract and upload any docx deliverables
docx_paths = _extract_docx_paths(result)
state["deliverable_paths"] = docx_paths
uploaded_count = 0
for path in docx_paths:
if client.upload_attachment(task_id, path):
uploaded_count += 1
else:
log.warning("Failed to upload %s for task %s", path, task_id)
# Success
state["state"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
# Update ClickUp
client.update_task_status(task_id, self.config.clickup.review_status)
attach_note = f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else ""
comment = (
f"✅ CheddahBot completed this task.\n\n"
f"Skill: {skill_name}\n"
f"Result:\n{result[:3000]}{attach_note}"
)
client.add_comment(task_id, comment)
self._notify(
f"ClickUp task completed: **{task_name}**\n"
f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'"
)
log.info("ClickUp task completed: %s", task_name)
except Exception as e:
# Failure
state["state"] = "failed"
state["error"] = str(e)
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
# Comment the error on ClickUp
client.add_comment(
task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}"
)
self._notify(
f"ClickUp task failed: **{task_name}**\n"
f"Skill: `{skill_name}` | Error: {str(e)[:200]}"
)
log.error("ClickUp task failed: %s%s", task_name, e)
def _build_tool_args(self, state: dict) -> dict:
"""Build tool arguments from ClickUp task fields using the field mapping."""
skill_map = self.config.clickup.skill_map
task_type = state.get("task_type", "")
mapping = skill_map.get(task_type, {})
field_mapping = mapping.get("field_mapping", {})
args = {}
for tool_param, source in field_mapping.items():
if source == "task_name":
args[tool_param] = state.get("clickup_task_name", "")
elif source == "task_description":
args[tool_param] = state.get("custom_fields", {}).get("description", "")
else:
# Look up custom field by name
args[tool_param] = state.get("custom_fields", {}).get(source, "")
return args