diff --git a/cheddahbot/clickup.py b/cheddahbot/clickup.py new file mode 100644 index 0000000..70cafe6 --- /dev/null +++ b/cheddahbot/clickup.py @@ -0,0 +1,181 @@ +"""ClickUp REST API client.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +import httpx + +log = logging.getLogger(__name__) + +BASE_URL = "https://api.clickup.com/api/v2" + + +@dataclass +class ClickUpTask: + """Lightweight representation of a ClickUp task.""" + + id: str + name: str + status: str + description: str = "" + task_type: str = "" + url: str = "" + custom_fields: dict[str, Any] = field(default_factory=dict) + list_id: str = "" + list_name: str = "" + + @classmethod + def from_api(cls, data: dict, task_type_field_name: str = "Task Type") -> ClickUpTask: + """Parse a task from the ClickUp API response.""" + custom_fields = {} + task_type = "" + for cf in data.get("custom_fields", []): + cf_name = cf.get("name", "") + cf_value = cf.get("value") + + # Resolve dropdown type_config to label + if cf.get("type") == "drop_down" and cf_value is not None: + options = cf.get("type_config", {}).get("options", []) + order_index = cf_value if isinstance(cf_value, int) else None + for opt in options: + if order_index is not None and opt.get("orderindex") == order_index: + cf_value = opt.get("name", cf_value) + break + elif opt.get("id") == cf_value: + cf_value = opt.get("name", cf_value) + break + + custom_fields[cf_name] = cf_value + if cf_name == task_type_field_name: + task_type = str(cf_value) if cf_value else "" + + status_name = data.get("status", {}).get("status", "unknown") + + return cls( + id=data["id"], + name=data.get("name", ""), + status=status_name.lower(), + description=data.get("description", "") or "", + task_type=task_type, + url=data.get("url", ""), + custom_fields=custom_fields, + list_id=data.get("list", {}).get("id", ""), + list_name=data.get("list", {}).get("name", ""), + ) + + +class ClickUpClient: + """Thin wrapper around the ClickUp REST API v2.""" + + def __init__(self, api_token: str, workspace_id: str = "", task_type_field_name: str = "Task Type"): + self._token = api_token + self.workspace_id = workspace_id + self._task_type_field_name = task_type_field_name + self._client = httpx.Client( + base_url=BASE_URL, + headers={"Authorization": api_token, "Content-Type": "application/json"}, + timeout=30.0, + ) + + def close(self): + self._client.close() + + # ── Read ── + + def get_tasks(self, list_id: str, statuses: list[str] | None = None) -> list[ClickUpTask]: + """Fetch tasks from a specific list, optionally filtered by status.""" + params: dict[str, Any] = {"include_closed": "false", "subtasks": "true"} + if statuses: + for s in statuses: + params.setdefault("statuses[]", []) + if isinstance(params["statuses[]"], list): + params["statuses[]"].append(s) + + # httpx needs repeated params as a list of tuples + param_list = [] + for k, v in params.items(): + if isinstance(v, list): + for item in v: + param_list.append((k, item)) + else: + param_list.append((k, v)) + + resp = self._client.get(f"/list/{list_id}/task", params=param_list) + resp.raise_for_status() + tasks_data = resp.json().get("tasks", []) + return [ClickUpTask.from_api(t, self._task_type_field_name) for t in tasks_data] + + def get_tasks_from_space(self, space_id: str, statuses: list[str] | None = None) -> list[ClickUpTask]: + """Traverse all folders and lists in a space to collect tasks.""" + all_tasks: list[ClickUpTask] = [] + list_ids = set() + + # Get foldered lists + try: + resp = self._client.get(f"/space/{space_id}/folder") + resp.raise_for_status() + for folder in resp.json().get("folders", []): + for lst in folder.get("lists", []): + list_ids.add(lst["id"]) + except httpx.HTTPStatusError as e: + log.warning("Failed to fetch folders for space %s: %s", space_id, e) + + # Get folderless lists + try: + resp = self._client.get(f"/space/{space_id}/list") + resp.raise_for_status() + for lst in resp.json().get("lists", []): + list_ids.add(lst["id"]) + except httpx.HTTPStatusError as e: + log.warning("Failed to fetch folderless lists for space %s: %s", space_id, e) + + # Fetch tasks from each list + for list_id in list_ids: + try: + tasks = self.get_tasks(list_id, statuses) + all_tasks.extend(tasks) + except httpx.HTTPStatusError as e: + log.warning("Failed to fetch tasks from list %s: %s", list_id, e) + + log.info("Found %d tasks across %d lists in space %s", len(all_tasks), len(list_ids), space_id) + return all_tasks + + # ── Write ── + + def update_task_status(self, task_id: str, status: str) -> bool: + """Update a task's status.""" + try: + resp = self._client.put(f"/task/{task_id}", json={"status": status}) + resp.raise_for_status() + log.info("Updated task %s status to '%s'", task_id, status) + return True + except httpx.HTTPStatusError as e: + log.error("Failed to update task %s status: %s", task_id, e) + return False + + def add_comment(self, task_id: str, text: str) -> bool: + """Add a comment to a task.""" + try: + resp = self._client.post( + f"/task/{task_id}/comment", + json={"comment_text": text}, + ) + resp.raise_for_status() + log.info("Added comment to task %s", task_id) + return True + except httpx.HTTPStatusError as e: + log.error("Failed to add comment to task %s: %s", task_id, e) + return False + + def get_custom_fields(self, list_id: str) -> list[dict]: + """Get custom fields for a list.""" + try: + resp = self._client.get(f"/list/{list_id}/field") + resp.raise_for_status() + return resp.json().get("fields", []) + except httpx.HTTPStatusError as e: + log.error("Failed to get custom fields for list %s: %s", list_id, e) + return [] diff --git a/cheddahbot/config.py b/cheddahbot/config.py index f84bcef..29d7e96 100644 --- a/cheddahbot/config.py +++ b/cheddahbot/config.py @@ -33,6 +33,21 @@ class ShellConfig: require_approval: bool = False +@dataclass +class ClickUpConfig: + api_token: str = "" + workspace_id: str = "" + space_id: str = "" + poll_interval_minutes: int = 20 + poll_statuses: list[str] = field(default_factory=lambda: ["to do"]) + review_status: str = "review" + in_progress_status: str = "in progress" + task_type_field_name: str = "Task Type" + default_auto_execute: bool = False + skill_map: dict = field(default_factory=dict) + enabled: bool = False + + @dataclass class Config: chat_model: str = "openai/gpt-4o-mini" @@ -45,6 +60,7 @@ class Config: memory: MemoryConfig = field(default_factory=MemoryConfig) scheduler: SchedulerConfig = field(default_factory=SchedulerConfig) shell: ShellConfig = field(default_factory=ShellConfig) + clickup: ClickUpConfig = field(default_factory=ClickUpConfig) # Derived paths root_dir: Path = field(default_factory=lambda: ROOT_DIR) @@ -79,6 +95,10 @@ def load_config() -> Config: for k, v in data["shell"].items(): if hasattr(cfg.shell, k): setattr(cfg.shell, k, v) + if "clickup" in data and isinstance(data["clickup"], dict): + for k, v in data["clickup"].items(): + if hasattr(cfg.clickup, k): + setattr(cfg.clickup, k, v) # Env var overrides (CHEDDAH_ prefix) cfg.openrouter_api_key = os.getenv("OPENROUTER_API_KEY", "") @@ -91,6 +111,16 @@ def load_config() -> Config: if p := os.getenv("CHEDDAH_PORT"): cfg.port = int(p) + # ClickUp env var overrides + if token := os.getenv("CLICKUP_API_TOKEN"): + cfg.clickup.api_token = token + if ws := os.getenv("CLICKUP_WORKSPACE_ID"): + cfg.clickup.workspace_id = ws + if sp := os.getenv("CLICKUP_SPACE_ID"): + cfg.clickup.space_id = sp + # Auto-enable if token is present + cfg.clickup.enabled = bool(cfg.clickup.api_token) + # Ensure data directories exist cfg.data_dir.mkdir(parents=True, exist_ok=True) (cfg.data_dir / "uploads").mkdir(exist_ok=True) diff --git a/config.yaml b/config.yaml index 614eb26..dfad89c 100644 --- a/config.yaml +++ b/config.yaml @@ -33,3 +33,19 @@ shell: - "format" - ":(){:|:&};:" require_approval: false # If true, shell commands need user confirmation + +# ClickUp integration +clickup: + poll_interval_minutes: 20 # 3x per hour + poll_statuses: ["to do"] + review_status: "review" + in_progress_status: "in progress" + task_type_field_name: "Task Type" + default_auto_execute: false + skill_map: + "Press Release": + tool: "write_press_releases" + auto_execute: true + field_mapping: + topic: "task_name" + company_name: "Company"