"""ClickUp chat-facing tools for listing, approving, and declining tasks.""" from __future__ import annotations import json import logging from . import tool log = logging.getLogger(__name__) def _get_clickup_client(ctx: dict): """Create a ClickUpClient from the agent's config.""" from ..clickup import ClickUpClient cfg = ctx["config"].clickup if not cfg.api_token: return None return ClickUpClient( api_token=cfg.api_token, workspace_id=cfg.workspace_id, task_type_field_name=cfg.task_type_field_name, ) def _get_clickup_states(db) -> dict[str, dict]: """Load all tracked ClickUp task states from kv_store.""" pairs = db.kv_scan("clickup:task:") states = {} for key, value in pairs: # keys look like clickup:task:{id}:state parts = key.split(":") if len(parts) == 4 and parts[3] == "state": task_id = parts[2] try: # noqa: SIM105 states[task_id] = json.loads(value) except json.JSONDecodeError: pass return states @tool( "clickup_query_tasks", "Query ClickUp live for tasks. Optionally filter by status (e.g. 'to do', 'in progress') " "and/or task type (e.g. 'Press Release'). Returns task name, ID, status, type, due date, " "and custom fields directly from the ClickUp API.", category="clickup", ) def clickup_query_tasks(status: str = "", task_type: str = "", ctx: dict | None = None) -> str: """Query ClickUp API for tasks, optionally filtered by status and task type.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." cfg = ctx["config"].clickup if not cfg.space_id: return "Error: ClickUp space_id not configured." try: statuses = [status] if status else None tasks = client.get_tasks_from_space(cfg.space_id, statuses=statuses) except Exception as e: return f"Error querying ClickUp: {e}" finally: client.close() if task_type: tasks = [t for t in tasks if t.task_type.lower() == task_type.lower()] if not tasks: filters = [] if status: filters.append(f"status='{status}'") if task_type: filters.append(f"type='{task_type}'") filter_str = " with " + ", ".join(filters) if filters else "" return f"No tasks found{filter_str}." lines = [] for t in tasks: parts = [f"**{t.name}** (ID: {t.id})"] parts.append(f" Status: {t.status} | Type: {t.task_type or '—'}") # Show custom fields that have values fields = {k: v for k, v in t.custom_fields.items() if v} if fields: field_strs = [f"{k}: {v}" for k, v in fields.items()] parts.append(f" Fields: {', '.join(field_strs)}") parts.append(f" URL: {t.url}") lines.append("\n".join(parts)) return f"**ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines) @tool( "clickup_list_tasks", "List ClickUp tasks that Cheddah is tracking. Optionally filter by internal state " "(discovered, awaiting_approval, executing, completed, failed, declined, unmapped).", category="clickup", ) def clickup_list_tasks(status: str = "", ctx: dict | None = None) -> str: """List tracked ClickUp tasks, optionally filtered by state.""" db = ctx["db"] states = _get_clickup_states(db) if not states: return "No ClickUp tasks are currently being tracked." if status: states = {tid: s for tid, s in states.items() if s.get("state") == status} if not states: return f"No ClickUp tasks with state '{status}'." lines = [] for task_id, state in sorted(states.items(), key=lambda x: x[1].get("discovered_at", "")): name = state.get("clickup_task_name", "Unknown") task_type = state.get("task_type", "—") task_state = state.get("state", "unknown") skill = state.get("skill_name", "—") lines.append( f"• **{name}** (ID: {task_id})\n" f" Type: {task_type} | State: {task_state} | Skill: {skill}" ) return f"**Tracked ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines) @tool( "clickup_task_status", "Check the detailed internal processing state of a ClickUp task by its ID.", category="clickup", ) def clickup_task_status(task_id: str, ctx: dict | None = None) -> str: """Get detailed state for a specific tracked task.""" db = ctx["db"] raw = db.kv_get(f"clickup:task:{task_id}:state") if not raw: return f"No tracked state found for task ID '{task_id}'." try: state = json.loads(raw) except json.JSONDecodeError: return f"Corrupted state data for task '{task_id}'." lines = [f"**Task: {state.get('clickup_task_name', 'Unknown')}** (ID: {task_id})"] lines.append(f"State: {state.get('state', 'unknown')}") lines.append(f"Task Type: {state.get('task_type', '—')}") lines.append(f"Mapped Skill: {state.get('skill_name', '—')}") lines.append(f"Discovered: {state.get('discovered_at', '—')}") if state.get("started_at"): lines.append(f"Started: {state['started_at']}") if state.get("completed_at"): lines.append(f"Completed: {state['completed_at']}") if state.get("error"): lines.append(f"Error: {state['error']}") if state.get("deliverable_paths"): lines.append(f"Deliverables: {', '.join(state['deliverable_paths'])}") if state.get("custom_fields"): fields_str = ", ".join(f"{k}: {v}" for k, v in state["custom_fields"].items() if v) if fields_str: lines.append(f"Custom Fields: {fields_str}") return "\n".join(lines) @tool( "clickup_approve_task", "Approve a ClickUp task that is waiting for permission to execute.", category="clickup", ) def clickup_approve_task(task_id: str, ctx: dict | None = None) -> str: """Approve a task in awaiting_approval state.""" db = ctx["db"] key = f"clickup:task:{task_id}:state" raw = db.kv_get(key) if not raw: return f"No tracked state found for task ID '{task_id}'." try: state = json.loads(raw) except json.JSONDecodeError: return f"Corrupted state data for task '{task_id}'." if state.get("state") != "awaiting_approval": current = state.get("state") return f"Task '{task_id}' is in state '{current}', not 'awaiting_approval'. Cannot approve." state["state"] = "approved" db.kv_set(key, json.dumps(state)) name = state.get("clickup_task_name", task_id) return f"Task '{name}' approved for execution. It will run on the next scheduler cycle." @tool( "clickup_decline_task", "Decline a ClickUp task that is waiting for permission to execute.", category="clickup", ) def clickup_decline_task(task_id: str, ctx: dict | None = None) -> str: """Decline a task in awaiting_approval state.""" db = ctx["db"] key = f"clickup:task:{task_id}:state" raw = db.kv_get(key) if not raw: return f"No tracked state found for task ID '{task_id}'." try: state = json.loads(raw) except json.JSONDecodeError: return f"Corrupted state data for task '{task_id}'." if state.get("state") != "awaiting_approval": current = state.get("state") return f"Task '{task_id}' is in state '{current}', not 'awaiting_approval'. Cannot decline." state["state"] = "declined" db.kv_set(key, json.dumps(state)) return f"Task '{state.get('clickup_task_name', task_id)}' has been declined."