From a67e714045e22557082e0e9abfd8ef4c9c22e2b2 Mon Sep 17 00:00:00 2001 From: PeninsulaInd Date: Sun, 15 Feb 2026 22:28:35 -0600 Subject: [PATCH] Add ClickUp polling loop to scheduler Third daemon thread polls ClickUp every 20 minutes, discovers new tasks, maps Task Type to skills via config, and auto-executes or queues for approval. On completion updates ClickUp status + comments with results. Recovers orphaned executing tasks on startup. Pushes all events through the NotificationBus for UI-agnostic delivery. Co-Authored-By: Claude Opus 4.6 --- cheddahbot/scheduler.py | 270 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 265 insertions(+), 5 deletions(-) diff --git a/cheddahbot/scheduler.py b/cheddahbot/scheduler.py index de108e4..fb286cc 100644 --- a/cheddahbot/scheduler.py +++ b/cheddahbot/scheduler.py @@ -1,12 +1,11 @@ -"""Task scheduler with heartbeat support.""" +"""Task scheduler with heartbeat and ClickUp polling support.""" from __future__ import annotations +import json import logging import threading -import time from datetime import datetime, timezone -from pathlib import Path from typing import TYPE_CHECKING from croniter import croniter @@ -15,6 +14,7 @@ if TYPE_CHECKING: from .agent import Agent from .config import Config from .db import Database + from .notifications import NotificationBus log = logging.getLogger(__name__) @@ -22,28 +22,49 @@ HEARTBEAT_OK = "HEARTBEAT_OK" class Scheduler: - def __init__(self, config: Config, db: Database, agent: Agent): + def __init__(self, config: Config, db: Database, agent: Agent, + notification_bus: NotificationBus | None = None): self.config = config self.db = db self.agent = agent + self.notification_bus = notification_bus self._stop_event = threading.Event() self._thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None + self._clickup_thread: threading.Thread | None = None + self._clickup_client = None def start(self): - """Start the scheduler and heartbeat threads.""" + """Start the scheduler, heartbeat, and ClickUp threads.""" self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler") self._thread.start() self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="heartbeat") self._heartbeat_thread.start() + # Start ClickUp polling if configured + if self.config.clickup.enabled: + self._clickup_thread = threading.Thread(target=self._clickup_loop, daemon=True, name="clickup") + self._clickup_thread.start() + log.info("ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes) + else: + log.info("ClickUp integration disabled (no API token)") + log.info("Scheduler started (poll=%ds, heartbeat=%dm)", self.config.scheduler.poll_interval_seconds, self.config.scheduler.heartbeat_interval_minutes) def stop(self): self._stop_event.set() + if self._clickup_client: + self._clickup_client.close() + + def _notify(self, message: str, category: str = "clickup"): + """Push a notification through the bus if available.""" + if self.notification_bus: + self.notification_bus.push(message, category) + else: + log.info("Notification [%s]: %s", category, message) # ── Scheduled Tasks ── @@ -113,3 +134,242 @@ class Scheduler: log.debug("Heartbeat: all clear") else: log.info("Heartbeat action taken: %s", result[:200]) + + # ── ClickUp Integration ── + + def _get_clickup_client(self): + """Lazy-init the ClickUp API client.""" + if self._clickup_client is None: + from .clickup import ClickUpClient + self._clickup_client = ClickUpClient( + api_token=self.config.clickup.api_token, + workspace_id=self.config.clickup.workspace_id, + task_type_field_name=self.config.clickup.task_type_field_name, + ) + return self._clickup_client + + def _clickup_loop(self): + """Poll ClickUp for tasks on a regular interval.""" + interval = self.config.clickup.poll_interval_minutes * 60 + + # Wait before first poll to let other systems initialize + self._stop_event.wait(30) + + # On startup, recover orphaned executing tasks + self._recover_orphaned_tasks() + + while not self._stop_event.is_set(): + try: + self._poll_clickup() + self._execute_approved_tasks() + except Exception as e: + log.error("ClickUp poll error: %s", e) + self._stop_event.wait(interval) + + def _recover_orphaned_tasks(self): + """Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'.""" + pairs = self.db.kv_scan("clickup:task:") + for key, value in pairs: + if not key.endswith(":state"): + continue + try: + state = json.loads(value) + if state.get("state") == "executing": + task_id = state["clickup_task_id"] + log.warning("Recovering orphaned executing task: %s", task_id) + state["state"] = "approved" + state["error"] = None + self.db.kv_set(key, json.dumps(state)) + except (json.JSONDecodeError, KeyError): + pass + + def _poll_clickup(self): + """Discover new tasks from ClickUp and process them.""" + client = self._get_clickup_client() + space_id = self.config.clickup.space_id + if not space_id: + log.warning("ClickUp space_id not configured, skipping poll") + return + + tasks = client.get_tasks_from_space( + space_id, + statuses=self.config.clickup.poll_statuses, + ) + + # Load active task IDs to avoid re-processing + active_raw = self.db.kv_get("clickup:active_task_ids") + active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set() + + for task in tasks: + if task.id in active_ids: + continue # Already tracked + self._process_clickup_task(task, active_ids) + + # Save updated active IDs + self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids))) + + def _process_clickup_task(self, task, active_ids: set[str]): + """Discover a new ClickUp task, map to skill, decide action.""" + from .clickup import ClickUpTask + + now = datetime.now(timezone.utc).isoformat() + skill_map = self.config.clickup.skill_map + + # Build state object + state = { + "state": "discovered", + "clickup_task_id": task.id, + "clickup_task_name": task.name, + "task_type": task.task_type, + "skill_name": None, + "discovered_at": now, + "started_at": None, + "completed_at": None, + "error": None, + "deliverable_paths": [], + "custom_fields": task.custom_fields, + } + + # Try to map task type to a skill + mapping = skill_map.get(task.task_type) + if not mapping: + state["state"] = "unmapped" + self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) + active_ids.add(task.id) + self._notify( + f"New ClickUp task discovered but no skill mapping found.\n" + f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n" + f"Configure a skill_map entry in config.yaml to handle this task type." + ) + log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type) + return + + tool_name = mapping.get("tool", "") + auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute) + state["skill_name"] = tool_name + + if auto_execute: + state["state"] = "approved" + self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) + active_ids.add(task.id) + self._notify( + f"New ClickUp task auto-approved for execution.\n" + f"Task: **{task.name}** → Skill: `{tool_name}`" + ) + log.info("Auto-approved ClickUp task: %s → %s", task.name, tool_name) + else: + state["state"] = "awaiting_approval" + self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) + active_ids.add(task.id) + self._notify( + f"New ClickUp task needs your approval.\n" + f"Task: **{task.name}** → Skill: `{tool_name}`\n" + f"Use `clickup_approve_task(\"{task.id}\")` to approve or " + f"`clickup_decline_task(\"{task.id}\")` to decline." + ) + log.info("ClickUp task awaiting approval: %s → %s", task.name, tool_name) + + def _execute_approved_tasks(self): + """Scan for approved tasks and execute them.""" + pairs = self.db.kv_scan("clickup:task:") + for key, value in pairs: + if not key.endswith(":state"): + continue + try: + state = json.loads(value) + except json.JSONDecodeError: + continue + + if state.get("state") != "approved": + continue + + self._execute_clickup_task(state, key) + + def _execute_clickup_task(self, state: dict, kv_key: str): + """Execute a single approved ClickUp task.""" + task_id = state["clickup_task_id"] + task_name = state["clickup_task_name"] + skill_name = state["skill_name"] + now = datetime.now(timezone.utc).isoformat() + + log.info("Executing ClickUp task: %s → %s", task_name, skill_name) + + # Update state to executing + state["state"] = "executing" + state["started_at"] = now + self.db.kv_set(kv_key, json.dumps(state)) + + # Set ClickUp status to "in progress" + client = self._get_clickup_client() + client.update_task_status(task_id, self.config.clickup.in_progress_status) + + try: + # Build tool arguments from field mapping + args = self._build_tool_args(state) + + # Execute the skill via the tool registry + if hasattr(self.agent, '_tools') and self.agent._tools: + result = self.agent._tools.execute(skill_name, args) + else: + result = self.agent.execute_task( + f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. " + f"Task description: {state.get('custom_fields', {})}" + ) + + # Success + state["state"] = "completed" + state["completed_at"] = datetime.now(timezone.utc).isoformat() + self.db.kv_set(kv_key, json.dumps(state)) + + # Update ClickUp + client.update_task_status(task_id, self.config.clickup.review_status) + comment = ( + f"✅ CheddahBot completed this task.\n\n" + f"Skill: {skill_name}\n" + f"Result:\n{result[:3000]}" + ) + client.add_comment(task_id, comment) + + self._notify( + f"ClickUp task completed: **{task_name}**\n" + f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'" + ) + log.info("ClickUp task completed: %s", task_name) + + except Exception as e: + # Failure + state["state"] = "failed" + state["error"] = str(e) + state["completed_at"] = datetime.now(timezone.utc).isoformat() + self.db.kv_set(kv_key, json.dumps(state)) + + # Comment the error on ClickUp + client.add_comment( + task_id, + f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}" + ) + + self._notify( + f"ClickUp task failed: **{task_name}**\n" + f"Skill: `{skill_name}` | Error: {str(e)[:200]}" + ) + log.error("ClickUp task failed: %s — %s", task_name, e) + + def _build_tool_args(self, state: dict) -> dict: + """Build tool arguments from ClickUp task fields using the field mapping.""" + skill_map = self.config.clickup.skill_map + task_type = state.get("task_type", "") + mapping = skill_map.get(task_type, {}) + field_mapping = mapping.get("field_mapping", {}) + + args = {} + for tool_param, source in field_mapping.items(): + if source == "task_name": + args[tool_param] = state.get("clickup_task_name", "") + elif source == "task_description": + args[tool_param] = state.get("custom_fields", {}).get("description", "") + else: + # Look up custom field by name + args[tool_param] = state.get("custom_fields", {}).get(source, "") + + return args