495 lines
16 KiB
Python
495 lines
16 KiB
Python
"""Tests for clickup_runner.claude_runner."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from clickup_runner.claude_runner import (
|
|
RunResult,
|
|
build_prompt,
|
|
copy_to_nas,
|
|
notify,
|
|
read_skill_file,
|
|
run_claude,
|
|
)
|
|
from clickup_runner.clickup_client import ClickUpTask
|
|
from clickup_runner.config import Config, NASConfig, NtfyConfig, RunnerConfig
|
|
from clickup_runner.skill_map import SkillRoute
|
|
|
|
|
|
# ── Fixtures ──
|
|
|
|
|
|
def _make_task(**overrides) -> ClickUpTask:
|
|
"""Build a ClickUpTask with sensible defaults."""
|
|
defaults = {
|
|
"id": "task_123",
|
|
"name": "Write blog post about widgets",
|
|
"status": "to do",
|
|
"description": "A 1500-word SEO article about widgets.",
|
|
"task_type": "Content Creation",
|
|
"url": "https://app.clickup.com/t/task_123",
|
|
"list_id": "list_1",
|
|
"custom_fields": {
|
|
"Client": "Acme Corp",
|
|
"IMSURL": "https://acme.com/widgets",
|
|
},
|
|
}
|
|
defaults.update(overrides)
|
|
return ClickUpTask(**defaults)
|
|
|
|
|
|
def _make_route(**overrides) -> SkillRoute:
|
|
defaults = {
|
|
"skill_file": "content_draft.md",
|
|
"next_stage": "final",
|
|
"next_status": "review",
|
|
"tools": "Read,Edit,Write,Bash",
|
|
"max_turns": 25,
|
|
}
|
|
defaults.update(overrides)
|
|
return SkillRoute(**defaults)
|
|
|
|
|
|
def _make_config(**overrides) -> Config:
|
|
cfg = Config()
|
|
cfg.runner = RunnerConfig(claude_timeout_seconds=60)
|
|
for k, v in overrides.items():
|
|
setattr(cfg, k, v)
|
|
return cfg
|
|
|
|
|
|
# ── build_prompt ──
|
|
|
|
|
|
class TestBuildPrompt:
|
|
def test_includes_skill_content(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "# My Skill\nDo the thing.")
|
|
assert "# My Skill" in prompt
|
|
assert "Do the thing." in prompt
|
|
|
|
def test_includes_task_name(self):
|
|
task = _make_task(name="Optimize landing page")
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill content")
|
|
assert "Task: Optimize landing page" in prompt
|
|
|
|
def test_includes_customer(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill content")
|
|
assert "Client: Acme Corp" in prompt
|
|
|
|
def test_includes_target_url(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill content")
|
|
assert "Target URL: https://acme.com/widgets" in prompt
|
|
|
|
def test_includes_clickup_link(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill content")
|
|
assert "ClickUp Task: https://app.clickup.com/t/task_123" in prompt
|
|
|
|
def test_includes_description(self):
|
|
task = _make_task(description="Write about blue widgets")
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill content")
|
|
assert "Write about blue widgets" in prompt
|
|
|
|
def test_includes_xlsx_urls(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
urls = ["https://cdn.clickup.com/report.xlsx"]
|
|
prompt = build_prompt(task, route, "skill", xlsx_urls=urls)
|
|
assert "https://cdn.clickup.com/report.xlsx" in prompt
|
|
assert "Cora Reports" in prompt
|
|
|
|
def test_no_xlsx_section_when_none(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill")
|
|
assert "Cora Reports" not in prompt
|
|
|
|
def test_no_customer_when_missing(self):
|
|
task = _make_task(custom_fields={})
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill")
|
|
assert "Client:" not in prompt
|
|
|
|
def test_output_instructions_present(self):
|
|
task = _make_task()
|
|
route = _make_route()
|
|
prompt = build_prompt(task, route, "skill")
|
|
assert "Write all output files to the current working directory" in prompt
|
|
|
|
|
|
# ── run_claude ──
|
|
|
|
|
|
class TestRunClaude:
|
|
def test_success(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
# Pre-create an output file as if Claude wrote it
|
|
(tmp_path / "output.docx").write_bytes(b"fake docx")
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="Done!", stderr=""
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
|
|
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
assert result.success is True
|
|
assert result.output == "Done!"
|
|
assert len(result.output_files) == 1
|
|
assert result.output_files[0].name == "output.docx"
|
|
|
|
def test_nonzero_exit(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=1, stdout="", stderr="Something broke"
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
|
|
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
assert result.success is False
|
|
assert "code 1" in result.error
|
|
assert "Something broke" in result.error
|
|
|
|
def test_timeout(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
with patch(
|
|
"clickup_runner.claude_runner.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=60),
|
|
):
|
|
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
assert result.success is False
|
|
assert "timed out" in result.error
|
|
|
|
def test_claude_not_found(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
with patch(
|
|
"clickup_runner.claude_runner.subprocess.run",
|
|
side_effect=FileNotFoundError(),
|
|
):
|
|
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
assert result.success is False
|
|
assert "not found" in result.error
|
|
|
|
def test_passes_allowed_tools(self, tmp_path):
|
|
route = _make_route(tools="Read,Write")
|
|
cfg = _make_config()
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="ok", stderr=""
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
|
|
run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
cmd = mock_run.call_args[0][0]
|
|
idx = cmd.index("--allowedTools")
|
|
assert cmd[idx + 1] == "Read,Write"
|
|
|
|
def test_passes_max_turns(self, tmp_path):
|
|
route = _make_route(max_turns=42)
|
|
cfg = _make_config()
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="ok", stderr=""
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
|
|
run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
cmd = mock_run.call_args[0][0]
|
|
idx = cmd.index("--max-turns")
|
|
assert cmd[idx + 1] == "42"
|
|
|
|
def test_uses_bypass_permissions(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="ok", stderr=""
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result) as mock_run:
|
|
run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
cmd = mock_run.call_args[0][0]
|
|
assert "--permission-mode" in cmd
|
|
assert "bypassPermissions" in cmd
|
|
|
|
def test_collects_multiple_files(self, tmp_path):
|
|
route = _make_route()
|
|
cfg = _make_config()
|
|
|
|
(tmp_path / "article.md").write_text("content")
|
|
(tmp_path / "schema.json").write_text("{}")
|
|
(tmp_path / "notes.txt").write_text("notes")
|
|
|
|
mock_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="done", stderr=""
|
|
)
|
|
with patch("clickup_runner.claude_runner.subprocess.run", return_value=mock_result):
|
|
result = run_claude("prompt", route, cfg, work_dir=tmp_path)
|
|
|
|
assert len(result.output_files) == 3
|
|
names = [f.name for f in result.output_files]
|
|
assert "article.md" in names
|
|
assert "schema.json" in names
|
|
|
|
|
|
# ── copy_to_nas ──
|
|
|
|
|
|
class TestCopyToNas:
|
|
def test_copies_files(self, tmp_path):
|
|
src = tmp_path / "src"
|
|
src.mkdir()
|
|
(src / "file1.txt").write_text("hello")
|
|
(src / "file2.txt").write_text("world")
|
|
|
|
nas = tmp_path / "nas"
|
|
nas.mkdir()
|
|
|
|
copied = copy_to_nas(
|
|
[src / "file1.txt", src / "file2.txt"],
|
|
"Acme Corp",
|
|
str(nas),
|
|
)
|
|
|
|
assert len(copied) == 2
|
|
assert (nas / "Acme Corp" / "file1.txt").exists()
|
|
assert (nas / "Acme Corp" / "file2.txt").read_text() == "world"
|
|
|
|
def test_skips_when_no_customer(self, tmp_path):
|
|
copied = copy_to_nas([], "", str(tmp_path))
|
|
assert copied == []
|
|
|
|
def test_skips_when_no_nas_dir(self, tmp_path):
|
|
copied = copy_to_nas([], "Acme", "")
|
|
assert copied == []
|
|
|
|
def test_handles_unreachable_nas(self, tmp_path):
|
|
src = tmp_path / "file.txt"
|
|
src.write_text("data")
|
|
# Use a path that can't exist
|
|
copied = copy_to_nas([src], "Acme", "//NONEXISTENT_HOST/share")
|
|
assert copied == []
|
|
|
|
|
|
# ── read_skill_file ──
|
|
|
|
|
|
class TestReadSkillFile:
|
|
def test_reads_existing_file(self, tmp_path):
|
|
skill = tmp_path / "my_skill.md"
|
|
skill.write_text("# Skill\nDo stuff.")
|
|
route = _make_route(skill_file="my_skill.md")
|
|
content = read_skill_file(route, tmp_path)
|
|
assert "# Skill" in content
|
|
|
|
def test_raises_on_missing_file(self, tmp_path):
|
|
route = _make_route(skill_file="nonexistent.md")
|
|
with pytest.raises(FileNotFoundError, match="nonexistent.md"):
|
|
read_skill_file(route, tmp_path)
|
|
|
|
|
|
# ── notify ──
|
|
|
|
|
|
class TestNotify:
|
|
def test_sends_error_notification(self):
|
|
cfg = _make_config()
|
|
cfg.ntfy = NtfyConfig(
|
|
enabled=True,
|
|
server="https://ntfy.sh",
|
|
error_topic="test-errors",
|
|
success_topic="test-ok",
|
|
)
|
|
|
|
with patch("httpx.post") as mock_post:
|
|
notify(cfg, "Failed: task", "Something went wrong", is_error=True)
|
|
|
|
mock_post.assert_called_once()
|
|
call_args = mock_post.call_args
|
|
assert "test-errors" in call_args[0][0]
|
|
assert call_args[1]["headers"]["Priority"] == "high"
|
|
|
|
def test_sends_success_notification(self):
|
|
cfg = _make_config()
|
|
cfg.ntfy = NtfyConfig(
|
|
enabled=True,
|
|
server="https://ntfy.sh",
|
|
error_topic="test-errors",
|
|
success_topic="test-ok",
|
|
)
|
|
|
|
with patch("httpx.post") as mock_post:
|
|
notify(cfg, "Done: task", "All good", is_error=False)
|
|
|
|
call_args = mock_post.call_args
|
|
assert "test-ok" in call_args[0][0]
|
|
|
|
def test_noop_when_no_topic(self):
|
|
cfg = _make_config()
|
|
cfg.ntfy = NtfyConfig() # no topics set
|
|
|
|
with patch("httpx.post") as mock_post:
|
|
notify(cfg, "title", "msg", is_error=True)
|
|
|
|
mock_post.assert_not_called()
|
|
|
|
|
|
# ── _dispatch_claude integration (via __main__) ──
|
|
|
|
|
|
class TestDispatchClaude:
|
|
"""Test the full _dispatch_claude flow with mocked Claude + ClickUp."""
|
|
|
|
def _setup(self, tmp_path):
|
|
"""Common setup for dispatch tests."""
|
|
# Create skill file
|
|
skills_dir = tmp_path / "skills"
|
|
skills_dir.mkdir()
|
|
(skills_dir / "content_draft.md").write_text("# Draft Skill\nWrite a draft.")
|
|
|
|
cfg = _make_config()
|
|
cfg.skills_dir = skills_dir
|
|
cfg.nas = NASConfig(generated_dir="")
|
|
|
|
client = MagicMock()
|
|
db = MagicMock()
|
|
db.log_run_start.return_value = 1
|
|
|
|
task = _make_task(
|
|
attachments=[],
|
|
)
|
|
route = _make_route()
|
|
|
|
return cfg, client, db, task, route
|
|
|
|
def test_success_path(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_claude
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
|
|
mock_result = RunResult(
|
|
success=True,
|
|
output="Draft complete.",
|
|
output_files=[],
|
|
work_dir=tmp_path,
|
|
)
|
|
|
|
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
|
|
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
|
|
_dispatch_claude(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Status set to ai working first
|
|
client.update_task_status.assert_any_call(task.id, "ai working")
|
|
# Stage advanced
|
|
client.set_stage.assert_called_once()
|
|
# Status set to review
|
|
client.update_task_status.assert_any_call(task.id, "review")
|
|
# Success comment posted
|
|
client.add_comment.assert_called_once()
|
|
assert "complete" in client.add_comment.call_args[0][1].lower()
|
|
# Delegate unchecked
|
|
client.set_checkbox.assert_any_call(
|
|
task.id, task.list_id, "Delegate to Claude", False
|
|
)
|
|
# Error cleared
|
|
client.set_checkbox.assert_any_call(
|
|
task.id, task.list_id, "Error", False
|
|
)
|
|
# Run logged as completed
|
|
db.log_run_finish.assert_called_once_with(1, "completed", result="0 files uploaded")
|
|
|
|
def test_error_path_claude_fails(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_claude
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
|
|
mock_result = RunResult(
|
|
success=False,
|
|
output="",
|
|
error="Claude exited with code 1: crash",
|
|
work_dir=tmp_path,
|
|
)
|
|
|
|
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
|
|
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
|
|
_dispatch_claude(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Error checkbox set
|
|
client.set_checkbox.assert_any_call(
|
|
task.id, task.list_id, "Error", True
|
|
)
|
|
# Error comment posted
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "[ERROR]" in comment
|
|
assert "crash" in comment
|
|
# Run logged as failed
|
|
db.log_run_finish.assert_called_once_with(
|
|
1, "failed", error="Claude exited with code 1: crash"
|
|
)
|
|
|
|
def test_error_path_missing_skill_file(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_claude
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
route = _make_route(skill_file="nonexistent.md")
|
|
|
|
# Use real read_skill_file (it will fail)
|
|
with patch("clickup_runner.__main__.read_skill_file", side_effect=FileNotFoundError("Skill file not found: nonexistent.md")):
|
|
_dispatch_claude(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Error checkbox set
|
|
client.set_checkbox.assert_any_call(
|
|
task.id, task.list_id, "Error", True
|
|
)
|
|
db.log_run_finish.assert_called_once()
|
|
assert db.log_run_finish.call_args[0][1] == "failed"
|
|
|
|
def test_uploads_output_files(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_claude
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
|
|
out1 = tmp_path / "article.docx"
|
|
out2 = tmp_path / "schema.json"
|
|
out1.write_bytes(b"docx")
|
|
out2.write_text("{}")
|
|
|
|
mock_result = RunResult(
|
|
success=True,
|
|
output="done",
|
|
output_files=[out1, out2],
|
|
work_dir=tmp_path,
|
|
)
|
|
|
|
client.upload_attachment.return_value = True
|
|
|
|
with patch("clickup_runner.__main__.run_claude", return_value=mock_result):
|
|
with patch("clickup_runner.__main__.read_skill_file", return_value="# Skill"):
|
|
_dispatch_claude(client, cfg, db, task, route, run_id=1)
|
|
|
|
assert client.upload_attachment.call_count == 2
|
|
db.log_run_finish.assert_called_once_with(1, "completed", result="2 files uploaded")
|