Add clickup_runner module -- Phase 1: skeleton + ClickUp polling

New headless automation runner that replaces CheddahBot's multi-agent
system with a simpler design: poll ClickUp for tasks with "Delegate to
Claude" checkbox checked + due date <= today, route by task type + stage
through a skill map, and dispatch to Claude Code headless or AutoCora.

Module structure:
- config.py: env vars > YAML > defaults configuration
- clickup_client.py: ClickUp API client (adapted from cheddahbot/clickup.py)
  with checkbox, stage dropdown, and attachment operations
- skill_map.py: task_type + stage -> SkillRoute routing dictionary
  (Content Creation, On Page Optimization, Press Release, Link Building)
- state.py: minimal SQLite KV store + run log
- __main__.py: poll loop with graceful shutdown
- README.md: setup, config reference, skill map docs

Dispatch functions are stubs (Phase 2: Claude runner, Phase 3: AutoCora).
Includes system-design-spec.md that drives this rewrite.

68 new tests, all passing. Existing 347 tests unaffected.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
clickup-runner
PeninsulaInd 2026-03-30 09:14:22 -05:00
parent dcfabc2261
commit 74c1971f70
13 changed files with 2242 additions and 0 deletions

View File

@ -0,0 +1,148 @@
# ClickUp Runner
Headless background service that polls ClickUp for tasks with the
"Delegate to Claude" checkbox checked, routes them through a skill map
based on task type + stage, runs Claude Code headless, and posts results
back to ClickUp.
## Quick Start
```bash
# Set required env vars (or put them in .env at repo root)
export CLICKUP_API_TOKEN="pk_..."
export CLICKUP_SPACE_ID="..."
# Run the runner
uv run python -m clickup_runner
```
## How It Works
1. Every 720 seconds, polls all "Overall" lists in the ClickUp space
2. Finds tasks where:
- "Delegate to Claude" checkbox is checked
- Due date is today or earlier
3. Reads the task's Work Category and Stage fields
4. Looks up the skill route in `skill_map.py`
5. Dispatches to either:
- **AutoCora handler** (for `run_cora` stage): submits a Cora job to the NAS queue
- **Claude Code handler**: runs `claude -p` with the skill file as system prompt
6. On success: advances Stage, sets next status, posts comment, attaches output files
7. On error: sets Error checkbox, posts error comment with fix instructions
8. Always unchecks "Delegate to Claude" after processing
## Configuration
Config is loaded from `clickup_runner.yaml` at the repo root (optional),
with env var overrides.
### clickup_runner.yaml
```yaml
clickup:
space_id: "your_space_id"
task_type_field_name: "Work Category"
delegate_field_name: "Delegate to Claude"
stage_field_name: "Stage"
error_field_name: "Error"
ai_working_status: "ai working"
review_status: "review"
autocora:
jobs_dir: "//PennQnap1/SHARE1/AutoCora/jobs"
results_dir: "//PennQnap1/SHARE1/AutoCora/results"
xlsx_dir: "//PennQnap1/SHARE1/Cora72-for-macro"
poll_interval_seconds: 120
nas:
generated_dir: "//PennQnap1/SHARE1/generated"
runner:
poll_interval_seconds: 720
claude_timeout_seconds: 2700
max_turns_default: 10
```
### Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `CLICKUP_API_TOKEN` | Yes | ClickUp API token |
| `CLICKUP_SPACE_ID` | Yes | ClickUp space to poll |
| `NTFY_ERROR_TOPIC` | No | ntfy.sh topic for error notifications |
| `NTFY_SUCCESS_TOPIC` | No | ntfy.sh topic for success notifications |
## ClickUp Custom Fields Required
These must exist in your ClickUp space:
| Field | Type | Purpose |
|-------|------|---------|
| Delegate to Claude | Checkbox | Trigger -- checked = process this task |
| Stage | Dropdown | Pipeline position (run_cora, outline, draft, etc.) |
| Error | Checkbox | Flagged when processing fails |
| Work Category | Dropdown | Task type (Content Creation, Press Release, etc.) |
## Skill Map
The routing table lives in `skill_map.py`. Each task type has a sequence
of stages, and each stage maps to either an AutoCora job or a Claude Code
skill file.
### Content Creation
```
run_cora -> outline -> draft -> final
```
### On Page Optimization
```
run_cora -> outline -> draft -> hidden div -> final
```
### Press Release
```
draft -> final
```
### Link Building
```
run_cora -> build -> final
```
## Adding a New Task Type
1. Add an entry to `SKILL_MAP` in `skill_map.py`
2. Write the skill `.md` file(s) in `skills/`
3. Add the Work Category value in ClickUp
4. Add the Stage dropdown values in ClickUp
## Statuses
| Status | Owner | Meaning |
|--------|-------|---------|
| To Do | Nobody | Not started |
| In Progress | Human | Human is working on it |
| Needs Input | Human | Blocked, needs info |
| AI Working | Claude | Runner is processing |
| Review | Human | Output ready for human review |
| Client Review | Client | Sent to client |
| Complete | Nobody | Done |
## Logs
- Console output: INFO level
- File log: `logs/clickup_runner.log` (DEBUG level)
- Run history: `data/clickup_runner.db` (run_log table)
## Tests
```bash
# Unit tests only (no credentials needed)
uv run pytest tests/test_clickup_runner/ -m "not integration"
# Full suite (needs CLICKUP_API_TOKEN)
uv run pytest tests/test_clickup_runner/
# Specific test
uv run pytest tests/test_clickup_runner/test_skill_map.py -v
```

View File

@ -0,0 +1 @@
"""ClickUp + Claude Code automation runner."""

View File

@ -0,0 +1,316 @@
"""ClickUp + Claude Code automation runner -- entry point.
Usage:
uv run python -m clickup_runner
"""
from __future__ import annotations
import logging
import signal
import sys
import time
from datetime import datetime, timezone
from .clickup_client import ClickUpClient, ClickUpTask
from .config import Config, load_config
from .skill_map import get_route, get_supported_task_types, get_valid_stages
from .state import StateDB
log = logging.getLogger("clickup_runner")
# Flag for graceful shutdown
_shutdown = False
def _handle_signal(signum, frame):
global _shutdown
log.info("Received signal %d -- shutting down after current cycle", signum)
_shutdown = True
def _setup_logging():
"""Configure logging: console + file."""
fmt = logging.Formatter(
"[%(asctime)s] %(levelname)-7s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console = logging.StreamHandler(sys.stdout)
console.setFormatter(fmt)
console.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(console)
# File handler for persistent logs
try:
from pathlib import Path
log_dir = Path(__file__).resolve().parent.parent / "logs"
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / "clickup_runner.log", encoding="utf-8"
)
file_handler.setFormatter(fmt)
file_handler.setLevel(logging.DEBUG)
root.addHandler(file_handler)
except Exception as e:
log.warning("Could not set up file logging: %s", e)
def _due_date_cutoff_ms() -> int:
"""Return end-of-today as Unix milliseconds for due_date_lt filter."""
now = datetime.now(timezone.utc)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
return int(end_of_day.timestamp() * 1000)
def _is_due_today_or_earlier(task: ClickUpTask) -> bool:
"""Check if a task's due date is today or earlier."""
if not task.due_date:
return False
try:
due_ms = int(task.due_date)
cutoff_ms = _due_date_cutoff_ms()
return due_ms <= cutoff_ms
except (ValueError, TypeError):
return False
def poll_cycle(
client: ClickUpClient,
cfg: Config,
db: StateDB,
) -> int:
"""Run one poll cycle. Returns the number of tasks dispatched."""
space_id = cfg.clickup.space_id
if not space_id:
log.error("No space_id configured -- skipping poll cycle")
return 0
# Fetch all tasks from Overall lists with due date <= today
cutoff_ms = _due_date_cutoff_ms()
tasks = client.get_tasks_from_overall_lists(space_id, due_date_lt=cutoff_ms)
dispatched = 0
for task in tasks:
# 1. Check "Delegate to Claude" checkbox
if not client.is_checkbox_checked(task, cfg.clickup.delegate_field_name):
continue
# 2. Verify due date <= today
if not _is_due_today_or_earlier(task):
continue
# 3. Read task type and stage
task_type = task.task_type
stage = client.get_stage(task, cfg.clickup.stage_field_name)
log.info(
"Found delegated task: %s (id=%s, type=%s, stage=%s)",
task.name,
task.id,
task_type,
stage,
)
# 4. Look up skill route
if not task_type:
_handle_no_mapping(
client, cfg, task,
"Task has no Work Category set. "
"Set the Work Category field, then re-check Delegate to Claude.",
)
continue
if not stage:
_handle_no_mapping(
client, cfg, task,
"Task has no Stage set. "
"Valid stages for %s: %s. "
"Set the Stage field, then re-check Delegate to Claude."
% (task_type, ", ".join(get_valid_stages(task_type)) or "none"),
)
continue
# 5. Check for .xlsx attachment on run_cora stage
route = get_route(task_type, stage)
if route and route.handler == "autocora":
# If .xlsx is already attached, skip Cora and advance
attachments = client.get_task_attachments(task.id)
task.attachments = attachments
if task.has_xlsx_attachment():
log.info(
"Task %s has .xlsx attached -- skipping run_cora, advancing to %s",
task.id,
route.next_stage,
)
client.set_stage(
task.id,
task.list_id,
route.next_stage,
cfg.clickup.stage_field_name,
)
# Re-read stage and re-route
stage = route.next_stage
route = get_route(task_type, stage)
if route is None:
valid = get_valid_stages(task_type)
if not valid:
msg = (
"Task type '%s' is not supported. "
"Supported types: %s. "
"Fix the Work Category field, then re-check Delegate to Claude."
% (task_type, ", ".join(get_supported_task_types()))
)
else:
msg = (
"Stage '%s' is not valid for task type '%s'. "
"Valid stages: %s. "
"Fix the Stage field, then re-check Delegate to Claude."
% (stage, task_type, ", ".join(valid))
)
_handle_no_mapping(client, cfg, task, msg)
continue
# 6. Dispatch
log.info(
"Dispatching task %s: type=%s, stage=%s, handler=%s",
task.id,
task_type,
stage,
route.handler,
)
run_id = db.log_run_start(task.id, task.name, task_type, stage)
if route.handler == "autocora":
_dispatch_autocora(client, cfg, db, task, route, run_id)
else:
_dispatch_claude(client, cfg, db, task, route, run_id)
dispatched += 1
return dispatched
def _handle_no_mapping(
client: ClickUpClient,
cfg: Config,
task: ClickUpTask,
message: str,
):
"""Handle a task that can't be routed: post comment, set error, uncheck."""
comment = "[ERROR] Cannot process task\n--\n%s" % message
client.add_comment(task.id, comment)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, True
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
log.warning("Task %s: %s", task.id, message)
def _dispatch_autocora(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
route,
run_id: int,
):
"""Submit an AutoCora job for a task."""
# TODO: Phase 3 -- implement AutoCora job submission
log.info("AutoCora dispatch for task %s -- NOT YET IMPLEMENTED", task.id)
db.log_run_finish(run_id, "skipped", result="AutoCora not yet implemented")
# For now, post a comment and uncheck
client.add_comment(
task.id,
"[WARNING] AutoCora dispatch not yet implemented. "
"Attach the .xlsx manually and re-check Delegate to Claude.",
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
def _dispatch_claude(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
route,
run_id: int,
):
"""Run Claude Code headless for a task."""
# TODO: Phase 2 -- implement Claude Code runner
log.info("Claude dispatch for task %s -- NOT YET IMPLEMENTED", task.id)
db.log_run_finish(run_id, "skipped", result="Claude runner not yet implemented")
# For now, post a comment and uncheck
client.add_comment(
task.id,
"[WARNING] Claude Code runner not yet implemented. "
"This task was picked up but cannot be processed yet.",
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
def main():
_setup_logging()
log.info("ClickUp Runner starting up")
cfg = load_config()
if not cfg.clickup.api_token:
log.error("CLICKUP_API_TOKEN not set -- exiting")
sys.exit(1)
if not cfg.clickup.space_id:
log.error("CLICKUP_SPACE_ID not set -- exiting")
sys.exit(1)
client = ClickUpClient(
api_token=cfg.clickup.api_token,
task_type_field_name=cfg.clickup.task_type_field_name,
)
db = StateDB(cfg.db_path)
# Graceful shutdown on SIGINT/SIGTERM
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
log.info(
"Runner ready. Polling every %ds. Space: %s",
cfg.runner.poll_interval_seconds,
cfg.clickup.space_id,
)
try:
while not _shutdown:
try:
count = poll_cycle(client, cfg, db)
if count:
log.info("Dispatched %d task(s) this cycle", count)
except Exception:
log.exception("Error in poll cycle")
# Sleep in small increments so we can catch shutdown signal
for _ in range(cfg.runner.poll_interval_seconds):
if _shutdown:
break
time.sleep(1)
finally:
client.close()
log.info("ClickUp Runner shut down")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,488 @@
"""ClickUp REST API client for the runner.
Adapted from cheddahbot/clickup.py -- stripped to what the runner needs,
with additions for checkbox, stage dropdown, and attachment operations.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import httpx
log = logging.getLogger(__name__)
BASE_URL = "https://api.clickup.com/api/v2"
@dataclass
class ClickUpTask:
"""Lightweight representation of a ClickUp task."""
id: str
name: str
status: str
description: str = ""
task_type: str = "" # Work Category value
url: str = ""
due_date: str = "" # Unix-ms timestamp string, or ""
custom_fields: dict[str, Any] = field(default_factory=dict)
custom_fields_raw: list[dict] = field(default_factory=list)
list_id: str = ""
list_name: str = ""
folder_name: str = ""
tags: list[str] = field(default_factory=list)
attachments: list[dict] = field(default_factory=list)
@classmethod
def from_api(
cls, data: dict, task_type_field_name: str = "Work Category"
) -> ClickUpTask:
"""Parse a task from the ClickUp API response."""
custom_fields: dict[str, Any] = {}
custom_fields_raw = data.get("custom_fields", [])
task_type = ""
for cf in custom_fields_raw:
cf_name = cf.get("name", "")
cf_value = cf.get("value")
# Resolve dropdown type_config to label
if cf.get("type") == "drop_down" and cf_value is not None:
options = cf.get("type_config", {}).get("options", [])
order_index = cf_value if isinstance(cf_value, int) else None
for opt in options:
if (
order_index is not None
and opt.get("orderindex") == order_index
) or opt.get("id") == cf_value:
cf_value = opt.get("name", cf_value)
break
custom_fields[cf_name] = cf_value
if cf_name == task_type_field_name:
task_type = str(cf_value) if cf_value else ""
status_name = data.get("status", {}).get("status", "unknown")
raw_due = data.get("due_date")
due_date = str(raw_due) if raw_due else ""
tags = [tag["name"] for tag in data.get("tags", [])]
# Folder name from list -> folder if available
folder_data = data.get("folder", {})
folder_name = folder_data.get("name", "") if folder_data else ""
return cls(
id=data["id"],
name=data.get("name", ""),
status=status_name.lower(),
description=data.get("description", "") or "",
task_type=task_type,
url=data.get("url", ""),
due_date=due_date,
custom_fields=custom_fields,
custom_fields_raw=custom_fields_raw,
list_id=data.get("list", {}).get("id", ""),
list_name=data.get("list", {}).get("name", ""),
folder_name=folder_name,
tags=tags,
)
def get_field_value(self, field_name: str) -> Any:
"""Get a custom field value by name."""
return self.custom_fields.get(field_name)
def has_xlsx_attachment(self) -> bool:
"""Check if this task has an .xlsx attachment."""
return any(
a.get("title", "").lower().endswith(".xlsx")
or a.get("url", "").lower().endswith(".xlsx")
for a in self.attachments
)
class ClickUpClient:
"""ClickUp REST API v2 client for the runner."""
def __init__(self, api_token: str, task_type_field_name: str = "Work Category"):
self._token = api_token
self._task_type_field_name = task_type_field_name
self._client = httpx.Client(
base_url=BASE_URL,
headers={
"Authorization": api_token,
"Content-Type": "application/json",
},
timeout=30.0,
)
# Cache: field_name -> {field_id, options} per list_id
self._field_cache: dict[str, dict[str, Any]] = {}
def close(self):
self._client.close()
# ── Retry helper ──
@staticmethod
def _retry(fn, max_attempts: int = 3, backoff: float = 2.0):
"""Retry on 5xx / transport errors with exponential backoff."""
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return fn()
except (httpx.TransportError, httpx.HTTPStatusError) as e:
if (
isinstance(e, httpx.HTTPStatusError)
and e.response.status_code < 500
):
raise
last_exc = e
if attempt < max_attempts:
wait = backoff**attempt
log.warning(
"Retry %d/%d after %.1fs: %s",
attempt,
max_attempts,
wait,
e,
)
time.sleep(wait)
raise last_exc # type: ignore[misc]
# ── Read ──
def get_tasks(
self,
list_id: str,
statuses: list[str] | None = None,
due_date_lt: int | None = None,
due_date_gt: int | None = None,
include_closed: bool = False,
) -> list[ClickUpTask]:
"""Fetch tasks from a list with optional filters."""
params: dict[str, Any] = {
"include_closed": "true" if include_closed else "false",
"subtasks": "true",
}
if statuses:
for s in statuses:
params.setdefault("statuses[]", [])
if isinstance(params["statuses[]"], list):
params["statuses[]"].append(s)
if due_date_lt is not None:
params["due_date_lt"] = str(due_date_lt)
if due_date_gt is not None:
params["due_date_gt"] = str(due_date_gt)
# httpx needs repeated params as list of tuples
param_list = []
for k, v in params.items():
if isinstance(v, list):
for item in v:
param_list.append((k, item))
else:
param_list.append((k, v))
resp = self._client.get(f"/list/{list_id}/task", params=param_list)
resp.raise_for_status()
tasks_data = resp.json().get("tasks", [])
return [
ClickUpTask.from_api(t, self._task_type_field_name) for t in tasks_data
]
def get_task(self, task_id: str) -> ClickUpTask:
"""Fetch a single task by ID."""
resp = self._client.get(f"/task/{task_id}")
resp.raise_for_status()
return ClickUpTask.from_api(resp.json(), self._task_type_field_name)
def get_folders(self, space_id: str) -> list[dict]:
"""Return folders in a space with their lists."""
resp = self._client.get(f"/space/{space_id}/folder")
resp.raise_for_status()
folders = []
for f in resp.json().get("folders", []):
lists = [
{"id": lst["id"], "name": lst["name"]} for lst in f.get("lists", [])
]
folders.append({"id": f["id"], "name": f["name"], "lists": lists})
return folders
def get_tasks_from_overall_lists(
self,
space_id: str,
due_date_lt: int | None = None,
) -> list[ClickUpTask]:
"""Fetch tasks from all 'Overall' lists in each folder.
Does NOT filter by status -- we need all tasks so we can check
the Delegate to Claude checkbox ourselves.
"""
all_tasks: list[ClickUpTask] = []
overall_ids: list[str] = []
try:
folders = self.get_folders(space_id)
for folder in folders:
for lst in folder["lists"]:
if lst["name"].lower() == "overall":
overall_ids.append(lst["id"])
except httpx.HTTPStatusError as e:
log.warning("Failed to fetch folders for space %s: %s", space_id, e)
return []
for list_id in overall_ids:
try:
tasks = self.get_tasks(
list_id, due_date_lt=due_date_lt
)
all_tasks.extend(tasks)
except httpx.HTTPStatusError as e:
log.warning("Failed to fetch tasks from list %s: %s", list_id, e)
log.info(
"Found %d tasks across %d Overall lists",
len(all_tasks),
len(overall_ids),
)
return all_tasks
def get_task_attachments(self, task_id: str) -> list[dict]:
"""Get attachments for a task.
Returns list of dicts with keys: id, title, url, date, etc.
NOTE: Requires ClickUp Business plan or higher.
"""
try:
resp = self._client.get(f"/task/{task_id}/attachment")
resp.raise_for_status()
return resp.json().get("attachments", [])
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
log.warning(
"Attachment listing not available (may require Business plan)"
)
else:
log.warning(
"Failed to get attachments for task %s: %s", task_id, e
)
return []
def get_custom_fields(self, list_id: str) -> list[dict]:
"""Get custom field definitions for a list."""
try:
resp = self._client.get(f"/list/{list_id}/field")
resp.raise_for_status()
return resp.json().get("fields", [])
except httpx.HTTPStatusError as e:
log.error("Failed to get custom fields for list %s: %s", list_id, e)
return []
# ── Write ──
def update_task_status(self, task_id: str, status: str) -> bool:
"""Update a task's status."""
try:
def _call():
resp = self._client.put(
f"/task/{task_id}", json={"status": status}
)
resp.raise_for_status()
return resp
self._retry(_call)
log.info("Updated task %s status to '%s'", task_id, status)
return True
except (httpx.TransportError, httpx.HTTPStatusError) as e:
log.error("Failed to update task %s status: %s", task_id, e)
return False
def add_comment(self, task_id: str, text: str) -> bool:
"""Add a comment to a task."""
try:
def _call():
resp = self._client.post(
f"/task/{task_id}/comment",
json={"comment_text": text},
)
resp.raise_for_status()
return resp
self._retry(_call)
log.info("Added comment to task %s", task_id)
return True
except (httpx.TransportError, httpx.HTTPStatusError) as e:
log.error("Failed to add comment to task %s: %s", task_id, e)
return False
def upload_attachment(self, task_id: str, file_path: str | Path) -> bool:
"""Upload a file attachment to a task.
Uses module-level httpx.post() because the shared client sets
Content-Type: application/json which conflicts with multipart.
"""
fp = Path(file_path)
if not fp.exists():
log.warning("Attachment file not found: %s", fp)
return False
try:
def _call():
with open(fp, "rb") as f:
resp = httpx.post(
f"{BASE_URL}/task/{task_id}/attachment",
headers={"Authorization": self._token},
files={
"attachment": (
fp.name,
f,
"application/octet-stream",
)
},
timeout=60.0,
)
resp.raise_for_status()
return resp
self._retry(_call)
log.info("Uploaded attachment %s to task %s", fp.name, task_id)
return True
except (httpx.TransportError, httpx.HTTPStatusError) as e:
log.warning("Failed to upload attachment to task %s: %s", task_id, e)
return False
def set_custom_field_value(
self, task_id: str, field_id: str, value: Any
) -> bool:
"""Set a custom field value by field ID."""
try:
def _call():
resp = self._client.post(
f"/task/{task_id}/field/{field_id}",
json={"value": value},
)
resp.raise_for_status()
return resp
self._retry(_call)
log.info("Set field %s on task %s", field_id, task_id)
return True
except (httpx.TransportError, httpx.HTTPStatusError) as e:
log.error(
"Failed to set field %s on task %s: %s", field_id, task_id, e
)
return False
def _resolve_field(
self, list_id: str, field_name: str
) -> dict[str, Any] | None:
"""Look up a custom field's ID and options by name.
Returns {"field_id": "...", "type": "...", "options": {...}} or None.
Caches per list_id:field_name.
"""
cache_key = f"{list_id}:{field_name}"
if cache_key in self._field_cache:
return self._field_cache[cache_key]
fields = self.get_custom_fields(list_id)
for f in fields:
if f.get("name") == field_name:
options: dict[str, str] = {}
if f.get("type") == "drop_down":
for opt in f.get("type_config", {}).get("options", []):
opt_name = opt.get("name", "")
opt_id = opt.get("id", "")
if opt_name and opt_id:
options[opt_name] = opt_id
result = {
"field_id": f["id"],
"type": f.get("type", ""),
"options": options,
}
self._field_cache[cache_key] = result
return result
log.warning("Field '%s' not found in list %s", field_name, list_id)
return None
def set_field_by_name(
self, task_id: str, list_id: str, field_name: str, value: Any
) -> bool:
"""Set a custom field by name, auto-resolving dropdown UUIDs.
For dropdowns, value is matched against option names (case-insensitive).
For checkboxes, value should be True/False (sent as "true"/"false").
"""
info = self._resolve_field(list_id, field_name)
if not info:
return False
resolved = value
if info["type"] == "drop_down" and isinstance(value, str):
for opt_name, opt_id in info["options"].items():
if opt_name.lower() == value.lower():
resolved = opt_id
break
else:
log.warning(
"Dropdown option '%s' not found for field '%s'",
value,
field_name,
)
return False
return self.set_custom_field_value(task_id, info["field_id"], resolved)
# ── Convenience: checkbox operations ──
def set_checkbox(
self, task_id: str, list_id: str, field_name: str, checked: bool
) -> bool:
"""Set a checkbox custom field to checked (True) or unchecked (False)."""
info = self._resolve_field(list_id, field_name)
if not info:
return False
# ClickUp checkbox API expects "true" or "false" string
return self.set_custom_field_value(
task_id, info["field_id"], "true" if checked else "false"
)
def is_checkbox_checked(self, task: ClickUpTask, field_name: str) -> bool:
"""Check if a checkbox field is checked on a task.
ClickUp checkbox values come back as True/False or "true"/"false".
"""
val = task.custom_fields.get(field_name)
if val is None:
return False
if isinstance(val, bool):
return val
if isinstance(val, str):
return val.lower() == "true"
return bool(val)
# ── Convenience: stage operations ──
def get_stage(self, task: ClickUpTask, field_name: str = "Stage") -> str:
"""Get the current stage value from a task."""
val = task.custom_fields.get(field_name)
return str(val).lower().strip() if val else ""
def set_stage(
self,
task_id: str,
list_id: str,
stage_value: str,
field_name: str = "Stage",
) -> bool:
"""Set the Stage dropdown on a task."""
return self.set_field_by_name(task_id, list_id, field_name, stage_value)

View File

@ -0,0 +1,121 @@
"""Configuration loader: env vars -> config.yaml -> defaults."""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
import yaml
from dotenv import load_dotenv
log = logging.getLogger(__name__)
ROOT_DIR = Path(__file__).resolve().parent.parent
load_dotenv(ROOT_DIR / ".env")
@dataclass
class ClickUpConfig:
api_token: str = ""
space_id: str = ""
task_type_field_name: str = "Work Category"
# Custom field names (must match ClickUp exactly)
delegate_field_name: str = "Delegate to Claude"
stage_field_name: str = "Stage"
error_field_name: str = "Error"
# Statuses
ai_working_status: str = "ai working"
review_status: str = "review"
client_review_status: str = "client review"
complete_status: str = "complete"
@dataclass
class AutoCoraConfig:
jobs_dir: str = "//PennQnap1/SHARE1/AutoCora/jobs"
results_dir: str = "//PennQnap1/SHARE1/AutoCora/results"
xlsx_dir: str = "//PennQnap1/SHARE1/Cora72-for-macro"
poll_interval_seconds: int = 120
@dataclass
class NASConfig:
generated_dir: str = "//PennQnap1/SHARE1/generated"
@dataclass
class RunnerConfig:
poll_interval_seconds: int = 720
claude_timeout_seconds: int = 2700 # 45 minutes
max_turns_default: int = 10
temp_dir: str = "" # empty = system temp
@dataclass
class NtfyConfig:
enabled: bool = False
server: str = "https://ntfy.sh"
error_topic: str = ""
success_topic: str = ""
@dataclass
class Config:
clickup: ClickUpConfig = field(default_factory=ClickUpConfig)
autocora: AutoCoraConfig = field(default_factory=AutoCoraConfig)
nas: NASConfig = field(default_factory=NASConfig)
runner: RunnerConfig = field(default_factory=RunnerConfig)
ntfy: NtfyConfig = field(default_factory=NtfyConfig)
# Derived paths
root_dir: Path = field(default_factory=lambda: ROOT_DIR)
skills_dir: Path = field(default_factory=lambda: ROOT_DIR / "skills")
db_path: Path = field(default_factory=lambda: ROOT_DIR / "data" / "clickup_runner.db")
def _apply_section(cfg_obj, data: dict):
"""Apply a dict of values to a dataclass instance, skipping unknown keys."""
for k, v in data.items():
if hasattr(cfg_obj, k):
setattr(cfg_obj, k, v)
def load_config(yaml_path: Path | None = None) -> Config:
"""Load config from env vars -> config.yaml -> defaults."""
cfg = Config()
# Load YAML if exists
if yaml_path is None:
yaml_path = ROOT_DIR / "clickup_runner.yaml"
if yaml_path.exists():
with open(yaml_path) as f:
data = yaml.safe_load(f) or {}
for section_name in ("clickup", "autocora", "nas", "runner", "ntfy"):
if section_name in data and isinstance(data[section_name], dict):
_apply_section(getattr(cfg, section_name), data[section_name])
# Env var overrides
if token := os.getenv("CLICKUP_API_TOKEN"):
cfg.clickup.api_token = token
if space := os.getenv("CLICKUP_SPACE_ID"):
cfg.clickup.space_id = space
# ntfy topics from env vars
if topic := os.getenv("NTFY_ERROR_TOPIC"):
cfg.ntfy.error_topic = topic
if topic := os.getenv("NTFY_SUCCESS_TOPIC"):
cfg.ntfy.success_topic = topic
# Validate required fields
if not cfg.clickup.api_token:
log.warning("CLICKUP_API_TOKEN not set -- runner will not be able to poll")
if not cfg.clickup.space_id:
log.warning("CLICKUP_SPACE_ID not set -- runner will not be able to poll")
# Ensure data dir exists
cfg.db_path.parent.mkdir(parents=True, exist_ok=True)
return cfg

View File

@ -0,0 +1,130 @@
"""Skill routing: task_type + stage -> skill configuration.
Each entry maps a (task_type, stage) pair to either:
- A Claude Code skill (skill_file, tools, max_turns)
- An AutoCora handler (handler="autocora")
To add a new task type: add an entry here + write the skill .md file.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class SkillRoute:
"""One step in a task's pipeline."""
next_stage: str
next_status: str
handler: str = "claude" # "claude" or "autocora"
skill_file: str = "" # path relative to skills_dir
tools: str = "" # comma-separated Claude Code --allowedTools
max_turns: int = 10
# Tools commonly needed for content work
_CONTENT_TOOLS = "Read,Edit,Write,Bash,Glob,Grep,WebFetch,WebSearch"
_LINK_TOOLS = "Read,Edit,Write,Bash,Glob,Grep"
SKILL_MAP: dict[str, dict[str, SkillRoute]] = {
"Content Creation": {
"run_cora": SkillRoute(
handler="autocora",
next_stage="outline",
next_status="review",
),
"outline": SkillRoute(
skill_file="content_outline.md",
next_stage="draft",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
),
"draft": SkillRoute(
skill_file="content_draft.md",
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=20,
),
},
"On Page Optimization": {
"run_cora": SkillRoute(
handler="autocora",
next_stage="outline",
next_status="review",
),
"outline": SkillRoute(
skill_file="content_outline.md",
next_stage="draft",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
),
"draft": SkillRoute(
skill_file="content_draft.md",
next_stage="hidden div",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=20,
),
"hidden div": SkillRoute(
skill_file="content_hidden_div.md",
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=10,
),
},
"Press Release": {
"draft": SkillRoute(
skill_file="press_release_prompt.md",
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
),
},
"Link Building": {
"run_cora": SkillRoute(
handler="autocora",
next_stage="build",
next_status="review",
),
"build": SkillRoute(
skill_file="linkbuilding.md",
next_stage="final",
next_status="review",
tools=_LINK_TOOLS,
max_turns=10,
),
},
}
def get_route(task_type: str, stage: str) -> SkillRoute | None:
"""Look up the skill route for a task type + stage.
Returns None if no mapping exists.
"""
type_routes = SKILL_MAP.get(task_type)
if not type_routes:
return None
return type_routes.get(stage.lower().strip())
def get_valid_stages(task_type: str) -> list[str]:
"""Return the list of valid stage names for a task type."""
type_routes = SKILL_MAP.get(task_type)
if not type_routes:
return []
return list(type_routes.keys())
def get_supported_task_types() -> list[str]:
"""Return all supported task type names."""
return list(SKILL_MAP.keys())

View File

@ -0,0 +1,135 @@
"""Minimal SQLite persistence for the runner.
Just a KV store for tracking processed tasks and AutoCora jobs,
plus a run log for auditing.
"""
from __future__ import annotations
import json
import sqlite3
import threading
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
class StateDB:
"""Thread-safe SQLite KV store + run log."""
def __init__(self, db_path: Path):
self._path = db_path
self._local = threading.local()
self._init_schema()
@property
def _conn(self) -> sqlite3.Connection:
if not hasattr(self._local, "conn"):
self._local.conn = sqlite3.connect(str(self._path))
self._local.conn.row_factory = sqlite3.Row
self._local.conn.execute("PRAGMA journal_mode=WAL")
return self._local.conn
def _init_schema(self):
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS run_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
task_name TEXT NOT NULL,
task_type TEXT NOT NULL,
stage TEXT NOT NULL,
status TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
result TEXT,
error TEXT
);
CREATE INDEX IF NOT EXISTS idx_run_log_task
ON run_log(task_id, started_at);
""")
self._conn.commit()
# ── KV Store ──
def kv_set(self, key: str, value: str):
self._conn.execute(
"INSERT OR REPLACE INTO kv_store (key, value) VALUES (?, ?)",
(key, value),
)
self._conn.commit()
def kv_get(self, key: str) -> str | None:
row = self._conn.execute(
"SELECT value FROM kv_store WHERE key = ?", (key,)
).fetchone()
return row["value"] if row else None
def kv_set_json(self, key: str, data: Any):
self.kv_set(key, json.dumps(data))
def kv_get_json(self, key: str) -> Any | None:
raw = self.kv_get(key)
if raw is None:
return None
return json.loads(raw)
def kv_scan(self, prefix: str) -> list[tuple[str, str]]:
"""Return all KV pairs where key starts with prefix."""
rows = self._conn.execute(
"SELECT key, value FROM kv_store WHERE key LIKE ?",
(prefix + "%",),
).fetchall()
return [(r["key"], r["value"]) for r in rows]
def kv_delete(self, key: str):
self._conn.execute("DELETE FROM kv_store WHERE key = ?", (key,))
self._conn.commit()
# ── Run Log ──
def log_run_start(
self,
task_id: str,
task_name: str,
task_type: str,
stage: str,
) -> int:
"""Log the start of a task run. Returns the run log ID."""
now = _now()
cur = self._conn.execute(
"""INSERT INTO run_log
(task_id, task_name, task_type, stage, status, started_at)
VALUES (?, ?, ?, ?, 'running', ?)""",
(task_id, task_name, task_type, stage, now),
)
self._conn.commit()
return cur.lastrowid # type: ignore[return-value]
def log_run_finish(
self, run_id: int, status: str, result: str | None = None, error: str | None = None
):
"""Update a run log entry with the outcome."""
now = _now()
self._conn.execute(
"""UPDATE run_log
SET status = ?, finished_at = ?, result = ?, error = ?
WHERE id = ?""",
(status, now, result, error, run_id),
)
self._conn.commit()
def get_recent_runs(self, limit: int = 20) -> list[dict]:
"""Get the most recent run log entries."""
rows = self._conn.execute(
"SELECT * FROM run_log ORDER BY started_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def _now() -> str:
return datetime.now(UTC).isoformat()

View File

@ -0,0 +1,226 @@
# ClickUp + Claude Code Automation System -- Design Spec
## Overview
This document describes a system that polls ClickUp for tasks triggered by a
checkbox, routes them to the correct Claude Code skill based on task type and
stage, runs Claude Code in headless mode, posts results back to ClickUp, and
advances the task through its lifecycle.
The user has existing code that should be refactored or replaced to match this
design. Review the existing code and determine what can be reused vs rewritten.
---
## 1. ClickUp Status System (replaces the current 14-status setup)
The old statuses are confusing because they encode WHAT is happening instead of
WHO OWNS THE TASK. The new system has 7 statuses organized by ownership:
| Status | Owner | Replaces |
|----------------|----------------|-----------------------------------------------------|
| To Do | Nobody | To Do |
| In Progress | Human | In Progress, After Client Feedback |
| Needs Input | Human (blocked)| Needs Input |
| AI Working | Claude Code | Automation Underway, Running CORA |
| Review | Human | Outline Review, Outline Approved, PR Needs Review, Internal Review |
| Client Review | Client | Client Review |
| Complete | Nobody | Ready to Use, Complete |
Key decisions:
- "After Client Feedback" is just "In Progress" again -- same owner, same state.
- "Error" is NOT a status. It becomes a checkbox custom field that can be flagged
on any status.
- "Running CORA" and "Automation Underway" collapse into "AI Working" -- the
WHAT is tracked by the Stage field, not the status.
---
## 2. Custom Fields Required
These custom fields need to exist in ClickUp:
- **Run Claude** (checkbox) -- when checked, the poller picks up the task and
spawns a Claude Code session. Gets unchecked after processing so it does not
re-trigger.
- **Stage** (dropdown) -- tracks where in the content lifecycle the task is.
Values: report, outline, draft, final. This is independent of status.
"Review" + Stage:Outline = the old "Outline Review".
"AI Working" + Stage:Draft = AI is writing the draft.
- **Error** (checkbox) -- flagged when Claude Code errors out. Can happen at any
status. Not a status itself.
---
## 3. Skill Routing (task type + stage -> skill)
When a task hits "AI Working", the script looks at the task type AND the current
stage to decide which skill to load. The skill file contents get passed to Claude
Code via --append-system-prompt. After AI finishes, the stage advances and the
task moves to the appropriate next status.
The routing is defined in a SKILL_MAP dictionary:
```
task_type -> stage -> {
skill_file: path to SKILL.md
next_stage: what stage to set after AI finishes
next_status: where the task goes after AI finishes
tools: which Claude Code tools to allow
}
```
Example for "content" task type:
```
content:
report -> skill: make_outline -> next: stage=outline, status=review
outline -> skill: make_content -> next: stage=draft, status=review
draft -> skill: finalize -> next: stage=final, status=review or client review
```
The user has multiple task types (content, data_report, client_deliverable).
Each has its own stage progression and skill chain.
Skills are NOT invoked via slash commands (those only work in interactive mode).
Instead, the SKILL.md file is read from disk and passed as instructions:
```
claude -p "Task prompt here" \
--append-system-prompt "$(cat /path/to/SKILL.md)" \
--output-format json \
--allowedTools "Read,Edit,Bash" \
--max-turns 10
```
---
## 4. Multi-List Polling
The user has 20+ folders in one ClickUp space, each containing a list named
"Overall". The script needs to poll across all of them.
The approach is a hardcoded dictionary mapping folder names to list IDs:
```python
LIST_IDS = {
"folder_a": "list_id_1",
"folder_b": "list_id_2",
# ... 20+ entries
}
```
The poller iterates through all lists each cycle, collecting any tasks where the
"Run Claude" checkbox is checked. Each task gets tagged with its source folder
name for logging. If one list fails to poll, the error is logged and the poller
continues with the remaining lists.
At 60-second intervals with 20+ lists, this is roughly 20 requests per minute,
well within ClickUp's 100/min rate limit.
---
## 5. Claude Code Headless Configuration
Key flags and decisions:
- **Do NOT use --bare** -- the runner operates on the user's own machine, so
CLAUDE.md project instructions, MCP servers from .mcp.json, and local config
should all load automatically.
- Use **--mcp-config** if specific MCP servers are needed per-task beyond what
is in the project config.
- Use **-p** with the task prompt for non-interactive mode.
- Use **--append-system-prompt** to inject skill instructions.
- Use **--output-format json** to get structured results with session metadata.
- Use **--max-turns** to cap cost and runtime (default 10).
- Use **--allowedTools** scoped per stage. Available built-in tools:
Read, Edit, MultiEdit, Write, Bash, Glob, Grep, WebFetch, WebSearch,
NotebookEdit, TodoWrite.
- Bash can be scoped: Bash(git:*), Bash(python:*), Bash(npm test), etc.
- MCP tools use the naming pattern: mcp__{server_name}__{tool_name}
Allow all from a server with: mcp__servername__*
---
## 6. Workflow: What Happens When a Task Is Triggered
1. Poller finds a task with "Run Claude" checkbox checked.
2. Script reads the task type and current stage.
3. Looks up the skill routing in SKILL_MAP.
4. If no mapping found, posts a warning comment and unchecks the box.
5. Sets task status to "AI Working".
6. Loads the skill file from disk.
7. Builds a prompt from the task name and description.
8. Runs Claude Code headless with the skill as system prompt.
9. On success:
- Advances the Stage to the next value.
- Sets the status to the next value (usually "Review").
- Posts the result as a task comment.
10. On error:
- Flags the Error checkbox.
- Sets status to "Review" (so the human sees it).
- Posts the error as a task comment.
11. Always unchecks "Run Claude" so it does not re-trigger.
---
## 7. The Typical Content Workflow End-to-End
```
[To Do]
|
v
[In Progress] -- human runs CORA report (java program, already handled)
|
| (human checks "Run Claude", sets stage to "report")
v
[AI Working] -- skill: make_outline, stage: report
|
| (AI finishes, stage -> outline, status -> review)
v
[Review] -- human checks the outline
|
| (human approves: checks "Run Claude")
v
[AI Working] -- skill: make_content, stage: outline
|
| (AI finishes, stage -> draft, status -> review)
v
[Review] -- human checks the draft
|
| (human approves: checks "Run Claude" OR does manual edits first)
v
[AI Working] -- skill: finalize, stage: draft
|
| (AI finishes, stage -> final, status -> client review or complete)
v
[Client Review] or [Complete]
|
| (if client sends revisions)
v
[In Progress] -- human works on revisions (this is NOT "After Client Feedback",
it is just In Progress again)
```
---
## 8. Terminal Compatibility
All output strings must be pure ASCII. No emojis, no unicode arrows, no em
dashes. Use [OK], [ERROR], [WARNING] as prefixes in comments and logs. Use ->
instead of arrows. Use -- instead of em dashes.
---
## 9. File Structure
The reference implementation is in clickup_claude_runner.py. Key sections:
- CONFIG: tokens, list IDs, custom field IDs, repo path
- SKILL_MAP: task type + stage -> skill routing
- ClickUp API helpers: get tasks, update status, set fields, post comments
- Claude Code runner: load skill, build command, run subprocess, parse output
- Main loop: poll, process, sleep
The user's existing code may have different structure. Evaluate what can be
kept vs what should be replaced to match this design.

View File

@ -0,0 +1,341 @@
"""Tests for clickup_runner.clickup_client.
Unit tests use respx to mock HTTP. Integration tests hit the real API.
"""
import pytest
import respx
import httpx
from clickup_runner.clickup_client import ClickUpClient, ClickUpTask, BASE_URL
# ── Fixtures ──
@pytest.fixture
def sample_task_data():
"""A realistic ClickUp API task response."""
return {
"id": "task_001",
"name": "Plumber SEO Page - Miami",
"status": {"status": "to do"},
"description": "Write optimized content for plumber services in Miami",
"url": "https://app.clickup.com/t/task_001",
"due_date": "1711929600000", # some timestamp
"list": {"id": "list_100", "name": "Overall"},
"folder": {"id": "fold_1", "name": "Acme Plumbing"},
"tags": [{"name": "mar26"}, {"name": "content"}],
"custom_fields": [
{
"id": "cf_wc",
"name": "Work Category",
"type": "drop_down",
"value": "opt_cc",
"type_config": {
"options": [
{"id": "opt_cc", "name": "Content Creation", "orderindex": 0},
{"id": "opt_pr", "name": "Press Release", "orderindex": 1},
{"id": "opt_lb", "name": "Link Building", "orderindex": 2},
]
},
},
{
"id": "cf_stage",
"name": "Stage",
"type": "drop_down",
"value": "opt_outline",
"type_config": {
"options": [
{"id": "opt_runcora", "name": "run_cora", "orderindex": 0},
{"id": "opt_outline", "name": "outline", "orderindex": 1},
{"id": "opt_draft", "name": "draft", "orderindex": 2},
]
},
},
{
"id": "cf_delegate",
"name": "Delegate to Claude",
"type": "checkbox",
"value": True,
},
{
"id": "cf_error",
"name": "Error",
"type": "checkbox",
"value": False,
},
{
"id": "cf_keyword",
"name": "Keyword",
"type": "short_text",
"value": "plumber miami",
},
{
"id": "cf_imsurl",
"name": "IMSURL",
"type": "url",
"value": "https://acmeplumbing.com/miami",
},
],
}
@pytest.fixture
def sample_task(sample_task_data):
return ClickUpTask.from_api(sample_task_data)
# ── ClickUpTask parsing tests ──
class TestClickUpTaskFromApi:
def test_basic_fields(self, sample_task):
assert sample_task.id == "task_001"
assert sample_task.name == "Plumber SEO Page - Miami"
assert sample_task.status == "to do"
assert sample_task.list_id == "list_100"
assert sample_task.folder_name == "Acme Plumbing"
def test_dropdown_resolved_to_label(self, sample_task):
assert sample_task.task_type == "Content Creation"
assert sample_task.custom_fields["Stage"] == "outline"
def test_checkbox_fields(self, sample_task):
assert sample_task.custom_fields["Delegate to Claude"] is True
assert sample_task.custom_fields["Error"] is False
def test_text_fields(self, sample_task):
assert sample_task.custom_fields["Keyword"] == "plumber miami"
assert sample_task.custom_fields["IMSURL"] == "https://acmeplumbing.com/miami"
def test_tags(self, sample_task):
assert "mar26" in sample_task.tags
assert "content" in sample_task.tags
def test_due_date(self, sample_task):
assert sample_task.due_date == "1711929600000"
def test_missing_due_date(self, sample_task_data):
sample_task_data["due_date"] = None
task = ClickUpTask.from_api(sample_task_data)
assert task.due_date == ""
def test_missing_custom_fields(self):
task = ClickUpTask.from_api({"id": "t1", "name": "test"})
assert task.task_type == ""
assert task.custom_fields == {}
def test_unknown_dropdown_value_kept_as_is(self, sample_task_data):
"""If dropdown value doesn't match any option, keep raw value."""
sample_task_data["custom_fields"][0]["value"] = "unknown_opt_id"
task = ClickUpTask.from_api(sample_task_data)
assert task.task_type == "unknown_opt_id"
class TestClickUpTaskHelpers:
def test_get_field_value(self, sample_task):
assert sample_task.get_field_value("Keyword") == "plumber miami"
assert sample_task.get_field_value("Nonexistent") is None
def test_has_xlsx_attachment_false_when_empty(self, sample_task):
sample_task.attachments = []
assert not sample_task.has_xlsx_attachment()
def test_has_xlsx_attachment_true(self, sample_task):
sample_task.attachments = [
{"title": "report.xlsx", "url": "https://..."}
]
assert sample_task.has_xlsx_attachment()
def test_has_xlsx_attachment_case_insensitive(self, sample_task):
sample_task.attachments = [
{"title": "Report.XLSX", "url": "https://..."}
]
assert sample_task.has_xlsx_attachment()
# ── ClickUpClient tests (respx mocked) ──
class TestClientCheckbox:
def test_is_checkbox_checked_true(self, sample_task):
client = ClickUpClient(api_token="fake")
assert client.is_checkbox_checked(sample_task, "Delegate to Claude")
def test_is_checkbox_checked_false(self, sample_task):
client = ClickUpClient(api_token="fake")
assert not client.is_checkbox_checked(sample_task, "Error")
def test_is_checkbox_checked_missing_field(self, sample_task):
client = ClickUpClient(api_token="fake")
assert not client.is_checkbox_checked(sample_task, "Nonexistent")
def test_is_checkbox_checked_string_true(self, sample_task):
client = ClickUpClient(api_token="fake")
sample_task.custom_fields["Delegate to Claude"] = "true"
assert client.is_checkbox_checked(sample_task, "Delegate to Claude")
def test_is_checkbox_checked_string_false(self, sample_task):
client = ClickUpClient(api_token="fake")
sample_task.custom_fields["Delegate to Claude"] = "false"
assert not client.is_checkbox_checked(sample_task, "Delegate to Claude")
class TestClientStage:
def test_get_stage(self, sample_task):
client = ClickUpClient(api_token="fake")
assert client.get_stage(sample_task) == "outline"
def test_get_stage_empty(self):
client = ClickUpClient(api_token="fake")
task = ClickUpTask(id="t1", name="test", status="to do")
assert client.get_stage(task) == ""
def test_get_stage_custom_field_name(self, sample_task):
client = ClickUpClient(api_token="fake")
sample_task.custom_fields["Custom Stage"] = "DRAFT"
assert client.get_stage(sample_task, field_name="Custom Stage") == "draft"
@respx.mock
class TestClientHTTP:
def test_get_task(self):
task_data = {
"id": "t1",
"name": "Test",
"status": {"status": "to do"},
}
respx.get(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(200, json=task_data)
)
client = ClickUpClient(api_token="fake")
task = client.get_task("t1")
assert task.id == "t1"
assert task.name == "Test"
client.close()
def test_update_task_status(self):
respx.put(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(200, json={})
)
client = ClickUpClient(api_token="fake")
assert client.update_task_status("t1", "ai working") is True
client.close()
def test_update_task_status_failure(self):
respx.put(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(404, json={"err": "not found"})
)
client = ClickUpClient(api_token="fake")
assert client.update_task_status("t1", "ai working") is False
client.close()
def test_add_comment(self):
respx.post(f"{BASE_URL}/task/t1/comment").mock(
return_value=httpx.Response(200, json={})
)
client = ClickUpClient(api_token="fake")
assert client.add_comment("t1", "hello") is True
client.close()
def test_get_folders(self):
respx.get(f"{BASE_URL}/space/sp1/folder").mock(
return_value=httpx.Response(200, json={
"folders": [
{
"id": "f1",
"name": "Acme",
"lists": [
{"id": "l1", "name": "Overall"},
{"id": "l2", "name": "Archive"},
],
}
]
})
)
client = ClickUpClient(api_token="fake")
folders = client.get_folders("sp1")
assert len(folders) == 1
assert folders[0]["name"] == "Acme"
assert len(folders[0]["lists"]) == 2
client.close()
def test_get_tasks_from_overall_lists(self):
# Mock folders endpoint
respx.get(f"{BASE_URL}/space/sp1/folder").mock(
return_value=httpx.Response(200, json={
"folders": [
{
"id": "f1",
"name": "Client A",
"lists": [
{"id": "l1", "name": "Overall"},
{"id": "l2", "name": "Archive"},
],
},
{
"id": "f2",
"name": "Client B",
"lists": [
{"id": "l3", "name": "Overall"},
],
},
]
})
)
# Mock task endpoints -- only Overall lists should be hit
respx.get(f"{BASE_URL}/list/l1/task").mock(
return_value=httpx.Response(200, json={
"tasks": [
{"id": "t1", "name": "Task 1", "status": {"status": "to do"}},
]
})
)
respx.get(f"{BASE_URL}/list/l3/task").mock(
return_value=httpx.Response(200, json={
"tasks": [
{"id": "t2", "name": "Task 2", "status": {"status": "review"}},
]
})
)
# l2 (Archive) should NOT be called
client = ClickUpClient(api_token="fake")
tasks = client.get_tasks_from_overall_lists("sp1")
assert len(tasks) == 2
ids = {t.id for t in tasks}
assert ids == {"t1", "t2"}
client.close()
def test_retry_on_5xx(self):
route = respx.put(f"{BASE_URL}/task/t1")
route.side_effect = [
httpx.Response(500, json={"err": "internal"}),
httpx.Response(200, json={}),
]
client = ClickUpClient(api_token="fake")
assert client.update_task_status("t1", "ai working") is True
client.close()
def test_no_retry_on_4xx(self):
respx.put(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(400, json={"err": "bad request"})
)
client = ClickUpClient(api_token="fake")
assert client.update_task_status("t1", "ai working") is False
client.close()
def test_get_task_attachments(self):
respx.get(f"{BASE_URL}/task/t1/attachment").mock(
return_value=httpx.Response(200, json={
"attachments": [
{"id": "a1", "title": "report.xlsx", "url": "https://..."},
]
})
)
client = ClickUpClient(api_token="fake")
atts = client.get_task_attachments("t1")
assert len(atts) == 1
assert atts[0]["title"] == "report.xlsx"
client.close()

View File

@ -0,0 +1,104 @@
"""Tests for clickup_runner.config."""
import os
from pathlib import Path
import pytest
import yaml
from clickup_runner.config import Config, load_config
@pytest.fixture
def yaml_config(tmp_path):
"""Write a config YAML file and return its path."""
cfg = {
"clickup": {
"space_id": "space_from_yaml",
"delegate_field_name": "Run It",
"ai_working_status": "bot working",
},
"runner": {
"poll_interval_seconds": 300,
"claude_timeout_seconds": 1800,
},
"autocora": {
"jobs_dir": "/tmp/autocora/jobs",
"xlsx_dir": "/tmp/autocora/xlsx",
},
}
p = tmp_path / "clickup_runner.yaml"
p.write_text(yaml.dump(cfg))
return p
def test_defaults():
"""Config defaults are sensible without any YAML or env vars."""
cfg = Config()
assert cfg.clickup.delegate_field_name == "Delegate to Claude"
assert cfg.clickup.stage_field_name == "Stage"
assert cfg.clickup.error_field_name == "Error"
assert cfg.clickup.ai_working_status == "ai working"
assert cfg.runner.poll_interval_seconds == 720
assert cfg.runner.claude_timeout_seconds == 2700
assert cfg.autocora.poll_interval_seconds == 120
def test_yaml_overrides(yaml_config, monkeypatch):
"""YAML values override defaults."""
monkeypatch.delenv("CLICKUP_SPACE_ID", raising=False)
monkeypatch.delenv("CLICKUP_API_TOKEN", raising=False)
cfg = load_config(yaml_path=yaml_config)
assert cfg.clickup.space_id == "space_from_yaml"
assert cfg.clickup.delegate_field_name == "Run It"
assert cfg.clickup.ai_working_status == "bot working"
assert cfg.runner.poll_interval_seconds == 300
assert cfg.runner.claude_timeout_seconds == 1800
assert cfg.autocora.jobs_dir == "/tmp/autocora/jobs"
assert cfg.autocora.xlsx_dir == "/tmp/autocora/xlsx"
def test_env_overrides_yaml(yaml_config, monkeypatch):
"""Env vars take precedence over YAML."""
monkeypatch.setenv("CLICKUP_API_TOKEN", "pk_env_token")
monkeypatch.setenv("CLICKUP_SPACE_ID", "space_from_env")
cfg = load_config(yaml_path=yaml_config)
assert cfg.clickup.api_token == "pk_env_token"
# Env var overrides YAML
assert cfg.clickup.space_id == "space_from_env"
def test_missing_yaml_uses_defaults(tmp_path):
"""If YAML doesn't exist, all defaults are used."""
nonexistent = tmp_path / "nope.yaml"
cfg = load_config(yaml_path=nonexistent)
assert cfg.clickup.delegate_field_name == "Delegate to Claude"
assert cfg.runner.poll_interval_seconds == 720
def test_unknown_yaml_keys_ignored(tmp_path, monkeypatch):
"""Unknown keys in YAML don't cause errors."""
monkeypatch.delenv("CLICKUP_SPACE_ID", raising=False)
monkeypatch.delenv("CLICKUP_API_TOKEN", raising=False)
p = tmp_path / "clickup_runner.yaml"
p.write_text(yaml.dump({
"clickup": {"space_id": "test", "bogus_key": "whatever"},
"totally_unknown_section": {"foo": "bar"},
}))
cfg = load_config(yaml_path=p)
assert cfg.clickup.space_id == "test"
assert not hasattr(cfg.clickup, "bogus_key")
def test_db_path_parent_created(tmp_path, monkeypatch):
"""load_config ensures the DB parent directory exists."""
# Patch ROOT_DIR so db_path points inside tmp_path
import clickup_runner.config as config_mod
fake_root = tmp_path / "project"
fake_root.mkdir()
monkeypatch.setattr(config_mod, "ROOT_DIR", fake_root)
cfg = load_config(yaml_path=tmp_path / "nope.yaml")
# The default db_path parent should have been created
assert cfg.db_path.parent.exists()

View File

@ -0,0 +1,151 @@
"""Tests for clickup_runner.skill_map."""
import pytest
from clickup_runner.skill_map import (
SKILL_MAP,
SkillRoute,
get_route,
get_supported_task_types,
get_valid_stages,
)
class TestGetRoute:
def test_content_creation_run_cora(self):
route = get_route("Content Creation", "run_cora")
assert route is not None
assert route.handler == "autocora"
assert route.next_stage == "outline"
assert route.next_status == "review"
def test_content_creation_outline(self):
route = get_route("Content Creation", "outline")
assert route is not None
assert route.handler == "claude"
assert route.skill_file == "content_outline.md"
assert route.next_stage == "draft"
def test_content_creation_draft(self):
route = get_route("Content Creation", "draft")
assert route is not None
assert route.next_stage == "final"
assert route.max_turns == 20
def test_on_page_optimization_has_hidden_div(self):
route = get_route("On Page Optimization", "hidden div")
assert route is not None
assert route.skill_file == "content_hidden_div.md"
assert route.next_stage == "final"
def test_on_page_draft_goes_to_hidden_div(self):
route = get_route("On Page Optimization", "draft")
assert route is not None
assert route.next_stage == "hidden div"
def test_press_release_single_stage(self):
route = get_route("Press Release", "draft")
assert route is not None
assert route.skill_file == "press_release_prompt.md"
assert route.next_stage == "final"
assert route.next_status == "review"
def test_press_release_no_run_cora(self):
"""Press releases don't need Cora."""
route = get_route("Press Release", "run_cora")
assert route is None
def test_link_building_run_cora(self):
route = get_route("Link Building", "run_cora")
assert route is not None
assert route.handler == "autocora"
assert route.next_stage == "build"
def test_link_building_build(self):
route = get_route("Link Building", "build")
assert route is not None
assert route.handler == "claude"
assert route.skill_file == "linkbuilding.md"
def test_unknown_task_type_returns_none(self):
assert get_route("Banana Farming", "draft") is None
def test_unknown_stage_returns_none(self):
assert get_route("Content Creation", "nonexistent") is None
def test_stage_is_case_insensitive(self):
route = get_route("Content Creation", "RUN_CORA")
assert route is not None
assert route.handler == "autocora"
def test_stage_strips_whitespace(self):
route = get_route("Content Creation", " outline ")
assert route is not None
assert route.handler == "claude"
class TestGetValidStages:
def test_content_creation(self):
stages = get_valid_stages("Content Creation")
assert stages == ["run_cora", "outline", "draft"]
def test_on_page_optimization(self):
stages = get_valid_stages("On Page Optimization")
assert "hidden div" in stages
assert len(stages) == 4
def test_press_release(self):
stages = get_valid_stages("Press Release")
assert stages == ["draft"]
def test_link_building(self):
stages = get_valid_stages("Link Building")
assert stages == ["run_cora", "build"]
def test_unknown_type(self):
assert get_valid_stages("Nope") == []
class TestGetSupportedTaskTypes:
def test_returns_all_four(self):
types = get_supported_task_types()
assert "Content Creation" in types
assert "On Page Optimization" in types
assert "Press Release" in types
assert "Link Building" in types
assert len(types) == 4
class TestSkillRouteDataclass:
def test_defaults(self):
route = SkillRoute(next_stage="x", next_status="y")
assert route.handler == "claude"
assert route.skill_file == ""
assert route.tools == ""
assert route.max_turns == 10
def test_frozen(self):
route = SkillRoute(next_stage="x", next_status="y")
with pytest.raises(AttributeError):
route.next_stage = "z" # type: ignore[misc]
class TestAllRoutesHaveRequiredFields:
"""Every route in the map should be well-formed."""
@pytest.mark.parametrize(
"task_type,stage",
[
(tt, s)
for tt, stages in SKILL_MAP.items()
for s in stages
],
)
def test_route_has_required_fields(self, task_type, stage):
route = get_route(task_type, stage)
assert route is not None
assert route.next_stage, f"{task_type}/{stage} missing next_stage"
assert route.next_status, f"{task_type}/{stage} missing next_status"
if route.handler == "claude":
assert route.skill_file, f"{task_type}/{stage} missing skill_file"
assert route.tools, f"{task_type}/{stage} missing tools"

View File

@ -0,0 +1,81 @@
"""Tests for clickup_runner.state."""
import json
import pytest
from clickup_runner.state import StateDB
@pytest.fixture
def db(tmp_path):
return StateDB(tmp_path / "test.db")
class TestKVStore:
def test_set_and_get(self, db):
db.kv_set("key1", "value1")
assert db.kv_get("key1") == "value1"
def test_get_missing_key(self, db):
assert db.kv_get("nope") is None
def test_set_overwrites(self, db):
db.kv_set("key1", "v1")
db.kv_set("key1", "v2")
assert db.kv_get("key1") == "v2"
def test_delete(self, db):
db.kv_set("key1", "v1")
db.kv_delete("key1")
assert db.kv_get("key1") is None
def test_scan(self, db):
db.kv_set("autocora:job:kw1", "submitted")
db.kv_set("autocora:job:kw2", "submitted")
db.kv_set("other:key", "val")
results = db.kv_scan("autocora:job:")
assert len(results) == 2
keys = {k for k, _ in results}
assert keys == {"autocora:job:kw1", "autocora:job:kw2"}
def test_json_round_trip(self, db):
data = {"status": "submitted", "job_id": "job-001", "task_ids": ["t1", "t2"]}
db.kv_set_json("autocora:job:test", data)
result = db.kv_get_json("autocora:job:test")
assert result == data
def test_json_get_missing(self, db):
assert db.kv_get_json("nope") is None
class TestRunLog:
def test_log_start_and_finish(self, db):
run_id = db.log_run_start("t1", "Test Task", "Content Creation", "outline")
assert run_id > 0
db.log_run_finish(run_id, "completed", result="outline.md created")
runs = db.get_recent_runs(limit=1)
assert len(runs) == 1
assert runs[0]["task_id"] == "t1"
assert runs[0]["status"] == "completed"
assert runs[0]["result"] == "outline.md created"
assert runs[0]["error"] is None
def test_log_error(self, db):
run_id = db.log_run_start("t2", "Failing Task", "Press Release", "draft")
db.log_run_finish(run_id, "error", error="Claude Code exit code 1")
runs = db.get_recent_runs(limit=1)
assert runs[0]["status"] == "error"
assert "exit code 1" in runs[0]["error"]
def test_recent_runs_ordered(self, db):
r1 = db.log_run_start("t1", "First", "PR", "draft")
db.log_run_finish(r1, "completed")
r2 = db.log_run_start("t2", "Second", "CC", "outline")
db.log_run_finish(r2, "completed")
runs = db.get_recent_runs(limit=10)
# Most recent first
assert runs[0]["task_name"] == "Second"
assert runs[1]["task_name"] == "First"