"""ClickUp REST API client.""" from __future__ import annotations import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx log = logging.getLogger(__name__) BASE_URL = "https://api.clickup.com/api/v2" @dataclass class ClickUpTask: """Lightweight representation of a ClickUp task.""" id: str name: str status: str description: str = "" task_type: str = "" url: str = "" custom_fields: dict[str, Any] = field(default_factory=dict) list_id: str = "" list_name: str = "" @classmethod def from_api(cls, data: dict, task_type_field_name: str = "Task Type") -> ClickUpTask: """Parse a task from the ClickUp API response.""" custom_fields = {} task_type = "" for cf in data.get("custom_fields", []): cf_name = cf.get("name", "") cf_value = cf.get("value") # Resolve dropdown type_config to label if cf.get("type") == "drop_down" and cf_value is not None: options = cf.get("type_config", {}).get("options", []) order_index = cf_value if isinstance(cf_value, int) else None for opt in options: if ( order_index is not None and opt.get("orderindex") == order_index ) or opt.get("id") == cf_value: cf_value = opt.get("name", cf_value) break custom_fields[cf_name] = cf_value if cf_name == task_type_field_name: task_type = str(cf_value) if cf_value else "" status_name = data.get("status", {}).get("status", "unknown") return cls( id=data["id"], name=data.get("name", ""), status=status_name.lower(), description=data.get("description", "") or "", task_type=task_type, url=data.get("url", ""), custom_fields=custom_fields, list_id=data.get("list", {}).get("id", ""), list_name=data.get("list", {}).get("name", ""), ) class ClickUpClient: """Thin wrapper around the ClickUp REST API v2.""" def __init__( self, api_token: str, workspace_id: str = "", task_type_field_name: str = "Task Type" ): self._token = api_token self.workspace_id = workspace_id self._task_type_field_name = task_type_field_name self._client = httpx.Client( base_url=BASE_URL, headers={"Authorization": api_token, "Content-Type": "application/json"}, timeout=30.0, ) def close(self): self._client.close() # ── Read ── def get_tasks(self, list_id: str, statuses: list[str] | None = None) -> list[ClickUpTask]: """Fetch tasks from a specific list, optionally filtered by status.""" params: dict[str, Any] = {"include_closed": "false", "subtasks": "true"} if statuses: for s in statuses: params.setdefault("statuses[]", []) if isinstance(params["statuses[]"], list): params["statuses[]"].append(s) # httpx needs repeated params as a list of tuples param_list = [] for k, v in params.items(): if isinstance(v, list): for item in v: param_list.append((k, item)) else: param_list.append((k, v)) resp = self._client.get(f"/list/{list_id}/task", params=param_list) resp.raise_for_status() tasks_data = resp.json().get("tasks", []) return [ClickUpTask.from_api(t, self._task_type_field_name) for t in tasks_data] def get_tasks_from_space( self, space_id: str, statuses: list[str] | None = None ) -> list[ClickUpTask]: """Traverse all folders and lists in a space to collect tasks.""" all_tasks: list[ClickUpTask] = [] list_ids = set() # Get foldered lists try: resp = self._client.get(f"/space/{space_id}/folder") resp.raise_for_status() for folder in resp.json().get("folders", []): for lst in folder.get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folders for space %s: %s", space_id, e) # Get folderless lists try: resp = self._client.get(f"/space/{space_id}/list") resp.raise_for_status() for lst in resp.json().get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folderless lists for space %s: %s", space_id, e) # Fetch tasks from each list for list_id in list_ids: try: tasks = self.get_tasks(list_id, statuses) all_tasks.extend(tasks) except httpx.HTTPStatusError as e: log.warning("Failed to fetch tasks from list %s: %s", list_id, e) log.info( "Found %d tasks across %d lists in space %s", len(all_tasks), len(list_ids), space_id ) return all_tasks # ── Write (with retry) ── @staticmethod def _retry(fn, max_attempts: int = 3, backoff: float = 2.0): """Call *fn* up to *max_attempts* times with exponential backoff. Returns the result of *fn* on success, or re-raises the last exception. Only retries on network/transport errors and 5xx status errors. """ last_exc: Exception | None = None for attempt in range(1, max_attempts + 1): try: return fn() except (httpx.TransportError, httpx.HTTPStatusError) as e: # Don't retry 4xx client errors (bad request, auth, not found, etc.) if isinstance(e, httpx.HTTPStatusError) and e.response.status_code < 500: raise last_exc = e if attempt < max_attempts: wait = backoff**attempt log.warning("Retry %d/%d after %.1fs: %s", attempt, max_attempts, wait, e) time.sleep(wait) raise last_exc def update_task_status(self, task_id: str, status: str) -> bool: """Update a task's status.""" try: def _call(): resp = self._client.put(f"/task/{task_id}", json={"status": status}) resp.raise_for_status() return resp self._retry(_call) log.info("Updated task %s status to '%s'", task_id, status) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to update task %s status: %s", task_id, e) return False def add_comment(self, task_id: str, text: str) -> bool: """Add a comment to a task.""" try: def _call(): resp = self._client.post( f"/task/{task_id}/comment", json={"comment_text": text}, ) resp.raise_for_status() return resp self._retry(_call) log.info("Added comment to task %s", task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to add comment to task %s: %s", task_id, e) return False def upload_attachment(self, task_id: str, file_path: str | Path) -> bool: """Upload a file attachment to a task. Uses module-level httpx.post() instead of self._client because the shared client sets Content-Type: application/json which conflicts with multipart/form-data uploads. """ fp = Path(file_path) if not fp.exists(): log.warning("Attachment file not found: %s", fp) return False try: def _call(): with open(fp, "rb") as f: resp = httpx.post( f"{BASE_URL}/task/{task_id}/attachment", headers={"Authorization": self._token}, files={"attachment": (fp.name, f, "application/octet-stream")}, timeout=60.0, ) resp.raise_for_status() return resp self._retry(_call) log.info("Uploaded attachment %s to task %s", fp.name, task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.warning("Failed to upload attachment to task %s: %s", task_id, e) return False def get_custom_fields(self, list_id: str) -> list[dict]: """Get custom fields for a list.""" try: resp = self._client.get(f"/list/{list_id}/field") resp.raise_for_status() return resp.json().get("fields", []) except httpx.HTTPStatusError as e: log.error("Failed to get custom fields for list %s: %s", list_id, e) return []