CheddahBot/tests/test_clickup_runner/test_autocora.py

502 lines
15 KiB
Python

"""Tests for clickup_runner.autocora and AutoCora dispatch in __main__."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from clickup_runner.autocora import (
CoraResult,
archive_result,
make_job_id,
parse_result_file,
scan_results,
slugify,
submit_job,
)
from clickup_runner.clickup_client import ClickUpTask
from clickup_runner.config import AutoCoraConfig, Config, NtfyConfig, RunnerConfig
from clickup_runner.skill_map import SkillRoute
# ── Fixtures ──
def _make_task(**overrides) -> ClickUpTask:
defaults = {
"id": "task_abc",
"name": "SEO for CNC Machining",
"status": "to do",
"description": "Content creation for CNC machining page.",
"task_type": "Content Creation",
"url": "https://app.clickup.com/t/task_abc",
"list_id": "list_1",
"custom_fields": {
"Client": "Acme Corp",
"Keyword": "CNC Machining",
"IMSURL": "https://acme.com/cnc-machining",
"Delegate to Claude": True,
"Stage": "run_cora",
},
}
defaults.update(overrides)
return ClickUpTask(**defaults)
def _make_config(**overrides) -> Config:
cfg = Config()
cfg.runner = RunnerConfig(claude_timeout_seconds=60)
cfg.ntfy = NtfyConfig()
for k, v in overrides.items():
setattr(cfg, k, v)
return cfg
# ── slugify ──
class TestSlugify:
def test_basic(self):
assert slugify("CNC Machining") == "cnc-machining"
def test_special_chars(self):
assert slugify("Hello, World! & Co.") == "hello-world-co"
def test_max_length(self):
result = slugify("a" * 100, max_len=20)
assert len(result) <= 20
def test_empty_string(self):
assert slugify("") == "unknown"
def test_only_special_chars(self):
assert slugify("!!!@@@") == "unknown"
def test_leading_trailing_hyphens(self):
assert slugify("--hello--") == "hello"
def test_preserves_numbers(self):
assert slugify("Top 10 CNC tips") == "top-10-cnc-tips"
# ── make_job_id ──
class TestMakeJobId:
def test_format(self):
job_id = make_job_id("CNC Machining")
assert job_id.startswith("job-")
assert "cnc-machining" in job_id
def test_uniqueness(self):
# Two calls should produce different IDs (different timestamps)
id1 = make_job_id("test")
id2 = make_job_id("test")
# Could be same in same millisecond, but format should be valid
assert id1.startswith("job-")
assert id2.startswith("job-")
# ── submit_job ──
class TestSubmitJob:
def test_creates_job_file(self, tmp_path):
jobs_dir = tmp_path / "jobs"
job_id = submit_job("CNC Machining", "https://acme.com", "task_1", str(jobs_dir))
assert job_id is not None
assert jobs_dir.exists()
# Find the job file
files = list(jobs_dir.glob("job-*.json"))
assert len(files) == 1
data = json.loads(files[0].read_text())
assert data["keyword"] == "CNC Machining"
assert data["url"] == "https://acme.com"
assert data["task_ids"] == ["task_1"]
def test_fallback_url(self, tmp_path):
jobs_dir = tmp_path / "jobs"
submit_job("test", "", "task_1", str(jobs_dir))
files = list(jobs_dir.glob("job-*.json"))
data = json.loads(files[0].read_text())
assert data["url"] == "https://seotoollab.com/blank.html"
def test_unreachable_dir(self):
result = submit_job("test", "http://x.com", "t1", "//NONEXISTENT/share/jobs")
assert result is None
def test_creates_parent_dirs(self, tmp_path):
jobs_dir = tmp_path / "deep" / "nested" / "jobs"
job_id = submit_job("test", "http://x.com", "t1", str(jobs_dir))
assert job_id is not None
assert jobs_dir.exists()
# ── parse_result_file ──
class TestParseResultFile:
def test_json_success(self, tmp_path):
f = tmp_path / "job-123-test.result"
f.write_text(json.dumps({
"status": "SUCCESS",
"keyword": "CNC Machining",
"task_ids": ["t1", "t2"],
}))
result = parse_result_file(f)
assert result is not None
assert result.status == "SUCCESS"
assert result.keyword == "CNC Machining"
assert result.task_ids == ["t1", "t2"]
assert result.job_id == "job-123-test"
def test_json_failure(self, tmp_path):
f = tmp_path / "job-456.result"
f.write_text(json.dumps({
"status": "FAILURE",
"keyword": "test",
"task_ids": ["t1"],
"reason": "Cora timed out",
}))
result = parse_result_file(f)
assert result.status == "FAILURE"
assert result.reason == "Cora timed out"
def test_legacy_success(self, tmp_path):
f = tmp_path / "job-789.result"
f.write_text("SUCCESS")
result = parse_result_file(f)
assert result.status == "SUCCESS"
assert result.task_ids == []
def test_legacy_failure(self, tmp_path):
f = tmp_path / "job-101.result"
f.write_text("FAILURE: Network timeout")
result = parse_result_file(f)
assert result.status == "FAILURE"
assert result.reason == "Network timeout"
def test_empty_file(self, tmp_path):
f = tmp_path / "empty.result"
f.write_text("")
assert parse_result_file(f) is None
def test_unrecognized_format(self, tmp_path):
f = tmp_path / "weird.result"
f.write_text("something random")
assert parse_result_file(f) is None
def test_missing_file(self, tmp_path):
f = tmp_path / "missing.result"
assert parse_result_file(f) is None
# ── scan_results ──
class TestScanResults:
def test_finds_result_files(self, tmp_path):
(tmp_path / "job-1.result").write_text(json.dumps({"status": "SUCCESS"}))
(tmp_path / "job-2.result").write_text(json.dumps({"status": "FAILURE", "reason": "x"}))
(tmp_path / "not-a-result.txt").write_text("ignore me")
results = scan_results(str(tmp_path))
assert len(results) == 2
def test_empty_dir(self, tmp_path):
assert scan_results(str(tmp_path)) == []
def test_nonexistent_dir(self):
assert scan_results("//NONEXISTENT/path") == []
def test_skips_unparseable(self, tmp_path):
(tmp_path / "good.result").write_text(json.dumps({"status": "SUCCESS"}))
(tmp_path / "bad.result").write_text("")
results = scan_results(str(tmp_path))
assert len(results) == 1
# ── archive_result ──
class TestArchiveResult:
def test_moves_to_processed(self, tmp_path):
f = tmp_path / "job-1.result"
f.write_text("SUCCESS")
result = CoraResult(
job_id="job-1",
status="SUCCESS",
keyword="test",
task_ids=[],
reason="",
result_path=f,
)
assert archive_result(result) is True
assert not f.exists()
assert (tmp_path / "processed" / "job-1.result").exists()
def test_creates_processed_dir(self, tmp_path):
f = tmp_path / "job-2.result"
f.write_text("data")
result = CoraResult(
job_id="job-2", status="SUCCESS", keyword="",
task_ids=[], reason="", result_path=f,
)
archive_result(result)
assert (tmp_path / "processed").is_dir()
# ── _dispatch_autocora integration ──
class TestDispatchAutocora:
def _setup(self, tmp_path):
cfg = _make_config()
cfg.autocora = AutoCoraConfig(
jobs_dir=str(tmp_path / "jobs"),
results_dir=str(tmp_path / "results"),
)
client = MagicMock()
db = MagicMock()
db.log_run_start.return_value = 1
task = _make_task()
route = SkillRoute(
handler="autocora",
next_stage="outline",
next_status="review",
)
return cfg, client, db, task, route
def test_success_submission(self, tmp_path):
from clickup_runner.__main__ import _dispatch_autocora
cfg, client, db, task, route = self._setup(tmp_path)
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
# Job file created
job_files = list((tmp_path / "jobs").glob("job-*.json"))
assert len(job_files) == 1
data = json.loads(job_files[0].read_text())
assert data["keyword"] == "CNC Machining"
assert data["task_ids"] == ["task_abc"]
# Status set to ai working
client.update_task_status.assert_called_with("task_abc", "ai working")
# Comment posted
client.add_comment.assert_called_once()
comment = client.add_comment.call_args[0][1]
assert "CNC Machining" in comment
# Delegate unchecked
client.set_checkbox.assert_called_with(
"task_abc", "list_1", "Delegate to Claude", False
)
# State DB updated
db.kv_set_json.assert_called_once()
kv_key = db.kv_set_json.call_args[0][0]
assert kv_key.startswith("autocora:job:")
# Run logged as submitted
db.log_run_finish.assert_called_once()
assert db.log_run_finish.call_args[0][1] == "submitted"
def test_missing_keyword(self, tmp_path):
from clickup_runner.__main__ import _dispatch_autocora
cfg, client, db, task, route = self._setup(tmp_path)
task.custom_fields["Keyword"] = None
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
# Error comment posted
comment = client.add_comment.call_args[0][1]
assert "Keyword" in comment
# Run logged as failed
db.log_run_finish.assert_called_once()
assert db.log_run_finish.call_args[0][1] == "failed"
def test_unreachable_nas(self, tmp_path):
from clickup_runner.__main__ import _dispatch_autocora
cfg, client, db, task, route = self._setup(tmp_path)
cfg.autocora.jobs_dir = "//NONEXISTENT/share/jobs"
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
# Error comment posted about NAS
comment = client.add_comment.call_args[0][1]
assert "ERROR" in comment
# Error checkbox set
client.set_checkbox.assert_any_call(
"task_abc", "list_1", "Error", True
)
# ── _check_autocora_results integration ──
class TestCheckAutocoraResults:
def _setup(self, tmp_path):
cfg = _make_config()
cfg.autocora = AutoCoraConfig(
jobs_dir=str(tmp_path / "jobs"),
results_dir=str(tmp_path / "results"),
xlsx_dir="//NAS/Cora72",
)
client = MagicMock()
# Mock get_task to return a task
client.get_task.return_value = _make_task()
# get_stage needs to return the actual stage string for route lookup
client.get_stage.return_value = "run_cora"
db = MagicMock()
return cfg, client, db
def test_success_result_with_state_db(self, tmp_path):
from clickup_runner.__main__ import _check_autocora_results
cfg, client, db = self._setup(tmp_path)
# Write a result file
results_dir = tmp_path / "results"
results_dir.mkdir()
job_id = "job-1234-cnc-machining"
(results_dir / ("%s.result" % job_id)).write_text(json.dumps({
"status": "SUCCESS",
"keyword": "CNC Machining",
"task_ids": ["task_abc"],
}))
# Set up state DB to return job data
db.kv_get_json.return_value = {
"task_id": "task_abc",
"task_name": "SEO for CNC",
"keyword": "CNC Machining",
"url": "https://acme.com",
"run_id": 5,
}
_check_autocora_results(client, cfg, db)
# Task status updated to review
client.update_task_status.assert_called_with("task_abc", "review")
# Stage advanced
client.set_stage.assert_called_once()
# Success comment posted
client.add_comment.assert_called_once()
comment = client.add_comment.call_args[0][1]
assert "CNC Machining" in comment
assert "//NAS/Cora72" in comment
# Error checkbox cleared
client.set_checkbox.assert_called()
# Run log finished
db.log_run_finish.assert_called_once_with(5, "completed", result="Cora report ready")
# State DB entry deleted
db.kv_delete.assert_called_once_with("autocora:job:%s" % job_id)
# Result file archived
assert not (results_dir / ("%s.result" % job_id)).exists()
assert (results_dir / "processed" / ("%s.result" % job_id)).exists()
def test_failure_result(self, tmp_path):
from clickup_runner.__main__ import _check_autocora_results
cfg, client, db = self._setup(tmp_path)
results_dir = tmp_path / "results"
results_dir.mkdir()
job_id = "job-999-test"
(results_dir / ("%s.result" % job_id)).write_text(json.dumps({
"status": "FAILURE",
"keyword": "test keyword",
"task_ids": ["task_abc"],
"reason": "Cora process crashed",
}))
db.kv_get_json.return_value = {
"task_id": "task_abc",
"keyword": "test keyword",
"run_id": 10,
}
_check_autocora_results(client, cfg, db)
# Error comment posted
comment = client.add_comment.call_args[0][1]
assert "ERROR" in comment
assert "Cora process crashed" in comment
# Error checkbox set
client.set_checkbox.assert_any_call(
"task_abc", "list_1", "Error", True
)
# Run log failed
db.log_run_finish.assert_called_once()
assert db.log_run_finish.call_args[0][1] == "failed"
def test_no_results(self, tmp_path):
from clickup_runner.__main__ import _check_autocora_results
cfg, client, db = self._setup(tmp_path)
# No results dir
_check_autocora_results(client, cfg, db)
# Nothing should happen
client.add_comment.assert_not_called()
db.log_run_finish.assert_not_called()
def test_result_without_state_db_uses_file_task_ids(self, tmp_path):
from clickup_runner.__main__ import _check_autocora_results
cfg, client, db = self._setup(tmp_path)
results_dir = tmp_path / "results"
results_dir.mkdir()
(results_dir / "job-orphan.result").write_text(json.dumps({
"status": "SUCCESS",
"keyword": "orphan",
"task_ids": ["task_abc"],
}))
# No state DB entry
db.kv_get_json.return_value = None
_check_autocora_results(client, cfg, db)
# Should still process using task_ids from result file
client.update_task_status.assert_called()
client.add_comment.assert_called()