Add AutoCora job submission and result polling automation

Automates Cora SEO report workflow: queries ClickUp for qualifying tasks,
submits jobs to a shared folder queue, polls for results, and updates task
statuses. Includes two tools (submit_autocora_jobs, poll_autocora_results),
a scheduler polling loop, and 30 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-25 07:52:53 -06:00
parent 0e3e3bc945
commit bc64fae6f1
5 changed files with 1038 additions and 1 deletions

View File

@ -76,6 +76,19 @@ class LinkBuildingConfig:
default_branded_plus_ratio: float = 0.7 default_branded_plus_ratio: float = 0.7
@dataclass
class AutoCoraConfig:
jobs_dir: str = "//PennQnap1/SHARE1/AutoCora/jobs"
results_dir: str = "//PennQnap1/SHARE1/AutoCora/results"
poll_interval_minutes: int = 5
success_status: str = "running cora"
error_status: str = "error"
enabled: bool = False
cora_categories: list[str] = field(
default_factory=lambda: ["Content Creation", "On Page Optimization", "Link Building"]
)
@dataclass @dataclass
class ApiBudgetConfig: class ApiBudgetConfig:
monthly_limit: float = 20.00 # USD - alert when exceeded monthly_limit: float = 20.00 # USD - alert when exceeded
@ -111,6 +124,7 @@ class Config:
press_advantage: PressAdvantageConfig = field(default_factory=PressAdvantageConfig) press_advantage: PressAdvantageConfig = field(default_factory=PressAdvantageConfig)
email: EmailConfig = field(default_factory=EmailConfig) email: EmailConfig = field(default_factory=EmailConfig)
link_building: LinkBuildingConfig = field(default_factory=LinkBuildingConfig) link_building: LinkBuildingConfig = field(default_factory=LinkBuildingConfig)
autocora: AutoCoraConfig = field(default_factory=AutoCoraConfig)
api_budget: ApiBudgetConfig = field(default_factory=ApiBudgetConfig) api_budget: ApiBudgetConfig = field(default_factory=ApiBudgetConfig)
agents: list[AgentConfig] = field(default_factory=lambda: [AgentConfig()]) agents: list[AgentConfig] = field(default_factory=lambda: [AgentConfig()])
@ -163,6 +177,10 @@ def load_config() -> Config:
for k, v in data["link_building"].items(): for k, v in data["link_building"].items():
if hasattr(cfg.link_building, k): if hasattr(cfg.link_building, k):
setattr(cfg.link_building, k, v) setattr(cfg.link_building, k, v)
if "autocora" in data and isinstance(data["autocora"], dict):
for k, v in data["autocora"].items():
if hasattr(cfg.autocora, k):
setattr(cfg.autocora, k, v)
if "api_budget" in data and isinstance(data["api_budget"], dict): if "api_budget" in data and isinstance(data["api_budget"], dict):
for k, v in data["api_budget"].items(): for k, v in data["api_budget"].items():
if hasattr(cfg.api_budget, k): if hasattr(cfg.api_budget, k):

View File

@ -55,6 +55,8 @@ class Scheduler:
self._heartbeat_thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None
self._clickup_thread: threading.Thread | None = None self._clickup_thread: threading.Thread | None = None
self._folder_watch_thread: threading.Thread | None = None self._folder_watch_thread: threading.Thread | None = None
self._autocora_thread: threading.Thread | None = None
self._force_autocora = threading.Event()
self._clickup_client = None self._clickup_client = None
self._field_filter_cache: dict | None = None self._field_filter_cache: dict | None = None
@ -95,6 +97,19 @@ class Scheduler:
else: else:
log.info("Folder watcher disabled (no watch_folder configured)") log.info("Folder watcher disabled (no watch_folder configured)")
# Start AutoCora result polling if configured
if self.config.autocora.enabled:
self._autocora_thread = threading.Thread(
target=self._autocora_loop, daemon=True, name="autocora"
)
self._autocora_thread.start()
log.info(
"AutoCora polling started (interval=%dm)",
self.config.autocora.poll_interval_minutes,
)
else:
log.info("AutoCora polling disabled")
log.info( log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)", "Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds, self.config.scheduler.poll_interval_seconds,
@ -133,6 +148,10 @@ class Scheduler:
"""Wake the scheduler poll loop immediately.""" """Wake the scheduler poll loop immediately."""
self._force_poll.set() self._force_poll.set()
def force_autocora(self):
"""Wake the AutoCora poll loop immediately."""
self._force_autocora.set()
def get_loop_timestamps(self) -> dict[str, str | None]: def get_loop_timestamps(self) -> dict[str, str | None]:
"""Return last_run timestamps for all loops.""" """Return last_run timestamps for all loops."""
return { return {
@ -140,6 +159,7 @@ class Scheduler:
"poll": self.db.kv_get("system:loop:poll:last_run"), "poll": self.db.kv_get("system:loop:poll:last_run"),
"clickup": self.db.kv_get("system:loop:clickup:last_run"), "clickup": self.db.kv_get("system:loop:clickup:last_run"),
"folder_watch": self.db.kv_get("system:loop:folder_watch:last_run"), "folder_watch": self.db.kv_get("system:loop:folder_watch:last_run"),
"autocora": self.db.kv_get("system:loop:autocora:last_run"),
} }
# ── Scheduled Tasks ── # ── Scheduled Tasks ──
@ -498,6 +518,108 @@ class Scheduler:
return args return args
# ── AutoCora Result Polling ──
def _autocora_loop(self):
"""Poll for AutoCora results on a regular interval."""
interval = self.config.autocora.poll_interval_minutes * 60
# Wait before first poll
self._stop_event.wait(30)
while not self._stop_event.is_set():
try:
self._poll_autocora_results()
self.db.kv_set(
"system:loop:autocora:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("AutoCora poll error: %s", e)
self._interruptible_wait(interval, self._force_autocora)
def _poll_autocora_results(self):
"""Check for completed AutoCora results and update ClickUp tasks."""
from .tools.autocora import _parse_result
autocora = self.config.autocora
results_dir = Path(autocora.results_dir)
# Find submitted jobs in KV
kv_entries = self.db.kv_scan("autocora:job:")
submitted = []
for key, value in kv_entries:
try:
state = json.loads(value)
if state.get("status") == "submitted":
submitted.append((key, state))
except json.JSONDecodeError:
continue
if not submitted:
return
if not results_dir.exists():
log.debug("AutoCora results dir does not exist: %s", results_dir)
return
client = self._get_clickup_client() if self.config.clickup.api_token else None
for kv_key, state in submitted:
job_id = state.get("job_id", "")
if not job_id:
continue
result_path = results_dir / f"{job_id}.result"
if not result_path.exists():
continue
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
task_ids = result_data.get("task_ids") or state.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = state.get("keyword", "")
if status == "SUCCESS":
state["status"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
client.add_comment(
tid, f"Cora report completed for keyword: {keyword}"
)
self._notify(
f"AutoCora SUCCESS: **{keyword}** — "
f"{len(task_ids)} task(s) moved to '{autocora.success_status}'",
category="autocora",
)
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
state["status"] = "failed"
state["error"] = reason
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
client.add_comment(
tid,
f"Cora report failed for keyword: {keyword}\nReason: {reason}",
)
self._notify(
f"AutoCora FAILURE: **{keyword}** — {reason}",
category="autocora",
)
log.info("AutoCora result for '%s': %s", keyword, status)
# ── Folder Watcher ── # ── Folder Watcher ──
def _folder_watch_loop(self): def _folder_watch_loop(self):

View File

@ -0,0 +1,409 @@
"""AutoCora job submission and result polling tools.
Submits Cora SEO report jobs to a shared folder queue and polls for results.
Jobs are JSON files written to a network share; a worker on another machine
picks them up, runs Cora, and writes result files back.
"""
from __future__ import annotations
import json
import logging
import re
import time
from datetime import UTC, datetime
from pathlib import Path
from . import tool
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _slugify(text: str) -> str:
"""Convert text to a filesystem-safe slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
return re.sub(r"-+", "-", text).strip("-")[:80]
def _make_job_id(keyword: str) -> str:
"""Create a unique job ID from keyword + timestamp."""
ts = str(int(time.time() * 1000))
slug = _slugify(keyword)
return f"job-{ts}-{slug}"
def _get_clickup_client(ctx: dict):
"""Build a ClickUp client from context config."""
from ..clickup import ClickUpClient
config = ctx["config"]
return ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
def _find_qualifying_tasks(client, config, target_date: str, categories: list[str]):
"""Find 'to do' tasks in cora_categories due on target_date.
Returns list of ClickUpTask objects.
"""
space_id = config.clickup.space_id
if not space_id:
return []
# Parse target date to filter by due_date range (full day)
try:
dt = datetime.strptime(target_date, "%Y-%m-%d").replace(tzinfo=UTC)
except ValueError:
log.warning("Invalid target_date format: %s", target_date)
return []
day_start_ms = int(dt.timestamp() * 1000)
day_end_ms = day_start_ms + 24 * 60 * 60 * 1000
tasks = client.get_tasks_from_space(
space_id,
statuses=["to do"],
due_date_lt=day_end_ms,
)
qualifying = []
for task in tasks:
# Must be in one of the cora categories
if task.task_type not in categories:
continue
# Must have a due_date within the target day
if not task.due_date:
continue
try:
task_due_ms = int(task.due_date)
except (ValueError, TypeError):
continue
if task_due_ms < day_start_ms or task_due_ms >= day_end_ms:
continue
qualifying.append(task)
return qualifying
def _find_all_todo_tasks(client, config, categories: list[str]):
"""Find ALL 'to do' tasks in cora_categories (no date filter).
Used to find sibling tasks sharing the same keyword.
"""
space_id = config.clickup.space_id
if not space_id:
return []
tasks = client.get_tasks_from_space(space_id, statuses=["to do"])
return [t for t in tasks if t.task_type in categories]
def _group_by_keyword(tasks, all_tasks):
"""Group tasks by normalized keyword, pulling in sibling tasks from all_tasks.
Returns dict: {keyword_lower: {"keyword": str, "url": str, "task_ids": [str]}}
Alerts list for tasks missing Keyword or IMSURL.
"""
alerts = []
groups: dict[str, dict] = {}
# Index all tasks by keyword for sibling lookup
all_by_keyword: dict[str, list] = {}
for t in all_tasks:
kw = t.custom_fields.get("Keyword", "") or ""
kw = str(kw).strip()
if kw:
all_by_keyword.setdefault(kw.lower(), []).append(t)
for task in tasks:
keyword = task.custom_fields.get("Keyword", "") or ""
keyword = str(keyword).strip()
if not keyword:
alerts.append(f"Task '{task.name}' (id={task.id}) missing Keyword field")
continue
url = task.custom_fields.get("IMSURL", "") or ""
url = str(url).strip()
if not url:
alerts.append(f"Task '{task.name}' (id={task.id}) missing IMSURL field")
continue
kw_lower = keyword.lower()
if kw_lower not in groups:
# Collect ALL task IDs sharing this keyword
sibling_ids = set()
for sibling in all_by_keyword.get(kw_lower, []):
sibling_ids.add(sibling.id)
sibling_ids.add(task.id)
groups[kw_lower] = {
"keyword": keyword,
"url": url,
"task_ids": sorted(sibling_ids),
}
else:
# Add this task's ID if not already there
if task.id not in groups[kw_lower]["task_ids"]:
groups[kw_lower]["task_ids"].append(task.id)
groups[kw_lower]["task_ids"].sort()
return groups, alerts
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@tool(
"submit_autocora_jobs",
"Submit Cora SEO report jobs for ClickUp tasks due on a given date. "
"Writes job JSON files to the AutoCora shared folder queue.",
category="autocora",
)
def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
"""Submit AutoCora jobs for qualifying ClickUp tasks.
Args:
target_date: Date to check (YYYY-MM-DD). Defaults to today.
ctx: Injected context with config, db, etc.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
if not target_date:
target_date = datetime.now(UTC).strftime("%Y-%m-%d")
if not config.clickup.api_token:
return "Error: ClickUp API token not configured"
client = _get_clickup_client(ctx)
# Find qualifying tasks (due on target_date, in cora_categories, status "to do")
qualifying = _find_qualifying_tasks(client, config, target_date, autocora.cora_categories)
if not qualifying:
return f"No qualifying tasks found for {target_date}."
# Find ALL to-do tasks in cora categories for sibling keyword matching
all_todo = _find_all_todo_tasks(client, config, autocora.cora_categories)
# Group by keyword
groups, alerts = _group_by_keyword(qualifying, all_todo)
if not groups and alerts:
return "No jobs submitted.\n\n" + "\n".join(f"- {a}" for a in alerts)
# Ensure jobs directory exists
jobs_dir = Path(autocora.jobs_dir)
jobs_dir.mkdir(parents=True, exist_ok=True)
submitted = []
skipped = []
for kw_lower, group in groups.items():
# Check KV for existing submission
kv_key = f"autocora:job:{kw_lower}"
existing = db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") == "submitted":
skipped.append(group["keyword"])
continue
except json.JSONDecodeError:
pass
# Write job file
job_id = _make_job_id(group["keyword"])
job_data = {
"keyword": group["keyword"],
"url": group["url"],
"task_ids": group["task_ids"],
}
job_path = jobs_dir / f"{job_id}.json"
job_path.write_text(json.dumps(job_data, indent=2), encoding="utf-8")
# Track in KV
kv_state = {
"status": "submitted",
"job_id": job_id,
"keyword": group["keyword"],
"url": group["url"],
"task_ids": group["task_ids"],
"submitted_at": datetime.now(UTC).isoformat(),
}
db.kv_set(kv_key, json.dumps(kv_state))
submitted.append(group["keyword"])
log.info("Submitted AutoCora job: %s%s", group["keyword"], job_id)
# Build response
lines = [f"AutoCora submission for {target_date}:"]
if submitted:
lines.append(f"\nSubmitted {len(submitted)} job(s):")
for kw in submitted:
lines.append(f" - {kw}")
if skipped:
lines.append(f"\nSkipped {len(skipped)} (already submitted):")
for kw in skipped:
lines.append(f" - {kw}")
if alerts:
lines.append(f"\nAlerts ({len(alerts)}):")
for a in alerts:
lines.append(f" - {a}")
return "\n".join(lines)
@tool(
"poll_autocora_results",
"Poll the AutoCora results folder for completed Cora SEO report jobs. "
"Updates ClickUp task statuses based on results.",
category="autocora",
)
def poll_autocora_results(ctx: dict | None = None) -> str:
"""Poll for AutoCora results and update ClickUp tasks.
Args:
ctx: Injected context with config, db, etc.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
# Find all submitted jobs in KV
kv_entries = db.kv_scan("autocora:job:")
submitted = []
for key, value in kv_entries:
try:
state = json.loads(value)
if state.get("status") == "submitted":
submitted.append((key, state))
except json.JSONDecodeError:
continue
if not submitted:
return "No pending AutoCora jobs to check."
results_dir = Path(autocora.results_dir)
if not results_dir.exists():
return f"Results directory does not exist: {results_dir}"
client = None
if config.clickup.api_token:
client = _get_clickup_client(ctx)
processed = []
still_pending = []
for kv_key, state in submitted:
job_id = state.get("job_id", "")
if not job_id:
continue
result_path = results_dir / f"{job_id}.result"
if not result_path.exists():
still_pending.append(state.get("keyword", job_id))
continue
# Read and parse result
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
# Get task_ids: prefer result file, fall back to KV
task_ids = result_data.get("task_ids") or state.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = state.get("keyword", "")
if status == "SUCCESS":
# Update KV
state["status"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
client.add_comment(tid, f"Cora report completed for keyword: {keyword}")
processed.append(f"SUCCESS: {keyword}")
log.info("AutoCora SUCCESS: %s", keyword)
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
state["status"] = "failed"
state["error"] = reason
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
client.add_comment(
tid, f"Cora report failed for keyword: {keyword}\nReason: {reason}"
)
processed.append(f"FAILURE: {keyword} ({reason})")
log.info("AutoCora FAILURE: %s%s", keyword, reason)
else:
processed.append(f"UNKNOWN: {keyword} (status={status})")
# Build response
lines = ["AutoCora poll results:"]
if processed:
lines.append(f"\nProcessed {len(processed)} result(s):")
for p in processed:
lines.append(f" - {p}")
if still_pending:
lines.append(f"\nStill pending ({len(still_pending)}):")
for kw in still_pending:
lines.append(f" - {kw}")
return "\n".join(lines)
def _parse_result(raw: str) -> dict:
"""Parse a result file — JSON format or legacy plain text."""
# Try JSON first
try:
data = json.loads(raw)
if isinstance(data, dict):
return data
except json.JSONDecodeError:
pass
# Legacy plain text: "SUCCESS" or "FAILURE: reason"
if raw.startswith("SUCCESS"):
return {"status": "SUCCESS"}
if raw.startswith("FAILURE"):
reason = raw.split(":", 1)[1].strip() if ":" in raw else "unknown"
return {"status": "FAILURE", "reason": reason}
return {"status": "UNKNOWN", "raw": raw}

View File

@ -79,6 +79,15 @@ link_building:
watch_interval_minutes: 60 watch_interval_minutes: 60
default_branded_plus_ratio: 0.7 default_branded_plus_ratio: 0.7
# AutoCora job submission
autocora:
jobs_dir: "//PennQnap1/SHARE1/AutoCora/jobs"
results_dir: "//PennQnap1/SHARE1/AutoCora/results"
poll_interval_minutes: 5
success_status: "running cora"
error_status: "error"
enabled: true
# Multi-agent configuration # Multi-agent configuration
# Each agent gets its own personality, tool whitelist, and memory scope. # Each agent gets its own personality, tool whitelist, and memory scope.
# The first agent is the default. Omit this section for single-agent mode. # The first agent is the default. Omit this section for single-agent mode.
@ -110,7 +119,7 @@ agents:
- name: link_builder - name: link_builder
display_name: Link Builder display_name: Link Builder
tools: [run_link_building, run_cora_backlinks, blm_ingest_cora, blm_generate_batch, scan_cora_folder, delegate_task, remember, search_memory] tools: [run_link_building, run_cora_backlinks, blm_ingest_cora, blm_generate_batch, scan_cora_folder, submit_autocora_jobs, poll_autocora_results, delegate_task, remember, search_memory]
memory_scope: "" memory_scope: ""
- name: planner - name: planner

View File

@ -0,0 +1,479 @@
"""Tests for AutoCora job submission and result polling."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from cheddahbot.config import AutoCoraConfig, ClickUpConfig, Config
from cheddahbot.tools.autocora import (
_group_by_keyword,
_make_job_id,
_parse_result,
_slugify,
poll_autocora_results,
submit_autocora_jobs,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@dataclass
class FakeTask:
"""Minimal stand-in for ClickUpTask."""
id: str
name: str
status: str = "to do"
task_type: str = "Content Creation"
due_date: str = ""
custom_fields: dict[str, Any] = field(default_factory=dict)
@pytest.fixture()
def autocora_config():
return AutoCoraConfig(enabled=True)
@pytest.fixture()
def cfg(tmp_path, autocora_config):
"""Config with AutoCora pointing at tmp dirs."""
jobs_dir = tmp_path / "jobs"
results_dir = tmp_path / "results"
jobs_dir.mkdir()
results_dir.mkdir()
autocora_config.jobs_dir = str(jobs_dir)
autocora_config.results_dir = str(results_dir)
return Config(
autocora=autocora_config,
clickup=ClickUpConfig(
api_token="test-token",
workspace_id="ws1",
space_id="sp1",
task_type_field_name="Work Category",
),
)
@pytest.fixture()
def ctx(cfg, tmp_db):
return {"config": cfg, "db": tmp_db}
# ---------------------------------------------------------------------------
# Helper tests
# ---------------------------------------------------------------------------
class TestSlugify:
def test_basic(self):
assert _slugify("Hello World") == "hello-world"
def test_special_chars(self):
assert _slugify("CNC machining & milling!") == "cnc-machining-milling"
def test_multiple_spaces(self):
assert _slugify(" too many spaces ") == "too-many-spaces"
def test_truncation(self):
result = _slugify("a" * 200)
assert len(result) <= 80
class TestMakeJobId:
def test_format(self):
jid = _make_job_id("precision machining")
assert jid.startswith("job-")
assert "precision-machining" in jid
def test_uniqueness(self):
a = _make_job_id("test")
b = _make_job_id("test")
# Millisecond timestamp — may be equal if very fast, but format is correct
assert a.startswith("job-")
assert b.startswith("job-")
class TestParseResult:
def test_json_success(self):
raw = json.dumps({"status": "SUCCESS", "task_ids": ["abc"]})
result = _parse_result(raw)
assert result["status"] == "SUCCESS"
assert result["task_ids"] == ["abc"]
def test_json_failure(self):
raw = json.dumps({"status": "FAILURE", "reason": "Cora not running", "task_ids": ["x"]})
result = _parse_result(raw)
assert result["status"] == "FAILURE"
assert result["reason"] == "Cora not running"
def test_legacy_success(self):
result = _parse_result("SUCCESS")
assert result["status"] == "SUCCESS"
def test_legacy_failure(self):
result = _parse_result("FAILURE: timeout exceeded")
assert result["status"] == "FAILURE"
assert result["reason"] == "timeout exceeded"
def test_unknown(self):
result = _parse_result("some garbage")
assert result["status"] == "UNKNOWN"
class TestGroupByKeyword:
def test_basic_grouping(self):
tasks = [
FakeTask(id="t1", name="Task 1", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
FakeTask(id="t2", name="Task 2", custom_fields={"Keyword": "cnc", "IMSURL": "http://a.com"}),
]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 1
assert "cnc" in groups
assert set(groups["cnc"]["task_ids"]) == {"t1", "t2"}
assert alerts == []
def test_missing_keyword(self):
tasks = [FakeTask(id="t1", name="No KW", custom_fields={"IMSURL": "http://a.com"})]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 0
assert any("missing Keyword" in a for a in alerts)
def test_missing_imsurl(self):
tasks = [FakeTask(id="t1", name="No URL", custom_fields={"Keyword": "test"})]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 0
assert any("missing IMSURL" in a for a in alerts)
def test_sibling_tasks(self):
"""Tasks sharing a keyword from all_tasks should be included."""
due_tasks = [
FakeTask(id="t1", name="Due Task", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
]
all_tasks = [
FakeTask(id="t1", name="Due Task", custom_fields={"Keyword": "CNC", "IMSURL": "http://a.com"}),
FakeTask(id="t2", name="Sibling", custom_fields={"Keyword": "cnc", "IMSURL": "http://a.com"}),
FakeTask(id="t3", name="Other KW", custom_fields={"Keyword": "welding", "IMSURL": "http://b.com"}),
]
groups, alerts = _group_by_keyword(due_tasks, all_tasks)
assert set(groups["cnc"]["task_ids"]) == {"t1", "t2"}
assert "welding" not in groups
# ---------------------------------------------------------------------------
# Submit tool tests
# ---------------------------------------------------------------------------
class TestSubmitAutocoraJobs:
def test_disabled(self, ctx):
ctx["config"].autocora.enabled = False
result = submit_autocora_jobs(ctx=ctx)
assert "disabled" in result.lower()
def test_no_context(self):
result = submit_autocora_jobs()
assert "Error" in result
def test_no_qualifying_tasks(self, ctx, monkeypatch):
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: []
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "No qualifying tasks" in result
def test_submit_writes_job_file(self, ctx, monkeypatch, tmp_path):
"""Valid tasks produce a job JSON file on disk."""
task = FakeTask(
id="t1",
name="CNC Page",
due_date="1700000000000",
custom_fields={"Keyword": "CNC Machining", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_all_todo_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Submitted 1 job" in result
# Check job file exists
jobs_dir = Path(ctx["config"].autocora.jobs_dir)
job_files = list(jobs_dir.glob("job-*.json"))
assert len(job_files) == 1
# Verify contents
job_data = json.loads(job_files[0].read_text())
assert job_data["keyword"] == "CNC Machining"
assert job_data["url"] == "http://example.com"
assert job_data["task_ids"] == ["t1"]
def test_submit_tracks_kv(self, ctx, monkeypatch):
"""KV store tracks submitted jobs."""
task = FakeTask(
id="t1",
name="Test",
due_date="1700000000000",
custom_fields={"Keyword": "test keyword", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_all_todo_tasks", lambda *a, **kw: [task]
)
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
raw = ctx["db"].kv_get("autocora:job:test keyword")
assert raw is not None
state = json.loads(raw)
assert state["status"] == "submitted"
assert "t1" in state["task_ids"]
def test_duplicate_prevention(self, ctx, monkeypatch):
"""Already-submitted keywords are skipped."""
task = FakeTask(
id="t1",
name="Test",
due_date="1700000000000",
custom_fields={"Keyword": "test", "IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_all_todo_tasks", lambda *a, **kw: [task]
)
# First submit
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
# Second submit — should skip
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Skipped 1" in result
def test_missing_keyword_alert(self, ctx, monkeypatch):
"""Tasks without Keyword field produce alerts."""
task = FakeTask(
id="t1",
name="No KW Task",
due_date="1700000000000",
custom_fields={"IMSURL": "http://example.com"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_all_todo_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "missing Keyword" in result
def test_missing_imsurl_alert(self, ctx, monkeypatch):
"""Tasks without IMSURL field produce alerts."""
task = FakeTask(
id="t1",
name="No URL Task",
due_date="1700000000000",
custom_fields={"Keyword": "test"},
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_qualifying_tasks", lambda *a, **kw: [task]
)
monkeypatch.setattr(
"cheddahbot.tools.autocora._find_all_todo_tasks", lambda *a, **kw: [task]
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "missing IMSURL" in result
# ---------------------------------------------------------------------------
# Poll tool tests
# ---------------------------------------------------------------------------
class TestPollAutocoraResults:
def test_disabled(self, ctx):
ctx["config"].autocora.enabled = False
result = poll_autocora_results(ctx=ctx)
assert "disabled" in result.lower()
def test_no_pending(self, ctx):
result = poll_autocora_results(ctx=ctx)
assert "No pending" in result
def test_success_json(self, ctx, monkeypatch):
"""JSON SUCCESS result updates KV and ClickUp."""
db = ctx["db"]
results_dir = Path(ctx["config"].autocora.results_dir)
# Set up submitted job in KV
job_id = "job-123-test"
kv_key = "autocora:job:test keyword"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "test keyword",
"task_ids": ["t1", "t2"],
}),
)
# Write result file
result_data = {"status": "SUCCESS", "task_ids": ["t1", "t2"]}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
# Mock ClickUp client
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS: test keyword" in result
# Verify KV updated
state = json.loads(db.kv_get(kv_key))
assert state["status"] == "completed"
# Verify ClickUp calls
assert mock_client.update_task_status.call_count == 2
mock_client.update_task_status.assert_any_call("t1", "running cora")
mock_client.update_task_status.assert_any_call("t2", "running cora")
assert mock_client.add_comment.call_count == 2
def test_failure_json(self, ctx, monkeypatch):
"""JSON FAILURE result updates KV and ClickUp with error."""
db = ctx["db"]
results_dir = Path(ctx["config"].autocora.results_dir)
job_id = "job-456-fail"
kv_key = "autocora:job:fail keyword"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "fail keyword",
"task_ids": ["t3"],
}),
)
result_data = {
"status": "FAILURE",
"reason": "Cora not running",
"task_ids": ["t3"],
}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "FAILURE: fail keyword" in result
assert "Cora not running" in result
state = json.loads(db.kv_get(kv_key))
assert state["status"] == "failed"
assert state["error"] == "Cora not running"
mock_client.update_task_status.assert_called_once_with("t3", "error")
def test_legacy_plain_text(self, ctx, monkeypatch):
"""Legacy plain-text SUCCESS result still works."""
db = ctx["db"]
results_dir = Path(ctx["config"].autocora.results_dir)
job_id = "job-789-legacy"
kv_key = "autocora:job:legacy kw"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "legacy kw",
"task_ids": ["t5"],
}),
)
# Legacy format — plain text, no JSON
(results_dir / f"{job_id}.result").write_text("SUCCESS")
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS: legacy kw" in result
# task_ids come from KV fallback
mock_client.update_task_status.assert_called_once_with("t5", "running cora")
def test_task_ids_from_result_preferred(self, ctx, monkeypatch):
"""task_ids from result file take precedence over KV."""
db = ctx["db"]
results_dir = Path(ctx["config"].autocora.results_dir)
job_id = "job-100-pref"
kv_key = "autocora:job:pref kw"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "pref kw",
"task_ids": ["old_t1"], # KV has old IDs
}),
)
# Result has updated task_ids
result_data = {"status": "SUCCESS", "task_ids": ["new_t1", "new_t2"]}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
)
poll_autocora_results(ctx=ctx)
# Should use result file task_ids, not KV
calls = [c.args for c in mock_client.update_task_status.call_args_list]
assert ("new_t1", "running cora") in calls
assert ("new_t2", "running cora") in calls
assert ("old_t1", "running cora") not in calls
def test_still_pending(self, ctx):
"""Jobs without result files show as still pending."""
db = ctx["db"]
db.kv_set(
"autocora:job:waiting",
json.dumps({
"status": "submitted",
"job_id": "job-999-wait",
"keyword": "waiting",
"task_ids": ["t99"],
}),
)
result = poll_autocora_results(ctx=ctx)
assert "Still pending" in result
assert "waiting" in result