502 lines
15 KiB
Python
502 lines
15 KiB
Python
"""Tests for clickup_runner.autocora and AutoCora dispatch in __main__."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from clickup_runner.autocora import (
|
|
CoraResult,
|
|
archive_result,
|
|
make_job_id,
|
|
parse_result_file,
|
|
scan_results,
|
|
slugify,
|
|
submit_job,
|
|
)
|
|
from clickup_runner.clickup_client import ClickUpTask
|
|
from clickup_runner.config import AutoCoraConfig, Config, NtfyConfig, RunnerConfig
|
|
from clickup_runner.skill_map import SkillRoute
|
|
|
|
|
|
# ── Fixtures ──
|
|
|
|
|
|
def _make_task(**overrides) -> ClickUpTask:
|
|
defaults = {
|
|
"id": "task_abc",
|
|
"name": "SEO for CNC Machining",
|
|
"status": "to do",
|
|
"description": "Content creation for CNC machining page.",
|
|
"task_type": "Content Creation",
|
|
"url": "https://app.clickup.com/t/task_abc",
|
|
"list_id": "list_1",
|
|
"custom_fields": {
|
|
"Customer": "Acme Corp",
|
|
"Keyword": "CNC Machining",
|
|
"IMSURL": "https://acme.com/cnc-machining",
|
|
"Delegate to Claude": True,
|
|
"Stage": "run_cora",
|
|
},
|
|
}
|
|
defaults.update(overrides)
|
|
return ClickUpTask(**defaults)
|
|
|
|
|
|
def _make_config(**overrides) -> Config:
|
|
cfg = Config()
|
|
cfg.runner = RunnerConfig(claude_timeout_seconds=60)
|
|
cfg.ntfy = NtfyConfig()
|
|
for k, v in overrides.items():
|
|
setattr(cfg, k, v)
|
|
return cfg
|
|
|
|
|
|
# ── slugify ──
|
|
|
|
|
|
class TestSlugify:
|
|
def test_basic(self):
|
|
assert slugify("CNC Machining") == "cnc-machining"
|
|
|
|
def test_special_chars(self):
|
|
assert slugify("Hello, World! & Co.") == "hello-world-co"
|
|
|
|
def test_max_length(self):
|
|
result = slugify("a" * 100, max_len=20)
|
|
assert len(result) <= 20
|
|
|
|
def test_empty_string(self):
|
|
assert slugify("") == "unknown"
|
|
|
|
def test_only_special_chars(self):
|
|
assert slugify("!!!@@@") == "unknown"
|
|
|
|
def test_leading_trailing_hyphens(self):
|
|
assert slugify("--hello--") == "hello"
|
|
|
|
def test_preserves_numbers(self):
|
|
assert slugify("Top 10 CNC tips") == "top-10-cnc-tips"
|
|
|
|
|
|
# ── make_job_id ──
|
|
|
|
|
|
class TestMakeJobId:
|
|
def test_format(self):
|
|
job_id = make_job_id("CNC Machining")
|
|
assert job_id.startswith("job-")
|
|
assert "cnc-machining" in job_id
|
|
|
|
def test_uniqueness(self):
|
|
# Two calls should produce different IDs (different timestamps)
|
|
id1 = make_job_id("test")
|
|
id2 = make_job_id("test")
|
|
# Could be same in same millisecond, but format should be valid
|
|
assert id1.startswith("job-")
|
|
assert id2.startswith("job-")
|
|
|
|
|
|
# ── submit_job ──
|
|
|
|
|
|
class TestSubmitJob:
|
|
def test_creates_job_file(self, tmp_path):
|
|
jobs_dir = tmp_path / "jobs"
|
|
job_id = submit_job("CNC Machining", "https://acme.com", "task_1", str(jobs_dir))
|
|
|
|
assert job_id is not None
|
|
assert jobs_dir.exists()
|
|
|
|
# Find the job file
|
|
files = list(jobs_dir.glob("job-*.json"))
|
|
assert len(files) == 1
|
|
|
|
data = json.loads(files[0].read_text())
|
|
assert data["keyword"] == "CNC Machining"
|
|
assert data["url"] == "https://acme.com"
|
|
assert data["task_ids"] == ["task_1"]
|
|
|
|
def test_fallback_url(self, tmp_path):
|
|
jobs_dir = tmp_path / "jobs"
|
|
submit_job("test", "", "task_1", str(jobs_dir))
|
|
|
|
files = list(jobs_dir.glob("job-*.json"))
|
|
data = json.loads(files[0].read_text())
|
|
assert data["url"] == "https://seotoollab.com/blank.html"
|
|
|
|
def test_unreachable_dir(self):
|
|
result = submit_job("test", "http://x.com", "t1", "//NONEXISTENT/share/jobs")
|
|
assert result is None
|
|
|
|
def test_creates_parent_dirs(self, tmp_path):
|
|
jobs_dir = tmp_path / "deep" / "nested" / "jobs"
|
|
job_id = submit_job("test", "http://x.com", "t1", str(jobs_dir))
|
|
assert job_id is not None
|
|
assert jobs_dir.exists()
|
|
|
|
|
|
# ── parse_result_file ──
|
|
|
|
|
|
class TestParseResultFile:
|
|
def test_json_success(self, tmp_path):
|
|
f = tmp_path / "job-123-test.result"
|
|
f.write_text(json.dumps({
|
|
"status": "SUCCESS",
|
|
"keyword": "CNC Machining",
|
|
"task_ids": ["t1", "t2"],
|
|
}))
|
|
|
|
result = parse_result_file(f)
|
|
assert result is not None
|
|
assert result.status == "SUCCESS"
|
|
assert result.keyword == "CNC Machining"
|
|
assert result.task_ids == ["t1", "t2"]
|
|
assert result.job_id == "job-123-test"
|
|
|
|
def test_json_failure(self, tmp_path):
|
|
f = tmp_path / "job-456.result"
|
|
f.write_text(json.dumps({
|
|
"status": "FAILURE",
|
|
"keyword": "test",
|
|
"task_ids": ["t1"],
|
|
"reason": "Cora timed out",
|
|
}))
|
|
|
|
result = parse_result_file(f)
|
|
assert result.status == "FAILURE"
|
|
assert result.reason == "Cora timed out"
|
|
|
|
def test_legacy_success(self, tmp_path):
|
|
f = tmp_path / "job-789.result"
|
|
f.write_text("SUCCESS")
|
|
|
|
result = parse_result_file(f)
|
|
assert result.status == "SUCCESS"
|
|
assert result.task_ids == []
|
|
|
|
def test_legacy_failure(self, tmp_path):
|
|
f = tmp_path / "job-101.result"
|
|
f.write_text("FAILURE: Network timeout")
|
|
|
|
result = parse_result_file(f)
|
|
assert result.status == "FAILURE"
|
|
assert result.reason == "Network timeout"
|
|
|
|
def test_empty_file(self, tmp_path):
|
|
f = tmp_path / "empty.result"
|
|
f.write_text("")
|
|
assert parse_result_file(f) is None
|
|
|
|
def test_unrecognized_format(self, tmp_path):
|
|
f = tmp_path / "weird.result"
|
|
f.write_text("something random")
|
|
assert parse_result_file(f) is None
|
|
|
|
def test_missing_file(self, tmp_path):
|
|
f = tmp_path / "missing.result"
|
|
assert parse_result_file(f) is None
|
|
|
|
|
|
# ── scan_results ──
|
|
|
|
|
|
class TestScanResults:
|
|
def test_finds_result_files(self, tmp_path):
|
|
(tmp_path / "job-1.result").write_text(json.dumps({"status": "SUCCESS"}))
|
|
(tmp_path / "job-2.result").write_text(json.dumps({"status": "FAILURE", "reason": "x"}))
|
|
(tmp_path / "not-a-result.txt").write_text("ignore me")
|
|
|
|
results = scan_results(str(tmp_path))
|
|
assert len(results) == 2
|
|
|
|
def test_empty_dir(self, tmp_path):
|
|
assert scan_results(str(tmp_path)) == []
|
|
|
|
def test_nonexistent_dir(self):
|
|
assert scan_results("//NONEXISTENT/path") == []
|
|
|
|
def test_skips_unparseable(self, tmp_path):
|
|
(tmp_path / "good.result").write_text(json.dumps({"status": "SUCCESS"}))
|
|
(tmp_path / "bad.result").write_text("")
|
|
|
|
results = scan_results(str(tmp_path))
|
|
assert len(results) == 1
|
|
|
|
|
|
# ── archive_result ──
|
|
|
|
|
|
class TestArchiveResult:
|
|
def test_moves_to_processed(self, tmp_path):
|
|
f = tmp_path / "job-1.result"
|
|
f.write_text("SUCCESS")
|
|
|
|
result = CoraResult(
|
|
job_id="job-1",
|
|
status="SUCCESS",
|
|
keyword="test",
|
|
task_ids=[],
|
|
reason="",
|
|
result_path=f,
|
|
)
|
|
|
|
assert archive_result(result) is True
|
|
assert not f.exists()
|
|
assert (tmp_path / "processed" / "job-1.result").exists()
|
|
|
|
def test_creates_processed_dir(self, tmp_path):
|
|
f = tmp_path / "job-2.result"
|
|
f.write_text("data")
|
|
|
|
result = CoraResult(
|
|
job_id="job-2", status="SUCCESS", keyword="",
|
|
task_ids=[], reason="", result_path=f,
|
|
)
|
|
|
|
archive_result(result)
|
|
assert (tmp_path / "processed").is_dir()
|
|
|
|
|
|
# ── _dispatch_autocora integration ──
|
|
|
|
|
|
class TestDispatchAutocora:
|
|
def _setup(self, tmp_path):
|
|
cfg = _make_config()
|
|
cfg.autocora = AutoCoraConfig(
|
|
jobs_dir=str(tmp_path / "jobs"),
|
|
results_dir=str(tmp_path / "results"),
|
|
)
|
|
|
|
client = MagicMock()
|
|
db = MagicMock()
|
|
db.log_run_start.return_value = 1
|
|
|
|
task = _make_task()
|
|
route = SkillRoute(
|
|
handler="autocora",
|
|
next_stage="outline",
|
|
next_status="review",
|
|
)
|
|
|
|
return cfg, client, db, task, route
|
|
|
|
def test_success_submission(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_autocora
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
|
|
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Job file created
|
|
job_files = list((tmp_path / "jobs").glob("job-*.json"))
|
|
assert len(job_files) == 1
|
|
data = json.loads(job_files[0].read_text())
|
|
assert data["keyword"] == "CNC Machining"
|
|
assert data["task_ids"] == ["task_abc"]
|
|
|
|
# Status set to ai working
|
|
client.update_task_status.assert_called_with("task_abc", "ai working")
|
|
|
|
# Comment posted
|
|
client.add_comment.assert_called_once()
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "CNC Machining" in comment
|
|
|
|
# Delegate unchecked
|
|
client.set_checkbox.assert_called_with(
|
|
"task_abc", "list_1", "Delegate to Claude", False
|
|
)
|
|
|
|
# State DB updated
|
|
db.kv_set_json.assert_called_once()
|
|
kv_key = db.kv_set_json.call_args[0][0]
|
|
assert kv_key.startswith("autocora:job:")
|
|
|
|
# Run logged as submitted
|
|
db.log_run_finish.assert_called_once()
|
|
assert db.log_run_finish.call_args[0][1] == "submitted"
|
|
|
|
def test_missing_keyword(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_autocora
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
task.custom_fields["Keyword"] = None
|
|
|
|
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Error comment posted
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "Keyword" in comment
|
|
|
|
# Run logged as failed
|
|
db.log_run_finish.assert_called_once()
|
|
assert db.log_run_finish.call_args[0][1] == "failed"
|
|
|
|
def test_unreachable_nas(self, tmp_path):
|
|
from clickup_runner.__main__ import _dispatch_autocora
|
|
|
|
cfg, client, db, task, route = self._setup(tmp_path)
|
|
cfg.autocora.jobs_dir = "//NONEXISTENT/share/jobs"
|
|
|
|
_dispatch_autocora(client, cfg, db, task, route, run_id=1)
|
|
|
|
# Error comment posted about NAS
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "ERROR" in comment
|
|
|
|
# Error checkbox set
|
|
client.set_checkbox.assert_any_call(
|
|
"task_abc", "list_1", "Error", True
|
|
)
|
|
|
|
|
|
# ── _check_autocora_results integration ──
|
|
|
|
|
|
class TestCheckAutocoraResults:
|
|
def _setup(self, tmp_path):
|
|
cfg = _make_config()
|
|
cfg.autocora = AutoCoraConfig(
|
|
jobs_dir=str(tmp_path / "jobs"),
|
|
results_dir=str(tmp_path / "results"),
|
|
xlsx_dir="//NAS/Cora72",
|
|
)
|
|
|
|
client = MagicMock()
|
|
# Mock get_task to return a task
|
|
client.get_task.return_value = _make_task()
|
|
# get_stage needs to return the actual stage string for route lookup
|
|
client.get_stage.return_value = "run_cora"
|
|
|
|
db = MagicMock()
|
|
|
|
return cfg, client, db
|
|
|
|
def test_success_result_with_state_db(self, tmp_path):
|
|
from clickup_runner.__main__ import _check_autocora_results
|
|
|
|
cfg, client, db = self._setup(tmp_path)
|
|
|
|
# Write a result file
|
|
results_dir = tmp_path / "results"
|
|
results_dir.mkdir()
|
|
job_id = "job-1234-cnc-machining"
|
|
(results_dir / ("%s.result" % job_id)).write_text(json.dumps({
|
|
"status": "SUCCESS",
|
|
"keyword": "CNC Machining",
|
|
"task_ids": ["task_abc"],
|
|
}))
|
|
|
|
# Set up state DB to return job data
|
|
db.kv_get_json.return_value = {
|
|
"task_id": "task_abc",
|
|
"task_name": "SEO for CNC",
|
|
"keyword": "CNC Machining",
|
|
"url": "https://acme.com",
|
|
"run_id": 5,
|
|
}
|
|
|
|
_check_autocora_results(client, cfg, db)
|
|
|
|
# Task status updated to review
|
|
client.update_task_status.assert_called_with("task_abc", "review")
|
|
|
|
# Stage advanced
|
|
client.set_stage.assert_called_once()
|
|
|
|
# Success comment posted
|
|
client.add_comment.assert_called_once()
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "CNC Machining" in comment
|
|
assert "//NAS/Cora72" in comment
|
|
|
|
# Error checkbox cleared
|
|
client.set_checkbox.assert_called()
|
|
|
|
# Run log finished
|
|
db.log_run_finish.assert_called_once_with(5, "completed", result="Cora report ready")
|
|
|
|
# State DB entry deleted
|
|
db.kv_delete.assert_called_once_with("autocora:job:%s" % job_id)
|
|
|
|
# Result file archived
|
|
assert not (results_dir / ("%s.result" % job_id)).exists()
|
|
assert (results_dir / "processed" / ("%s.result" % job_id)).exists()
|
|
|
|
def test_failure_result(self, tmp_path):
|
|
from clickup_runner.__main__ import _check_autocora_results
|
|
|
|
cfg, client, db = self._setup(tmp_path)
|
|
|
|
results_dir = tmp_path / "results"
|
|
results_dir.mkdir()
|
|
job_id = "job-999-test"
|
|
(results_dir / ("%s.result" % job_id)).write_text(json.dumps({
|
|
"status": "FAILURE",
|
|
"keyword": "test keyword",
|
|
"task_ids": ["task_abc"],
|
|
"reason": "Cora process crashed",
|
|
}))
|
|
|
|
db.kv_get_json.return_value = {
|
|
"task_id": "task_abc",
|
|
"keyword": "test keyword",
|
|
"run_id": 10,
|
|
}
|
|
|
|
_check_autocora_results(client, cfg, db)
|
|
|
|
# Error comment posted
|
|
comment = client.add_comment.call_args[0][1]
|
|
assert "ERROR" in comment
|
|
assert "Cora process crashed" in comment
|
|
|
|
# Error checkbox set
|
|
client.set_checkbox.assert_any_call(
|
|
"task_abc", "list_1", "Error", True
|
|
)
|
|
|
|
# Run log failed
|
|
db.log_run_finish.assert_called_once()
|
|
assert db.log_run_finish.call_args[0][1] == "failed"
|
|
|
|
def test_no_results(self, tmp_path):
|
|
from clickup_runner.__main__ import _check_autocora_results
|
|
|
|
cfg, client, db = self._setup(tmp_path)
|
|
|
|
# No results dir
|
|
_check_autocora_results(client, cfg, db)
|
|
|
|
# Nothing should happen
|
|
client.add_comment.assert_not_called()
|
|
db.log_run_finish.assert_not_called()
|
|
|
|
def test_result_without_state_db_uses_file_task_ids(self, tmp_path):
|
|
from clickup_runner.__main__ import _check_autocora_results
|
|
|
|
cfg, client, db = self._setup(tmp_path)
|
|
|
|
results_dir = tmp_path / "results"
|
|
results_dir.mkdir()
|
|
(results_dir / "job-orphan.result").write_text(json.dumps({
|
|
"status": "SUCCESS",
|
|
"keyword": "orphan",
|
|
"task_ids": ["task_abc"],
|
|
}))
|
|
|
|
# No state DB entry
|
|
db.kv_get_json.return_value = None
|
|
|
|
_check_autocora_results(client, cfg, db)
|
|
|
|
# Should still process using task_ids from result file
|
|
client.update_task_status.assert_called()
|
|
client.add_comment.assert_called()
|