"""Task scheduler with heartbeat and ClickUp polling support.""" from __future__ import annotations import json import logging import re import threading from datetime import UTC, datetime from typing import TYPE_CHECKING from croniter import croniter if TYPE_CHECKING: from .agent import Agent from .config import Config from .db import Database from .notifications import NotificationBus log = logging.getLogger(__name__) HEARTBEAT_OK = "HEARTBEAT_OK" # Matches **Docx:** `path/to/file.docx` patterns in tool output _DOCX_PATH_RE = re.compile(r"\*\*Docx:\*\*\s*`([^`]+\.docx)`") def _extract_docx_paths(result: str) -> list[str]: """Extract .docx file paths from a tool result string.""" return _DOCX_PATH_RE.findall(result) class Scheduler: def __init__( self, config: Config, db: Database, agent: Agent, notification_bus: NotificationBus | None = None, ): self.config = config self.db = db self.agent = agent self.notification_bus = notification_bus self._stop_event = threading.Event() self._thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None self._clickup_thread: threading.Thread | None = None self._clickup_client = None def start(self): """Start the scheduler, heartbeat, and ClickUp threads.""" self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler") self._thread.start() self._heartbeat_thread = threading.Thread( target=self._heartbeat_loop, daemon=True, name="heartbeat" ) self._heartbeat_thread.start() # Start ClickUp polling if configured if self.config.clickup.enabled: self._clickup_thread = threading.Thread( target=self._clickup_loop, daemon=True, name="clickup" ) self._clickup_thread.start() log.info( "ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes ) else: log.info("ClickUp integration disabled (no API token)") log.info( "Scheduler started (poll=%ds, heartbeat=%dm)", self.config.scheduler.poll_interval_seconds, self.config.scheduler.heartbeat_interval_minutes, ) def stop(self): self._stop_event.set() if self._clickup_client: self._clickup_client.close() def _notify(self, message: str, category: str = "clickup"): """Push a notification through the bus if available.""" if self.notification_bus: self.notification_bus.push(message, category) else: log.info("Notification [%s]: %s", category, message) # ── Scheduled Tasks ── def _poll_loop(self): while not self._stop_event.is_set(): try: self._run_due_tasks() except Exception as e: log.error("Scheduler poll error: %s", e) self._stop_event.wait(self.config.scheduler.poll_interval_seconds) def _run_due_tasks(self): tasks = self.db.get_due_tasks() for task in tasks: try: log.info("Running scheduled task: %s", task["name"]) result = self.agent.execute_task(task["prompt"]) self.db.log_task_run(task["id"], result=result[:2000]) # Calculate next run schedule = task["schedule"] if schedule.startswith("once:"): # One-time task, disable it self.db.disable_task(task["id"]) else: # Cron schedule - calculate next run now = datetime.now(UTC) cron = croniter(schedule, now) next_run = cron.get_next(datetime) self.db.update_task_next_run(task["id"], next_run.isoformat()) except Exception as e: log.error("Task '%s' failed: %s", task["name"], e) self.db.log_task_run(task["id"], error=str(e)) # ── Heartbeat ── def _heartbeat_loop(self): interval = self.config.scheduler.heartbeat_interval_minutes * 60 # Wait a bit before first heartbeat self._stop_event.wait(60) while not self._stop_event.is_set(): try: self._run_heartbeat() except Exception as e: log.error("Heartbeat error: %s", e) self._stop_event.wait(interval) def _run_heartbeat(self): heartbeat_path = self.config.identity_dir / "HEARTBEAT.md" if not heartbeat_path.exists(): return checklist = heartbeat_path.read_text(encoding="utf-8") prompt = ( f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n" f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n" f"{checklist}" ) result = self.agent.execute_task(prompt, system_context=checklist) if HEARTBEAT_OK in result: log.debug("Heartbeat: all clear") else: log.info("Heartbeat action taken: %s", result[:200]) # ── ClickUp Integration ── def _get_clickup_client(self): """Lazy-init the ClickUp API client.""" if self._clickup_client is None: from .clickup import ClickUpClient self._clickup_client = ClickUpClient( api_token=self.config.clickup.api_token, workspace_id=self.config.clickup.workspace_id, task_type_field_name=self.config.clickup.task_type_field_name, ) return self._clickup_client def _clickup_loop(self): """Poll ClickUp for tasks on a regular interval.""" interval = self.config.clickup.poll_interval_minutes * 60 # Wait before first poll to let other systems initialize self._stop_event.wait(30) # On startup, recover orphaned executing tasks self._recover_orphaned_tasks() while not self._stop_event.is_set(): try: self._poll_clickup() self._execute_approved_tasks() except Exception as e: log.error("ClickUp poll error: %s", e) self._stop_event.wait(interval) def _recover_orphaned_tasks(self): """Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'.""" pairs = self.db.kv_scan("clickup:task:") for key, value in pairs: if not key.endswith(":state"): continue try: state = json.loads(value) if state.get("state") == "executing": task_id = state["clickup_task_id"] log.warning("Recovering orphaned executing task: %s", task_id) state["state"] = "approved" state["error"] = None self.db.kv_set(key, json.dumps(state)) except (json.JSONDecodeError, KeyError): pass def _poll_clickup(self): """Discover new tasks from ClickUp and process them.""" client = self._get_clickup_client() space_id = self.config.clickup.space_id if not space_id: log.warning("ClickUp space_id not configured, skipping poll") return tasks = client.get_tasks_from_space( space_id, statuses=self.config.clickup.poll_statuses, ) # Load active task IDs to avoid re-processing active_raw = self.db.kv_get("clickup:active_task_ids") active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set() for task in tasks: if task.id in active_ids: continue # Already tracked self._process_clickup_task(task, active_ids) # Save updated active IDs self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids))) def _process_clickup_task(self, task, active_ids: set[str]): """Discover a new ClickUp task, map to skill, decide action.""" now = datetime.now(UTC).isoformat() skill_map = self.config.clickup.skill_map # Build state object state = { "state": "discovered", "clickup_task_id": task.id, "clickup_task_name": task.name, "task_type": task.task_type, "skill_name": None, "discovered_at": now, "started_at": None, "completed_at": None, "error": None, "deliverable_paths": [], "custom_fields": task.custom_fields, } # Try to map task type to a skill mapping = skill_map.get(task.task_type) if not mapping: state["state"] = "unmapped" self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) active_ids.add(task.id) self._notify( f"New ClickUp task discovered but no skill mapping found.\n" f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n" f"Configure a skill_map entry in config.yaml to handle this task type." ) log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type) return tool_name = mapping.get("tool", "") auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute) state["skill_name"] = tool_name if auto_execute: state["state"] = "approved" self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) active_ids.add(task.id) self._notify( f"New ClickUp task auto-approved for execution.\n" f"Task: **{task.name}** → Skill: `{tool_name}`" ) log.info("Auto-approved ClickUp task: %s → %s", task.name, tool_name) else: state["state"] = "awaiting_approval" self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) active_ids.add(task.id) self._notify( f"New ClickUp task needs your approval.\n" f"Task: **{task.name}** → Skill: `{tool_name}`\n" f'Use `clickup_approve_task("{task.id}")` to approve or ' f'`clickup_decline_task("{task.id}")` to decline.' ) log.info("ClickUp task awaiting approval: %s → %s", task.name, tool_name) def _execute_approved_tasks(self): """Scan for approved tasks and execute them.""" pairs = self.db.kv_scan("clickup:task:") for key, value in pairs: if not key.endswith(":state"): continue try: state = json.loads(value) except json.JSONDecodeError: continue if state.get("state") != "approved": continue self._execute_clickup_task(state, key) def _execute_clickup_task(self, state: dict, kv_key: str): """Execute a single approved ClickUp task.""" task_id = state["clickup_task_id"] task_name = state["clickup_task_name"] skill_name = state["skill_name"] now = datetime.now(UTC).isoformat() log.info("Executing ClickUp task: %s → %s", task_name, skill_name) # Update state to executing state["state"] = "executing" state["started_at"] = now self.db.kv_set(kv_key, json.dumps(state)) # Set ClickUp status to "in progress" client = self._get_clickup_client() client.update_task_status(task_id, self.config.clickup.in_progress_status) try: # Build tool arguments from field mapping args = self._build_tool_args(state) # Execute the skill via the tool registry if hasattr(self.agent, "_tools") and self.agent._tools: result = self.agent._tools.execute(skill_name, args) else: result = self.agent.execute_task( f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. " f"Task description: {state.get('custom_fields', {})}" ) # Extract and upload any docx deliverables docx_paths = _extract_docx_paths(result) state["deliverable_paths"] = docx_paths uploaded_count = 0 for path in docx_paths: if client.upload_attachment(task_id, path): uploaded_count += 1 else: log.warning("Failed to upload %s for task %s", path, task_id) # Success state["state"] = "completed" state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) # Update ClickUp client.update_task_status(task_id, self.config.clickup.review_status) attach_note = f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else "" comment = ( f"✅ CheddahBot completed this task.\n\n" f"Skill: {skill_name}\n" f"Result:\n{result[:3000]}{attach_note}" ) client.add_comment(task_id, comment) self._notify( f"ClickUp task completed: **{task_name}**\n" f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'" ) log.info("ClickUp task completed: %s", task_name) except Exception as e: # Failure state["state"] = "failed" state["error"] = str(e) state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) # Comment the error on ClickUp client.add_comment( task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}" ) self._notify( f"ClickUp task failed: **{task_name}**\n" f"Skill: `{skill_name}` | Error: {str(e)[:200]}" ) log.error("ClickUp task failed: %s — %s", task_name, e) def _build_tool_args(self, state: dict) -> dict: """Build tool arguments from ClickUp task fields using the field mapping.""" skill_map = self.config.clickup.skill_map task_type = state.get("task_type", "") mapping = skill_map.get(task_type, {}) field_mapping = mapping.get("field_mapping", {}) args = {} for tool_param, source in field_mapping.items(): if source == "task_name": args[tool_param] = state.get("clickup_task_name", "") elif source == "task_description": args[tool_param] = state.get("custom_fields", {}).get("description", "") else: # Look up custom field by name args[tool_param] = state.get("custom_fields", {}).get(source, "") return args