CheddahBot/cheddahbot/tools/autocora.py

413 lines
13 KiB
Python

"""AutoCora job submission and result polling tools.
Submits Cora SEO report jobs to a shared folder queue and polls for results.
Jobs are JSON files written to a network share; a worker on another machine
picks them up, runs Cora, and writes result files back.
"""
from __future__ import annotations
import json
import logging
import re
import time
from datetime import UTC, datetime
from pathlib import Path
from . import tool
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _slugify(text: str) -> str:
"""Convert text to a filesystem-safe slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
return re.sub(r"-+", "-", text).strip("-")[:80]
def _make_job_id(keyword: str) -> str:
"""Create a unique job ID from keyword + timestamp."""
ts = str(int(time.time() * 1000))
slug = _slugify(keyword)
return f"job-{ts}-{slug}"
def _get_clickup_client(ctx: dict):
"""Build a ClickUp client from context config."""
from ..clickup import ClickUpClient
config = ctx["config"]
return ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
def _find_qualifying_tasks(client, config, target_date: str, categories: list[str]):
"""Find 'to do' tasks in cora_categories due on target_date.
Returns list of ClickUpTask objects.
"""
space_id = config.clickup.space_id
if not space_id:
return []
# Parse target date to filter by due_date range (full day)
try:
dt = datetime.strptime(target_date, "%Y-%m-%d").replace(tzinfo=UTC)
except ValueError:
log.warning("Invalid target_date format: %s", target_date)
return []
day_start_ms = int(dt.timestamp() * 1000)
day_end_ms = day_start_ms + 24 * 60 * 60 * 1000
tasks = client.get_tasks_from_space(
space_id,
statuses=["to do"],
due_date_lt=day_end_ms,
)
qualifying = []
for task in tasks:
# Must be in one of the cora categories
if task.task_type not in categories:
continue
# Must have a due_date within the target day
if not task.due_date:
continue
try:
task_due_ms = int(task.due_date)
except (ValueError, TypeError):
continue
if task_due_ms < day_start_ms or task_due_ms >= day_end_ms:
continue
qualifying.append(task)
return qualifying
def _find_all_todo_tasks(client, config, categories: list[str]):
"""Find ALL 'to do' tasks in cora_categories (no date filter).
Used to find sibling tasks sharing the same keyword.
"""
space_id = config.clickup.space_id
if not space_id:
return []
tasks = client.get_tasks_from_space(space_id, statuses=["to do"])
return [t for t in tasks if t.task_type in categories]
def _group_by_keyword(tasks, all_tasks):
"""Group tasks by normalized keyword, pulling in sibling tasks from all_tasks.
Returns dict: {keyword_lower: {"keyword": str, "url": str, "task_ids": [str]}}
Alerts list for tasks missing Keyword or IMSURL.
"""
alerts = []
groups: dict[str, dict] = {}
# Index all tasks by keyword for sibling lookup
all_by_keyword: dict[str, list] = {}
for t in all_tasks:
kw = t.custom_fields.get("Keyword", "") or ""
kw = str(kw).strip()
if kw:
all_by_keyword.setdefault(kw.lower(), []).append(t)
for task in tasks:
keyword = task.custom_fields.get("Keyword", "") or ""
keyword = str(keyword).strip()
if not keyword:
alerts.append(f"Task '{task.name}' (id={task.id}) missing Keyword field")
continue
url = task.custom_fields.get("IMSURL", "") or ""
url = str(url).strip()
if not url:
url = "https://seotoollab.com/blank.html"
kw_lower = keyword.lower()
if kw_lower not in groups:
# Collect ALL task IDs sharing this keyword
sibling_ids = set()
for sibling in all_by_keyword.get(kw_lower, []):
sibling_ids.add(sibling.id)
sibling_ids.add(task.id)
groups[kw_lower] = {
"keyword": keyword,
"url": url,
"task_ids": sorted(sibling_ids),
}
else:
# Add this task's ID if not already there
if task.id not in groups[kw_lower]["task_ids"]:
groups[kw_lower]["task_ids"].append(task.id)
groups[kw_lower]["task_ids"].sort()
return groups, alerts
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@tool(
"submit_autocora_jobs",
"Submit Cora SEO report jobs for ClickUp tasks due on a given date. "
"Writes job JSON files to the AutoCora shared folder queue.",
category="autocora",
)
def submit_autocora_jobs(target_date: str = "", ctx: dict | None = None) -> str:
"""Submit AutoCora jobs for qualifying ClickUp tasks.
Args:
target_date: Date to check (YYYY-MM-DD). Defaults to today.
ctx: Injected context with config, db, etc.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
if not target_date:
target_date = datetime.now(UTC).strftime("%Y-%m-%d")
if not config.clickup.api_token:
return "Error: ClickUp API token not configured"
client = _get_clickup_client(ctx)
# Find qualifying tasks (due on target_date, in cora_categories, status "to do")
qualifying = _find_qualifying_tasks(client, config, target_date, autocora.cora_categories)
if not qualifying:
return f"No qualifying tasks found for {target_date}."
# Find ALL to-do tasks in cora categories for sibling keyword matching
all_todo = _find_all_todo_tasks(client, config, autocora.cora_categories)
# Group by keyword
groups, alerts = _group_by_keyword(qualifying, all_todo)
if not groups and alerts:
return "No jobs submitted.\n\n" + "\n".join(f"- {a}" for a in alerts)
# Ensure jobs directory exists
jobs_dir = Path(autocora.jobs_dir)
jobs_dir.mkdir(parents=True, exist_ok=True)
submitted = []
skipped = []
for kw_lower, group in groups.items():
# Check KV for existing submission
kv_key = f"autocora:job:{kw_lower}"
existing = db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") == "submitted":
skipped.append(group["keyword"])
continue
except json.JSONDecodeError:
pass
# Write job file
job_id = _make_job_id(group["keyword"])
job_data = {
"keyword": group["keyword"],
"url": group["url"],
"task_ids": group["task_ids"],
}
job_path = jobs_dir / f"{job_id}.json"
job_path.write_text(json.dumps(job_data, indent=2), encoding="utf-8")
# Track in KV
kv_state = {
"status": "submitted",
"job_id": job_id,
"keyword": group["keyword"],
"url": group["url"],
"task_ids": group["task_ids"],
"submitted_at": datetime.now(UTC).isoformat(),
}
db.kv_set(kv_key, json.dumps(kv_state))
# Move ClickUp tasks to "automation underway"
for tid in group["task_ids"]:
client.update_task_status(tid, "automation underway")
submitted.append(group["keyword"])
log.info("Submitted AutoCora job: %s%s", group["keyword"], job_id)
# Build response
lines = [f"AutoCora submission for {target_date}:"]
if submitted:
lines.append(f"\nSubmitted {len(submitted)} job(s):")
for kw in submitted:
lines.append(f" - {kw}")
if skipped:
lines.append(f"\nSkipped {len(skipped)} (already submitted):")
for kw in skipped:
lines.append(f" - {kw}")
if alerts:
lines.append(f"\nAlerts ({len(alerts)}):")
for a in alerts:
lines.append(f" - {a}")
return "\n".join(lines)
@tool(
"poll_autocora_results",
"Poll the AutoCora results folder for completed Cora SEO report jobs. "
"Updates ClickUp task statuses based on results.",
category="autocora",
)
def poll_autocora_results(ctx: dict | None = None) -> str:
"""Poll for AutoCora results and update ClickUp tasks.
Args:
ctx: Injected context with config, db, etc.
"""
if not ctx:
return "Error: context not available"
config = ctx["config"]
db = ctx["db"]
autocora = config.autocora
if not autocora.enabled:
return "AutoCora is disabled in config."
# Find all submitted jobs in KV
kv_entries = db.kv_scan("autocora:job:")
submitted = []
for key, value in kv_entries:
try:
state = json.loads(value)
if state.get("status") == "submitted":
submitted.append((key, state))
except json.JSONDecodeError:
continue
if not submitted:
return "No pending AutoCora jobs to check."
results_dir = Path(autocora.results_dir)
if not results_dir.exists():
return f"Results directory does not exist: {results_dir}"
client = None
if config.clickup.api_token:
client = _get_clickup_client(ctx)
processed = []
still_pending = []
for kv_key, state in submitted:
job_id = state.get("job_id", "")
if not job_id:
continue
result_path = results_dir / f"{job_id}.result"
if not result_path.exists():
still_pending.append(state.get("keyword", job_id))
continue
# Read and parse result
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
# Get task_ids: prefer result file, fall back to KV
task_ids = result_data.get("task_ids") or state.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = state.get("keyword", "")
if status == "SUCCESS":
# Update KV
state["status"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
client.add_comment(tid, f"Cora report completed for keyword: {keyword}")
processed.append(f"SUCCESS: {keyword}")
log.info("AutoCora SUCCESS: %s", keyword)
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
state["status"] = "failed"
state["error"] = reason
state["completed_at"] = datetime.now(UTC).isoformat()
db.kv_set(kv_key, json.dumps(state))
# Update ClickUp tasks
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
client.add_comment(
tid, f"Cora report failed for keyword: {keyword}\nReason: {reason}"
)
processed.append(f"FAILURE: {keyword} ({reason})")
log.info("AutoCora FAILURE: %s%s", keyword, reason)
else:
processed.append(f"UNKNOWN: {keyword} (status={status})")
# Build response
lines = ["AutoCora poll results:"]
if processed:
lines.append(f"\nProcessed {len(processed)} result(s):")
for p in processed:
lines.append(f" - {p}")
if still_pending:
lines.append(f"\nStill pending ({len(still_pending)}):")
for kw in still_pending:
lines.append(f" - {kw}")
return "\n".join(lines)
def _parse_result(raw: str) -> dict:
"""Parse a result file — JSON format or legacy plain text."""
# Try JSON first
try:
data = json.loads(raw)
if isinstance(data, dict):
return data
except json.JSONDecodeError:
pass
# Legacy plain text: "SUCCESS" or "FAILURE: reason"
if raw.startswith("SUCCESS"):
return {"status": "SUCCESS"}
if raw.startswith("FAILURE"):
reason = raw.split(":", 1)[1].strip() if ":" in raw else "unknown"
return {"status": "FAILURE", "reason": reason}
return {"status": "UNKNOWN", "raw": raw}