CheddahBot/tests/test_autocora.py

494 lines
17 KiB
Python

"""Tests for AutoCora job submission and result polling."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from cheddahbot.config import AutoCoraConfig, ClickUpConfig, Config
from cheddahbot.tools.autocora import (
_find_qualifying_tasks_sweep,
_group_by_keyword,
_make_job_id,
_parse_result,
_slugify,
poll_autocora_results,
submit_autocora_jobs,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@dataclass
class FakeTask:
"""Minimal stand-in for ClickUpTask."""
id: str
name: str
status: str = "to do"
task_type: str = "Content Creation"
due_date: str = ""
custom_fields: dict[str, Any] = field(default_factory=dict)
tags: list[str] = field(default_factory=list)
@pytest.fixture()
def autocora_config():
return AutoCoraConfig(enabled=True)
@pytest.fixture()
def cfg(tmp_path, autocora_config):
"""Config with AutoCora pointing at tmp dirs."""
jobs_dir = tmp_path / "jobs"
results_dir = tmp_path / "results"
jobs_dir.mkdir()
results_dir.mkdir()
autocora_config.jobs_dir = str(jobs_dir)
autocora_config.results_dir = str(results_dir)
return Config(
autocora=autocora_config,
clickup=ClickUpConfig(
api_token="test-token",
workspace_id="ws1",
space_id="sp1",
task_type_field_name="Work Category",
),
)
@pytest.fixture()
def ctx(cfg, tmp_db):
return {"config": cfg, "db": tmp_db}
# ---------------------------------------------------------------------------
# Helper tests
# ---------------------------------------------------------------------------
class TestSlugify:
def test_basic(self):
assert _slugify("Hello World") == "hello-world"
def test_special_chars(self):
assert _slugify("CNC machining & milling!") == "cnc-machining-milling"
def test_multiple_spaces(self):
assert _slugify(" too many spaces ") == "too-many-spaces"
def test_truncation(self):
result = _slugify("a" * 200)
assert len(result) <= 80
class TestMakeJobId:
def test_format(self):
jid = _make_job_id("precision machining")
assert jid.startswith("job-")
assert "precision-machining" in jid
def test_uniqueness(self):
a = _make_job_id("test")
b = _make_job_id("test")
# Millisecond timestamp — may be equal if very fast, but format is correct
assert a.startswith("job-")
assert b.startswith("job-")
class TestParseResult:
def test_json_success(self):
raw = json.dumps({"status": "SUCCESS", "task_ids": ["abc"]})
result = _parse_result(raw)
assert result["status"] == "SUCCESS"
assert result["task_ids"] == ["abc"]
def test_json_failure(self):
raw = json.dumps({"status": "FAILURE", "reason": "Cora not running", "task_ids": ["x"]})
result = _parse_result(raw)
assert result["status"] == "FAILURE"
assert result["reason"] == "Cora not running"
def test_legacy_success(self):
result = _parse_result("SUCCESS")
assert result["status"] == "SUCCESS"
def test_legacy_failure(self):
result = _parse_result("FAILURE: timeout exceeded")
assert result["status"] == "FAILURE"
assert result["reason"] == "timeout exceeded"
def test_unknown(self):
result = _parse_result("some garbage")
assert result["status"] == "UNKNOWN"
class TestGroupByKeyword:
def test_basic_grouping(self):
tasks = [
FakeTask(id="t1", name="Task 1", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
FakeTask(id="t2", name="Task 2", custom_fields={"Keyword": "cnc", "IMSURL": "http://a.com"}),
]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 1
assert "cnc" in groups
assert set(groups["cnc"]["task_ids"]) == {"t1", "t2"}
assert alerts == []
def test_missing_keyword(self):
tasks = [FakeTask(id="t1", name="No KW", custom_fields={"IMSURL": "http://a.com"})]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 0
assert any("missing Keyword" in a for a in alerts)
def test_missing_imsurl_uses_fallback(self):
"""Missing IMSURL gets a fallback blank URL."""
tasks = [FakeTask(id="t1", name="No URL", custom_fields={"Keyword": "test"})]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 1
assert groups["test"]["url"] == "https://seotoollab.com/blank.html"
def test_sibling_tasks(self):
"""Tasks sharing a keyword from all_tasks should be included."""
due_tasks = [
FakeTask(id="t1", name="Due Task", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
]
all_tasks = [
FakeTask(id="t1", name="Due Task", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
FakeTask(id="t2", name="Sibling", custom_fields={"Keyword": "cnc", "IMSURL": "http://a.com"}),
FakeTask(id="t3", name="Other KW", custom_fields={"Keyword": "welding", "IMSURL": "http://b.com"}),
]
groups, alerts = _group_by_keyword(due_tasks, all_tasks)
assert set(groups["cnc"]["task_ids"]) == {"t1", "t2"}
assert "welding" not in groups
# ---------------------------------------------------------------------------
# Submit tool tests
# ---------------------------------------------------------------------------
class TestSubmitAutocoraJobs:
def test_disabled(self, ctx):
ctx["config"].autocora.enabled = False
result = submit_autocora_jobs(ctx=ctx)
assert "disabled" in result.lower()
def test_no_context(self):
result = submit_autocora_jobs()
assert "Error" in result
def test_no_qualifying_tasks(self, ctx, monkeypatch):
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: []
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "No qualifying tasks" in result
def test_submit_writes_job_file(self, ctx, monkeypatch, tmp_path):
"""Valid tasks produce a job JSON file on disk."""
task = FakeTask(
id="t1",
name="CNC Page",
due_date="1700000000000",
custom_fields={"Keyword": "CNC Machining", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Submitted 1 job" in result
# Check job file exists
jobs_dir = Path(ctx["config"].autocora.jobs_dir)
job_files = list(jobs_dir.glob("job-*.json"))
assert len(job_files) == 1
# Verify contents
job_data = json.loads(job_files[0].read_text())
assert job_data["keyword"] == "CNC Machining"
assert job_data["url"] == "http://example.com"
assert job_data["task_ids"] == ["t1"]
def test_submit_writes_job_with_task_ids(self, ctx, monkeypatch):
"""Job file contains task_ids for the result poller."""
task = FakeTask(
id="t1",
name="Test",
due_date="1700000000000",
custom_fields={"Keyword": "test keyword", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
jobs_dir = Path(ctx["config"].autocora.jobs_dir)
job_files = list(jobs_dir.glob("job-*.json"))
assert len(job_files) == 1
data = json.loads(job_files[0].read_text())
assert "t1" in data["task_ids"]
def test_duplicate_prevention(self, ctx, monkeypatch):
"""Already-submitted keywords are skipped (job file exists)."""
task = FakeTask(
id="t1",
name="Test",
due_date="1700000000000",
custom_fields={"Keyword": "test", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
# First submit
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
# Second submit — should skip (job file already exists)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Skipped 1" in result
def test_missing_keyword_alert(self, ctx, monkeypatch):
"""Tasks without Keyword field produce alerts."""
task = FakeTask(
id="t1",
name="No KW Task",
due_date="1700000000000",
custom_fields={"IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "missing Keyword" in result
def test_missing_imsurl_uses_fallback(self, ctx, monkeypatch):
"""Tasks without IMSURL use fallback URL and still submit."""
task = FakeTask(
id="t1",
name="No URL Task",
due_date="1700000000000",
custom_fields={"Keyword": "test"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Submitted 1 job" in result
# ---------------------------------------------------------------------------
# Poll tool tests
# ---------------------------------------------------------------------------
class TestPollAutocoraResults:
def test_disabled(self, ctx):
ctx["config"].autocora.enabled = False
result = poll_autocora_results(ctx=ctx)
assert "disabled" in result.lower()
def test_no_result_files(self, ctx):
result = poll_autocora_results(ctx=ctx)
assert "No result files" in result
def test_success_json(self, ctx, monkeypatch):
"""JSON SUCCESS result updates ClickUp and moves result file."""
results_dir = Path(ctx["config"].autocora.results_dir)
# Write result file directly (no KV needed)
result_data = {"status": "SUCCESS", "task_ids": ["t1", "t2"], "keyword": "test keyword"}
(results_dir / "job-123-test.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS: test keyword" in result
# Verify ClickUp calls
assert mock_client.update_task_status.call_count == 2
mock_client.update_task_status.assert_any_call("t1", "running cora")
mock_client.update_task_status.assert_any_call("t2", "running cora")
assert mock_client.add_comment.call_count == 2
# Verify result file moved to processed/
assert not (results_dir / "job-123-test.result").exists()
assert (results_dir / "processed" / "job-123-test.result").exists()
def test_failure_json(self, ctx, monkeypatch):
"""JSON FAILURE result updates ClickUp with error."""
results_dir = Path(ctx["config"].autocora.results_dir)
result_data = {
"status": "FAILURE",
"reason": "Cora not running",
"task_ids": ["t3"],
"keyword": "fail keyword",
}
(results_dir / "job-456-fail.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "FAILURE: fail keyword" in result
assert "Cora not running" in result
mock_client.update_task_status.assert_called_once_with("t3", "error")
def test_legacy_plain_text(self, ctx, monkeypatch):
"""Legacy plain-text SUCCESS result still works (keyword from filename)."""
results_dir = Path(ctx["config"].autocora.results_dir)
# Legacy format — plain text, no JSON
(results_dir / "job-789-legacy-kw.result").write_text("SUCCESS")
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS:" in result
# No task_ids in legacy format, so no ClickUp calls
mock_client.update_task_status.assert_not_called()
def test_task_ids_from_result_file(self, ctx, monkeypatch):
"""task_ids from result file drive ClickUp updates."""
results_dir = Path(ctx["config"].autocora.results_dir)
result_data = {"status": "SUCCESS", "task_ids": ["new_t1", "new_t2"], "keyword": "pref kw"}
(results_dir / "job-100-pref.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
poll_autocora_results(ctx=ctx)
calls = [c.args for c in mock_client.update_task_status.call_args_list]
assert ("new_t1", "running cora") in calls
assert ("new_t2", "running cora") in calls
# ---------------------------------------------------------------------------
# Sweep tests
# ---------------------------------------------------------------------------
class TestFindQualifyingTasksSweep:
"""Test the multi-pass sweep logic."""
def _make_client(self, tasks):
client = MagicMock()
client.get_tasks_from_space.return_value = tasks
return client
def _make_config(self):
config = MagicMock()
config.clickup.space_id = "sp1"
return config
def test_finds_tasks_due_today(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
today_ms = int(now.replace(hour=12).timestamp() * 1000)
task = FakeTask(id="t1", name="Today", due_date=str(today_ms))
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t1" for t in result)
def test_finds_overdue_with_month_tag(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
month_tag = now.strftime("%b%y").lower()
# Due 3 days ago
overdue_ms = int((now.timestamp() - 3 * 86400) * 1000)
task = FakeTask(
id="t2", name="Overdue", due_date=str(overdue_ms), tags=[month_tag]
)
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t2" for t in result)
def test_finds_last_month_tagged(self):
from datetime import UTC, datetime, timedelta
now = datetime.now(UTC)
last = now.replace(day=1) - timedelta(days=1)
last_tag = last.strftime("%b%y").lower()
# No due date needed for month-tag pass
task = FakeTask(id="t3", name="Last Month", tags=[last_tag])
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t3" for t in result)
def test_finds_lookahead(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
tomorrow_ms = int((now.timestamp() + 36 * 3600) * 1000)
task = FakeTask(id="t4", name="Tomorrow", due_date=str(tomorrow_ms))
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t4" for t in result)
def test_deduplicates_across_passes(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
month_tag = now.strftime("%b%y").lower()
today_ms = int(now.replace(hour=12).timestamp() * 1000)
# Task is due today AND has month tag — should only appear once
task = FakeTask(
id="t5", name="Multi", due_date=str(today_ms), tags=[month_tag]
)
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
ids = [t.id for t in result]
assert ids.count("t5") == 1
def test_empty_space_id(self):
config = self._make_config()
config.clickup.space_id = ""
client = self._make_client([])
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert result == []