Fix 1: AutoCora prioritized sweep and KV-free result polling

Replace single-day task filter with multi-pass sweep when no explicit
target_date: (1) due today, (2) overdue + current month tag, (3) last
month tag, (4) look-ahead 2 days. Deduplicate across passes.

Remove KV store from submit (dedup by job file existence) and result
poller (scan results/ folder directly, move to processed/ after handling).
Scheduler auto-submit no longer passes explicit target_date.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/customer-field-migration
PeninsulaInd 2026-02-27 15:57:09 -06:00
parent 7d44014d7a
commit 5ddeb93033
3 changed files with 329 additions and 239 deletions

View File

@ -612,69 +612,52 @@ class Scheduler:
self._interruptible_wait(interval, self._force_autocora)
def _auto_submit_cora_jobs(self):
"""Auto-submit AutoCora jobs for tasks due today."""
"""Auto-submit AutoCora jobs using multi-pass sweep (no explicit date)."""
from .tools.autocora import submit_autocora_jobs
if not self.config.clickup.api_token:
return
today = datetime.now(UTC).strftime("%Y-%m-%d")
ctx = {
"config": self.config,
"db": self.db,
"agent": self.agent,
}
result = submit_autocora_jobs(target_date=today, ctx=ctx)
log.info("AutoCora auto-submit (%s): %s", today, result)
result = submit_autocora_jobs(ctx=ctx)
log.info("AutoCora auto-submit (sweep): %s", result)
def _poll_autocora_results(self):
"""Check for completed AutoCora results and update ClickUp tasks."""
"""Check for completed AutoCora results and update ClickUp tasks.
Scans the results folder for .result files. Each file contains JSON
with task_ids and status. After processing, moves the file to
results/processed/ to prevent re-processing.
"""
from .tools.autocora import _parse_result
autocora = self.config.autocora
results_dir = Path(autocora.results_dir)
# Find submitted jobs in KV
kv_entries = self.db.kv_scan("autocora:job:")
submitted = []
for key, value in kv_entries:
try:
state = json.loads(value)
if state.get("status") == "submitted":
submitted.append((key, state))
except json.JSONDecodeError:
continue
if not submitted:
return
if not results_dir.exists():
log.debug("AutoCora results dir does not exist: %s", results_dir)
return
result_files = list(results_dir.glob("*.result"))
if not result_files:
return
client = self._get_clickup_client() if self.config.clickup.api_token else None
processed_dir = results_dir / "processed"
for kv_key, state in submitted:
job_id = state.get("job_id", "")
if not job_id:
continue
result_path = results_dir / f"{job_id}.result"
if not result_path.exists():
continue
for result_path in result_files:
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
task_ids = result_data.get("task_ids") or state.get("task_ids", [])
task_ids = result_data.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = state.get("keyword", "")
keyword = result_data.get("keyword", result_path.stem)
if status == "SUCCESS":
state["status"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
@ -690,11 +673,6 @@ class Scheduler:
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
state["status"] = "failed"
state["error"] = reason
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
@ -710,6 +688,13 @@ class Scheduler:
log.info("AutoCora result for '%s': %s", keyword, status)
# Move result file to processed/
processed_dir.mkdir(exist_ok=True)
try:
result_path.rename(processed_dir / result_path.name)
except OSError as e:
log.warning("Could not move result file %s: %s", result_path.name, e)
# ── Folder Watcher ──
def _folder_watch_loop(self):

View File

@ -52,15 +52,15 @@ def _get_clickup_client(ctx: dict):
def _find_qualifying_tasks(client, config, target_date: str, categories: list[str]):
"""Find 'to do' tasks in cora_categories due on target_date.
"""Find 'to do' tasks in cora_categories due on target_date (single day).
Used when target_date is explicitly provided.
Returns list of ClickUpTask objects.
"""
space_id = config.clickup.space_id
if not space_id:
return []
# Parse target date to filter by due_date range (full day)
try:
dt = datetime.strptime(target_date, "%Y-%m-%d").replace(tzinfo=UTC)
except ValueError:
@ -78,10 +78,8 @@ def _find_qualifying_tasks(client, config, target_date: str, categories: list[st
qualifying = []
for task in tasks:
# Must be in one of the cora categories
if task.task_type not in categories:
continue
# Must have a due_date within the target day
if not task.due_date:
continue
try:
@ -95,6 +93,130 @@ def _find_qualifying_tasks(client, config, target_date: str, categories: list[st
return qualifying
def _find_qualifying_tasks_sweep(client, config, categories: list[str]):
"""Multi-pass sweep for qualifying tasks when no explicit date is given.
Pass 1: Tasks due today
Pass 2: Overdue tasks tagged with current month (e.g. "feb26")
Pass 3: Tasks tagged with last month (e.g. "jan26"), still "to do"
Pass 4: Tasks due in next 2 days (look-ahead)
Deduplicates across passes by task ID.
Returns list of ClickUpTask objects.
"""
space_id = config.clickup.space_id
if not space_id:
return []
now = datetime.now(UTC)
today_start_ms = int(
now.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000
)
today_end_ms = today_start_ms + 24 * 60 * 60 * 1000
lookahead_end_ms = today_start_ms + 3 * 24 * 60 * 60 * 1000 # +2 days
# Current and last month tags (e.g. "feb26", "jan26")
current_month_tag = now.strftime("%b%y").lower()
# Go back one month
if now.month == 1:
last_month = now.replace(year=now.year - 1, month=12)
else:
last_month = now.replace(month=now.month - 1)
last_month_tag = last_month.strftime("%b%y").lower()
# Fetch all "to do" tasks with due dates up to lookahead
all_tasks = client.get_tasks_from_space(
space_id,
statuses=["to do"],
due_date_lt=lookahead_end_ms,
)
# Filter to cora categories
cora_tasks = [t for t in all_tasks if t.task_type in categories]
seen_ids: set[str] = set()
qualifying: list = []
def _add(task):
if task.id not in seen_ids:
seen_ids.add(task.id)
qualifying.append(task)
# Pass 1: Due today
for task in cora_tasks:
if not task.due_date:
continue
try:
due_ms = int(task.due_date)
except (ValueError, TypeError):
continue
if today_start_ms <= due_ms < today_end_ms:
_add(task)
# Pass 2: Overdue + tagged with current month
for task in cora_tasks:
if not task.due_date:
continue
try:
due_ms = int(task.due_date)
except (ValueError, TypeError):
continue
if due_ms < today_start_ms and current_month_tag in task.tags:
_add(task)
# Pass 3: Tagged with last month, still "to do"
for task in cora_tasks:
if last_month_tag in task.tags:
_add(task)
# Pass 4: Look-ahead (due in next 2 days, excluding today which was pass 1)
for task in cora_tasks:
if not task.due_date:
continue
try:
due_ms = int(task.due_date)
except (ValueError, TypeError):
continue
if today_end_ms <= due_ms < lookahead_end_ms:
_add(task)
log.info(
"AutoCora sweep: %d qualifying tasks "
"(today=%d, overdue+month=%d, last_month=%d, lookahead=%d)",
len(qualifying),
sum(1 for t in qualifying if _is_due_today(t, today_start_ms, today_end_ms)),
sum(1 for t in qualifying if _is_overdue_with_tag(t, today_start_ms, current_month_tag)),
sum(1 for t in qualifying if last_month_tag in t.tags),
sum(1 for t in qualifying if _is_lookahead(t, today_end_ms, lookahead_end_ms)),
)
return qualifying
def _is_due_today(task, start_ms, end_ms) -> bool:
try:
due = int(task.due_date)
return start_ms <= due < end_ms
except (ValueError, TypeError):
return False
def _is_overdue_with_tag(task, today_start_ms, tag) -> bool:
try:
due = int(task.due_date)
return due < today_start_ms and tag in task.tags
except (ValueError, TypeError):
return False
def _is_lookahead(task, today_end_ms, lookahead_end_ms) -> bool:
try:
due = int(task.due_date)
return today_end_ms <= due < lookahead_end_ms
except (ValueError, TypeError):
return False
def _find_all_todo_tasks(client, config, categories: list[str]):
"""Find ALL 'to do' tasks in cora_categories (no date filter).
@ -165,7 +287,8 @@ def _group_by_keyword(tasks, all_tasks):
@tool(
"submit_autocora_jobs",
"Submit Cora SEO report jobs for ClickUp tasks due on a given date. "
"Submit Cora SEO report jobs for ClickUp tasks. Uses a multi-pass sweep "
"(today, overdue, last month, look-ahead) unless a specific date is given. "
"Writes job JSON files to the AutoCora shared folder queue.",
category="autocora",
)
@ -173,32 +296,33 @@ def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
"""Submit AutoCora jobs for qualifying ClickUp tasks.
Args:
target_date: Date to check (YYYY-MM-DD). Defaults to today.
target_date: Date to check (YYYY-MM-DD). Empty = multi-pass sweep.
ctx: Injected context with config, db, etc.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
if not target_date:
target_date = datetime.now(UTC).strftime("%Y-%m-%d")
if not config.clickup.api_token:
return "Error: ClickUp API token not configured"
client = _get_clickup_client(ctx)
# Find qualifying tasks (due on target_date, in cora_categories, status "to do")
# Find qualifying tasks — sweep or single-day
if target_date:
qualifying = _find_qualifying_tasks(client, config, target_date, autocora.cora_categories)
label = target_date
else:
qualifying = _find_qualifying_tasks_sweep(client, config, autocora.cora_categories)
label = "sweep"
if not qualifying:
return f"No qualifying tasks found for {target_date}."
return f"No qualifying tasks found ({label})."
# Find ALL to-do tasks in cora categories for sibling keyword matching
all_todo = _find_all_todo_tasks(client, config, autocora.cora_categories)
@ -217,19 +341,13 @@ def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
skipped = []
for kw_lower, group in groups.items():
# Check KV for existing submission
kv_key = f"autocora:job:{kw_lower}"
existing = db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") == "submitted":
# Check if a job file already exists for this keyword (dedup by file)
existing_jobs = list(jobs_dir.glob(f"job-*-{_slugify(group['keyword'])}*.json"))
if existing_jobs:
skipped.append(group["keyword"])
continue
except json.JSONDecodeError:
pass
# Write job file
# Write job file (contains task_ids for the result poller)
job_id = _make_job_id(group["keyword"])
job_data = {
"keyword": group["keyword"],
@ -239,17 +357,6 @@ def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
job_path = jobs_dir / f"{job_id}.json"
job_path.write_text(json.dumps(job_data, indent=2), encoding="utf-8")
# Track in KV
kv_state = {
"status": "submitted",
"job_id": job_id,
"keyword": group["keyword"],
"url": group["url"],
"task_ids": group["task_ids"],
"submitted_at": datetime.now(UTC).isoformat(),
}
db.kv_set(kv_key, json.dumps(kv_state))
# Move ClickUp tasks to "automation underway"
for tid in group["task_ids"]:
client.update_task_status(tid, "automation underway")
@ -258,13 +365,13 @@ def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
log.info("Submitted AutoCora job: %s%s", group["keyword"], job_id)
# Build response
lines = [f"AutoCora submission for {target_date}:"]
lines = [f"AutoCora submission ({label}):"]
if submitted:
lines.append(f"\nSubmitted {len(submitted)} job(s):")
for kw in submitted:
lines.append(f" - {kw}")
if skipped:
lines.append(f"\nSkipped {len(skipped)} (already submitted):")
lines.append(f"\nSkipped {len(skipped)} (job file already exists):")
for kw in skipped:
lines.append(f" - {kw}")
if alerts:
@ -278,77 +385,51 @@ def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
@tool(
"poll_autocora_results",
"Poll the AutoCora results folder for completed Cora SEO report jobs. "
"Updates ClickUp task statuses based on results.",
"Scans for .result files, reads task_ids from the JSON, updates ClickUp, "
"then moves the result file to a processed/ subfolder.",
category="autocora",
)
def poll_autocora_results(ctx: dict | None = None) -> str:
"""Poll for AutoCora results and update ClickUp tasks.
Args:
ctx: Injected context with config, db, etc.
Scans the results folder for .result files. Each result file is JSON
containing {status, task_ids, keyword, ...}. After processing, the
result file is moved to results/processed/ to avoid re-processing.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
# Find all submitted jobs in KV
kv_entries = db.kv_scan("autocora:job:")
submitted = []
for key, value in kv_entries:
try:
state = json.loads(value)
if state.get("status") == "submitted":
submitted.append((key, state))
except json.JSONDecodeError:
continue
if not submitted:
return "No pending AutoCora jobs to check."
results_dir = Path(autocora.results_dir)
if not results_dir.exists():
return f"Results directory does not exist: {results_dir}"
# Scan for .result files
result_files = list(results_dir.glob("*.result"))
if not result_files:
return "No result files found in results folder."
client = None
if config.clickup.api_token:
client = _get_clickup_client(ctx)
processed_dir = results_dir / "processed"
processed = []
still_pending = []
for kv_key, state in submitted:
job_id = state.get("job_id", "")
if not job_id:
continue
result_path = results_dir / f"{job_id}.result"
if not result_path.exists():
still_pending.append(state.get("keyword", job_id))
continue
# Read and parse result
for result_path in result_files:
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
# Get task_ids: prefer result file, fall back to KV
task_ids = result_data.get("task_ids") or state.get("task_ids", [])
task_ids = result_data.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = state.get("keyword", "")
keyword = result_data.get("keyword", result_path.stem)
if status == "SUCCESS":
# Update KV
state["status"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
@ -359,12 +440,6 @@ def poll_autocora_results(ctx: dict | None = None) -> str:
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
state["status"] = "failed"
state["error"] = reason
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
@ -378,16 +453,19 @@ def poll_autocora_results(ctx: dict | None = None) -> str:
else:
processed.append(f"UNKNOWN: {keyword} (status={status})")
# Move result file to processed/ so it's not re-processed
processed_dir.mkdir(exist_ok=True)
try:
result_path.rename(processed_dir / result_path.name)
except OSError as e:
log.warning("Could not move result file %s: %s", result_path.name, e)
# Build response
lines = ["AutoCora poll results:"]
if processed:
lines.append(f"\nProcessed {len(processed)} result(s):")
for p in processed:
lines.append(f" - {p}")
if still_pending:
lines.append(f"\nStill pending ({len(still_pending)}):")
for kw in still_pending:
lines.append(f" - {kw}")
return "\n".join(lines)

View File

@ -12,6 +12,7 @@ import pytest
from cheddahbot.config import AutoCoraConfig, ClickUpConfig, Config
from cheddahbot.tools.autocora import (
_find_qualifying_tasks_sweep,
_group_by_keyword,
_make_job_id,
_parse_result,
@ -36,6 +37,7 @@ class FakeTask:
task_type: str = "Content Creation"
due_date: str = ""
custom_fields: dict[str, Any] = field(default_factory=dict)
tags: list[str] = field(default_factory=list)
@pytest.fixture()
@ -147,11 +149,12 @@ class TestGroupByKeyword:
assert len(groups) == 0
assert any("missing Keyword" in a for a in alerts)
def test_missing_imsurl(self):
def test_missing_imsurl_uses_fallback(self):
"""Missing IMSURL gets a fallback blank URL."""
tasks = [FakeTask(id="t1", name="No URL", custom_fields={"Keyword": "test"})]
groups, alerts = _group_by_keyword(tasks, tasks)
assert len(groups) == 0
assert any("missing IMSURL" in a for a in alerts)
assert len(groups) == 1
assert groups["test"]["url"] == "https://seotoollab.com/blank.html"
def test_sibling_tasks(self):
"""Tasks sharing a keyword from all_tasks should be included."""
@ -219,8 +222,8 @@ class TestSubmitAutocoraJobs:
assert job_data["url"] == "http://example.com"
assert job_data["task_ids"] == ["t1"]
def test_submit_tracks_kv(self, ctx, monkeypatch):
"""KV store tracks submitted jobs."""
def test_submit_writes_job_with_task_ids(self, ctx, monkeypatch):
"""Job file contains task_ids for the result poller."""
task = FakeTask(
id="t1",
name="Test",
@ -236,14 +239,14 @@ class TestSubmitAutocoraJobs:
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
raw = ctx["db"].kv_get("autocora:job:test keyword")
assert raw is not None
state = json.loads(raw)
assert state["status"] == "submitted"
assert "t1" in state["task_ids"]
jobs_dir = Path(ctx["config"].autocora.jobs_dir)
job_files = list(jobs_dir.glob("job-*.json"))
assert len(job_files) == 1
data = json.loads(job_files[0].read_text())
assert "t1" in data["task_ids"]
def test_duplicate_prevention(self, ctx, monkeypatch):
"""Already-submitted keywords are skipped."""
"""Already-submitted keywords are skipped (job file exists)."""
task = FakeTask(
id="t1",
name="Test",
@ -260,7 +263,7 @@ class TestSubmitAutocoraJobs:
# First submit
submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
# Second submit — should skip
# Second submit — should skip (job file already exists)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "Skipped 1" in result
@ -282,8 +285,8 @@ class TestSubmitAutocoraJobs:
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "missing Keyword" in result
def test_missing_imsurl_alert(self, ctx, monkeypatch):
"""Tasks without IMSURL field produce alerts."""
def test_missing_imsurl_uses_fallback(self, ctx, monkeypatch):
"""Tasks without IMSURL use fallback URL and still submit."""
task = FakeTask(
id="t1",
name="No URL Task",
@ -298,7 +301,7 @@ class TestSubmitAutocoraJobs:
)
result = submit_autocora_jobs(target_date="2025-01-01", ctx=ctx)
assert "missing IMSURL" in result
assert "Submitted 1 job" in result
# ---------------------------------------------------------------------------
@ -312,33 +315,18 @@ class TestPollAutocoraResults:
result = poll_autocora_results(ctx=ctx)
assert "disabled" in result.lower()
def test_no_pending(self, ctx):
def test_no_result_files(self, ctx):
result = poll_autocora_results(ctx=ctx)
assert "No pending" in result
assert "No result files" in result
def test_success_json(self, ctx, monkeypatch):
"""JSON SUCCESS result updates KV and ClickUp."""
db = ctx["db"]
"""JSON SUCCESS result updates ClickUp and moves result file."""
results_dir = Path(ctx["config"].autocora.results_dir)
# Set up submitted job in KV
job_id = "job-123-test"
kv_key = "autocora:job:test keyword"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "test keyword",
"task_ids": ["t1", "t2"],
}),
)
# Write result file directly (no KV needed)
result_data = {"status": "SUCCESS", "task_ids": ["t1", "t2"], "keyword": "test keyword"}
(results_dir / "job-123-test.result").write_text(json.dumps(result_data))
# Write result file
result_data = {"status": "SUCCESS", "task_ids": ["t1", "t2"]}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
# Mock ClickUp client
mock_client = MagicMock()
monkeypatch.setattr(
"cheddahbot.tools.autocora._get_clickup_client", lambda ctx: mock_client
@ -347,39 +335,27 @@ class TestPollAutocoraResults:
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS: test keyword" in result
# Verify KV updated
state = json.loads(db.kv_get(kv_key))
assert state["status"] == "completed"
# Verify ClickUp calls
assert mock_client.update_task_status.call_count == 2
mock_client.update_task_status.assert_any_call("t1", "running cora")
mock_client.update_task_status.assert_any_call("t2", "running cora")
assert mock_client.add_comment.call_count == 2
def test_failure_json(self, ctx, monkeypatch):
"""JSON FAILURE result updates KV and ClickUp with error."""
db = ctx["db"]
results_dir = Path(ctx["config"].autocora.results_dir)
# Verify result file moved to processed/
assert not (results_dir / "job-123-test.result").exists()
assert (results_dir / "processed" / "job-123-test.result").exists()
job_id = "job-456-fail"
kv_key = "autocora:job:fail keyword"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "fail keyword",
"task_ids": ["t3"],
}),
)
def test_failure_json(self, ctx, monkeypatch):
"""JSON FAILURE result updates ClickUp with error."""
results_dir = Path(ctx["config"].autocora.results_dir)
result_data = {
"status": "FAILURE",
"reason": "Cora not running",
"task_ids": ["t3"],
"keyword": "fail keyword",
}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
(results_dir / "job-456-fail.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
@ -390,31 +366,14 @@ class TestPollAutocoraResults:
assert "FAILURE: fail keyword" in result
assert "Cora not running" in result
state = json.loads(db.kv_get(kv_key))
assert state["status"] == "failed"
assert state["error"] == "Cora not running"
mock_client.update_task_status.assert_called_once_with("t3", "error")
def test_legacy_plain_text(self, ctx, monkeypatch):
"""Legacy plain-text SUCCESS result still works."""
db = ctx["db"]
"""Legacy plain-text SUCCESS result still works (keyword from filename)."""
results_dir = Path(ctx["config"].autocora.results_dir)
job_id = "job-789-legacy"
kv_key = "autocora:job:legacy kw"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "legacy kw",
"task_ids": ["t5"],
}),
)
# Legacy format — plain text, no JSON
(results_dir / f"{job_id}.result").write_text("SUCCESS")
(results_dir / "job-789-legacy-kw.result").write_text("SUCCESS")
mock_client = MagicMock()
monkeypatch.setattr(
@ -422,31 +381,17 @@ class TestPollAutocoraResults:
)
result = poll_autocora_results(ctx=ctx)
assert "SUCCESS: legacy kw" in result
assert "SUCCESS:" in result
# task_ids come from KV fallback
mock_client.update_task_status.assert_called_once_with("t5", "running cora")
# No task_ids in legacy format, so no ClickUp calls
mock_client.update_task_status.assert_not_called()
def test_task_ids_from_result_preferred(self, ctx, monkeypatch):
"""task_ids from result file take precedence over KV."""
db = ctx["db"]
def test_task_ids_from_result_file(self, ctx, monkeypatch):
"""task_ids from result file drive ClickUp updates."""
results_dir = Path(ctx["config"].autocora.results_dir)
job_id = "job-100-pref"
kv_key = "autocora:job:pref kw"
db.kv_set(
kv_key,
json.dumps({
"status": "submitted",
"job_id": job_id,
"keyword": "pref kw",
"task_ids": ["old_t1"], # KV has old IDs
}),
)
# Result has updated task_ids
result_data = {"status": "SUCCESS", "task_ids": ["new_t1", "new_t2"]}
(results_dir / f"{job_id}.result").write_text(json.dumps(result_data))
result_data = {"status": "SUCCESS", "task_ids": ["new_t1", "new_t2"], "keyword": "pref kw"}
(results_dir / "job-100-pref.result").write_text(json.dumps(result_data))
mock_client = MagicMock()
monkeypatch.setattr(
@ -455,25 +400,107 @@ class TestPollAutocoraResults:
poll_autocora_results(ctx=ctx)
# Should use result file task_ids, not KV
calls = [c.args for c in mock_client.update_task_status.call_args_list]
assert ("new_t1", "running cora") in calls
assert ("new_t2", "running cora") in calls
assert ("old_t1", "running cora") not in calls
def test_still_pending(self, ctx):
"""Jobs without result files show as still pending."""
db = ctx["db"]
db.kv_set(
"autocora:job:waiting",
json.dumps({
"status": "submitted",
"job_id": "job-999-wait",
"keyword": "waiting",
"task_ids": ["t99"],
}),
# ---------------------------------------------------------------------------
# Sweep tests
# ---------------------------------------------------------------------------
class TestFindQualifyingTasksSweep:
"""Test the multi-pass sweep logic."""
def _make_client(self, tasks):
client = MagicMock()
client.get_tasks_from_space.return_value = tasks
return client
def _make_config(self):
config = MagicMock()
config.clickup.space_id = "sp1"
return config
def test_finds_tasks_due_today(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
today_ms = int(now.replace(hour=12).timestamp() * 1000)
task = FakeTask(id="t1", name="Today", due_date=str(today_ms))
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t1" for t in result)
def test_finds_overdue_with_month_tag(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
month_tag = now.strftime("%b%y").lower()
# Due 3 days ago
overdue_ms = int((now.timestamp() - 3 * 86400) * 1000)
task = FakeTask(
id="t2", name="Overdue", due_date=str(overdue_ms), tags=[month_tag]
)
client = self._make_client([task])
config = self._make_config()
result = poll_autocora_results(ctx=ctx)
assert "Still pending" in result
assert "waiting" in result
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t2" for t in result)
def test_finds_last_month_tagged(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
if now.month == 1:
last = now.replace(year=now.year - 1, month=12)
else:
last = now.replace(month=now.month - 1)
last_tag = last.strftime("%b%y").lower()
# No due date needed for month-tag pass
task = FakeTask(id="t3", name="Last Month", tags=[last_tag])
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t3" for t in result)
def test_finds_lookahead(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
tomorrow_ms = int((now.timestamp() + 36 * 3600) * 1000)
task = FakeTask(id="t4", name="Tomorrow", due_date=str(tomorrow_ms))
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert any(t.id == "t4" for t in result)
def test_deduplicates_across_passes(self):
from datetime import UTC, datetime
now = datetime.now(UTC)
month_tag = now.strftime("%b%y").lower()
today_ms = int(now.replace(hour=12).timestamp() * 1000)
# Task is due today AND has month tag — should only appear once
task = FakeTask(
id="t5", name="Multi", due_date=str(today_ms), tags=[month_tag]
)
client = self._make_client([task])
config = self._make_config()
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
ids = [t.id for t in result]
assert ids.count("t5") == 1
def test_empty_space_id(self):
config = self._make_config()
config.clickup.space_id = ""
client = self._make_client([])
result = _find_qualifying_tasks_sweep(client, config, ["Content Creation"])
assert result == []