"""ClickUp REST API client.""" from __future__ import annotations import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx log = logging.getLogger(__name__) BASE_URL = "https://api.clickup.com/api/v2" @dataclass class ClickUpTask: """Lightweight representation of a ClickUp task.""" id: str name: str status: str description: str = "" task_type: str = "" url: str = "" due_date: str = "" custom_fields: dict[str, Any] = field(default_factory=dict) list_id: str = "" list_name: str = "" tags: list[str] = field(default_factory=list) date_done: str = "" date_updated: str = "" @classmethod def from_api(cls, data: dict, task_type_field_name: str = "Task Type") -> ClickUpTask: """Parse a task from the ClickUp API response.""" custom_fields = {} task_type = "" for cf in data.get("custom_fields", []): cf_name = cf.get("name", "") cf_value = cf.get("value") # Resolve dropdown type_config to label if cf.get("type") == "drop_down" and cf_value is not None: options = cf.get("type_config", {}).get("options", []) order_index = cf_value if isinstance(cf_value, int) else None for opt in options: if ( order_index is not None and opt.get("orderindex") == order_index ) or opt.get("id") == cf_value: cf_value = opt.get("name", cf_value) break custom_fields[cf_name] = cf_value if cf_name == task_type_field_name: task_type = str(cf_value) if cf_value else "" status_name = data.get("status", {}).get("status", "unknown") # due_date comes as a Unix-ms timestamp string (or None) raw_due = data.get("due_date") due_date = str(raw_due) if raw_due else "" tags = [tag["name"] for tag in data.get("tags", [])] raw_done = data.get("date_done") or data.get("date_closed") date_done = str(raw_done) if raw_done else "" raw_updated = data.get("date_updated") date_updated = str(raw_updated) if raw_updated else "" return cls( id=data["id"], name=data.get("name", ""), status=status_name.lower(), description=data.get("description", "") or "", task_type=task_type, url=data.get("url", ""), due_date=due_date, custom_fields=custom_fields, list_id=data.get("list", {}).get("id", ""), list_name=data.get("list", {}).get("name", ""), tags=tags, date_done=date_done, date_updated=date_updated, ) class ClickUpClient: """Thin wrapper around the ClickUp REST API v2.""" def __init__( self, api_token: str, workspace_id: str = "", task_type_field_name: str = "Task Type" ): self._token = api_token self.workspace_id = workspace_id self._task_type_field_name = task_type_field_name self._client = httpx.Client( base_url=BASE_URL, headers={"Authorization": api_token, "Content-Type": "application/json"}, timeout=30.0, ) def close(self): self._client.close() # ── Read ── def get_tasks( self, list_id: str, statuses: list[str] | None = None, due_date_lt: int | None = None, custom_fields: str | None = None, include_closed: bool = False, ) -> list[ClickUpTask]: """Fetch tasks from a specific list, optionally filtered by status/due date/fields.""" params: dict[str, Any] = { "include_closed": "true" if include_closed else "false", "subtasks": "true", } if statuses: for s in statuses: params.setdefault("statuses[]", []) if isinstance(params["statuses[]"], list): params["statuses[]"].append(s) if due_date_lt is not None: params["due_date_lt"] = str(due_date_lt) if custom_fields is not None: params["custom_fields"] = custom_fields # httpx needs repeated params as a list of tuples param_list = [] for k, v in params.items(): if isinstance(v, list): for item in v: param_list.append((k, item)) else: param_list.append((k, v)) resp = self._client.get(f"/list/{list_id}/task", params=param_list) resp.raise_for_status() tasks_data = resp.json().get("tasks", []) return [ClickUpTask.from_api(t, self._task_type_field_name) for t in tasks_data] def get_tasks_from_space( self, space_id: str, statuses: list[str] | None = None, due_date_lt: int | None = None, custom_fields: str | None = None, ) -> list[ClickUpTask]: """Traverse all folders and lists in a space to collect tasks.""" all_tasks: list[ClickUpTask] = [] list_ids = set() # Get foldered lists try: resp = self._client.get(f"/space/{space_id}/folder") resp.raise_for_status() for folder in resp.json().get("folders", []): for lst in folder.get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folders for space %s: %s", space_id, e) # Get folderless lists try: resp = self._client.get(f"/space/{space_id}/list") resp.raise_for_status() for lst in resp.json().get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folderless lists for space %s: %s", space_id, e) # Fetch tasks from each list for list_id in list_ids: try: tasks = self.get_tasks(list_id, statuses, due_date_lt, custom_fields) all_tasks.extend(tasks) except httpx.HTTPStatusError as e: log.warning("Failed to fetch tasks from list %s: %s", list_id, e) log.info( "Found %d tasks across %d lists in space %s", len(all_tasks), len(list_ids), space_id ) return all_tasks def get_tasks_from_overall_lists( self, space_id: str, statuses: list[str] | None = None, due_date_lt: int | None = None, custom_fields: str | None = None, include_closed: bool = False, ) -> list[ClickUpTask]: """Fetch tasks only from 'Overall' lists in each folder. This is the dashboard-specific alternative to get_tasks_from_space, which hits every list and returns too much noise. """ all_tasks: list[ClickUpTask] = [] overall_ids: list[str] = [] try: folders = self.get_folders(space_id) for folder in folders: for lst in folder["lists"]: if lst["name"].lower() == "overall": overall_ids.append(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folders for space %s: %s", space_id, e) return [] for list_id in overall_ids: try: tasks = self.get_tasks( list_id, statuses, due_date_lt, custom_fields, include_closed ) all_tasks.extend(tasks) except httpx.HTTPStatusError as e: log.warning("Failed to fetch tasks from list %s: %s", list_id, e) log.info( "Found %d tasks across %d Overall lists in space %s", len(all_tasks), len(overall_ids), space_id, ) return all_tasks # ── Write (with retry) ── @staticmethod def _retry(fn, max_attempts: int = 3, backoff: float = 2.0): """Call *fn* up to *max_attempts* times with exponential backoff. Returns the result of *fn* on success, or re-raises the last exception. Only retries on network/transport errors and 5xx status errors. """ last_exc: Exception | None = None for attempt in range(1, max_attempts + 1): try: return fn() except (httpx.TransportError, httpx.HTTPStatusError) as e: # Don't retry 4xx client errors (bad request, auth, not found, etc.) if isinstance(e, httpx.HTTPStatusError) and e.response.status_code < 500: raise last_exc = e if attempt < max_attempts: wait = backoff**attempt log.warning("Retry %d/%d after %.1fs: %s", attempt, max_attempts, wait, e) time.sleep(wait) raise last_exc def update_task_status(self, task_id: str, status: str) -> bool: """Update a task's status.""" try: def _call(): resp = self._client.put(f"/task/{task_id}", json={"status": status}) resp.raise_for_status() return resp self._retry(_call) log.info("Updated task %s status to '%s'", task_id, status) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to update task %s status: %s", task_id, e) return False def add_comment(self, task_id: str, text: str) -> bool: """Add a comment to a task.""" try: def _call(): resp = self._client.post( f"/task/{task_id}/comment", json={"comment_text": text}, ) resp.raise_for_status() return resp self._retry(_call) log.info("Added comment to task %s", task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to add comment to task %s: %s", task_id, e) return False def upload_attachment(self, task_id: str, file_path: str | Path) -> bool: """Upload a file attachment to a task. Uses module-level httpx.post() instead of self._client because the shared client sets Content-Type: application/json which conflicts with multipart/form-data uploads. """ fp = Path(file_path) if not fp.exists(): log.warning("Attachment file not found: %s", fp) return False try: def _call(): with open(fp, "rb") as f: resp = httpx.post( f"{BASE_URL}/task/{task_id}/attachment", headers={"Authorization": self._token}, files={"attachment": (fp.name, f, "application/octet-stream")}, timeout=60.0, ) resp.raise_for_status() return resp self._retry(_call) log.info("Uploaded attachment %s to task %s", fp.name, task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.warning("Failed to upload attachment to task %s: %s", task_id, e) return False def get_list_ids_from_space(self, space_id: str) -> set[str]: """Return all list IDs (foldered + folderless) in a space.""" list_ids: set[str] = set() try: resp = self._client.get(f"/space/{space_id}/folder") resp.raise_for_status() for folder in resp.json().get("folders", []): for lst in folder.get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folders for space %s: %s", space_id, e) try: resp = self._client.get(f"/space/{space_id}/list") resp.raise_for_status() for lst in resp.json().get("lists", []): list_ids.add(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folderless lists for space %s: %s", space_id, e) return list_ids def get_folders(self, space_id: str) -> list[dict]: """Return folders in a space with their lists. Each dict has keys: id, name, lists (list of {id, name}). """ resp = self._client.get(f"/space/{space_id}/folder") resp.raise_for_status() folders = [] for f in resp.json().get("folders", []): lists = [{"id": lst["id"], "name": lst["name"]} for lst in f.get("lists", [])] folders.append({"id": f["id"], "name": f["name"], "lists": lists}) return folders def set_custom_field_value(self, task_id: str, field_id: str, value: Any) -> bool: """Set a custom field value on a task. For dropdowns, *value* should be the option UUID string. """ try: def _call(): resp = self._client.post( f"/task/{task_id}/field/{field_id}", json={"value": value}, ) resp.raise_for_status() return resp self._retry(_call) log.info("Set field %s on task %s", field_id, task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to set field %s on task %s: %s", field_id, task_id, e) return False def get_custom_fields(self, list_id: str) -> list[dict]: """Get custom fields for a list.""" try: resp = self._client.get(f"/list/{list_id}/field") resp.raise_for_status() return resp.json().get("fields", []) except httpx.HTTPStatusError as e: log.error("Failed to get custom fields for list %s: %s", list_id, e) return [] def create_custom_field( self, list_id: str, name: str, field_type: str, type_config: dict | None = None, ) -> dict: """Create a custom field on a list. Args: list_id: ClickUp list ID. name: Field name (e.g., "LB Method"). field_type: Field type (e.g., "short_text", "drop_down"). type_config: Optional type configuration (e.g., dropdown options). Returns: API response dict on success, or raises on failure. """ payload: dict = {"name": name, "type": field_type} if type_config: payload["type_config"] = type_config def _call(): resp = self._client.post(f"/list/{list_id}/field", json=payload) resp.raise_for_status() return resp.json() result = self._retry(_call) log.info("Created custom field '%s' (%s) on list %s", name, field_type, list_id) return result def get_task(self, task_id: str) -> ClickUpTask: """Fetch a single task by ID.""" resp = self._client.get(f"/task/{task_id}") resp.raise_for_status() return ClickUpTask.from_api(resp.json(), self._task_type_field_name) def set_custom_field_by_name( self, task_id: str, field_name: str, value: Any ) -> bool: """Set a custom field by its human-readable name. Looks up the field ID from the task's list, then sets the value. Falls back gracefully if the field doesn't exist. """ try: task_data = self._client.get(f"/task/{task_id}").json() list_id = task_data.get("list", {}).get("id", "") if not list_id: log.warning("Could not determine list_id for task %s", task_id) return False fields = self.get_custom_fields(list_id) field_id = None for f in fields: if f.get("name") == field_name: field_id = f["id"] break if not field_id: log.warning("Field '%s' not found in list %s", field_name, list_id) return False return self.set_custom_field_value(task_id, field_id, value) except Exception as e: log.error("Failed to set field '%s' on task %s: %s", field_name, task_id, e) return False def set_custom_field_smart( self, task_id: str, list_id: str, field_name: str, value: str ) -> bool: """Set a custom field by name, auto-resolving dropdown option UUIDs. For dropdown fields, *value* is matched against option names (case-insensitive). For all other field types, *value* is passed through. """ try: fields = self.get_custom_fields(list_id) target = None for f in fields: if f.get("name") == field_name: target = f break if not target: log.warning("Field '%s' not found in list %s", field_name, list_id) return False field_id = target["id"] resolved = value if target.get("type") == "drop_down": options = target.get("type_config", {}).get("options", []) for opt in options: if opt.get("name", "").lower() == value.lower(): resolved = opt["id"] break else: log.warning( "Dropdown option '%s' not found for field '%s'", value, field_name, ) return False return self.set_custom_field_value(task_id, field_id, resolved) except Exception as e: log.error( "Failed to set field '%s' on task %s: %s", field_name, task_id, e ) return False def add_dependency(self, task_id: str, depends_on: str) -> bool: """Add a 'blocked by' dependency: *task_id* is blocked by *depends_on*. Uses POST /task/{task_id}/dependency with {"depends_on": ...}. """ try: def _call(): resp = self._client.post( f"/task/{task_id}/dependency", json={"depends_on": depends_on}, ) resp.raise_for_status() return resp self._retry(_call) log.info( "Added dependency: task %s blocked by %s", task_id, depends_on ) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error( "Failed to add dependency on task %s: %s", task_id, e ) return False def get_custom_field_by_name(self, task_id: str, field_name: str) -> Any: """Read a custom field value from a task by field name. Fetches the task and looks up the field value from custom_fields. Returns None if not found. """ try: task = self.get_task(task_id) return task.custom_fields.get(field_name) except Exception as e: log.warning("Failed to read field '%s' from task %s: %s", field_name, task_id, e) return None def create_task( self, list_id: str, name: str, description: str = "", status: str = "to do", due_date: int | None = None, tags: list[str] | None = None, custom_fields: list[dict] | None = None, priority: int | None = None, assignees: list[int] | None = None, time_estimate: int | None = None, ) -> dict: """Create a new task in a ClickUp list. Args: list_id: The list to create the task in. name: Task name. description: Task description (markdown supported). status: Initial status (default "to do"). due_date: Due date as Unix timestamp in milliseconds. tags: List of tag names to apply. custom_fields: List of custom field dicts ({"id": ..., "value": ...}). priority: 1=Urgent, 2=High, 3=Normal, 4=Low. assignees: List of ClickUp user IDs. time_estimate: Time estimate in milliseconds. Returns: API response dict containing task id, url, etc. """ payload: dict[str, Any] = {"name": name, "status": status} if description: payload["description"] = description if due_date is not None: payload["due_date"] = due_date if tags: payload["tags"] = tags if custom_fields: payload["custom_fields"] = custom_fields if priority is not None: payload["priority"] = priority if assignees: payload["assignees"] = assignees if time_estimate is not None: payload["time_estimate"] = time_estimate def _call(): resp = self._client.post(f"/list/{list_id}/task", json=payload) resp.raise_for_status() return resp.json() result = self._retry(_call) log.info("Created task '%s' in list %s (id: %s)", name, list_id, result.get("id")) return result def find_list_in_folder( self, space_id: str, folder_name: str, list_name: str = "Overall" ) -> str | None: """Find a list within a named folder in a space. Args: space_id: ClickUp space ID. folder_name: Folder name to match (case-insensitive). list_name: List name within the folder (default "Overall"). Returns: The list_id if found, or None. """ folders = self.get_folders(space_id) for folder in folders: if folder["name"].lower() == folder_name.lower(): for lst in folder["lists"]: if lst["name"].lower() == list_name.lower(): return lst["id"] return None def discover_field_filter(self, list_id: str, field_name: str) -> dict[str, Any] | None: """Discover a custom field's UUID and dropdown option map. Returns {"field_id": "", "options": {"Press Release": "", ...}} or None if the field is not found. """ fields = self.get_custom_fields(list_id) for f in fields: if f.get("name") == field_name: field_id = f["id"] options: dict[str, str] = {} if f.get("type") == "drop_down": for opt in f.get("type_config", {}).get("options", []): opt_name = opt.get("name", "") opt_id = opt.get("id", "") if opt_name and opt_id: options[opt_name] = opt_id return {"field_id": field_id, "options": options} return None