"""ClickUp chat-facing tools for listing, querying, and resetting tasks.""" from __future__ import annotations import logging from datetime import UTC, datetime from . import tool log = logging.getLogger(__name__) def _get_clickup_client(ctx: dict): """Create a ClickUpClient from the agent's config.""" from ..clickup import ClickUpClient cfg = ctx["config"].clickup if not cfg.api_token: return None return ClickUpClient( api_token=cfg.api_token, workspace_id=cfg.workspace_id, task_type_field_name=cfg.task_type_field_name, ) @tool( "clickup_query_tasks", "Query ClickUp live for tasks. Optionally filter by status (e.g. 'to do', 'in progress') " "and/or task type (e.g. 'Press Release'). Returns task name, ID, status, type, due date, " "and custom fields directly from the ClickUp API.", category="clickup", ) def clickup_query_tasks(status: str = "", task_type: str = "", ctx: dict | None = None) -> str: """Query ClickUp API for tasks, optionally filtered by status and task type.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." cfg = ctx["config"].clickup if not cfg.space_id: return "Error: ClickUp space_id not configured." try: statuses = [status] if status else None tasks = client.get_tasks_from_space(cfg.space_id, statuses=statuses) except Exception as e: return f"Error querying ClickUp: {e}" finally: client.close() if task_type: tasks = [t for t in tasks if t.task_type.lower() == task_type.lower()] if not tasks: filters = [] if status: filters.append(f"status='{status}'") if task_type: filters.append(f"type='{task_type}'") filter_str = " with " + ", ".join(filters) if filters else "" return f"No tasks found{filter_str}." lines = [] for t in tasks: parts = [f"**{t.name}** (ID: {t.id})"] parts.append(f" Status: {t.status} | Type: {t.task_type or '—'}") # Show custom fields that have values fields = {k: v for k, v in t.custom_fields.items() if v} if fields: field_strs = [f"{k}: {v}" for k, v in fields.items()] parts.append(f" Fields: {', '.join(field_strs)}") parts.append(f" URL: {t.url}") lines.append("\n".join(parts)) return f"**ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines) @tool( "clickup_list_tasks", "List ClickUp tasks in automation-related statuses (automation underway, " "outline review, internal review, error). Shows tasks currently being processed.", category="clickup", ) def clickup_list_tasks(status: str = "", ctx: dict | None = None) -> str: """List ClickUp tasks in automation-related statuses.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." cfg = ctx["config"].clickup if not cfg.space_id: return "Error: ClickUp space_id not configured." # Query tasks in automation-related statuses automation_statuses = [ cfg.automation_status, "outline review", cfg.review_status, cfg.error_status, ] if status: automation_statuses = [status] try: tasks = client.get_tasks_from_space(cfg.space_id, statuses=automation_statuses) except Exception as e: return f"Error querying ClickUp: {e}" finally: client.close() if not tasks: filter_note = f" with status '{status}'" if status else " in automation statuses" return f"No tasks found{filter_note}." lines = [] for t in tasks: parts = [f"**{t.name}** (ID: {t.id})"] parts.append(f" Status: {t.status} | Type: {t.task_type or '—'}") fields = {k: v for k, v in t.custom_fields.items() if v} if fields: field_strs = [f"{k}: {v}" for k, v in fields.items()] parts.append(f" Fields: {', '.join(field_strs)}") lines.append("\n".join(parts)) return f"**Automation Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines) @tool( "clickup_task_status", "Check the current status and details of a ClickUp task by its ID.", category="clickup", ) def clickup_task_status(task_id: str, ctx: dict | None = None) -> str: """Get current status for a specific ClickUp task from the API.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." try: task = client.get_task(task_id) except Exception as e: return f"Error fetching task '{task_id}': {e}" finally: client.close() lines = [f"**Task: {task.name}** (ID: {task.id})"] lines.append(f"Status: {task.status}") lines.append(f"Type: {task.task_type or '—'}") if task.url: lines.append(f"URL: {task.url}") if task.due_date: lines.append(f"Due: {task.due_date}") if task.date_updated: lines.append(f"Updated: {task.date_updated}") fields = {k: v for k, v in task.custom_fields.items() if v} if fields: field_strs = [f"{k}: {v}" for k, v in fields.items()] lines.append(f"Fields: {', '.join(field_strs)}") return "\n".join(lines) @tool( "clickup_create_task", "Create a new ClickUp task for a client. Requires task name and client name. " "Optionally set work category, description, status, due_date (Unix ms), " "tags (comma-separated), and arbitrary custom fields via custom_fields_json " '(JSON object like {"Keyword":"value","CLIFlags":"--tier1-count 5"}). ' "The task is created in the 'Overall' list within the client's folder.", category="clickup", ) def clickup_create_task( name: str, client: str, work_category: str = "", description: str = "", status: str = "to do", due_date: str = "", tags: str = "", custom_fields_json: str = "", priority: int = 2, assignee: int = 10765627, time_estimate_ms: int = 0, ctx: dict | None = None, ) -> str: """Create a new ClickUp task in the client's Overall list.""" import json as _json client_obj = _get_clickup_client(ctx) if not client_obj: return "Error: ClickUp API token not configured." cfg = ctx["config"].clickup if not cfg.space_id: return "Error: ClickUp space_id not configured." try: # Find the client's Overall list list_id = client_obj.find_list_in_folder(cfg.space_id, client) if not list_id: return ( f"Error: Could not find folder '{client}' " f"with an 'Overall' list in space." ) # Build create kwargs create_kwargs: dict = { "list_id": list_id, "name": name, "description": description, "status": status, "priority": priority, "assignees": [assignee], } if due_date: create_kwargs["due_date"] = int(due_date) if tags: create_kwargs["tags"] = [t.strip() for t in tags.split(",")] if time_estimate_ms: create_kwargs["time_estimate"] = time_estimate_ms # Create the task result = client_obj.create_task(**create_kwargs) task_id = result.get("id", "") task_url = result.get("url", "") # Set Client dropdown field client_obj.set_custom_field_smart(task_id, list_id, "Client", client) # Set Work Category if provided if work_category: client_obj.set_custom_field_smart( task_id, list_id, "Work Category", work_category ) # Set any additional custom fields if custom_fields_json: extra_fields = _json.loads(custom_fields_json) for field_name, field_value in extra_fields.items(): client_obj.set_custom_field_smart( task_id, list_id, field_name, str(field_value) ) return ( f"Task created successfully!\n" f" Name: {name}\n" f" Client: {client}\n" f" ID: {task_id}\n" f" URL: {task_url}" ) except Exception as e: return f"Error creating task: {e}" finally: client_obj.close() @tool( "clickup_add_dependency", "Add a 'blocked by' dependency between two ClickUp tasks. " "The blocked_task_id will be blocked by blocker_task_id " "(i.e. blocker must complete before blocked can start).", category="clickup", ) def clickup_add_dependency( blocked_task_id: str, blocker_task_id: str, ctx: dict | None = None, ) -> str: """Set blocked_task_id as blocked by blocker_task_id.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." try: ok = client.add_dependency(blocked_task_id, depends_on=blocker_task_id) if ok: return ( f"Dependency added: task {blocked_task_id} " f"is now blocked by {blocker_task_id}." ) return ( f"Failed to add dependency. Check that both task IDs are valid." ) except Exception as e: return f"Error adding dependency: {e}" finally: client.close() @tool( "clickup_reset_task", "Reset a ClickUp task to 'to do' status so it can be retried on the next poll. " "Use this when a task is stuck in an error or automation state.", category="clickup", ) def clickup_reset_task(task_id: str, ctx: dict | None = None) -> str: """Reset a ClickUp task status to 'to do' for retry.""" client = _get_clickup_client(ctx) if not client: return "Error: ClickUp API token not configured." cfg = ctx["config"].clickup reset_status = cfg.poll_statuses[0] if cfg.poll_statuses else "to do" try: client.update_task_status(task_id, reset_status) client.add_comment( task_id, f"Task reset to '{reset_status}' via chat command.", ) except Exception as e: return f"Error resetting task '{task_id}': {e}" finally: client.close() return ( f"Task '{task_id}' reset to '{reset_status}'. " f"It will be picked up on the next scheduler poll." ) def _format_duration(delta) -> str: """Format a timedelta as a human-readable duration string.""" total_seconds = int(delta.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours: return f"{hours}h {minutes}m {seconds}s" if minutes: return f"{minutes}m {seconds}s" return f"{seconds}s" def _format_ago(iso_str: str | None) -> str: """Format an ISO timestamp as 'Xm ago' relative to now.""" if not iso_str: return "never" try: ts = datetime.fromisoformat(iso_str) delta = datetime.now(UTC) - ts total_seconds = int(delta.total_seconds()) if total_seconds < 60: return f"{total_seconds}s ago" minutes = total_seconds // 60 if minutes < 60: return f"{minutes}m ago" hours = minutes // 60 return f"{hours}h {minutes % 60}m ago" except (ValueError, TypeError): return "unknown" @tool( "get_active_tasks", "Show what CheddahBot is actively executing right now. " "Reports running tasks, loop health, and whether it's safe to restart.", category="clickup", ) def get_active_tasks(ctx: dict | None = None) -> str: """Show actively running scheduler tasks and loop health.""" scheduler = ctx.get("scheduler") if ctx else None if not scheduler: return "Scheduler not available — cannot check active executions." now = datetime.now(UTC) lines = [] # Active executions active = scheduler.get_active_executions() if active: lines.append(f"**Active Executions ({len(active)}):**") for task_id, info in active.items(): duration = _format_duration(now - info["started_at"]) lines.append( f"- **{info['name']}** — `{info['tool']}` — " f"running {duration} ({info['thread']} thread)" ) else: lines.append("**No tasks actively executing.**") # Loop health timestamps = scheduler.get_loop_timestamps() lines.append("") lines.append("**Loop Health:**") for loop_name, ts in timestamps.items(): lines.append(f"- {loop_name}: last ran {_format_ago(ts)}") # Safe to restart? lines.append("") if active: lines.append(f"**Safe to restart: No** ({len(active)} task(s) actively running)") else: lines.append("**Safe to restart: Yes**") return "\n".join(lines)