Add folder watcher thread to scheduler for Cora inbox

- 4th daemon thread _folder_watch_loop scans Z:/cora-inbox
- Fuzzy-matches .xlsx filename stems to ClickUp Keyword fields
- On match: runs run_cora_backlinks, moves file to processed/
- On failure: marks in KV store, notifies via NotificationBus
- Uses existing _get_clickup_client and notification infrastructure

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-19 20:07:24 -06:00
parent 40aca81e16
commit 90e79b77ab
1 changed files with 331 additions and 130 deletions

View File

@ -1,12 +1,14 @@
"""Task scheduler with heartbeat and ClickUp polling support.""" """Task scheduler with heartbeat, ClickUp polling, and folder watch support."""
from __future__ import annotations from __future__ import annotations
import json import json
import logging import logging
import re import re
import shutil
import threading import threading
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from croniter import croniter from croniter import croniter
@ -31,6 +33,9 @@ def _extract_docx_paths(result: str) -> list[str]:
class Scheduler: class Scheduler:
# Tasks due within this window are eligible for execution
DUE_DATE_WINDOW_WEEKS = 3
def __init__( def __init__(
self, self,
config: Config, config: Config,
@ -46,7 +51,9 @@ class Scheduler:
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
self._heartbeat_thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None
self._clickup_thread: threading.Thread | None = None self._clickup_thread: threading.Thread | None = None
self._folder_watch_thread: threading.Thread | None = None
self._clickup_client = None self._clickup_client = None
self._field_filter_cache: dict | None = None
def start(self): def start(self):
"""Start the scheduler, heartbeat, and ClickUp threads.""" """Start the scheduler, heartbeat, and ClickUp threads."""
@ -70,6 +77,21 @@ class Scheduler:
else: else:
log.info("ClickUp integration disabled (no API token)") log.info("ClickUp integration disabled (no API token)")
# Start folder watcher if configured
watch_folder = self.config.link_building.watch_folder
if watch_folder:
self._folder_watch_thread = threading.Thread(
target=self._folder_watch_loop, daemon=True, name="folder-watch"
)
self._folder_watch_thread.start()
log.info(
"Folder watcher started (folder=%s, interval=%dm)",
watch_folder,
self.config.link_building.watch_interval_minutes,
)
else:
log.info("Folder watcher disabled (no watch_folder configured)")
log.info( log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)", "Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds, self.config.scheduler.poll_interval_seconds,
@ -175,165 +197,159 @@ class Scheduler:
# Wait before first poll to let other systems initialize # Wait before first poll to let other systems initialize
self._stop_event.wait(30) self._stop_event.wait(30)
# On startup, recover orphaned executing tasks
self._recover_orphaned_tasks()
while not self._stop_event.is_set(): while not self._stop_event.is_set():
try: try:
self._poll_clickup() self._poll_clickup()
self._execute_approved_tasks()
except Exception as e: except Exception as e:
log.error("ClickUp poll error: %s", e) log.error("ClickUp poll error: %s", e)
self._stop_event.wait(interval) self._stop_event.wait(interval)
def _recover_orphaned_tasks(self): def _discover_field_filter(self, client):
"""Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'.""" """Discover and cache the Work Category field UUID + option map."""
pairs = self.db.kv_scan("clickup:task:") space_id = self.config.clickup.space_id
for key, value in pairs: list_ids = client.get_list_ids_from_space(space_id)
if not key.endswith(":state"): if not list_ids:
continue log.warning("No lists found in space %s — cannot discover field filter", space_id)
try: return None
state = json.loads(value)
if state.get("state") == "executing": # Use the first list to discover field metadata
task_id = state["clickup_task_id"] first_list = next(iter(list_ids))
log.warning("Recovering orphaned executing task: %s", task_id) field_name = self.config.clickup.task_type_field_name
state["state"] = "approved" result = client.discover_field_filter(first_list, field_name)
state["error"] = None if result:
self.db.kv_set(key, json.dumps(state)) log.info(
except (json.JSONDecodeError, KeyError): "Discovered field filter for '%s': field_id=%s, options=%s",
pass field_name,
result["field_id"],
list(result["options"].keys()),
)
else:
log.warning(
"Field '%s' not found in list %s — falling back to client-side filtering",
field_name,
first_list,
)
return result
def _poll_clickup(self): def _poll_clickup(self):
"""Discover new tasks from ClickUp and process them.""" """Poll ClickUp for eligible tasks and execute them immediately."""
client = self._get_clickup_client() client = self._get_clickup_client()
space_id = self.config.clickup.space_id space_id = self.config.clickup.space_id
if not space_id: if not space_id:
log.warning("ClickUp space_id not configured, skipping poll") log.warning("ClickUp space_id not configured, skipping poll")
return return
skill_map = self.config.clickup.skill_map
if not skill_map:
log.debug("No skill_map configured, skipping ClickUp poll")
return
# Discover field filter on first poll
if self._field_filter_cache is None:
self._field_filter_cache = self._discover_field_filter(client) or {}
# Build API filters
now_ms = int(datetime.now(UTC).timestamp() * 1000)
due_date_lt = now_ms + (self.DUE_DATE_WINDOW_WEEKS * 7 * 24 * 60 * 60 * 1000)
custom_fields_filter = None
if self._field_filter_cache and self._field_filter_cache.get("options"):
field_id = self._field_filter_cache["field_id"]
options = self._field_filter_cache["options"]
# Only include options that map to skills we have
matching_opt_ids = [
options[name] for name in skill_map if name in options
]
if matching_opt_ids:
import json as _json
custom_fields_filter = _json.dumps(
[{"field_id": field_id, "operator": "ANY", "value": matching_opt_ids}]
)
tasks = client.get_tasks_from_space( tasks = client.get_tasks_from_space(
space_id, space_id,
statuses=self.config.clickup.poll_statuses, statuses=self.config.clickup.poll_statuses,
due_date_lt=due_date_lt,
custom_fields=custom_fields_filter,
) )
# Load active task IDs to avoid re-processing
active_raw = self.db.kv_get("clickup:active_task_ids")
active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set()
for task in tasks: for task in tasks:
if task.id in active_ids: # Skip tasks already processed in kv_store
continue # Already tracked raw = self.db.kv_get(f"clickup:task:{task.id}:state")
self._process_clickup_task(task, active_ids) if raw:
try:
existing = json.loads(raw)
if existing.get("state") in ("executing", "completed", "failed"):
continue
except json.JSONDecodeError:
pass
# Save updated active IDs # Client-side verify: Work Category must be in skill_map
self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids))) if task.task_type not in skill_map:
continue
def _process_clickup_task(self, task, active_ids: set[str]): # Client-side verify: due_date must exist and be within window
"""Discover a new ClickUp task, map to skill, decide action.""" if not task.due_date:
continue
try:
task_due_ms = int(task.due_date)
if task_due_ms > due_date_lt:
continue
except (ValueError, TypeError):
continue
now = datetime.now(UTC).isoformat() self._execute_task(task)
def _execute_task(self, task):
"""Execute a single ClickUp task immediately."""
skill_map = self.config.clickup.skill_map skill_map = self.config.clickup.skill_map
mapping = skill_map.get(task.task_type, {})
tool_name = mapping.get("tool", "")
if not tool_name:
log.warning("No tool in skill_map for type '%s'", task.task_type)
return
# Build state object task_id = task.id
kv_key = f"clickup:task:{task_id}:state"
now = datetime.now(UTC).isoformat()
client = self._get_clickup_client()
# Build state object — starts at "executing"
state = { state = {
"state": "discovered", "state": "executing",
"clickup_task_id": task.id, "clickup_task_id": task_id,
"clickup_task_name": task.name, "clickup_task_name": task.name,
"task_type": task.task_type, "task_type": task.task_type,
"skill_name": None, "skill_name": tool_name,
"discovered_at": now, "discovered_at": now,
"started_at": None, "started_at": now,
"completed_at": None, "completed_at": None,
"error": None, "error": None,
"deliverable_paths": [], "deliverable_paths": [],
"custom_fields": task.custom_fields, "custom_fields": task.custom_fields,
} }
# Try to map task type to a skill # Move to "in progress" on ClickUp immediately
mapping = skill_map.get(task.task_type) client.update_task_status(task_id, self.config.clickup.in_progress_status)
if not mapping:
state["state"] = "unmapped"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task discovered but no skill mapping found.\n"
f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n"
f"Configure a skill_map entry in config.yaml to handle this task type."
)
log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type)
return
tool_name = mapping.get("tool", "")
auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute)
state["skill_name"] = tool_name
if auto_execute:
state["state"] = "approved"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task auto-approved for execution.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`"
)
log.info("Auto-approved ClickUp task: %s%s", task.name, tool_name)
else:
state["state"] = "awaiting_approval"
self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state))
active_ids.add(task.id)
self._notify(
f"New ClickUp task needs your approval.\n"
f"Task: **{task.name}** → Skill: `{tool_name}`\n"
f'Use `clickup_approve_task("{task.id}")` to approve or '
f'`clickup_decline_task("{task.id}")` to decline.'
)
log.info("ClickUp task awaiting approval: %s%s", task.name, tool_name)
def _execute_approved_tasks(self):
"""Scan for approved tasks and execute them."""
pairs = self.db.kv_scan("clickup:task:")
for key, value in pairs:
if not key.endswith(":state"):
continue
try:
state = json.loads(value)
except json.JSONDecodeError:
continue
if state.get("state") != "approved":
continue
self._execute_clickup_task(state, key)
def _execute_clickup_task(self, state: dict, kv_key: str):
"""Execute a single approved ClickUp task."""
task_id = state["clickup_task_id"]
task_name = state["clickup_task_name"]
skill_name = state["skill_name"]
now = datetime.now(UTC).isoformat()
log.info("Executing ClickUp task: %s%s", task_name, skill_name)
# Update state to executing
state["state"] = "executing"
state["started_at"] = now
self.db.kv_set(kv_key, json.dumps(state)) self.db.kv_set(kv_key, json.dumps(state))
client = self._get_clickup_client() log.info("Executing ClickUp task: %s%s", task.name, tool_name)
self._notify(
f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`"
)
try: try:
# Build tool arguments from field mapping # Build tool arguments from field mapping
args = self._build_tool_args(state) args = self._build_tool_args(state)
# Pass clickup_task_id so the tool can handle its own ClickUp sync
# (status updates, comments, attachments) if it supports it.
args["clickup_task_id"] = task_id args["clickup_task_id"] = task_id
# Execute the skill via the tool registry # Execute the skill via the tool registry
if hasattr(self.agent, "_tools") and self.agent._tools: if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute(skill_name, args) result = self.agent._tools.execute(tool_name, args)
else: else:
result = self.agent.execute_task( result = self.agent.execute_task(
f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. " f"Execute the '{tool_name}' tool for ClickUp task '{task.name}'. "
f"Task description: {state.get('custom_fields', {})}" f"Task description: {state.get('custom_fields', {})}"
) )
@ -341,17 +357,11 @@ class Scheduler:
tool_handled_sync = "## ClickUp Sync" in result tool_handled_sync = "## ClickUp Sync" in result
if tool_handled_sync: if tool_handled_sync:
# Tool did its own status updates, comments, and attachments.
# Just update the kv_store state.
state["state"] = "completed" state["state"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat() state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state)) self.db.kv_set(kv_key, json.dumps(state))
else: else:
# Tool doesn't handle sync — scheduler does it (fallback path). # Scheduler handles sync (fallback path)
# Set status to "in progress" (tool didn't do it)
client.update_task_status(task_id, self.config.clickup.in_progress_status)
# Extract and upload any docx deliverables
docx_paths = _extract_docx_paths(result) docx_paths = _extract_docx_paths(result)
state["deliverable_paths"] = docx_paths state["deliverable_paths"] = docx_paths
uploaded_count = 0 uploaded_count = 0
@ -361,46 +371,45 @@ class Scheduler:
else: else:
log.warning("Failed to upload %s for task %s", path, task_id) log.warning("Failed to upload %s for task %s", path, task_id)
# Success
state["state"] = "completed" state["state"] = "completed"
state["completed_at"] = datetime.now(UTC).isoformat() state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state)) self.db.kv_set(kv_key, json.dumps(state))
# Update ClickUp
client.update_task_status(task_id, self.config.clickup.review_status) client.update_task_status(task_id, self.config.clickup.review_status)
attach_note = ( attach_note = (
f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else "" f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else ""
) )
comment = ( comment = (
f"✅ CheddahBot completed this task.\n\n" f"✅ CheddahBot completed this task.\n\n"
f"Skill: {skill_name}\n" f"Skill: {tool_name}\n"
f"Result:\n{result[:3000]}{attach_note}" f"Result:\n{result[:3000]}{attach_note}"
) )
client.add_comment(task_id, comment) client.add_comment(task_id, comment)
self._notify( self._notify(
f"ClickUp task completed: **{task_name}**\n" f"ClickUp task completed: **{task.name}**\n"
f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'" f"Skill: `{tool_name}` | Status set to '{self.config.clickup.review_status}'"
) )
log.info("ClickUp task completed: %s", task_name) log.info("ClickUp task completed: %s", task.name)
except Exception as e: except Exception as e:
# Failure # Failure — move back to "to do" on ClickUp
state["state"] = "failed" state["state"] = "failed"
state["error"] = str(e) state["error"] = str(e)
state["completed_at"] = datetime.now(UTC).isoformat() state["completed_at"] = datetime.now(UTC).isoformat()
self.db.kv_set(kv_key, json.dumps(state)) self.db.kv_set(kv_key, json.dumps(state))
# Comment the error on ClickUp
client.add_comment( client.add_comment(
task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}" task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}"
) )
# Move back to "to do" so it can be retried after reset
client.update_task_status(task_id, "to do")
self._notify( self._notify(
f"ClickUp task failed: **{task_name}**\n" f"ClickUp task failed: **{task.name}**\n"
f"Skill: `{skill_name}` | Error: {str(e)[:200]}" f"Skill: `{tool_name}` | Error: {str(e)[:200]}"
) )
log.error("ClickUp task failed: %s%s", task_name, e) log.error("ClickUp task failed: %s%s", task.name, e)
def _build_tool_args(self, state: dict) -> dict: def _build_tool_args(self, state: dict) -> dict:
"""Build tool arguments from ClickUp task fields using the field mapping.""" """Build tool arguments from ClickUp task fields using the field mapping."""
@ -420,3 +429,195 @@ class Scheduler:
args[tool_param] = state.get("custom_fields", {}).get(source, "") args[tool_param] = state.get("custom_fields", {}).get(source, "")
return args return args
# ── Folder Watcher ──
def _folder_watch_loop(self):
"""Poll the watch folder for new .xlsx files on a regular interval."""
interval = self.config.link_building.watch_interval_minutes * 60
# Wait before first scan to let other systems initialize
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._scan_watch_folder()
except Exception as e:
log.error("Folder watcher error: %s", e)
self._stop_event.wait(interval)
def _scan_watch_folder(self):
"""Scan the watch folder for new .xlsx files and match to ClickUp tasks."""
watch_folder = Path(self.config.link_building.watch_folder)
if not watch_folder.exists():
log.warning("Watch folder does not exist: %s", watch_folder)
return
xlsx_files = sorted(watch_folder.glob("*.xlsx"))
if not xlsx_files:
log.debug("No .xlsx files in watch folder")
return
for xlsx_path in xlsx_files:
filename = xlsx_path.name
kv_key = f"linkbuilding:watched:{filename}"
# Skip already processed files
existing = self.db.kv_get(kv_key)
if existing:
try:
state = json.loads(existing)
if state.get("status") in ("completed", "processing", "failed"):
continue
except json.JSONDecodeError:
continue
log.info("Folder watcher: new .xlsx found: %s", filename)
self._process_watched_file(xlsx_path, kv_key)
def _process_watched_file(self, xlsx_path: Path, kv_key: str):
"""Try to match a watched .xlsx file to a ClickUp task and run the pipeline."""
filename = xlsx_path.name
# Normalize filename stem for matching (e.g., "precision-cnc-machining" → "precision cnc machining")
stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
# Mark as processing
self.db.kv_set(kv_key, json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}))
# Try to find matching ClickUp task
matched_task = None
if self.config.clickup.enabled:
matched_task = self._match_xlsx_to_clickup(stem)
if not matched_task:
log.warning("No ClickUp task match for '%s' — skipping", filename)
self.db.kv_set(kv_key, json.dumps({
"status": "unmatched",
"filename": filename,
"stem": stem,
"checked_at": datetime.now(UTC).isoformat(),
}))
self._notify(
f"Folder watcher: no ClickUp match for **{filename}**.\n"
f"Create a Link Building task with Keyword matching '{stem}' to enable auto-processing.",
category="linkbuilding",
)
return
# Extract tool args from matched task
task_id = matched_task.id
log.info("Matched '%s' to ClickUp task %s (%s)", filename, task_id, matched_task.name)
self._notify(
f"Folder watcher: matched **{filename}** to ClickUp task **{matched_task.name}**.\n"
f"Starting Cora Backlinks pipeline...",
category="linkbuilding",
)
# Build tool args from the matched task's custom fields
args = {
"xlsx_path": str(xlsx_path),
"project_name": matched_task.name,
"money_site_url": matched_task.custom_fields.get("IMSURL", ""),
"custom_anchors": matched_task.custom_fields.get("CustomAnchors", "") or "",
"cli_flags": matched_task.custom_fields.get("CLIFlags", "") or "",
"clickup_task_id": task_id,
}
# Parse branded_plus_ratio
bp_raw = matched_task.custom_fields.get("BrandedPlusRatio", "")
if bp_raw:
try:
args["branded_plus_ratio"] = float(bp_raw)
except (ValueError, TypeError):
pass
try:
# Execute via tool registry
if hasattr(self.agent, "_tools") and self.agent._tools:
result = self.agent._tools.execute("run_cora_backlinks", args)
else:
result = "Error: tool registry not available"
if "Error" in result and "## Step" not in result:
# Pipeline failed
self.db.kv_set(kv_key, json.dumps({
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": result[:500],
"failed_at": datetime.now(UTC).isoformat(),
}))
self._notify(
f"Folder watcher: pipeline **failed** for **{filename}**.\n"
f"Error: {result[:200]}",
category="linkbuilding",
)
else:
# Success — move file to processed/
processed_dir = xlsx_path.parent / "processed"
processed_dir.mkdir(exist_ok=True)
dest = processed_dir / filename
try:
shutil.move(str(xlsx_path), str(dest))
log.info("Moved %s to %s", filename, dest)
except OSError as e:
log.warning("Could not move %s to processed: %s", filename, e)
self.db.kv_set(kv_key, json.dumps({
"status": "completed",
"filename": filename,
"task_id": task_id,
"completed_at": datetime.now(UTC).isoformat(),
}))
self._notify(
f"Folder watcher: pipeline **completed** for **{filename}**.\n"
f"ClickUp task: {matched_task.name}",
category="linkbuilding",
)
except Exception as e:
log.error("Folder watcher pipeline error for %s: %s", filename, e)
self.db.kv_set(kv_key, json.dumps({
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": str(e)[:500],
"failed_at": datetime.now(UTC).isoformat(),
}))
def _match_xlsx_to_clickup(self, normalized_stem: str):
"""Find a ClickUp Link Building task whose Keyword matches the file stem.
Returns the matched ClickUpTask or None.
"""
from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
return None
try:
tasks = client.get_tasks_from_space(space_id, statuses=["to do"])
except Exception as e:
log.warning("ClickUp query failed in _match_xlsx_to_clickup: %s", e)
return None
for task in tasks:
if task.task_type != "Link Building":
continue
lb_method = task.custom_fields.get("LB Method", "")
if lb_method and lb_method != "Cora Backlinks":
continue
keyword = task.custom_fields.get("Keyword", "")
if not keyword:
continue
keyword_norm = _normalize_for_match(str(keyword))
if _fuzzy_keyword_match(normalized_stem, keyword_norm):
return task
return None