"""ClickUp REST API client for the runner. Adapted from cheddahbot/clickup.py -- stripped to what the runner needs, with additions for checkbox, stage dropdown, and attachment operations. """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx log = logging.getLogger(__name__) BASE_URL = "https://api.clickup.com/api/v2" @dataclass class ClickUpTask: """Lightweight representation of a ClickUp task.""" id: str name: str status: str description: str = "" task_type: str = "" # Work Category value url: str = "" due_date: str = "" # Unix-ms timestamp string, or "" custom_fields: dict[str, Any] = field(default_factory=dict) custom_fields_raw: list[dict] = field(default_factory=list) list_id: str = "" list_name: str = "" folder_name: str = "" tags: list[str] = field(default_factory=list) attachments: list[dict] = field(default_factory=list) @classmethod def from_api( cls, data: dict, task_type_field_name: str = "Work Category" ) -> ClickUpTask: """Parse a task from the ClickUp API response.""" custom_fields: dict[str, Any] = {} custom_fields_raw = data.get("custom_fields", []) task_type = "" for cf in custom_fields_raw: cf_name = cf.get("name", "") cf_value = cf.get("value") # Resolve dropdown type_config to label if cf.get("type") == "drop_down" and cf_value is not None: options = cf.get("type_config", {}).get("options", []) order_index = cf_value if isinstance(cf_value, int) else None for opt in options: if ( order_index is not None and opt.get("orderindex") == order_index ) or opt.get("id") == cf_value: cf_value = opt.get("name", cf_value) break custom_fields[cf_name] = cf_value if cf_name == task_type_field_name: task_type = str(cf_value) if cf_value else "" status_name = data.get("status", {}).get("status", "unknown") raw_due = data.get("due_date") due_date = str(raw_due) if raw_due else "" tags = [tag["name"] for tag in data.get("tags", [])] # Folder name from list -> folder if available folder_data = data.get("folder", {}) folder_name = folder_data.get("name", "") if folder_data else "" return cls( id=data["id"], name=data.get("name", ""), status=status_name.lower(), description=data.get("description", "") or "", task_type=task_type, url=data.get("url", ""), due_date=due_date, custom_fields=custom_fields, custom_fields_raw=custom_fields_raw, list_id=data.get("list", {}).get("id", ""), list_name=data.get("list", {}).get("name", ""), folder_name=folder_name, tags=tags, ) def get_field_value(self, field_name: str) -> Any: """Get a custom field value by name.""" return self.custom_fields.get(field_name) def has_xlsx_attachment(self) -> bool: """Check if this task has an .xlsx attachment.""" return any( a.get("title", "").lower().endswith(".xlsx") or a.get("url", "").lower().endswith(".xlsx") for a in self.attachments ) class ClickUpClient: """ClickUp REST API v2 client for the runner.""" def __init__(self, api_token: str, task_type_field_name: str = "Work Category"): self._token = api_token self._task_type_field_name = task_type_field_name self._client = httpx.Client( base_url=BASE_URL, headers={ "Authorization": api_token, "Content-Type": "application/json", }, timeout=30.0, ) # Cache: field_name -> {field_id, options} per list_id self._field_cache: dict[str, dict[str, Any]] = {} def close(self): self._client.close() # ── Retry helper ── @staticmethod def _retry(fn, max_attempts: int = 3, backoff: float = 2.0): """Retry on 5xx / transport errors with exponential backoff.""" last_exc: Exception | None = None for attempt in range(1, max_attempts + 1): try: return fn() except (httpx.TransportError, httpx.HTTPStatusError) as e: if ( isinstance(e, httpx.HTTPStatusError) and e.response.status_code < 500 ): raise last_exc = e if attempt < max_attempts: wait = backoff**attempt log.warning( "Retry %d/%d after %.1fs: %s", attempt, max_attempts, wait, e, ) time.sleep(wait) raise last_exc # type: ignore[misc] # ── Read ── def get_tasks( self, list_id: str, statuses: list[str] | None = None, due_date_lt: int | None = None, due_date_gt: int | None = None, include_closed: bool = False, ) -> list[ClickUpTask]: """Fetch tasks from a list with optional filters.""" params: dict[str, Any] = { "include_closed": "true" if include_closed else "false", "subtasks": "true", } if statuses: for s in statuses: params.setdefault("statuses[]", []) if isinstance(params["statuses[]"], list): params["statuses[]"].append(s) if due_date_lt is not None: params["due_date_lt"] = str(due_date_lt) if due_date_gt is not None: params["due_date_gt"] = str(due_date_gt) # httpx needs repeated params as list of tuples param_list = [] for k, v in params.items(): if isinstance(v, list): for item in v: param_list.append((k, item)) else: param_list.append((k, v)) resp = self._client.get(f"/list/{list_id}/task", params=param_list) resp.raise_for_status() tasks_data = resp.json().get("tasks", []) return [ ClickUpTask.from_api(t, self._task_type_field_name) for t in tasks_data ] def get_task(self, task_id: str) -> ClickUpTask: """Fetch a single task by ID.""" resp = self._client.get(f"/task/{task_id}") resp.raise_for_status() return ClickUpTask.from_api(resp.json(), self._task_type_field_name) def get_folders(self, space_id: str) -> list[dict]: """Return folders in a space with their lists.""" resp = self._client.get(f"/space/{space_id}/folder") resp.raise_for_status() folders = [] for f in resp.json().get("folders", []): lists = [ {"id": lst["id"], "name": lst["name"]} for lst in f.get("lists", []) ] folders.append({"id": f["id"], "name": f["name"], "lists": lists}) return folders def get_tasks_from_overall_lists( self, space_id: str, due_date_lt: int | None = None, ) -> list[ClickUpTask]: """Fetch tasks from all 'Overall' lists in each folder. Does NOT filter by status -- we need all tasks so we can check the Delegate to Claude checkbox ourselves. """ all_tasks: list[ClickUpTask] = [] overall_ids: list[str] = [] try: folders = self.get_folders(space_id) for folder in folders: for lst in folder["lists"]: if lst["name"].lower() == "overall": overall_ids.append(lst["id"]) except httpx.HTTPStatusError as e: log.warning("Failed to fetch folders for space %s: %s", space_id, e) return [] for list_id in overall_ids: try: tasks = self.get_tasks( list_id, due_date_lt=due_date_lt ) all_tasks.extend(tasks) except httpx.HTTPStatusError as e: log.warning("Failed to fetch tasks from list %s: %s", list_id, e) log.info( "Found %d tasks across %d Overall lists", len(all_tasks), len(overall_ids), ) return all_tasks def get_task_attachments(self, task_id: str) -> list[dict]: """Get attachments for a task. Returns list of dicts with keys: id, title, url, date, etc. NOTE: Requires ClickUp Business plan or higher. """ try: resp = self._client.get(f"/task/{task_id}/attachment") resp.raise_for_status() return resp.json().get("attachments", []) except httpx.HTTPStatusError as e: if e.response.status_code == 401: log.warning( "Attachment listing not available (may require Business plan)" ) else: log.warning( "Failed to get attachments for task %s: %s", task_id, e ) return [] def download_attachment(self, url: str, dest: Path) -> bool: """Download a ClickUp attachment to a local file. ClickUp attachment URLs are pre-signed S3 URLs that don't need auth headers, so we use a plain httpx request (not the API client). Returns True on success, False on failure. """ try: with httpx.stream("GET", url, follow_redirects=True, timeout=60.0) as resp: resp.raise_for_status() with open(dest, "wb") as f: for chunk in resp.iter_bytes(chunk_size=8192): f.write(chunk) log.info("Downloaded attachment to %s", dest) return True except Exception as e: log.warning("Failed to download attachment from %s: %s", url, e) return False def get_custom_fields(self, list_id: str) -> list[dict]: """Get custom field definitions for a list.""" try: resp = self._client.get(f"/list/{list_id}/field") resp.raise_for_status() return resp.json().get("fields", []) except httpx.HTTPStatusError as e: log.error("Failed to get custom fields for list %s: %s", list_id, e) return [] # ── Write ── def update_task_status(self, task_id: str, status: str) -> bool: """Update a task's status.""" try: def _call(): resp = self._client.put( f"/task/{task_id}", json={"status": status} ) resp.raise_for_status() return resp self._retry(_call) log.info("Updated task %s status to '%s'", task_id, status) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to update task %s status: %s", task_id, e) return False def add_comment(self, task_id: str, text: str) -> bool: """Add a comment to a task.""" try: def _call(): resp = self._client.post( f"/task/{task_id}/comment", json={"comment_text": text}, ) resp.raise_for_status() return resp self._retry(_call) log.info("Added comment to task %s", task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error("Failed to add comment to task %s: %s", task_id, e) return False def upload_attachment(self, task_id: str, file_path: str | Path) -> bool: """Upload a file attachment to a task. Uses module-level httpx.post() because the shared client sets Content-Type: application/json which conflicts with multipart. """ fp = Path(file_path) if not fp.exists(): log.warning("Attachment file not found: %s", fp) return False try: def _call(): with open(fp, "rb") as f: resp = httpx.post( f"{BASE_URL}/task/{task_id}/attachment", headers={"Authorization": self._token}, files={ "attachment": ( fp.name, f, "application/octet-stream", ) }, timeout=60.0, ) resp.raise_for_status() return resp self._retry(_call) log.info("Uploaded attachment %s to task %s", fp.name, task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.warning("Failed to upload attachment to task %s: %s", task_id, e) return False def set_custom_field_value( self, task_id: str, field_id: str, value: Any ) -> bool: """Set a custom field value by field ID.""" try: def _call(): resp = self._client.post( f"/task/{task_id}/field/{field_id}", json={"value": value}, ) resp.raise_for_status() return resp self._retry(_call) log.info("Set field %s on task %s", field_id, task_id) return True except (httpx.TransportError, httpx.HTTPStatusError) as e: log.error( "Failed to set field %s on task %s: %s", field_id, task_id, e ) return False def _resolve_field( self, list_id: str, field_name: str ) -> dict[str, Any] | None: """Look up a custom field's ID and options by name. Returns {"field_id": "...", "type": "...", "options": {...}} or None. Caches per list_id:field_name. """ cache_key = f"{list_id}:{field_name}" if cache_key in self._field_cache: return self._field_cache[cache_key] fields = self.get_custom_fields(list_id) for f in fields: if f.get("name") == field_name: options: dict[str, str] = {} if f.get("type") == "drop_down": for opt in f.get("type_config", {}).get("options", []): opt_name = opt.get("name", "") opt_id = opt.get("id", "") if opt_name and opt_id: options[opt_name] = opt_id result = { "field_id": f["id"], "type": f.get("type", ""), "options": options, } self._field_cache[cache_key] = result return result log.warning("Field '%s' not found in list %s", field_name, list_id) return None def set_field_by_name( self, task_id: str, list_id: str, field_name: str, value: Any ) -> bool: """Set a custom field by name, auto-resolving dropdown UUIDs. For dropdowns, value is matched against option names (case-insensitive). For checkboxes, value should be True/False (sent as "true"/"false"). """ info = self._resolve_field(list_id, field_name) if not info: return False resolved = value if info["type"] == "drop_down" and isinstance(value, str): for opt_name, opt_id in info["options"].items(): if opt_name.lower() == value.lower(): resolved = opt_id break else: log.warning( "Dropdown option '%s' not found for field '%s'", value, field_name, ) return False return self.set_custom_field_value(task_id, info["field_id"], resolved) # ── Convenience: checkbox operations ── def set_checkbox( self, task_id: str, list_id: str, field_name: str, checked: bool ) -> bool: """Set a checkbox custom field to checked (True) or unchecked (False).""" info = self._resolve_field(list_id, field_name) if not info: return False # ClickUp checkbox API expects "true" or "false" string return self.set_custom_field_value( task_id, info["field_id"], "true" if checked else "false" ) def is_checkbox_checked(self, task: ClickUpTask, field_name: str) -> bool: """Check if a checkbox field is checked on a task. ClickUp checkbox values come back as True/False or "true"/"false". """ val = task.custom_fields.get(field_name) if val is None: return False if isinstance(val, bool): return val if isinstance(val, str): return val.lower() == "true" return bool(val) # ── Convenience: stage operations ── def get_stage(self, task: ClickUpTask, field_name: str = "Stage") -> str: """Get the current stage value from a task.""" val = task.custom_fields.get(field_name) return str(val).lower().strip() if val else "" def set_stage( self, task_id: str, list_id: str, stage_value: str, field_name: str = "Stage", ) -> bool: """Set the Stage dropdown on a task.""" return self.set_field_by_name(task_id, list_id, field_name, stage_value)