CheddahBot/cheddahbot/tools/clickup_tool.py

193 lines
6.2 KiB
Python

"""ClickUp chat-facing tools for listing, querying, and resetting tasks."""
from __future__ import annotations
import logging
from . import tool
log = logging.getLogger(__name__)
def _get_clickup_client(ctx: dict):
"""Create a ClickUpClient from the agent's config."""
from ..clickup import ClickUpClient
cfg = ctx["config"].clickup
if not cfg.api_token:
return None
return ClickUpClient(
api_token=cfg.api_token,
workspace_id=cfg.workspace_id,
task_type_field_name=cfg.task_type_field_name,
)
@tool(
"clickup_query_tasks",
"Query ClickUp live for tasks. Optionally filter by status (e.g. 'to do', 'in progress') "
"and/or task type (e.g. 'Press Release'). Returns task name, ID, status, type, due date, "
"and custom fields directly from the ClickUp API.",
category="clickup",
)
def clickup_query_tasks(status: str = "", task_type: str = "", ctx: dict | None = None) -> str:
"""Query ClickUp API for tasks, optionally filtered by status and task type."""
client = _get_clickup_client(ctx)
if not client:
return "Error: ClickUp API token not configured."
cfg = ctx["config"].clickup
if not cfg.space_id:
return "Error: ClickUp space_id not configured."
try:
statuses = [status] if status else None
tasks = client.get_tasks_from_space(cfg.space_id, statuses=statuses)
except Exception as e:
return f"Error querying ClickUp: {e}"
finally:
client.close()
if task_type:
tasks = [t for t in tasks if t.task_type.lower() == task_type.lower()]
if not tasks:
filters = []
if status:
filters.append(f"status='{status}'")
if task_type:
filters.append(f"type='{task_type}'")
filter_str = " with " + ", ".join(filters) if filters else ""
return f"No tasks found{filter_str}."
lines = []
for t in tasks:
parts = [f"**{t.name}** (ID: {t.id})"]
parts.append(f" Status: {t.status} | Type: {t.task_type or ''}")
# Show custom fields that have values
fields = {k: v for k, v in t.custom_fields.items() if v}
if fields:
field_strs = [f"{k}: {v}" for k, v in fields.items()]
parts.append(f" Fields: {', '.join(field_strs)}")
parts.append(f" URL: {t.url}")
lines.append("\n".join(parts))
return f"**ClickUp Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines)
@tool(
"clickup_list_tasks",
"List ClickUp tasks in automation-related statuses (automation underway, "
"outline review, internal review, error). Shows tasks currently being processed.",
category="clickup",
)
def clickup_list_tasks(status: str = "", ctx: dict | None = None) -> str:
"""List ClickUp tasks in automation-related statuses."""
client = _get_clickup_client(ctx)
if not client:
return "Error: ClickUp API token not configured."
cfg = ctx["config"].clickup
if not cfg.space_id:
return "Error: ClickUp space_id not configured."
# Query tasks in automation-related statuses
automation_statuses = [
cfg.automation_status,
"outline review",
cfg.review_status,
cfg.error_status,
]
if status:
automation_statuses = [status]
try:
tasks = client.get_tasks_from_space(cfg.space_id, statuses=automation_statuses)
except Exception as e:
return f"Error querying ClickUp: {e}"
finally:
client.close()
if not tasks:
filter_note = f" with status '{status}'" if status else " in automation statuses"
return f"No tasks found{filter_note}."
lines = []
for t in tasks:
parts = [f"**{t.name}** (ID: {t.id})"]
parts.append(f" Status: {t.status} | Type: {t.task_type or ''}")
fields = {k: v for k, v in t.custom_fields.items() if v}
if fields:
field_strs = [f"{k}: {v}" for k, v in fields.items()]
parts.append(f" Fields: {', '.join(field_strs)}")
lines.append("\n".join(parts))
return f"**Automation Tasks ({len(lines)}):**\n\n" + "\n\n".join(lines)
@tool(
"clickup_task_status",
"Check the current status and details of a ClickUp task by its ID.",
category="clickup",
)
def clickup_task_status(task_id: str, ctx: dict | None = None) -> str:
"""Get current status for a specific ClickUp task from the API."""
client = _get_clickup_client(ctx)
if not client:
return "Error: ClickUp API token not configured."
try:
task = client.get_task(task_id)
except Exception as e:
return f"Error fetching task '{task_id}': {e}"
finally:
client.close()
lines = [f"**Task: {task.name}** (ID: {task.id})"]
lines.append(f"Status: {task.status}")
lines.append(f"Type: {task.task_type or ''}")
if task.url:
lines.append(f"URL: {task.url}")
if task.due_date:
lines.append(f"Due: {task.due_date}")
if task.date_updated:
lines.append(f"Updated: {task.date_updated}")
fields = {k: v for k, v in task.custom_fields.items() if v}
if fields:
field_strs = [f"{k}: {v}" for k, v in fields.items()]
lines.append(f"Fields: {', '.join(field_strs)}")
return "\n".join(lines)
@tool(
"clickup_reset_task",
"Reset a ClickUp task to 'to do' status so it can be retried on the next poll. "
"Use this when a task is stuck in an error or automation state.",
category="clickup",
)
def clickup_reset_task(task_id: str, ctx: dict | None = None) -> str:
"""Reset a ClickUp task status to 'to do' for retry."""
client = _get_clickup_client(ctx)
if not client:
return "Error: ClickUp API token not configured."
cfg = ctx["config"].clickup
reset_status = cfg.poll_statuses[0] if cfg.poll_statuses else "to do"
try:
client.update_task_status(task_id, reset_status)
client.add_comment(
task_id,
f"Task reset to '{reset_status}' via chat command.",
)
except Exception as e:
return f"Error resetting task '{task_id}': {e}"
finally:
client.close()
return (
f"Task '{task_id}' reset to '{reset_status}'. "
f"It will be picked up on the next scheduler poll."
)