"""Tests for clickup_runner.autocora and AutoCora dispatch in __main__.""" from __future__ import annotations import json from pathlib import Path from unittest.mock import MagicMock, patch import pytest from clickup_runner.autocora import ( CoraResult, archive_result, make_job_id, parse_result_file, scan_results, slugify, submit_job, ) from clickup_runner.clickup_client import ClickUpTask from clickup_runner.config import AutoCoraConfig, Config, NtfyConfig, RunnerConfig from clickup_runner.skill_map import SkillRoute # ── Fixtures ── def _make_task(**overrides) -> ClickUpTask: defaults = { "id": "task_abc", "name": "SEO for CNC Machining", "status": "to do", "description": "Content creation for CNC machining page.", "task_type": "Content Creation", "url": "https://app.clickup.com/t/task_abc", "list_id": "list_1", "custom_fields": { "Client": "Acme Corp", "Keyword": "CNC Machining", "IMSURL": "https://acme.com/cnc-machining", "Delegate to Claude": True, "Stage": "run_cora", }, } defaults.update(overrides) return ClickUpTask(**defaults) def _make_config(**overrides) -> Config: cfg = Config() cfg.runner = RunnerConfig(claude_timeout_seconds=60) cfg.ntfy = NtfyConfig() for k, v in overrides.items(): setattr(cfg, k, v) return cfg # ── slugify ── class TestSlugify: def test_basic(self): assert slugify("CNC Machining") == "cnc-machining" def test_special_chars(self): assert slugify("Hello, World! & Co.") == "hello-world-co" def test_max_length(self): result = slugify("a" * 100, max_len=20) assert len(result) <= 20 def test_empty_string(self): assert slugify("") == "unknown" def test_only_special_chars(self): assert slugify("!!!@@@") == "unknown" def test_leading_trailing_hyphens(self): assert slugify("--hello--") == "hello" def test_preserves_numbers(self): assert slugify("Top 10 CNC tips") == "top-10-cnc-tips" # ── make_job_id ── class TestMakeJobId: def test_format(self): job_id = make_job_id("CNC Machining") assert job_id.startswith("job-") assert "cnc-machining" in job_id def test_uniqueness(self): # Two calls should produce different IDs (different timestamps) id1 = make_job_id("test") id2 = make_job_id("test") # Could be same in same millisecond, but format should be valid assert id1.startswith("job-") assert id2.startswith("job-") # ── submit_job ── class TestSubmitJob: def test_creates_job_file(self, tmp_path): jobs_dir = tmp_path / "jobs" job_id = submit_job("CNC Machining", "https://acme.com", "task_1", str(jobs_dir)) assert job_id is not None assert jobs_dir.exists() # Find the job file files = list(jobs_dir.glob("job-*.json")) assert len(files) == 1 data = json.loads(files[0].read_text()) assert data["keyword"] == "CNC Machining" assert data["url"] == "https://acme.com" assert data["task_ids"] == ["task_1"] def test_fallback_url(self, tmp_path): jobs_dir = tmp_path / "jobs" submit_job("test", "", "task_1", str(jobs_dir)) files = list(jobs_dir.glob("job-*.json")) data = json.loads(files[0].read_text()) assert data["url"] == "https://seotoollab.com/blank.html" def test_unreachable_dir(self): result = submit_job("test", "http://x.com", "t1", "//NONEXISTENT/share/jobs") assert result is None def test_creates_parent_dirs(self, tmp_path): jobs_dir = tmp_path / "deep" / "nested" / "jobs" job_id = submit_job("test", "http://x.com", "t1", str(jobs_dir)) assert job_id is not None assert jobs_dir.exists() # ── parse_result_file ── class TestParseResultFile: def test_json_success(self, tmp_path): f = tmp_path / "job-123-test.result" f.write_text(json.dumps({ "status": "SUCCESS", "keyword": "CNC Machining", "task_ids": ["t1", "t2"], })) result = parse_result_file(f) assert result is not None assert result.status == "SUCCESS" assert result.keyword == "CNC Machining" assert result.task_ids == ["t1", "t2"] assert result.job_id == "job-123-test" def test_json_failure(self, tmp_path): f = tmp_path / "job-456.result" f.write_text(json.dumps({ "status": "FAILURE", "keyword": "test", "task_ids": ["t1"], "reason": "Cora timed out", })) result = parse_result_file(f) assert result.status == "FAILURE" assert result.reason == "Cora timed out" def test_legacy_success(self, tmp_path): f = tmp_path / "job-789.result" f.write_text("SUCCESS") result = parse_result_file(f) assert result.status == "SUCCESS" assert result.task_ids == [] def test_legacy_failure(self, tmp_path): f = tmp_path / "job-101.result" f.write_text("FAILURE: Network timeout") result = parse_result_file(f) assert result.status == "FAILURE" assert result.reason == "Network timeout" def test_empty_file(self, tmp_path): f = tmp_path / "empty.result" f.write_text("") assert parse_result_file(f) is None def test_unrecognized_format(self, tmp_path): f = tmp_path / "weird.result" f.write_text("something random") assert parse_result_file(f) is None def test_missing_file(self, tmp_path): f = tmp_path / "missing.result" assert parse_result_file(f) is None # ── scan_results ── class TestScanResults: def test_finds_result_files(self, tmp_path): (tmp_path / "job-1.result").write_text(json.dumps({"status": "SUCCESS"})) (tmp_path / "job-2.result").write_text(json.dumps({"status": "FAILURE", "reason": "x"})) (tmp_path / "not-a-result.txt").write_text("ignore me") results = scan_results(str(tmp_path)) assert len(results) == 2 def test_empty_dir(self, tmp_path): assert scan_results(str(tmp_path)) == [] def test_nonexistent_dir(self): assert scan_results("//NONEXISTENT/path") == [] def test_skips_unparseable(self, tmp_path): (tmp_path / "good.result").write_text(json.dumps({"status": "SUCCESS"})) (tmp_path / "bad.result").write_text("") results = scan_results(str(tmp_path)) assert len(results) == 1 # ── archive_result ── class TestArchiveResult: def test_moves_to_processed(self, tmp_path): f = tmp_path / "job-1.result" f.write_text("SUCCESS") result = CoraResult( job_id="job-1", status="SUCCESS", keyword="test", task_ids=[], reason="", result_path=f, ) assert archive_result(result) is True assert not f.exists() assert (tmp_path / "processed" / "job-1.result").exists() def test_creates_processed_dir(self, tmp_path): f = tmp_path / "job-2.result" f.write_text("data") result = CoraResult( job_id="job-2", status="SUCCESS", keyword="", task_ids=[], reason="", result_path=f, ) archive_result(result) assert (tmp_path / "processed").is_dir() # ── _dispatch_autocora integration ── class TestDispatchAutocora: def _setup(self, tmp_path): cfg = _make_config() cfg.autocora = AutoCoraConfig( jobs_dir=str(tmp_path / "jobs"), results_dir=str(tmp_path / "results"), ) client = MagicMock() db = MagicMock() db.log_run_start.return_value = 1 task = _make_task() route = SkillRoute( handler="autocora", next_stage="outline", next_status="review", ) return cfg, client, db, task, route def test_success_submission(self, tmp_path): from clickup_runner.__main__ import _dispatch_autocora cfg, client, db, task, route = self._setup(tmp_path) _dispatch_autocora(client, cfg, db, task, route, run_id=1) # Job file created job_files = list((tmp_path / "jobs").glob("job-*.json")) assert len(job_files) == 1 data = json.loads(job_files[0].read_text()) assert data["keyword"] == "CNC Machining" assert data["task_ids"] == ["task_abc"] # Status set to ai working client.update_task_status.assert_called_with("task_abc", "ai working") # Comment posted client.add_comment.assert_called_once() comment = client.add_comment.call_args[0][1] assert "CNC Machining" in comment # Delegate unchecked client.set_checkbox.assert_called_with( "task_abc", "list_1", "Delegate to Claude", False ) # State DB updated db.kv_set_json.assert_called_once() kv_key = db.kv_set_json.call_args[0][0] assert kv_key.startswith("autocora:job:") # Run logged as submitted db.log_run_finish.assert_called_once() assert db.log_run_finish.call_args[0][1] == "submitted" def test_missing_keyword(self, tmp_path): from clickup_runner.__main__ import _dispatch_autocora cfg, client, db, task, route = self._setup(tmp_path) task.custom_fields["Keyword"] = None _dispatch_autocora(client, cfg, db, task, route, run_id=1) # Error comment posted comment = client.add_comment.call_args[0][1] assert "Keyword" in comment # Run logged as failed db.log_run_finish.assert_called_once() assert db.log_run_finish.call_args[0][1] == "failed" def test_unreachable_nas(self, tmp_path): from clickup_runner.__main__ import _dispatch_autocora cfg, client, db, task, route = self._setup(tmp_path) cfg.autocora.jobs_dir = "//NONEXISTENT/share/jobs" _dispatch_autocora(client, cfg, db, task, route, run_id=1) # Error comment posted about NAS comment = client.add_comment.call_args[0][1] assert "ERROR" in comment # Error checkbox set client.set_checkbox.assert_any_call( "task_abc", "list_1", "Error", True ) # ── _check_autocora_results integration ── class TestCheckAutocoraResults: def _setup(self, tmp_path): cfg = _make_config() cfg.autocora = AutoCoraConfig( jobs_dir=str(tmp_path / "jobs"), results_dir=str(tmp_path / "results"), xlsx_dir="//NAS/Cora72", ) client = MagicMock() # Mock get_task to return a task client.get_task.return_value = _make_task() # get_stage needs to return the actual stage string for route lookup client.get_stage.return_value = "run_cora" db = MagicMock() return cfg, client, db def test_success_result_with_state_db(self, tmp_path): from clickup_runner.__main__ import _check_autocora_results cfg, client, db = self._setup(tmp_path) # Write a result file results_dir = tmp_path / "results" results_dir.mkdir() job_id = "job-1234-cnc-machining" (results_dir / ("%s.result" % job_id)).write_text(json.dumps({ "status": "SUCCESS", "keyword": "CNC Machining", "task_ids": ["task_abc"], })) # Set up state DB to return job data db.kv_get_json.return_value = { "task_id": "task_abc", "task_name": "SEO for CNC", "keyword": "CNC Machining", "url": "https://acme.com", "run_id": 5, } _check_autocora_results(client, cfg, db) # Task status updated to review client.update_task_status.assert_called_with("task_abc", "review") # Stage advanced client.set_stage.assert_called_once() # Success comment posted client.add_comment.assert_called_once() comment = client.add_comment.call_args[0][1] assert "CNC Machining" in comment assert "//NAS/Cora72" in comment # Error checkbox cleared client.set_checkbox.assert_called() # Run log finished db.log_run_finish.assert_called_once_with(5, "completed", result="Cora report ready") # State DB entry deleted db.kv_delete.assert_called_once_with("autocora:job:%s" % job_id) # Result file archived assert not (results_dir / ("%s.result" % job_id)).exists() assert (results_dir / "processed" / ("%s.result" % job_id)).exists() def test_failure_result(self, tmp_path): from clickup_runner.__main__ import _check_autocora_results cfg, client, db = self._setup(tmp_path) results_dir = tmp_path / "results" results_dir.mkdir() job_id = "job-999-test" (results_dir / ("%s.result" % job_id)).write_text(json.dumps({ "status": "FAILURE", "keyword": "test keyword", "task_ids": ["task_abc"], "reason": "Cora process crashed", })) db.kv_get_json.return_value = { "task_id": "task_abc", "keyword": "test keyword", "run_id": 10, } _check_autocora_results(client, cfg, db) # Error comment posted comment = client.add_comment.call_args[0][1] assert "ERROR" in comment assert "Cora process crashed" in comment # Error checkbox set client.set_checkbox.assert_any_call( "task_abc", "list_1", "Error", True ) # Run log failed db.log_run_finish.assert_called_once() assert db.log_run_finish.call_args[0][1] == "failed" def test_no_results(self, tmp_path): from clickup_runner.__main__ import _check_autocora_results cfg, client, db = self._setup(tmp_path) # No results dir _check_autocora_results(client, cfg, db) # Nothing should happen client.add_comment.assert_not_called() db.log_run_finish.assert_not_called() def test_result_without_state_db_uses_file_task_ids(self, tmp_path): from clickup_runner.__main__ import _check_autocora_results cfg, client, db = self._setup(tmp_path) results_dir = tmp_path / "results" results_dir.mkdir() (results_dir / "job-orphan.result").write_text(json.dumps({ "status": "SUCCESS", "keyword": "orphan", "task_ids": ["task_abc"], })) # No state DB entry db.kv_get_json.return_value = None _check_autocora_results(client, cfg, db) # Should still process using task_ids from result file client.update_task_status.assert_called() client.add_comment.assert_called()