Add Claude Code runner -- Phase 2: claude -p subprocess + ClickUp integration

Implements _dispatch_claude: reads skill files, builds prompts with task
context, runs claude -p as subprocess, uploads output files to ClickUp,
copies to NAS, advances stage/status, and posts structured error comments
on failure. Includes ntfy.sh notifications and generous max_turns defaults.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
clickup-runner
PeninsulaInd 2026-03-30 09:34:54 -05:00
parent 74c1971f70
commit b19e221b8f
6 changed files with 965 additions and 22 deletions

View File

@ -25,10 +25,11 @@ uv run python -m clickup_runner
3. Reads the task's Work Category and Stage fields
4. Looks up the skill route in `skill_map.py`
5. Dispatches to either:
- **AutoCora handler** (for `run_cora` stage): submits a Cora job to the NAS queue
- **Claude Code handler**: runs `claude -p` with the skill file as system prompt
6. On success: advances Stage, sets next status, posts comment, attaches output files
7. On error: sets Error checkbox, posts error comment with fix instructions
- **AutoCora handler** (for `run_cora` stage): submits a Cora job to the NAS queue *(Phase 3)*
- **Claude Code handler**: runs `claude -p` with the skill file + task context as prompt
6. On success: uploads output files as ClickUp attachments, copies to NAS (best-effort),
advances Stage, sets next status, posts summary comment
7. On error: sets Error checkbox, posts structured error comment (what failed, how to fix)
8. Always unchecks "Delegate to Claude" after processing
## Configuration
@ -128,6 +129,33 @@ run_cora -> build -> final
| Client Review | Client | Sent to client |
| Complete | Nobody | Done |
## Claude Code Runner (Phase 2)
When a task routes to a Claude handler, the runner:
1. Sets status to "AI Working"
2. Reads the skill `.md` file from `skills/`
3. Builds a prompt with skill instructions + task context:
- Task name, description, customer, target URL
- ClickUp task link
- Attached `.xlsx` Cora report URLs (if any)
- Instructions to write output files to the working directory
4. Runs `claude -p "<prompt>" --allowedTools "..." --max-turns N --permission-mode bypassPermissions --bare`
5. Collects all files Claude created in the temp working directory
6. Uploads files to ClickUp as attachments
7. Copies files to NAS at `//PennQnap1/SHARE1/generated/{customer}/` (best-effort)
8. Advances Stage, updates status, posts comment, unchecks Delegate to Claude
9. Sends ntfy.sh notification (if configured)
On failure, it posts a structured error comment:
```
[ERROR] Claude processing failed
--
What failed: <error details>
How to fix: <instructions>
```
## Logs
- Console output: INFO level

View File

@ -12,9 +12,17 @@ import sys
import time
from datetime import datetime, timezone
from .claude_runner import (
RunResult,
build_prompt,
copy_to_nas,
notify,
read_skill_file,
run_claude,
)
from .clickup_client import ClickUpClient, ClickUpTask
from .config import Config, load_config
from .skill_map import get_route, get_supported_task_types, get_valid_stages
from .skill_map import SkillRoute, get_route, get_supported_task_types, get_valid_stages
from .state import StateDB
log = logging.getLogger("clickup_runner")
@ -245,23 +253,152 @@ def _dispatch_claude(
cfg: Config,
db: StateDB,
task: ClickUpTask,
route,
route: SkillRoute,
run_id: int,
):
"""Run Claude Code headless for a task."""
# TODO: Phase 2 -- implement Claude Code runner
log.info("Claude dispatch for task %s -- NOT YET IMPLEMENTED", task.id)
db.log_run_finish(run_id, "skipped", result="Claude runner not yet implemented")
# 1. Set status to "ai working"
client.update_task_status(task.id, cfg.clickup.ai_working_status)
# For now, post a comment and uncheck
client.add_comment(
task.id,
"[WARNING] Claude Code runner not yet implemented. "
"This task was picked up but cannot be processed yet.",
# 2. Read skill file
try:
skill_content = read_skill_file(route, cfg.skills_dir)
except FileNotFoundError as e:
_handle_dispatch_error(
client, cfg, db, task, run_id,
error=str(e),
fix="Create the skill file at skills/%s, then re-check Delegate to Claude."
% route.skill_file,
)
return
# 3. Gather .xlsx attachment URLs for the prompt
xlsx_urls = [
a.get("url", "")
for a in task.attachments
if a.get("title", "").lower().endswith(".xlsx")
or a.get("url", "").lower().endswith(".xlsx")
]
# 4. Build prompt
prompt = build_prompt(task, route, skill_content, xlsx_urls or None)
# 5. Run Claude
log.info("Starting Claude for task %s (%s)", task.id, task.name)
result = run_claude(prompt, route, cfg)
if not result.success:
_handle_dispatch_error(
client, cfg, db, task, run_id,
error=result.error,
fix="Check logs/clickup_runner.log for details. "
"Fix the issue, then re-check Delegate to Claude.",
)
# Clean up temp dir
_cleanup_work_dir(result.work_dir)
return
# 6. Upload output files to ClickUp
uploaded = 0
for f in result.output_files:
if client.upload_attachment(task.id, f):
uploaded += 1
# 7. Copy to NAS (best-effort)
customer = task.get_field_value("Customer") or ""
if customer and cfg.nas.generated_dir:
copy_to_nas(result.output_files, customer, cfg.nas.generated_dir)
# 8. Advance stage + status
client.set_stage(
task.id, task.list_id, route.next_stage, cfg.clickup.stage_field_name
)
client.update_task_status(task.id, route.next_status)
# 9. Post success comment
summary = "Stage complete. %d file(s) attached." % uploaded
if result.output:
# Include first 500 chars of Claude's output as context
truncated = result.output[:500]
if len(result.output) > 500:
truncated += "..."
summary += "\n\n---\nClaude output:\n%s" % truncated
client.add_comment(task.id, summary)
# 10. Uncheck delegate + clear error
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, False
)
# 11. Log success
db.log_run_finish(
run_id, "completed",
result="%d files uploaded" % uploaded,
)
# 12. Notify
notify(cfg, "Task complete: %s" % task.name, summary)
log.info(
"Task %s completed: stage -> %s, %d file(s) uploaded",
task.id, route.next_stage, uploaded,
)
# 13. Clean up temp dir
_cleanup_work_dir(result.work_dir)
def _handle_dispatch_error(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
run_id: int,
error: str,
fix: str,
):
"""Handle a failed Claude dispatch: set error state, comment, notify."""
comment = (
"[ERROR] Claude processing failed\n"
"--\n"
"What failed: %s\n"
"\n"
"How to fix: %s"
) % (error, fix)
client.add_comment(task.id, comment)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, True
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
client.update_task_status(task.id, cfg.clickup.review_status)
db.log_run_finish(run_id, "failed", error=error)
notify(
cfg,
"FAILED: %s" % task.name,
"Error: %s\nFix: %s" % (error, fix),
is_error=True,
)
log.error("Task %s failed: %s", task.id, error)
def _cleanup_work_dir(work_dir):
"""Remove temporary work directory."""
if work_dir is None:
return
try:
import shutil
shutil.rmtree(str(work_dir), ignore_errors=True)
except Exception:
pass
def main():

View File

@ -0,0 +1,284 @@
"""Claude Code subprocess runner.
Builds prompts from skill files + task context, runs `claude -p`,
collects output files, and returns structured results.
"""
from __future__ import annotations
import logging
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .clickup_client import ClickUpTask
from .config import Config
from .skill_map import SkillRoute
log = logging.getLogger(__name__)
@dataclass
class RunResult:
"""Outcome of a Claude Code run."""
success: bool
output: str = ""
error: str = ""
output_files: list[Path] = field(default_factory=list)
work_dir: Path | None = None
def build_prompt(
task: ClickUpTask,
route: SkillRoute,
skill_content: str,
xlsx_urls: list[str] | None = None,
) -> str:
"""Assemble the prompt sent to `claude -p`.
Structure:
1. Skill file content (system-level instructions)
2. Task context block (name, description, customer, URL, attachments)
"""
parts: list[str] = []
# -- Skill instructions --
parts.append(skill_content.strip())
# -- Task context --
ctx_lines = [
"",
"---",
"## Task Context",
"",
"Task: %s" % task.name,
]
customer = task.get_field_value("Customer")
if customer:
ctx_lines.append("Customer: %s" % customer)
ims_url = task.get_field_value("IMSURL")
if ims_url:
ctx_lines.append("Target URL: %s" % ims_url)
if task.url:
ctx_lines.append("ClickUp Task: %s" % task.url)
if task.description:
ctx_lines.append("")
ctx_lines.append("### Description")
ctx_lines.append(task.description.strip())
if xlsx_urls:
ctx_lines.append("")
ctx_lines.append("### Attached Cora Reports (.xlsx)")
for url in xlsx_urls:
ctx_lines.append("- %s" % url)
# Tell Claude where to write output
ctx_lines.append("")
ctx_lines.append(
"### Output Instructions"
)
ctx_lines.append(
"Write all output files to the current working directory. "
"Do NOT create subdirectories."
)
parts.append("\n".join(ctx_lines))
return "\n\n".join(parts)
def _collect_output_files(work_dir: Path) -> list[Path]:
"""Return all files Claude created in the working directory."""
if not work_dir.exists():
return []
files = [f for f in work_dir.iterdir() if f.is_file()]
# Sort for deterministic ordering
files.sort(key=lambda p: p.name)
return files
def run_claude(
prompt: str,
route: SkillRoute,
cfg: Config,
work_dir: Path | None = None,
) -> RunResult:
"""Run `claude -p` as a subprocess and return the result.
Args:
prompt: The assembled prompt string.
route: SkillRoute with tools and max_turns.
cfg: Runner config (timeout, etc.).
work_dir: Directory for Claude to write files into.
If None, a temp directory is created.
"""
created_tmp = False
if work_dir is None:
work_dir = Path(tempfile.mkdtemp(prefix="clickup_runner_"))
created_tmp = True
cmd = [
"claude",
"-p",
prompt,
"--output-format", "text",
"--permission-mode", "bypassPermissions",
"--bare",
]
if route.tools:
cmd.extend(["--allowedTools", route.tools])
if route.max_turns:
cmd.extend(["--max-turns", str(route.max_turns)])
log.info(
"Running claude: tools=%s, max_turns=%d, timeout=%ds, work_dir=%s",
route.tools or "(all)",
route.max_turns,
cfg.runner.claude_timeout_seconds,
work_dir,
)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=cfg.runner.claude_timeout_seconds,
cwd=str(work_dir),
)
stdout = result.stdout or ""
stderr = result.stderr or ""
if result.returncode != 0:
log.error(
"Claude exited with code %d.\nstderr: %s",
result.returncode,
stderr[:2000],
)
return RunResult(
success=False,
output=stdout,
error="Claude exited with code %d: %s"
% (result.returncode, stderr[:2000]),
output_files=_collect_output_files(work_dir),
work_dir=work_dir,
)
output_files = _collect_output_files(work_dir)
log.info(
"Claude completed successfully. %d output file(s).", len(output_files)
)
return RunResult(
success=True,
output=stdout,
error="",
output_files=output_files,
work_dir=work_dir,
)
except subprocess.TimeoutExpired:
log.error(
"Claude timed out after %ds", cfg.runner.claude_timeout_seconds
)
return RunResult(
success=False,
output="",
error="Claude timed out after %d seconds"
% cfg.runner.claude_timeout_seconds,
output_files=_collect_output_files(work_dir),
work_dir=work_dir,
)
except FileNotFoundError:
log.error("claude CLI not found on PATH")
return RunResult(
success=False,
output="",
error="claude CLI not found on PATH. Is Claude Code installed?",
work_dir=work_dir,
)
def copy_to_nas(
files: list[Path],
customer: str,
nas_dir: str,
) -> list[Path]:
"""Best-effort copy of output files to NAS.
Returns list of successfully copied paths.
"""
if not nas_dir or not customer:
return []
dest_dir = Path(nas_dir) / customer
copied: list[Path] = []
try:
dest_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
log.warning("Cannot create NAS directory %s: %s", dest_dir, e)
return []
for f in files:
try:
dest = dest_dir / f.name
shutil.copy2(str(f), str(dest))
copied.append(dest)
log.info("Copied %s to NAS: %s", f.name, dest)
except OSError as e:
log.warning("Failed to copy %s to NAS: %s", f.name, e)
return copied
def notify(
cfg: Config,
title: str,
message: str,
is_error: bool = False,
) -> None:
"""Send a notification via ntfy.sh (best-effort)."""
topic = cfg.ntfy.error_topic if is_error else cfg.ntfy.success_topic
if not topic:
return
url = "%s/%s" % (cfg.ntfy.server.rstrip("/"), topic)
try:
import httpx as _httpx
_httpx.post(
url,
content=message.encode("ascii", errors="replace"),
headers={
"Title": title.encode("ascii", errors="replace").decode("ascii"),
"Priority": "high" if is_error else "default",
},
timeout=10.0,
)
log.info("Sent ntfy notification to %s", topic)
except Exception as e:
log.warning("Failed to send ntfy notification: %s", e)
def read_skill_file(route: SkillRoute, skills_dir: Path) -> str:
"""Read a skill .md file and return its content.
Raises FileNotFoundError if the skill file doesn't exist.
"""
skill_path = skills_dir / route.skill_file
if not skill_path.exists():
raise FileNotFoundError(
"Skill file not found: %s" % skill_path
)
return skill_path.read_text(encoding="utf-8")

View File

@ -42,14 +42,14 @@ SKILL_MAP: dict[str, dict[str, SkillRoute]] = {
next_stage="draft",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
max_turns=20,
),
"draft": SkillRoute(
skill_file="content_draft.md",
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=20,
max_turns=30,
),
},
"On Page Optimization": {
@ -63,21 +63,21 @@ SKILL_MAP: dict[str, dict[str, SkillRoute]] = {
next_stage="draft",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
max_turns=20,
),
"draft": SkillRoute(
skill_file="content_draft.md",
next_stage="hidden div",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=20,
max_turns=30,
),
"hidden div": SkillRoute(
skill_file="content_hidden_div.md",
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=10,
max_turns=15,
),
},
"Press Release": {
@ -86,7 +86,7 @@ SKILL_MAP: dict[str, dict[str, SkillRoute]] = {
next_stage="final",
next_status="review",
tools=_CONTENT_TOOLS,
max_turns=15,
max_turns=25,
),
},
"Link Building": {
@ -100,7 +100,7 @@ SKILL_MAP: dict[str, dict[str, SkillRoute]] = {
next_stage="final",
next_status="review",
tools=_LINK_TOOLS,
max_turns=10,
max_turns=15,
),
},
}

View File

@ -0,0 +1,494 @@
"""Tests for clickup_runner.claude_runner."""
from __future__ import annotations
import subprocess
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from clickup_runner.claude_runner import (
RunResult,
build_prompt,
copy_to_nas,
notify,
read_skill_file,
run_claude,
)
from clickup_runner.clickup_client import ClickUpTask
from clickup_runner.config import Config, NASConfig, NtfyConfig, RunnerConfig
from clickup_runner.skill_map import SkillRoute
# ── Fixtures ──
def _make_task(**overrides) -> ClickUpTask:
"""Build a ClickUpTask with sensible defaults."""
defaults = {
"id": "task_123",
"name": "Write blog post about widgets",
"status": "to do",
"description": "A 1500-word SEO article about widgets.",
"task_type": "Content Creation",
"url": "https://app.clickup.com/t/task_123",
"list_id": "list_1",
"custom_fields": {
"Customer": "Acme Corp",
"IMSURL": "https://acme.com/widgets",
},
}
defaults.update(overrides)
return ClickUpTask(**defaults)
def _make_route(**overrides) -> SkillRoute:
defaults = {
"skill_file": "content_draft.md",
"next_stage": "final",
"next_status": "review",
"tools": "Read,Edit,Write,Bash",
"max_turns": 25,
}
defaults.update(overrides)
return SkillRoute(**defaults)
def _make_config(**overrides) -> Config:
cfg = Config()
cfg.runner = RunnerConfig(claude_timeout_seconds=60)
for k, v in overrides.items():
setattr(cfg, k, v)
return cfg
# ── build_prompt ──
class TestBuildPrompt:
def test_includes_skill_content(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "# My Skill\nDo the thing.")
assert "# My Skill" in prompt
assert "Do the thing." in prompt
def test_includes_task_name(self):
task = _make_task(name="Optimize landing page")
route = _make_route()
prompt = build_prompt(task, route, "skill content")
assert "Task: Optimize landing page" in prompt
def test_includes_customer(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "skill content")
assert "Customer: Acme Corp" in prompt
def test_includes_target_url(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "skill content")
assert "Target URL: https://acme.com/widgets" in prompt
def test_includes_clickup_link(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "skill content")
assert "ClickUp Task: https://app.clickup.com/t/task_123" in prompt
def test_includes_description(self):
task = _make_task(description="Write about blue widgets")
route = _make_route()
prompt = build_prompt(task, route, "skill content")
assert "Write about blue widgets" in prompt
def test_includes_xlsx_urls(self):
task = _make_task()
route = _make_route()
urls = ["https://cdn.clickup.com/report.xlsx"]
prompt = build_prompt(task, route, "skill", xlsx_urls=urls)
assert "https://cdn.clickup.com/report.xlsx" in prompt
assert "Cora Reports" in prompt
def test_no_xlsx_section_when_none(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "skill")
assert "Cora Reports" not in prompt
def test_no_customer_when_missing(self):
task = _make_task(custom_fields={})
route = _make_route()
prompt = build_prompt(task, route, "skill")
assert "Customer:" not in prompt
def test_output_instructions_present(self):
task = _make_task()
route = _make_route()
prompt = build_prompt(task, route, "skill")
assert "Write all output files to the current working directory" in prompt
# ── run_claude ──
class TestRunClaude:
def test_success(self, tmp_path):
route = _make_route()
cfg = _make_config()
# Pre-create an output file as if Claude wrote it
(tmp_path / "output.docx").write_bytes(b"fake docx")
mock_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="Done!", stderr=""
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
assert result.success is True
assert result.output == "Done!"
assert len(result.output_files) == 1
assert result.output_files[0].name == "output.docx"
def test_nonzero_exit(self, tmp_path):
route = _make_route()
cfg = _make_config()
mock_result = subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr="Something broke"
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
assert result.success is False
assert "code 1" in result.error
assert "Something broke" in result.error
def test_timeout(self, tmp_path):
route = _make_route()
cfg = _make_config()
with patch(
"clickup_runner.claude_runner.subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=60),
):
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
assert result.success is False
assert "timed out" in result.error
def test_claude_not_found(self, tmp_path):
route = _make_route()
cfg = _make_config()
with patch(
"clickup_runner.claude_runner.subprocess.run",
side_effect=FileNotFoundError(),
):
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
assert result.success is False
assert "not found" in result.error
def test_passes_allowed_tools(self, tmp_path):
route = _make_route(tools="Read,Write")
cfg = _make_config()
mock_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="ok", stderr=""
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
run_claude("prompt", route, cfg, work_dir=tmp_path)
cmd = mock_run.call_args[0][0]
idx = cmd.index("--allowedTools")
assert cmd[idx + 1] == "Read,Write"
def test_passes_max_turns(self, tmp_path):
route = _make_route(max_turns=42)
cfg = _make_config()
mock_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="ok", stderr=""
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
run_claude("prompt", route, cfg, work_dir=tmp_path)
cmd = mock_run.call_args[0][0]
idx = cmd.index("--max-turns")
assert cmd[idx + 1] == "42"
def test_uses_bypass_permissions(self, tmp_path):
route = _make_route()
cfg = _make_config()
mock_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="ok", stderr=""
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
run_claude("prompt", route, cfg, work_dir=tmp_path)
cmd = mock_run.call_args[0][0]
assert "--permission-mode" in cmd
assert "bypassPermissions" in cmd
def test_collects_multiple_files(self, tmp_path):
route = _make_route()
cfg = _make_config()
(tmp_path / "article.md").write_text("content")
(tmp_path / "schema.json").write_text("{}")
(tmp_path / "notes.txt").write_text("notes")
mock_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="done", stderr=""
)
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
assert len(result.output_files) == 3
names = [f.name for f in result.output_files]
assert "article.md" in names
assert "schema.json" in names
# ── copy_to_nas ──
class TestCopyToNas:
def test_copies_files(self, tmp_path):
src = tmp_path / "src"
src.mkdir()
(src / "file1.txt").write_text("hello")
(src / "file2.txt").write_text("world")
nas = tmp_path / "nas"
nas.mkdir()
copied = copy_to_nas(
[src / "file1.txt", src / "file2.txt"],
"Acme Corp",
str(nas),
)
assert len(copied) == 2
assert (nas / "Acme Corp" / "file1.txt").exists()
assert (nas / "Acme Corp" / "file2.txt").read_text() == "world"
def test_skips_when_no_customer(self, tmp_path):
copied = copy_to_nas([], "", str(tmp_path))
assert copied == []
def test_skips_when_no_nas_dir(self, tmp_path):
copied = copy_to_nas([], "Acme", "")
assert copied == []
def test_handles_unreachable_nas(self, tmp_path):
src = tmp_path / "file.txt"
src.write_text("data")
# Use a path that can't exist
copied = copy_to_nas([src], "Acme", "//NONEXISTENT_HOST/share")
assert copied == []
# ── read_skill_file ──
class TestReadSkillFile:
def test_reads_existing_file(self, tmp_path):
skill = tmp_path / "my_skill.md"
skill.write_text("# Skill\nDo stuff.")
route = _make_route(skill_file="my_skill.md")
content = read_skill_file(route, tmp_path)
assert "# Skill" in content
def test_raises_on_missing_file(self, tmp_path):
route = _make_route(skill_file="nonexistent.md")
with pytest.raises(FileNotFoundError, match="nonexistent.md"):
read_skill_file(route, tmp_path)
# ── notify ──
class TestNotify:
def test_sends_error_notification(self):
cfg = _make_config()
cfg.ntfy = NtfyConfig(
enabled=True,
server="https://ntfy.sh",
error_topic="test-errors",
success_topic="test-ok",
)
with patch("httpx.post") as mock_post:
notify(cfg, "Failed: task", "Something went wrong", is_error=True)
mock_post.assert_called_once()
call_args = mock_post.call_args
assert "test-errors" in call_args[0][0]
assert call_args[1]["headers"]["Priority"] == "high"
def test_sends_success_notification(self):
cfg = _make_config()
cfg.ntfy = NtfyConfig(
enabled=True,
server="https://ntfy.sh",
error_topic="test-errors",
success_topic="test-ok",
)
with patch("httpx.post") as mock_post:
notify(cfg, "Done: task", "All good", is_error=False)
call_args = mock_post.call_args
assert "test-ok" in call_args[0][0]
def test_noop_when_no_topic(self):
cfg = _make_config()
cfg.ntfy = NtfyConfig() # no topics set
with patch("httpx.post") as mock_post:
notify(cfg, "title", "msg", is_error=True)
mock_post.assert_not_called()
# ── _dispatch_claude integration (via __main__) ──
class TestDispatchClaude:
"""Test the full _dispatch_claude flow with mocked Claude + ClickUp."""
def _setup(self, tmp_path):
"""Common setup for dispatch tests."""
# Create skill file
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
(skills_dir / "content_draft.md").write_text("# Draft Skill\nWrite a draft.")
cfg = _make_config()
cfg.skills_dir = skills_dir
cfg.nas = NASConfig(generated_dir="")
client = MagicMock()
db = MagicMock()
db.log_run_start.return_value = 1
task = _make_task(
attachments=[],
)
route = _make_route()
return cfg, client, db, task, route
def test_success_path(self, tmp_path):
from clickup_runner.__main__ import _dispatch_claude
cfg, client, db, task, route = self._setup(tmp_path)
mock_result = RunResult(
success=True,
output="Draft complete.",
output_files=[],
work_dir=tmp_path,
)
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
_dispatch_claude(client, cfg, db, task, route, run_id=1)
# Status set to ai working first
client.update_task_status.assert_any_call(task.id, "ai working")
# Stage advanced
client.set_stage.assert_called_once()
# Status set to review
client.update_task_status.assert_any_call(task.id, "review")
# Success comment posted
client.add_comment.assert_called_once()
assert "complete" in client.add_comment.call_args[0][1].lower()
# Delegate unchecked
client.set_checkbox.assert_any_call(
task.id, task.list_id, "Delegate to Claude", False
)
# Error cleared
client.set_checkbox.assert_any_call(
task.id, task.list_id, "Error", False
)
# Run logged as completed
db.log_run_finish.assert_called_once_with(1, "completed", result="0 files uploaded")
def test_error_path_claude_fails(self, tmp_path):
from clickup_runner.__main__ import _dispatch_claude
cfg, client, db, task, route = self._setup(tmp_path)
mock_result = RunResult(
success=False,
output="",
error="Claude exited with code 1: crash",
work_dir=tmp_path,
)
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
_dispatch_claude(client, cfg, db, task, route, run_id=1)
# Error checkbox set
client.set_checkbox.assert_any_call(
task.id, task.list_id, "Error", True
)
# Error comment posted
comment = client.add_comment.call_args[0][1]
assert "[ERROR]" in comment
assert "crash" in comment
# Run logged as failed
db.log_run_finish.assert_called_once_with(
1, "failed", error="Claude exited with code 1: crash"
)
def test_error_path_missing_skill_file(self, tmp_path):
from clickup_runner.__main__ import _dispatch_claude
cfg, client, db, task, route = self._setup(tmp_path)
route = _make_route(skill_file="nonexistent.md")
# Use real read_skill_file (it will fail)
with patch("clickup_runner.__main__.read_skill_file", side_effect=FileNotFoundError("Skill file not found: nonexistent.md")):
_dispatch_claude(client, cfg, db, task, route, run_id=1)
# Error checkbox set
client.set_checkbox.assert_any_call(
task.id, task.list_id, "Error", True
)
db.log_run_finish.assert_called_once()
assert db.log_run_finish.call_args[0][1] == "failed"
def test_uploads_output_files(self, tmp_path):
from clickup_runner.__main__ import _dispatch_claude
cfg, client, db, task, route = self._setup(tmp_path)
out1 = tmp_path / "article.docx"
out2 = tmp_path / "schema.json"
out1.write_bytes(b"docx")
out2.write_text("{}")
mock_result = RunResult(
success=True,
output="done",
output_files=[out1, out2],
work_dir=tmp_path,
)
client.upload_attachment.return_value = True
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
_dispatch_claude(client, cfg, db, task, route, run_id=1)
assert client.upload_attachment.call_count == 2
db.log_run_finish.assert_called_once_with(1, "completed", result="2 files uploaded")

View File

@ -30,7 +30,7 @@ class TestGetRoute:
route = get_route("Content Creation", "draft")
assert route is not None
assert route.next_stage == "final"
assert route.max_turns == 20
assert route.max_turns == 30
def test_on_page_optimization_has_hidden_div(self):
route = get_route("On Page Optimization", "hidden div")