diff --git a/cheddahbot/tools/clickup_tool.py b/cheddahbot/tools/clickup_tool.py new file mode 100644 index 0000000..9fb1ee4 --- /dev/null +++ b/cheddahbot/tools/clickup_tool.py @@ -0,0 +1,149 @@ +"""ClickUp chat-facing tools for listing, approving, and declining tasks.""" + +from __future__ import annotations + +import json +import logging + +from . import tool + +log = logging.getLogger(__name__) + + +def _get_clickup_states(db) -> dict[str, dict]: + """Load all tracked ClickUp task states from kv_store.""" + pairs = db.kv_scan("clickup:task:") + states = {} + for key, value in pairs: + # keys look like clickup:task:{id}:state + parts = key.split(":") + if len(parts) == 4 and parts[3] == "state": + task_id = parts[2] + try: + states[task_id] = json.loads(value) + except json.JSONDecodeError: + pass + return states + + +@tool( + "clickup_list_tasks", + "List ClickUp tasks that Cheddah is tracking. Optionally filter by internal state " + "(discovered, awaiting_approval, executing, completed, failed, declined, unmapped).", + category="clickup", +) +def clickup_list_tasks(status: str = "", ctx: dict = None) -> str: + """List tracked ClickUp tasks, optionally filtered by state.""" + db = ctx["db"] + states = _get_clickup_states(db) + + if not states: + return "No ClickUp tasks are currently being tracked." + + if status: + states = {tid: s for tid, s in states.items() if s.get("state") == status} + if not states: + return f"No ClickUp tasks with state '{status}'." + + lines = [] + for task_id, state in sorted(states.items(), key=lambda x: x[1].get("discovered_at", "")): + name = state.get("clickup_task_name", "Unknown") + task_type = state.get("task_type", "—") + task_state = state.get("state", "unknown") + skill = state.get("skill_name", "—") + lines.append( + f"• **{name}** (ID: {task_id})\n" + f" Type: {task_type} | State: {task_state} | Skill: {skill}" + ) + + return f"**Tracked ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines) + + +@tool( + "clickup_task_status", + "Check the detailed internal processing state of a ClickUp task by its ID.", + category="clickup", +) +def clickup_task_status(task_id: str, ctx: dict = None) -> str: + """Get detailed state for a specific tracked task.""" + db = ctx["db"] + raw = db.kv_get(f"clickup:task:{task_id}:state") + if not raw: + return f"No tracked state found for task ID '{task_id}'." + + try: + state = json.loads(raw) + except json.JSONDecodeError: + return f"Corrupted state data for task '{task_id}'." + + lines = [f"**Task: {state.get('clickup_task_name', 'Unknown')}** (ID: {task_id})"] + lines.append(f"State: {state.get('state', 'unknown')}") + lines.append(f"Task Type: {state.get('task_type', '—')}") + lines.append(f"Mapped Skill: {state.get('skill_name', '—')}") + lines.append(f"Discovered: {state.get('discovered_at', '—')}") + if state.get("started_at"): + lines.append(f"Started: {state['started_at']}") + if state.get("completed_at"): + lines.append(f"Completed: {state['completed_at']}") + if state.get("error"): + lines.append(f"Error: {state['error']}") + if state.get("deliverable_paths"): + lines.append(f"Deliverables: {', '.join(state['deliverable_paths'])}") + if state.get("custom_fields"): + fields_str = ", ".join(f"{k}: {v}" for k, v in state["custom_fields"].items() if v) + if fields_str: + lines.append(f"Custom Fields: {fields_str}") + + return "\n".join(lines) + + +@tool( + "clickup_approve_task", + "Approve a ClickUp task that is waiting for permission to execute.", + category="clickup", +) +def clickup_approve_task(task_id: str, ctx: dict = None) -> str: + """Approve a task in awaiting_approval state.""" + db = ctx["db"] + key = f"clickup:task:{task_id}:state" + raw = db.kv_get(key) + if not raw: + return f"No tracked state found for task ID '{task_id}'." + + try: + state = json.loads(raw) + except json.JSONDecodeError: + return f"Corrupted state data for task '{task_id}'." + + if state.get("state") != "awaiting_approval": + return f"Task '{task_id}' is in state '{state.get('state')}', not 'awaiting_approval'. Cannot approve." + + state["state"] = "approved" + db.kv_set(key, json.dumps(state)) + return f"Task '{state.get('clickup_task_name', task_id)}' approved for execution. It will run on the next scheduler cycle." + + +@tool( + "clickup_decline_task", + "Decline a ClickUp task that is waiting for permission to execute.", + category="clickup", +) +def clickup_decline_task(task_id: str, ctx: dict = None) -> str: + """Decline a task in awaiting_approval state.""" + db = ctx["db"] + key = f"clickup:task:{task_id}:state" + raw = db.kv_get(key) + if not raw: + return f"No tracked state found for task ID '{task_id}'." + + try: + state = json.loads(raw) + except json.JSONDecodeError: + return f"Corrupted state data for task '{task_id}'." + + if state.get("state") != "awaiting_approval": + return f"Task '{task_id}' is in state '{state.get('state')}', not 'awaiting_approval'. Cannot decline." + + state["state"] = "declined" + db.kv_set(key, json.dumps(state)) + return f"Task '{state.get('clickup_task_name', task_id)}' has been declined."