CheddahBot/cheddahbot/scheduler.py

1173 lines
45 KiB
Python

"""Task scheduler with heartbeat, ClickUp polling, and folder watch support."""
from __future__ import annotations
import contextlib
import json
import logging
import re
import shutil
import threading
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
from croniter import croniter
if TYPE_CHECKING:
from .agent import Agent
from .config import Config
from .db import Database
from .notifications import NotificationBus
log = logging.getLogger(__name__)
HEARTBEAT_OK = "HEARTBEAT_OK"
# Matches **Docx:** `path/to/file.docx` patterns in tool output
_DOCX_PATH_RE = re.compile(r"\*\*Docx:\*\*\s*`([^`]+\.docx)`")
def _extract_docx_paths(result: str) -> list[str]:
"""Extract .docx file paths from a tool result string."""
return _DOCX_PATH_RE.findall(result)
class Scheduler:
# Tasks due within this window are eligible for execution
DUE_DATE_WINDOW_WEEKS = 3
def __init__(
self,
config: Config,
db: Database,
agent: Agent,
notification_bus: NotificationBus | None = None,
):
self.config = config
self.db = db
self.agent = agent
self.notification_bus = notification_bus
self._stop_event = threading.Event()
self._force_heartbeat = threading.Event()
self._force_poll = threading.Event()
self._thread: threading.Thread | None = None
self._heartbeat_thread: threading.Thread | None = None
self._clickup_thread: threading.Thread | None = None
self._folder_watch_thread: threading.Thread | None = None
self._autocora_thread: threading.Thread | None = None
self._content_watch_thread: threading.Thread | None = None
self._force_autocora = threading.Event()
self._clickup_client = None
self._field_filter_cache: dict | None = None
def start(self):
"""Start the scheduler, heartbeat, and ClickUp threads."""
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler")
self._thread.start()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop, daemon=True, name="heartbeat"
)
self._heartbeat_thread.start()
# Start ClickUp polling if configured
if self.config.clickup.enabled:
self._clickup_thread = threading.Thread(
target=self._clickup_loop, daemon=True, name="clickup"
)
self._clickup_thread.start()
log.info(
"ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes
)
else:
log.info("ClickUp integration disabled (no API token)")
# Start folder watcher if configured
watch_folder = self.config.link_building.watch_folder
if watch_folder:
self._folder_watch_thread = threading.Thread(
target=self._folder_watch_loop, daemon=True, name="folder-watch"
)
self._folder_watch_thread.start()
log.info(
"Folder watcher started (folder=%s, interval=%dm)",
watch_folder,
self.config.link_building.watch_interval_minutes,
)
else:
log.info("Folder watcher disabled (no watch_folder configured)")
# Start AutoCora result polling if configured
if self.config.autocora.enabled:
self._autocora_thread = threading.Thread(
target=self._autocora_loop, daemon=True, name="autocora"
)
self._autocora_thread.start()
log.info(
"AutoCora polling started (interval=%dm)",
self.config.autocora.poll_interval_minutes,
)
else:
log.info("AutoCora polling disabled")
# Start content folder watcher if configured
content_inbox = self.config.content.cora_inbox
if content_inbox:
self._content_watch_thread = threading.Thread(
target=self._content_watch_loop, daemon=True, name="content-watch"
)
self._content_watch_thread.start()
log.info(
"Content folder watcher started (folder=%s, interval=%dm)",
content_inbox,
self.config.link_building.watch_interval_minutes,
)
else:
log.info("Content folder watcher disabled (no cora_inbox configured)")
log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds,
self.config.scheduler.heartbeat_interval_minutes,
)
def stop(self):
self._stop_event.set()
if self._clickup_client:
self._clickup_client.close()
def _notify(self, message: str, category: str = "clickup"):
"""Push a notification through the bus if available."""
if self.notification_bus:
self.notification_bus.push(message, category)
else:
log.info("Notification [%s]: %s", category, message)
# ── Loop control ──
def _interruptible_wait(self, seconds: float, force_event: threading.Event | None = None):
"""Wait for *seconds*, returning early if stop or force event fires."""
remaining = seconds
while remaining > 0 and not self._stop_event.is_set():
if force_event and force_event.is_set():
force_event.clear()
return
self._stop_event.wait(min(5, remaining))
remaining -= 5
def force_heartbeat(self):
"""Wake the heartbeat loop immediately."""
self._force_heartbeat.set()
def force_poll(self):
"""Wake the scheduler poll loop immediately."""
self._force_poll.set()
def force_autocora(self):
"""Wake the AutoCora poll loop immediately."""
self._force_autocora.set()
def get_loop_timestamps(self) -> dict[str, str | None]:
"""Return last_run timestamps for all loops."""
return {
"heartbeat": self.db.kv_get("system:loop:heartbeat:last_run"),
"poll": self.db.kv_get("system:loop:poll:last_run"),
"clickup": self.db.kv_get("system:loop:clickup:last_run"),
"folder_watch": self.db.kv_get("system:loop:folder_watch:last_run"),
"autocora": self.db.kv_get("system:loop:autocora:last_run"),
"content_watch": self.db.kv_get("system:loop:content_watch:last_run"),
}
# ── Scheduled Tasks ──
def _poll_loop(self):
while not self._stop_event.is_set():
try:
self._run_due_tasks()
self.db.kv_set(
"system:loop:poll:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("Scheduler poll error: %s", e)
self._interruptible_wait(
self.config.scheduler.poll_interval_seconds, self._force_poll
)
def _run_due_tasks(self):
tasks = self.db.get_due_tasks()
for task in tasks:
try:
log.info("Running scheduled task: %s", task["name"])
result = self.agent.execute_task(task["prompt"])
self.db.log_task_run(task["id"], result=result[:2000])
# Calculate next run
schedule = task["schedule"]
if schedule.startswith("once:"):
# One-time task, disable it
self.db.disable_task(task["id"])
else:
# Cron schedule - calculate next run
now = datetime.now(UTC)
cron = croniter(schedule, now)
next_run = cron.get_next(datetime)
self.db.update_task_next_run(task["id"], next_run.isoformat())
except Exception as e:
log.error("Task '%s' failed: %s", task["name"], e)
self.db.log_task_run(task["id"], error=str(e))
# ── Heartbeat ──
def _heartbeat_loop(self):
interval = self.config.scheduler.heartbeat_interval_minutes * 60
# Wait a bit before first heartbeat
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._run_heartbeat()
self.db.kv_set(
"system:loop:heartbeat:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("Heartbeat error: %s", e)
self._interruptible_wait(interval, self._force_heartbeat)
def _run_heartbeat(self):
heartbeat_path = self.config.identity_dir / "HEARTBEAT.md"
if not heartbeat_path.exists():
return
checklist = heartbeat_path.read_text(encoding="utf-8")
prompt = (
f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n"
f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n"
f"{checklist}"
)
result = self.agent.execute_task(prompt, system_context=checklist)
if HEARTBEAT_OK in result:
log.debug("Heartbeat: all clear")
else:
log.info("Heartbeat action taken: %s", result[:200])
# ── ClickUp Integration ──
def _get_clickup_client(self):
"""Lazy-init the ClickUp API client."""
if self._clickup_client is None:
from .clickup import ClickUpClient
self._clickup_client = ClickUpClient(
api_token=self.config.clickup.api_token,
workspace_id=self.config.clickup.workspace_id,
task_type_field_name=self.config.clickup.task_type_field_name,
)
return self._clickup_client
# Maximum time a task can stay in "automation underway" before recovery (seconds)
STALE_TASK_THRESHOLD_SECONDS = 2 * 60 * 60 # 2 hours
def _clickup_loop(self):
"""Poll ClickUp for tasks on a regular interval."""
interval = self.config.clickup.poll_interval_minutes * 60
# Wait before first poll to let other systems initialize
self._stop_event.wait(30)
while not self._stop_event.is_set():
try:
self._poll_clickup()
self._recover_stale_tasks()
self.db.kv_set(
"system:loop:clickup:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("ClickUp poll error: %s", e)
self._interruptible_wait(interval)
def _discover_field_filter(self, client):
"""Discover and cache the Work Category field UUID + option map."""
space_id = self.config.clickup.space_id
list_ids = client.get_list_ids_from_space(space_id)
if not list_ids:
log.warning("No lists found in space %s — cannot discover field filter", space_id)
return None
# Use the first list to discover field metadata
first_list = next(iter(list_ids))
field_name = self.config.clickup.task_type_field_name
result = client.discover_field_filter(first_list, field_name)
if result:
log.info(
"Discovered field filter for '%s': field_id=%s, options=%s",
field_name,
result["field_id"],
list(result["options"].keys()),
)
else:
log.warning(
"Field '%s' not found in list %s — falling back to client-side filtering",
field_name,
first_list,
)
return result
def _poll_clickup(self):
"""Poll ClickUp for eligible tasks and execute them immediately."""
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
log.warning("ClickUp space_id not configured, skipping poll")
return
skill_map = self.config.clickup.skill_map
if not skill_map:
log.debug("No skill_map configured, skipping ClickUp poll")
return
# Discover field filter on first poll
if self._field_filter_cache is None:
self._field_filter_cache = self._discover_field_filter(client) or {}
# Build API filters
now_ms = int(datetime.now(UTC).timestamp() * 1000)
due_date_lt = now_ms + (self.DUE_DATE_WINDOW_WEEKS * 7 * 24 * 60 * 60 * 1000)
custom_fields_filter = None
if self._field_filter_cache and self._field_filter_cache.get("options"):
field_id = self._field_filter_cache["field_id"]
options = self._field_filter_cache["options"]
# Only include options that map to skills we have
matching_opt_ids = [options[name] for name in skill_map if name in options]
if matching_opt_ids:
import json as _json
custom_fields_filter = _json.dumps(
[{"field_id": field_id, "operator": "ANY", "value": matching_opt_ids}]
)
tasks = client.get_tasks_from_space(
space_id,
statuses=self.config.clickup.poll_statuses,
due_date_lt=due_date_lt,
custom_fields=custom_fields_filter,
)
for task in tasks:
# Skip tasks already processed in kv_store
raw = self.db.kv_get(f"clickup:task:{task.id}:state")
if raw:
try:
existing = json.loads(raw)
if existing.get("state") in ("executing", "completed", "failed"):
continue
except json.JSONDecodeError:
pass
# Client-side verify: Work Category must be in skill_map
if task.task_type not in skill_map:
continue
# Respect auto_execute flag — skip tasks that require manual trigger
mapping = skill_map[task.task_type]
if not mapping.get("auto_execute", False):
log.debug(
"Skipping task '%s' (type=%s): auto_execute is false",
task.name,
task.task_type,
)
continue
# Client-side verify: due_date must exist and be within window
if not task.due_date:
continue
try:
task_due_ms = int(task.due_date)
if task_due_ms > due_date_lt:
continue
except (ValueError, TypeError):
continue
self._execute_task(task)
def _execute_task(self, task):
"""Execute a single ClickUp task immediately."""
skill_map = self.config.clickup.skill_map
mapping = skill_map.get(task.task_type, {})
tool_name = mapping.get("tool", "")
if not tool_name:
log.warning("No tool in skill_map for type '%s'", task.task_type)
return
task_id = task.id
kv_key = f"clickup:task:{task_id}:state"
now = datetime.now(UTC).isoformat()
client = self._get_clickup_client()
# Build state object — starts at "executing"
state = {
"state": "executing",
"clickup_task_id": task_id,
"clickup_task_name": task.name,
"task_type": task.task_type,
"skill_name": tool_name,
"discovered_at": now,
"started_at": now,
"completed_at": None,
"error": None,
"deliverable_paths": [],
"custom_fields": task.custom_fields,
}
# Move to "automation underway" on ClickUp immediately
client.update_task_status(task_id, self.config.clickup.automation_status)
self.db.kv_set(kv_key, json.dumps(state))
log.info("Executing ClickUp task: %s%s", task.name, tool_name)
self._notify(f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`")
try:
# Build tool arguments from field mapping
args = self._build_tool_args(state)
args["clickup_task_id"] = task_id
# Execute the skill via the tool registry
if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute(tool_name, args)
else:
result = self.agent.execute_task(
f"Execute the '{tool_name}' tool for ClickUp task '{task.name}'. "
f"Task description: {state.get('custom_fields', {})}"
)
# Check if the tool skipped or reported an error without doing work
if result.startswith("Skipped:") or result.startswith("Error:"):
state["state"] = "failed"
state["error"] = result[:500]
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
client.add_comment(
task_id,
f"⚠️ CheddahBot could not execute this task.\n\n{result[:2000]}",
)
# Move to "error" so Bryan can see what happened
client.update_task_status(task_id, self.config.clickup.error_status)
self._notify(
f"ClickUp task skipped: **{task.name}**\n"
f"Reason: {result[:200]}"
)
log.info("ClickUp task skipped: %s%s", task.name, result[:200])
return
# Check if the tool already handled ClickUp sync internally
tool_handled_sync = "## ClickUp Sync" in result
if tool_handled_sync:
state["state"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
else:
# Scheduler handles sync (fallback path)
docx_paths = _extract_docx_paths(result)
state["deliverable_paths"] = docx_paths
uploaded_count = 0
for path in docx_paths:
if client.upload_attachment(task_id, path):
uploaded_count += 1
else:
log.warning("Failed to upload %s for task %s", path, task_id)
state["state"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
client.update_task_status(task_id, self.config.clickup.review_status)
attach_note = f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else ""
comment = (
f"✅ CheddahBot completed this task.\n\n"
f"Skill: {tool_name}\n"
f"Result:\n{result[:3000]}{attach_note}"
)
client.add_comment(task_id, comment)
self._notify(
f"ClickUp task completed: **{task.name}**\n"
f"Skill: `{tool_name}` | Status set to '{self.config.clickup.review_status}'"
)
log.info("ClickUp task completed: %s", task.name)
except Exception as e:
# Failure — move back to "to do" on ClickUp
state["state"] = "failed"
state["error"] = str(e)
state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state))
client.add_comment(
task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}"
)
# Move to "error" so Bryan can see what happened
client.update_task_status(task_id, self.config.clickup.error_status)
self._notify(
f"ClickUp task failed: **{task.name}**\n"
f"Skill: `{tool_name}` | Error: {str(e)[:200]}"
)
log.error("ClickUp task failed: %s%s", task.name, e)
def _recover_stale_tasks(self):
"""Reset tasks stuck in 'automation underway' for too long.
If a task has been in the automation status for more than
STALE_TASK_THRESHOLD_SECONDS (default 2 hours), reset it to
the first poll status (usually 'to do') so it gets retried.
"""
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
return
automation_status = self.config.clickup.automation_status
try:
stale_tasks = client.get_tasks_from_space(
space_id, statuses=[automation_status]
)
except Exception as e:
log.warning("Failed to query stale tasks: %s", e)
return
now_ms = int(datetime.now(UTC).timestamp() * 1000)
threshold_ms = self.STALE_TASK_THRESHOLD_SECONDS * 1000
for task in stale_tasks:
if not task.date_updated:
continue
try:
updated_ms = int(task.date_updated)
except (ValueError, TypeError):
continue
age_ms = now_ms - updated_ms
if age_ms > threshold_ms:
reset_status = self.config.clickup.poll_statuses[0] if self.config.clickup.poll_statuses else "to do"
log.warning(
"Recovering stale task %s (%s) — stuck in '%s' for %.1f hours",
task.id, task.name, automation_status, age_ms / 3_600_000,
)
client.update_task_status(task.id, reset_status)
client.add_comment(
task.id,
f"⚠️ CheddahBot auto-recovered this task. It was stuck in "
f"'{automation_status}' for {age_ms / 3_600_000:.1f} hours. "
f"Reset to '{reset_status}' for retry.",
)
self._notify(
f"Recovered stale task: **{task.name}** — "
f"reset from '{automation_status}' to '{reset_status}'",
category="clickup",
)
def _build_tool_args(self, state: dict) -> dict:
"""Build tool arguments from ClickUp task fields using the field mapping."""
skill_map = self.config.clickup.skill_map
task_type = state.get("task_type", "")
mapping = skill_map.get(task_type, {})
field_mapping = mapping.get("field_mapping", {})
args = {}
for tool_param, source in field_mapping.items():
if source == "task_name":
args[tool_param] = state.get("clickup_task_name", "")
elif source == "task_description":
args[tool_param] = state.get("custom_fields", {}).get("description", "")
else:
# Look up custom field by name
args[tool_param] = state.get("custom_fields", {}).get(source, "")
return args
# ── AutoCora Result Polling ──
def _autocora_loop(self):
"""Auto-submit jobs for today's tasks, then poll for results."""
interval = self.config.autocora.poll_interval_minutes * 60
# Wait before first poll
self._stop_event.wait(30)
while not self._stop_event.is_set():
try:
self._auto_submit_cora_jobs()
self._poll_autocora_results()
self.db.kv_set(
"system:loop:autocora:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("AutoCora poll error: %s", e)
self._interruptible_wait(interval, self._force_autocora)
def _auto_submit_cora_jobs(self):
"""Auto-submit AutoCora jobs using multi-pass sweep (no explicit date)."""
from .tools.autocora import submit_autocora_jobs
if not self.config.clickup.api_token:
return
ctx = {
"config": self.config,
"db": self.db,
"agent": self.agent,
}
result = submit_autocora_jobs(ctx=ctx)
log.info("AutoCora auto-submit (sweep): %s", result)
def _poll_autocora_results(self):
"""Check for completed AutoCora results and update ClickUp tasks.
Scans the results folder for .result files. Each file contains JSON
with task_ids and status. After processing, moves the file to
results/processed/ to prevent re-processing.
"""
from .tools.autocora import _parse_result
autocora = self.config.autocora
results_dir = Path(autocora.results_dir)
if not results_dir.exists():
log.debug("AutoCora results dir does not exist: %s", results_dir)
return
result_files = list(results_dir.glob("*.result"))
if not result_files:
return
client = self._get_clickup_client() if self.config.clickup.api_token else None
processed_dir = results_dir / "processed"
for result_path in result_files:
raw = result_path.read_text(encoding="utf-8").strip()
result_data = _parse_result(raw)
task_ids = result_data.get("task_ids", [])
status = result_data.get("status", "UNKNOWN")
keyword = result_data.get("keyword", result_path.stem)
if status == "SUCCESS":
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.success_status)
client.add_comment(
tid, f"Cora report completed for keyword: {keyword}"
)
self._notify(
f"AutoCora SUCCESS: **{keyword}** — "
f"{len(task_ids)} task(s) moved to '{autocora.success_status}'",
category="autocora",
)
elif status == "FAILURE":
reason = result_data.get("reason", "unknown error")
if client and task_ids:
for tid in task_ids:
client.update_task_status(tid, autocora.error_status)
client.add_comment(
tid,
f"Cora report failed for keyword: {keyword}\nReason: {reason}",
)
self._notify(
f"AutoCora FAILURE: **{keyword}** — {reason}",
category="autocora",
)
log.info("AutoCora result for '%s': %s", keyword, status)
# Move result file to processed/
processed_dir.mkdir(exist_ok=True)
try:
result_path.rename(processed_dir / result_path.name)
except OSError as e:
log.warning("Could not move result file %s: %s", result_path.name, e)
# ── Folder Watcher ──
def _folder_watch_loop(self):
"""Poll the watch folder for new .xlsx files on a regular interval."""
interval = self.config.link_building.watch_interval_minutes * 60
# Wait before first scan to let other systems initialize
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._scan_watch_folder()
self.db.kv_set(
"system:loop:folder_watch:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("Folder watcher error: %s", e)
self._interruptible_wait(interval)
def _scan_watch_folder(self):
"""Scan the watch folder for new .xlsx files and match to ClickUp tasks."""
watch_folder = Path(self.config.link_building.watch_folder)
if not watch_folder.exists():
log.warning("Watch folder does not exist: %s", watch_folder)
return
xlsx_files = sorted(watch_folder.glob("*.xlsx"))
if not xlsx_files:
log.debug("No .xlsx files in watch folder")
return
for xlsx_path in xlsx_files:
filename = xlsx_path.name
# Skip Office temp/lock files (e.g. ~$insert_molding.xlsx)
if filename.startswith("~$"):
continue
kv_key = f"linkbuilding:watched:{filename}"
# Skip completed/failed; retry "processing" (killed run) and "blocked" (missing field)
existing = self.db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") in ("completed", "failed"):
continue
if state.get("status") in ("processing", "blocked", "unmatched"):
log.info("Retrying '%s' state for %s", state["status"], filename)
self.db.kv_delete(kv_key)
except json.JSONDecodeError:
continue
log.info("Folder watcher: new .xlsx found: %s", filename)
self._process_watched_file(xlsx_path, kv_key)
def _process_watched_file(self, xlsx_path: Path, kv_key: str):
"""Try to match a watched .xlsx file to a ClickUp task and run the pipeline."""
filename = xlsx_path.name
# Normalize filename stem for matching
# e.g., "precision-cnc-machining" → "precision cnc machining"
stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
# Mark as processing
self.db.kv_set(
kv_key,
json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}),
)
# Try to find matching ClickUp task
matched_task = None
if self.config.clickup.enabled:
matched_task = self._match_xlsx_to_clickup(stem)
if not matched_task:
log.warning("No ClickUp task match for '%s' — skipping", filename)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "unmatched",
"filename": filename,
"stem": stem,
"checked_at": datetime.now(UTC).isoformat(),
}
),
)
self._notify(
f"Folder watcher: no ClickUp match for **{filename}**.\n"
f"Create a Link Building task with Keyword "
f"matching '{stem}' to enable auto-processing.",
category="linkbuilding",
)
return
# Extract tool args from matched task
task_id = matched_task.id
log.info("Matched '%s' to ClickUp task %s (%s)", filename, task_id, matched_task.name)
# Set ClickUp status to "automation underway"
client = self._get_clickup_client()
client.update_task_status(task_id, self.config.clickup.automation_status)
self._notify(
f"Folder watcher: matched **{filename}** to ClickUp task **{matched_task.name}**.\n"
f"Starting Cora Backlinks pipeline...",
category="linkbuilding",
)
# Build tool args from the matched task's custom fields
money_site_url = matched_task.custom_fields.get("IMSURL", "") or ""
if not money_site_url:
log.warning("Task %s (%s) missing IMSURL — skipping", task_id, matched_task.name)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "blocked",
"reason": "missing_imsurl",
"filename": filename,
"task_id": task_id,
"task_name": matched_task.name,
"checked_at": datetime.now(UTC).isoformat(),
}
),
)
# Set ClickUp status to "error" so it's visible on the board
client.update_task_status(task_id, self.config.clickup.error_status)
self._notify(
f"Folder watcher: **{filename}** matched task **{matched_task.name}** "
f"but **IMSURL is empty**. Set the IMSURL field in ClickUp before "
f"the file can be processed.",
category="linkbuilding",
)
return
args = {
"xlsx_path": str(xlsx_path),
"project_name": matched_task.custom_fields.get("Keyword", "") or matched_task.name,
"money_site_url": money_site_url,
"custom_anchors": matched_task.custom_fields.get("CustomAnchors", "") or "",
"cli_flags": matched_task.custom_fields.get("CLIFlags", "") or "",
"clickup_task_id": task_id,
}
# Parse branded_plus_ratio
bp_raw = matched_task.custom_fields.get("BrandedPlusRatio", "")
if bp_raw:
with contextlib.suppress(ValueError, TypeError):
args["branded_plus_ratio"] = float(bp_raw)
try:
# Execute via tool registry
if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute("run_cora_backlinks", args)
else:
result = "Error: tool registry not available"
if "Error" in result and "## Step" not in result:
# Pipeline failed
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": result[:500],
"failed_at": datetime.now(UTC).isoformat(),
}
),
)
client.update_task_status(task_id, self.config.clickup.error_status)
self._notify(
f"Folder watcher: pipeline **failed** for **{filename}**.\n"
f"Error: {result[:200]}",
category="linkbuilding",
)
else:
# Success — move file to processed/
processed_dir = xlsx_path.parent / "processed"
processed_dir.mkdir(exist_ok=True)
dest = processed_dir / filename
try:
shutil.move(str(xlsx_path), str(dest))
log.info("Moved %s to %s", filename, dest)
except OSError as e:
log.warning("Could not move %s to processed: %s", filename, e)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "completed",
"filename": filename,
"task_id": task_id,
"completed_at": datetime.now(UTC).isoformat(),
}
),
)
client.update_task_status(task_id, "complete")
self._notify(
f"Folder watcher: pipeline **completed** for **{filename}**.\n"
f"ClickUp task: {matched_task.name}",
category="linkbuilding",
)
except Exception as e:
log.error("Folder watcher pipeline error for %s: %s", filename, e)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": str(e)[:500],
"failed_at": datetime.now(UTC).isoformat(),
}
),
)
client.update_task_status(task_id, self.config.clickup.error_status)
def _match_xlsx_to_clickup(self, normalized_stem: str):
"""Find a ClickUp Link Building task whose Keyword matches the file stem.
Returns the matched ClickUpTask or None.
"""
from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
return None
try:
tasks = client.get_tasks_from_overall_lists(space_id)
except Exception as e:
log.warning("ClickUp query failed in _match_xlsx_to_clickup: %s", e)
return None
for task in tasks:
if task.task_type != "Link Building":
continue
lb_method = task.custom_fields.get("LB Method", "")
if lb_method and lb_method != "Cora Backlinks":
continue
keyword = task.custom_fields.get("Keyword", "")
if not keyword:
continue
keyword_norm = _normalize_for_match(str(keyword))
if _fuzzy_keyword_match(normalized_stem, keyword_norm):
return task
return None
# ── Content Folder Watcher ──
def _content_watch_loop(self):
"""Poll the content Cora inbox for new .xlsx files on a regular interval."""
interval = self.config.link_building.watch_interval_minutes * 60
# Wait before first scan to let other systems initialize
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._scan_content_folder()
self.db.kv_set(
"system:loop:content_watch:last_run", datetime.now(UTC).isoformat()
)
except Exception as e:
log.error("Content folder watcher error: %s", e)
self._interruptible_wait(interval)
def _scan_content_folder(self):
"""Scan the content Cora inbox for new .xlsx files and match to ClickUp tasks."""
inbox = Path(self.config.content.cora_inbox)
if not inbox.exists():
log.warning("Content Cora inbox does not exist: %s", inbox)
return
xlsx_files = sorted(inbox.glob("*.xlsx"))
if not xlsx_files:
log.debug("No .xlsx files in content Cora inbox")
return
for xlsx_path in xlsx_files:
filename = xlsx_path.name
# Skip Office temp/lock files
if filename.startswith("~$"):
continue
kv_key = f"content:watched:{filename}"
# Skip completed/failed; retry processing/blocked/unmatched
existing = self.db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") in ("completed", "failed"):
continue
if state.get("status") in ("processing", "blocked", "unmatched"):
log.info("Retrying '%s' state for %s", state["status"], filename)
self.db.kv_delete(kv_key)
except json.JSONDecodeError:
continue
log.info("Content watcher: new .xlsx found: %s", filename)
self._process_content_file(xlsx_path, kv_key)
def _process_content_file(self, xlsx_path: Path, kv_key: str):
"""Match a content Cora .xlsx to a ClickUp task and run create_content."""
filename = xlsx_path.name
stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
# Mark as processing
self.db.kv_set(
kv_key,
json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}),
)
# Try to find matching ClickUp task
matched_task = None
if self.config.clickup.enabled:
matched_task = self._match_xlsx_to_content_task(stem)
if not matched_task:
log.warning("No ClickUp content task match for '%s' — skipping", filename)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "unmatched",
"filename": filename,
"stem": stem,
"checked_at": datetime.now(UTC).isoformat(),
}
),
)
self._notify(
f"Content watcher: no ClickUp match for **{filename}**.\n"
f"Create a Content Creation or On Page Optimization task with Keyword "
f"matching '{stem}' to enable auto-processing.",
category="content",
)
return
task_id = matched_task.id
log.info("Matched '%s' to ClickUp task %s (%s)", filename, task_id, matched_task.name)
# Set ClickUp status to "automation underway"
client = self._get_clickup_client()
client.update_task_status(task_id, self.config.clickup.automation_status)
self._notify(
f"Content watcher: matched **{filename}** to ClickUp task "
f"**{matched_task.name}**.\nStarting content creation pipeline...",
category="content",
)
# Extract fields from the matched task
keyword = matched_task.custom_fields.get("Keyword", "") or matched_task.name
url = matched_task.custom_fields.get("IMSURL", "") or ""
cli_flags = matched_task.custom_fields.get("CLIFlags", "") or ""
args = {
"keyword": str(keyword),
"url": str(url),
"cli_flags": str(cli_flags),
"clickup_task_id": task_id,
}
try:
if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute("create_content", args)
else:
result = "Error: tool registry not available"
if result.startswith("Error:"):
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": result[:500],
"failed_at": datetime.now(UTC).isoformat(),
}
),
)
self._notify(
f"Content watcher: pipeline **failed** for **{filename}**.\n"
f"Error: {result[:200]}",
category="content",
)
else:
# Success — move file to processed/
processed_dir = xlsx_path.parent / "processed"
processed_dir.mkdir(exist_ok=True)
dest = processed_dir / filename
try:
shutil.move(str(xlsx_path), str(dest))
log.info("Moved %s to %s", filename, dest)
except OSError as e:
log.warning("Could not move %s to processed: %s", filename, e)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "completed",
"filename": filename,
"task_id": task_id,
"completed_at": datetime.now(UTC).isoformat(),
}
),
)
self._notify(
f"Content watcher: pipeline **completed** for **{filename}**.\n"
f"ClickUp task: {matched_task.name}",
category="content",
)
except Exception as e:
log.error("Content watcher pipeline error for %s: %s", filename, e)
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": str(e)[:500],
"failed_at": datetime.now(UTC).isoformat(),
}
),
)
def _match_xlsx_to_content_task(self, normalized_stem: str):
"""Find a ClickUp content task whose Keyword matches the file stem.
Matches tasks with Work Category in ("Content Creation", "On Page Optimization").
Returns the matched ClickUpTask or None.
"""
from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
return None
try:
tasks = client.get_tasks_from_overall_lists(space_id)
except Exception as e:
log.warning("ClickUp query failed in _match_xlsx_to_content_task: %s", e)
return None
content_types = ("Content Creation", "On Page Optimization")
for task in tasks:
if task.task_type not in content_types:
continue
keyword = task.custom_fields.get("Keyword", "")
if not keyword:
continue
keyword_norm = _normalize_for_match(str(keyword))
if _fuzzy_keyword_match(normalized_stem, keyword_norm):
return task
return None