Add ClickUp polling loop to scheduler

Third daemon thread polls ClickUp every 20 minutes, discovers new tasks,
maps Task Type to skills via config, and auto-executes or queues for
approval. On completion updates ClickUp status + comments with results.
Recovers orphaned executing tasks on startup. Pushes all events through
the NotificationBus for UI-agnostic delivery.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-15 22:28:35 -06:00
parent e02f5a5cb3
commit a67e714045
1 changed files with 265 additions and 5 deletions

View File

@ -1,12 +1,11 @@
"""Task scheduler with heartbeat support."""
"""Task scheduler with heartbeat and ClickUp polling support."""
from __future__ import annotations
import json
import logging
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from croniter import croniter
@ -15,6 +14,7 @@ if TYPE_CHECKING:
from .agent import Agent
from .config import Config
from .db import Database
from .notifications import NotificationBus
log = logging.getLogger(__name__)
@ -22,28 +22,49 @@ HEARTBEAT_OK = "HEARTBEAT_OK"
class Scheduler:
def __init__(self, config: Config, db: Database, agent: Agent):
def __init__(self, config: Config, db: Database, agent: Agent,
notification_bus: NotificationBus | None = None):
self.config = config
self.db = db
self.agent = agent
self.notification_bus = notification_bus
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._heartbeat_thread: threading.Thread | None = None
self._clickup_thread: threading.Thread | None = None
self._clickup_client = None
def start(self):
"""Start the scheduler and heartbeat threads."""
"""Start the scheduler, heartbeat, and ClickUp threads."""
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler")
self._thread.start()
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="heartbeat")
self._heartbeat_thread.start()
# Start ClickUp polling if configured
if self.config.clickup.enabled:
self._clickup_thread = threading.Thread(target=self._clickup_loop, daemon=True, name="clickup")
self._clickup_thread.start()
log.info("ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes)
else:
log.info("ClickUp integration disabled (no API token)")
log.info("Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds,
self.config.scheduler.heartbeat_interval_minutes)
def stop(self):
self._stop_event.set()
if self._clickup_client:
self._clickup_client.close()
def _notify(self, message: str, category: str = "clickup"):
"""Push a notification through the bus if available."""
if self.notification_bus:
self.notification_bus.push(message, category)
else:
log.info("Notification [%s]: %s", category, message)
# ── Scheduled Tasks ──
@ -113,3 +134,242 @@ class Scheduler:
log.debug("Heartbeat: all clear")
else:
log.info("Heartbeat action taken: %s", result[:200])
# ── ClickUp Integration ──
def _get_clickup_client(self):
"""Lazy-init the ClickUp API client."""
if self._clickup_client is None:
from .clickup import ClickUpClient
self._clickup_client = ClickUpClient(
api_token=self.config.clickup.api_token,
workspace_id=self.config.clickup.workspace_id,
task_type_field_name=self.config.clickup.task_type_field_name,
)
return self._clickup_client
def _clickup_loop(self):
"""Poll ClickUp for tasks on a regular interval."""
interval = self.config.clickup.poll_interval_minutes * 60
# Wait before first poll to let other systems initialize
self._stop_event.wait(30)
# On startup, recover orphaned executing tasks
self._recover_orphaned_tasks()
while not self._stop_event.is_set():
try:
self._poll_clickup()
self._execute_approved_tasks()
except Exception as e:
log.error("ClickUp poll error: %s", e)
self._stop_event.wait(interval)
def _recover_orphaned_tasks(self):
"""Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'."""
pairs = self.db.kv_scan("clickup:task:")
for key, value in pairs:
if not key.endswith(":state"):
continue
try:
state = json.loads(value)
if state.get("state") == "executing":
task_id = state["clickup_task_id"]
log.warning("Recovering orphaned executing task: %s", task_id)
state["state"] = "approved"
state["error"] = None
self.db.kv_set(key, json.dumps(state))
except (json.JSONDecodeError, KeyError):
pass
def _poll_clickup(self):
"""Discover new tasks from ClickUp and process them."""
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
log.warning("ClickUp space_id not configured, skipping poll")
return
tasks = client.get_tasks_from_space(
space_id,
statuses=self.config.clickup.poll_statuses,
)
# Load active task IDs to avoid re-processing
active_raw = self.db.kv_get("clickup:active_task_ids")
active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set()
for task in tasks:
if task.id in active_ids:
continue # Already tracked
self._process_clickup_task(task, active_ids)
# Save updated active IDs
self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids)))
def _process_clickup_task(self, task, active_ids: set[str]):
"""Discover a new ClickUp task, map to skill, decide action."""
from .clickup import ClickUpTask
now = datetime.now(timezone.utc).isoformat()
skill_map = self.config.clickup.skill_map
# Build state object
state = {
"state": "discovered",
"clickup_task_id": task.id,
"clickup_task_name": task.name,
"task_type": task.task_type,
"skill_name": None,
"discovered_at": now,
"started_at": None,
"completed_at": None,
"error": None,
"deliverable_paths": [],
"custom_fields": task.custom_fields,
}
# Try to map task type to a skill
mapping = skill_map.get(task.task_type)
if not mapping:
state["state"] = "unmapped"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task discovered but no skill mapping found.\n"
f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n"
f"Configure a skill_map entry in config.yaml to handle this task type."
)
log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type)
return
tool_name = mapping.get("tool", "")
auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute)
state["skill_name"] = tool_name
if auto_execute:
state["state"] = "approved"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task auto-approved for execution.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`"
)
log.info("Auto-approved ClickUp task: %s%s", task.name, tool_name)
else:
state["state"] = "awaiting_approval"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task needs your approval.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`\n"
f"Use `clickup_approve_task(\"{task.id}\")` to approve or "
f"`clickup_decline_task(\"{task.id}\")` to decline."
)
log.info("ClickUp task awaiting approval: %s%s", task.name, tool_name)
def _execute_approved_tasks(self):
"""Scan for approved tasks and execute them."""
pairs = self.db.kv_scan("clickup:task:")
for key, value in pairs:
if not key.endswith(":state"):
continue
try:
state = json.loads(value)
except json.JSONDecodeError:
continue
if state.get("state") != "approved":
continue
self._execute_clickup_task(state, key)
def _execute_clickup_task(self, state: dict, kv_key: str):
"""Execute a single approved ClickUp task."""
task_id = state["clickup_task_id"]
task_name = state["clickup_task_name"]
skill_name = state["skill_name"]
now = datetime.now(timezone.utc).isoformat()
log.info("Executing ClickUp task: %s%s", task_name, skill_name)
# Update state to executing
state["state"] = "executing"
state["started_at"] = now
self.db.kv_set(kv_key, json.dumps(state))
# Set ClickUp status to "in progress"
client = self._get_clickup_client()
client.update_task_status(task_id, self.config.clickup.in_progress_status)
try:
# Build tool arguments from field mapping
args = self._build_tool_args(state)
# Execute the skill via the tool registry
if hasattr(self.agent, '_tools') and self.agent._tools:
result = self.agent._tools.execute(skill_name, args)
else:
result = self.agent.execute_task(
f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. "
f"Task description: {state.get('custom_fields', {})}"
)
# Success
state["state"] = "completed"
state["completed_at"] = datetime.now(timezone.utc).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
# Update ClickUp
client.update_task_status(task_id, self.config.clickup.review_status)
comment = (
f"✅ CheddahBot completed this task.\n\n"
f"Skill: {skill_name}\n"
f"Result:\n{result[:3000]}"
)
client.add_comment(task_id, comment)
self._notify(
f"ClickUp task completed: **{task_name}**\n"
f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'"
)
log.info("ClickUp task completed: %s", task_name)
except Exception as e:
# Failure
state["state"] = "failed"
state["error"] = str(e)
state["completed_at"] = datetime.now(timezone.utc).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
# Comment the error on ClickUp
client.add_comment(
task_id,
f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}"
)
self._notify(
f"ClickUp task failed: **{task_name}**\n"
f"Skill: `{skill_name}` | Error: {str(e)[:200]}"
)
log.error("ClickUp task failed: %s%s", task_name, e)
def _build_tool_args(self, state: dict) -> dict:
"""Build tool arguments from ClickUp task fields using the field mapping."""
skill_map = self.config.clickup.skill_map
task_type = state.get("task_type", "")
mapping = skill_map.get(task_type, {})
field_mapping = mapping.get("field_mapping", {})
args = {}
for tool_param, source in field_mapping.items():
if source == "task_name":
args[tool_param] = state.get("clickup_task_name", "")
elif source == "task_description":
args[tool_param] = state.get("custom_fields", {}).get("description", "")
else:
# Look up custom field by name
args[tool_param] = state.get("custom_fields", {}).get(source, "")
return args