Add ClickUp chat tools for task management

Four chat-facing tools: clickup_list_tasks (list/filter tracked tasks),
clickup_task_status (detailed state view), clickup_approve_task and
clickup_decline_task (approval flow for tasks awaiting permission).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-15 22:27:34 -06:00
parent ba89f61bc4
commit e02f5a5cb3
1 changed files with 149 additions and 0 deletions

View File

@ -0,0 +1,149 @@
"""ClickUp chat-facing tools for listing, approving, and declining tasks."""
from __future__ import annotations
import json
import logging
from . import tool
log = logging.getLogger(__name__)
def _get_clickup_states(db) -> dict[str, dict]:
"""Load all tracked ClickUp task states from kv_store."""
pairs = db.kv_scan("clickup:task:")
states = {}
for key, value in pairs:
# keys look like clickup:task:{id}:state
parts = key.split(":")
if len(parts) == 4 and parts[3] == "state":
task_id = parts[2]
try:
states[task_id] = json.loads(value)
except json.JSONDecodeError:
pass
return states
@tool(
"clickup_list_tasks",
"List ClickUp tasks that Cheddah is tracking. Optionally filter by internal state "
"(discovered, awaiting_approval, executing, completed, failed, declined, unmapped).",
category="clickup",
)
def clickup_list_tasks(status: str = "", ctx: dict = None) -> str:
"""List tracked ClickUp tasks, optionally filtered by state."""
db = ctx["db"]
states = _get_clickup_states(db)
if not states:
return "No ClickUp tasks are currently being tracked."
if status:
states = {tid: s for tid, s in states.items() if s.get("state") == status}
if not states:
return f"No ClickUp tasks with state '{status}'."
lines = []
for task_id, state in sorted(states.items(), key=lambda x: x[1].get("discovered_at", "")):
name = state.get("clickup_task_name", "Unknown")
task_type = state.get("task_type", "")
task_state = state.get("state", "unknown")
skill = state.get("skill_name", "")
lines.append(
f"• **{name}** (ID: {task_id})\n"
f" Type: {task_type} | State: {task_state} | Skill: {skill}"
)
return f"**Tracked ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines)
@tool(
"clickup_task_status",
"Check the detailed internal processing state of a ClickUp task by its ID.",
category="clickup",
)
def clickup_task_status(task_id: str, ctx: dict = None) -> str:
"""Get detailed state for a specific tracked task."""
db = ctx["db"]
raw = db.kv_get(f"clickup:task:{task_id}:state")
if not raw:
return f"No tracked state found for task ID '{task_id}'."
try:
state = json.loads(raw)
except json.JSONDecodeError:
return f"Corrupted state data for task '{task_id}'."
lines = [f"**Task: {state.get('clickup_task_name', 'Unknown')}** (ID: {task_id})"]
lines.append(f"State: {state.get('state', 'unknown')}")
lines.append(f"Task Type: {state.get('task_type', '')}")
lines.append(f"Mapped Skill: {state.get('skill_name', '')}")
lines.append(f"Discovered: {state.get('discovered_at', '')}")
if state.get("started_at"):
lines.append(f"Started: {state['started_at']}")
if state.get("completed_at"):
lines.append(f"Completed: {state['completed_at']}")
if state.get("error"):
lines.append(f"Error: {state['error']}")
if state.get("deliverable_paths"):
lines.append(f"Deliverables: {', '.join(state['deliverable_paths'])}")
if state.get("custom_fields"):
fields_str = ", ".join(f"{k}: {v}" for k, v in state["custom_fields"].items() if v)
if fields_str:
lines.append(f"Custom Fields: {fields_str}")
return "\n".join(lines)
@tool(
"clickup_approve_task",
"Approve a ClickUp task that is waiting for permission to execute.",
category="clickup",
)
def clickup_approve_task(task_id: str, ctx: dict = None) -> str:
"""Approve a task in awaiting_approval state."""
db = ctx["db"]
key = f"clickup:task:{task_id}:state"
raw = db.kv_get(key)
if not raw:
return f"No tracked state found for task ID '{task_id}'."
try:
state = json.loads(raw)
except json.JSONDecodeError:
return f"Corrupted state data for task '{task_id}'."
if state.get("state") != "awaiting_approval":
return f"Task '{task_id}' is in state '{state.get('state')}', not 'awaiting_approval'. Cannot approve."
state["state"] = "approved"
db.kv_set(key, json.dumps(state))
return f"Task '{state.get('clickup_task_name', task_id)}' approved for execution. It will run on the next scheduler cycle."
@tool(
"clickup_decline_task",
"Decline a ClickUp task that is waiting for permission to execute.",
category="clickup",
)
def clickup_decline_task(task_id: str, ctx: dict = None) -> str:
"""Decline a task in awaiting_approval state."""
db = ctx["db"]
key = f"clickup:task:{task_id}:state"
raw = db.kv_get(key)
if not raw:
return f"No tracked state found for task ID '{task_id}'."
try:
state = json.loads(raw)
except json.JSONDecodeError:
return f"Corrupted state data for task '{task_id}'."
if state.get("state") != "awaiting_approval":
return f"Task '{task_id}' is in state '{state.get('state')}', not 'awaiting_approval'. Cannot decline."
state["state"] = "declined"
db.kv_set(key, json.dumps(state))
return f"Task '{state.get('clickup_task_name', task_id)}' has been declined."