CheddahBot/clickup_runner/autocora.py

180 lines
4.8 KiB
Python

"""AutoCora job submission and result polling.
Submits Cora SEO analysis jobs to the NAS queue and polls for results.
Jobs are JSON files written to the jobs directory; an external worker
picks them up, runs Cora, and writes .result files to the results directory.
"""
from __future__ import annotations
import json
import logging
import re
import shutil
import time
from dataclasses import dataclass
from pathlib import Path
log = logging.getLogger(__name__)
@dataclass
class CoraResult:
"""Parsed result from a .result file."""
job_id: str
status: str # "SUCCESS" or "FAILURE"
keyword: str
task_ids: list[str]
reason: str # failure reason, empty on success
result_path: Path
def slugify(text: str, max_len: int = 80) -> str:
"""Convert text to a filesystem-safe slug.
Lowercase, alphanumeric + hyphens only, max length.
"""
slug = text.lower().strip()
slug = re.sub(r"[^a-z0-9]+", "-", slug)
slug = slug.strip("-")
if len(slug) > max_len:
slug = slug[:max_len].rstrip("-")
return slug or "unknown"
def make_job_id(keyword: str) -> str:
"""Generate a unique job ID from keyword + timestamp."""
ts = int(time.time() * 1000)
return "job-%d-%s" % (ts, slugify(keyword))
def submit_job(
keyword: str,
url: str,
task_id: str,
jobs_dir: str,
) -> str | None:
"""Write a job JSON file to the NAS jobs directory.
Returns the job_id on success, None on failure.
"""
jobs_path = Path(jobs_dir)
try:
jobs_path.mkdir(parents=True, exist_ok=True)
except OSError as e:
log.error("Cannot access jobs directory %s: %s", jobs_dir, e)
return None
job_id = make_job_id(keyword)
job_file = jobs_path / ("%s.json" % job_id)
job_data = {
"keyword": keyword,
"url": url or "https://seotoollab.com/blank.html",
"task_ids": [task_id],
}
try:
job_file.write_text(
json.dumps(job_data, indent=2),
encoding="utf-8",
)
log.info("Submitted AutoCora job: %s (keyword=%s)", job_id, keyword)
return job_id
except OSError as e:
log.error("Failed to write job file %s: %s", job_file, e)
return None
def parse_result_file(result_path: Path) -> CoraResult | None:
"""Parse a .result file (JSON or legacy plain-text format).
Returns a CoraResult or None if the file can't be parsed.
"""
try:
raw = result_path.read_text(encoding="utf-8").strip()
except OSError as e:
log.warning("Cannot read result file %s: %s", result_path, e)
return None
if not raw:
log.warning("Empty result file: %s", result_path)
return None
job_id = result_path.stem # filename without .result extension
# Try JSON first
try:
data = json.loads(raw)
return CoraResult(
job_id=job_id,
status=data.get("status", "FAILURE"),
keyword=data.get("keyword", ""),
task_ids=data.get("task_ids", []),
reason=data.get("reason", ""),
result_path=result_path,
)
except (json.JSONDecodeError, AttributeError):
pass
# Legacy plain-text format
if raw.startswith("SUCCESS"):
return CoraResult(
job_id=job_id,
status="SUCCESS",
keyword="",
task_ids=[],
reason="",
result_path=result_path,
)
if raw.startswith("FAILURE"):
reason = raw.split(":", 1)[1].strip() if ":" in raw else "Unknown"
return CoraResult(
job_id=job_id,
status="FAILURE",
keyword="",
task_ids=[],
reason=reason,
result_path=result_path,
)
log.warning("Unrecognized result format in %s", result_path)
return None
def scan_results(results_dir: str) -> list[CoraResult]:
"""Scan the results directory for .result files and parse them.
Returns a list of parsed results (skips unparseable files).
"""
results_path = Path(results_dir)
if not results_path.exists():
return []
results: list[CoraResult] = []
for f in sorted(results_path.glob("*.result")):
parsed = parse_result_file(f)
if parsed:
results.append(parsed)
return results
def archive_result(result: CoraResult) -> bool:
"""Move a .result file to the processed/ subdirectory.
Returns True on success.
"""
processed_dir = result.result_path.parent / "processed"
try:
processed_dir.mkdir(exist_ok=True)
dest = processed_dir / result.result_path.name
shutil.move(str(result.result_path), str(dest))
log.info("Archived result file: %s", result.result_path.name)
return True
except OSError as e:
log.warning("Failed to archive result %s: %s", result.result_path, e)
return False