"""Task scheduler with heartbeat, ClickUp polling, and folder watch support.""" from __future__ import annotations import contextlib import json import logging import re import shutil import threading from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING from croniter import croniter if TYPE_CHECKING: from .agent import Agent from .config import Config from .db import Database from .notifications import NotificationBus log = logging.getLogger(__name__) HEARTBEAT_OK = "HEARTBEAT_OK" # Matches **Docx:** `path/to/file.docx` patterns in tool output _DOCX_PATH_RE = re.compile(r"\*\*Docx:\*\*\s*`([^`]+\.docx)`") def _extract_docx_paths(result: str) -> list[str]: """Extract .docx file paths from a tool result string.""" return _DOCX_PATH_RE.findall(result) class Scheduler: # Tasks due within this window are eligible for execution DUE_DATE_WINDOW_WEEKS = 3 def __init__( self, config: Config, db: Database, agent: Agent, notification_bus: NotificationBus | None = None, ): self.config = config self.db = db self.agent = agent self.notification_bus = notification_bus self._stop_event = threading.Event() self._thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None self._clickup_thread: threading.Thread | None = None self._folder_watch_thread: threading.Thread | None = None self._clickup_client = None self._field_filter_cache: dict | None = None def start(self): """Start the scheduler, heartbeat, and ClickUp threads.""" self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler") self._thread.start() self._heartbeat_thread = threading.Thread( target=self._heartbeat_loop, daemon=True, name="heartbeat" ) self._heartbeat_thread.start() # Start ClickUp polling if configured if self.config.clickup.enabled: self._clickup_thread = threading.Thread( target=self._clickup_loop, daemon=True, name="clickup" ) self._clickup_thread.start() log.info( "ClickUp polling started (interval=%dm)", self.config.clickup.poll_interval_minutes ) else: log.info("ClickUp integration disabled (no API token)") # Start folder watcher if configured watch_folder = self.config.link_building.watch_folder if watch_folder: self._folder_watch_thread = threading.Thread( target=self._folder_watch_loop, daemon=True, name="folder-watch" ) self._folder_watch_thread.start() log.info( "Folder watcher started (folder=%s, interval=%dm)", watch_folder, self.config.link_building.watch_interval_minutes, ) else: log.info("Folder watcher disabled (no watch_folder configured)") log.info( "Scheduler started (poll=%ds, heartbeat=%dm)", self.config.scheduler.poll_interval_seconds, self.config.scheduler.heartbeat_interval_minutes, ) def stop(self): self._stop_event.set() if self._clickup_client: self._clickup_client.close() def _notify(self, message: str, category: str = "clickup"): """Push a notification through the bus if available.""" if self.notification_bus: self.notification_bus.push(message, category) else: log.info("Notification [%s]: %s", category, message) # ── Scheduled Tasks ── def _poll_loop(self): while not self._stop_event.is_set(): try: self._run_due_tasks() except Exception as e: log.error("Scheduler poll error: %s", e) self._stop_event.wait(self.config.scheduler.poll_interval_seconds) def _run_due_tasks(self): tasks = self.db.get_due_tasks() for task in tasks: try: log.info("Running scheduled task: %s", task["name"]) result = self.agent.execute_task(task["prompt"]) self.db.log_task_run(task["id"], result=result[:2000]) # Calculate next run schedule = task["schedule"] if schedule.startswith("once:"): # One-time task, disable it self.db.disable_task(task["id"]) else: # Cron schedule - calculate next run now = datetime.now(UTC) cron = croniter(schedule, now) next_run = cron.get_next(datetime) self.db.update_task_next_run(task["id"], next_run.isoformat()) except Exception as e: log.error("Task '%s' failed: %s", task["name"], e) self.db.log_task_run(task["id"], error=str(e)) # ── Heartbeat ── def _heartbeat_loop(self): interval = self.config.scheduler.heartbeat_interval_minutes * 60 # Wait a bit before first heartbeat self._stop_event.wait(60) while not self._stop_event.is_set(): try: self._run_heartbeat() except Exception as e: log.error("Heartbeat error: %s", e) self._stop_event.wait(interval) def _run_heartbeat(self): heartbeat_path = self.config.identity_dir / "HEARTBEAT.md" if not heartbeat_path.exists(): return checklist = heartbeat_path.read_text(encoding="utf-8") prompt = ( f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n" f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n" f"{checklist}" ) result = self.agent.execute_task(prompt, system_context=checklist) if HEARTBEAT_OK in result: log.debug("Heartbeat: all clear") else: log.info("Heartbeat action taken: %s", result[:200]) # ── ClickUp Integration ── def _get_clickup_client(self): """Lazy-init the ClickUp API client.""" if self._clickup_client is None: from .clickup import ClickUpClient self._clickup_client = ClickUpClient( api_token=self.config.clickup.api_token, workspace_id=self.config.clickup.workspace_id, task_type_field_name=self.config.clickup.task_type_field_name, ) return self._clickup_client def _clickup_loop(self): """Poll ClickUp for tasks on a regular interval.""" interval = self.config.clickup.poll_interval_minutes * 60 # Wait before first poll to let other systems initialize self._stop_event.wait(30) while not self._stop_event.is_set(): try: self._poll_clickup() except Exception as e: log.error("ClickUp poll error: %s", e) self._stop_event.wait(interval) def _discover_field_filter(self, client): """Discover and cache the Work Category field UUID + option map.""" space_id = self.config.clickup.space_id list_ids = client.get_list_ids_from_space(space_id) if not list_ids: log.warning("No lists found in space %s — cannot discover field filter", space_id) return None # Use the first list to discover field metadata first_list = next(iter(list_ids)) field_name = self.config.clickup.task_type_field_name result = client.discover_field_filter(first_list, field_name) if result: log.info( "Discovered field filter for '%s': field_id=%s, options=%s", field_name, result["field_id"], list(result["options"].keys()), ) else: log.warning( "Field '%s' not found in list %s — falling back to client-side filtering", field_name, first_list, ) return result def _poll_clickup(self): """Poll ClickUp for eligible tasks and execute them immediately.""" client = self._get_clickup_client() space_id = self.config.clickup.space_id if not space_id: log.warning("ClickUp space_id not configured, skipping poll") return skill_map = self.config.clickup.skill_map if not skill_map: log.debug("No skill_map configured, skipping ClickUp poll") return # Discover field filter on first poll if self._field_filter_cache is None: self._field_filter_cache = self._discover_field_filter(client) or {} # Build API filters now_ms = int(datetime.now(UTC).timestamp() * 1000) due_date_lt = now_ms + (self.DUE_DATE_WINDOW_WEEKS * 7 * 24 * 60 * 60 * 1000) custom_fields_filter = None if self._field_filter_cache and self._field_filter_cache.get("options"): field_id = self._field_filter_cache["field_id"] options = self._field_filter_cache["options"] # Only include options that map to skills we have matching_opt_ids = [options[name] for name in skill_map if name in options] if matching_opt_ids: import json as _json custom_fields_filter = _json.dumps( [{"field_id": field_id, "operator": "ANY", "value": matching_opt_ids}] ) tasks = client.get_tasks_from_space( space_id, statuses=self.config.clickup.poll_statuses, due_date_lt=due_date_lt, custom_fields=custom_fields_filter, ) for task in tasks: # Skip tasks already processed in kv_store raw = self.db.kv_get(f"clickup:task:{task.id}:state") if raw: try: existing = json.loads(raw) if existing.get("state") in ("executing", "completed", "failed"): continue except json.JSONDecodeError: pass # Client-side verify: Work Category must be in skill_map if task.task_type not in skill_map: continue # Respect auto_execute flag — skip tasks that require manual trigger mapping = skill_map[task.task_type] if not mapping.get("auto_execute", False): log.debug( "Skipping task '%s' (type=%s): auto_execute is false", task.name, task.task_type, ) continue # Client-side verify: due_date must exist and be within window if not task.due_date: continue try: task_due_ms = int(task.due_date) if task_due_ms > due_date_lt: continue except (ValueError, TypeError): continue self._execute_task(task) def _execute_task(self, task): """Execute a single ClickUp task immediately.""" skill_map = self.config.clickup.skill_map mapping = skill_map.get(task.task_type, {}) tool_name = mapping.get("tool", "") if not tool_name: log.warning("No tool in skill_map for type '%s'", task.task_type) return task_id = task.id kv_key = f"clickup:task:{task_id}:state" now = datetime.now(UTC).isoformat() client = self._get_clickup_client() # Build state object — starts at "executing" state = { "state": "executing", "clickup_task_id": task_id, "clickup_task_name": task.name, "task_type": task.task_type, "skill_name": tool_name, "discovered_at": now, "started_at": now, "completed_at": None, "error": None, "deliverable_paths": [], "custom_fields": task.custom_fields, } # Move to "automation underway" on ClickUp immediately client.update_task_status(task_id, self.config.clickup.automation_status) self.db.kv_set(kv_key, json.dumps(state)) log.info("Executing ClickUp task: %s → %s", task.name, tool_name) self._notify(f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`") try: # Build tool arguments from field mapping args = self._build_tool_args(state) args["clickup_task_id"] = task_id # Execute the skill via the tool registry if hasattr(self.agent, "_tools") and self.agent._tools: result = self.agent._tools.execute(tool_name, args) else: result = self.agent.execute_task( f"Execute the '{tool_name}' tool for ClickUp task '{task.name}'. " f"Task description: {state.get('custom_fields', {})}" ) # Check if the tool skipped or reported an error without doing work if result.startswith("Skipped:") or result.startswith("Error:"): state["state"] = "failed" state["error"] = result[:500] state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) client.add_comment( task_id, f"⚠️ CheddahBot could not execute this task.\n\n{result[:2000]}", ) # Move to "error" so Bryan can see what happened client.update_task_status(task_id, self.config.clickup.error_status) self._notify( f"ClickUp task skipped: **{task.name}**\n" f"Reason: {result[:200]}" ) log.info("ClickUp task skipped: %s — %s", task.name, result[:200]) return # Check if the tool already handled ClickUp sync internally tool_handled_sync = "## ClickUp Sync" in result if tool_handled_sync: state["state"] = "completed" state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) else: # Scheduler handles sync (fallback path) docx_paths = _extract_docx_paths(result) state["deliverable_paths"] = docx_paths uploaded_count = 0 for path in docx_paths: if client.upload_attachment(task_id, path): uploaded_count += 1 else: log.warning("Failed to upload %s for task %s", path, task_id) state["state"] = "completed" state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) client.update_task_status(task_id, self.config.clickup.review_status) attach_note = f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else "" comment = ( f"✅ CheddahBot completed this task.\n\n" f"Skill: {tool_name}\n" f"Result:\n{result[:3000]}{attach_note}" ) client.add_comment(task_id, comment) self._notify( f"ClickUp task completed: **{task.name}**\n" f"Skill: `{tool_name}` | Status set to '{self.config.clickup.review_status}'" ) log.info("ClickUp task completed: %s", task.name) except Exception as e: # Failure — move back to "to do" on ClickUp state["state"] = "failed" state["error"] = str(e) state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) client.add_comment( task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}" ) # Move to "error" so Bryan can see what happened client.update_task_status(task_id, self.config.clickup.error_status) self._notify( f"ClickUp task failed: **{task.name}**\n" f"Skill: `{tool_name}` | Error: {str(e)[:200]}" ) log.error("ClickUp task failed: %s — %s", task.name, e) def _build_tool_args(self, state: dict) -> dict: """Build tool arguments from ClickUp task fields using the field mapping.""" skill_map = self.config.clickup.skill_map task_type = state.get("task_type", "") mapping = skill_map.get(task_type, {}) field_mapping = mapping.get("field_mapping", {}) args = {} for tool_param, source in field_mapping.items(): if source == "task_name": args[tool_param] = state.get("clickup_task_name", "") elif source == "task_description": args[tool_param] = state.get("custom_fields", {}).get("description", "") else: # Look up custom field by name args[tool_param] = state.get("custom_fields", {}).get(source, "") return args # ── Folder Watcher ── def _folder_watch_loop(self): """Poll the watch folder for new .xlsx files on a regular interval.""" interval = self.config.link_building.watch_interval_minutes * 60 # Wait before first scan to let other systems initialize self._stop_event.wait(60) while not self._stop_event.is_set(): try: self._scan_watch_folder() except Exception as e: log.error("Folder watcher error: %s", e) self._stop_event.wait(interval) def _scan_watch_folder(self): """Scan the watch folder for new .xlsx files and match to ClickUp tasks.""" watch_folder = Path(self.config.link_building.watch_folder) if not watch_folder.exists(): log.warning("Watch folder does not exist: %s", watch_folder) return xlsx_files = sorted(watch_folder.glob("*.xlsx")) if not xlsx_files: log.debug("No .xlsx files in watch folder") return for xlsx_path in xlsx_files: filename = xlsx_path.name # Skip Office temp/lock files (e.g. ~$insert_molding.xlsx) if filename.startswith("~$"): continue kv_key = f"linkbuilding:watched:{filename}" # Skip completed/failed; retry "processing" (killed run) and "blocked" (missing field) existing = self.db.kv_get(kv_key) if existing: try: state = json.loads(existing) if state.get("status") in ("completed", "failed"): continue if state.get("status") in ("processing", "blocked", "unmatched"): log.info("Retrying '%s' state for %s", state["status"], filename) self.db.kv_delete(kv_key) except json.JSONDecodeError: continue log.info("Folder watcher: new .xlsx found: %s", filename) self._process_watched_file(xlsx_path, kv_key) def _process_watched_file(self, xlsx_path: Path, kv_key: str): """Try to match a watched .xlsx file to a ClickUp task and run the pipeline.""" filename = xlsx_path.name # Normalize filename stem for matching # e.g., "precision-cnc-machining" → "precision cnc machining" stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ") stem = re.sub(r"\s+", " ", stem).strip() # Mark as processing self.db.kv_set( kv_key, json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}), ) # Try to find matching ClickUp task matched_task = None if self.config.clickup.enabled: matched_task = self._match_xlsx_to_clickup(stem) if not matched_task: log.warning("No ClickUp task match for '%s' — skipping", filename) self.db.kv_set( kv_key, json.dumps( { "status": "unmatched", "filename": filename, "stem": stem, "checked_at": datetime.now(UTC).isoformat(), } ), ) self._notify( f"Folder watcher: no ClickUp match for **{filename}**.\n" f"Create a Link Building task with Keyword " f"matching '{stem}' to enable auto-processing.", category="linkbuilding", ) return # Extract tool args from matched task task_id = matched_task.id log.info("Matched '%s' to ClickUp task %s (%s)", filename, task_id, matched_task.name) # Set ClickUp status to "automation underway" client = self._get_clickup_client() client.update_task_status(task_id, self.config.clickup.automation_status) self._notify( f"Folder watcher: matched **{filename}** to ClickUp task **{matched_task.name}**.\n" f"Starting Cora Backlinks pipeline...", category="linkbuilding", ) # Build tool args from the matched task's custom fields money_site_url = matched_task.custom_fields.get("IMSURL", "") or "" if not money_site_url: log.warning("Task %s (%s) missing IMSURL — skipping", task_id, matched_task.name) self.db.kv_set( kv_key, json.dumps( { "status": "blocked", "reason": "missing_imsurl", "filename": filename, "task_id": task_id, "task_name": matched_task.name, "checked_at": datetime.now(UTC).isoformat(), } ), ) # Set ClickUp status to "error" so it's visible on the board client.update_task_status(task_id, self.config.clickup.error_status) self._notify( f"Folder watcher: **{filename}** matched task **{matched_task.name}** " f"but **IMSURL is empty**. Set the IMSURL field in ClickUp before " f"the file can be processed.", category="linkbuilding", ) return args = { "xlsx_path": str(xlsx_path), "project_name": matched_task.name, "money_site_url": money_site_url, "custom_anchors": matched_task.custom_fields.get("CustomAnchors", "") or "", "cli_flags": matched_task.custom_fields.get("CLIFlags", "") or "", "clickup_task_id": task_id, } # Parse branded_plus_ratio bp_raw = matched_task.custom_fields.get("BrandedPlusRatio", "") if bp_raw: with contextlib.suppress(ValueError, TypeError): args["branded_plus_ratio"] = float(bp_raw) try: # Execute via tool registry if hasattr(self.agent, "_tools") and self.agent._tools: result = self.agent._tools.execute("run_cora_backlinks", args) else: result = "Error: tool registry not available" if "Error" in result and "## Step" not in result: # Pipeline failed self.db.kv_set( kv_key, json.dumps( { "status": "failed", "filename": filename, "task_id": task_id, "error": result[:500], "failed_at": datetime.now(UTC).isoformat(), } ), ) client.update_task_status(task_id, self.config.clickup.error_status) self._notify( f"Folder watcher: pipeline **failed** for **{filename}**.\n" f"Error: {result[:200]}", category="linkbuilding", ) else: # Success — move file to processed/ processed_dir = xlsx_path.parent / "processed" processed_dir.mkdir(exist_ok=True) dest = processed_dir / filename try: shutil.move(str(xlsx_path), str(dest)) log.info("Moved %s to %s", filename, dest) except OSError as e: log.warning("Could not move %s to processed: %s", filename, e) self.db.kv_set( kv_key, json.dumps( { "status": "completed", "filename": filename, "task_id": task_id, "completed_at": datetime.now(UTC).isoformat(), } ), ) client.update_task_status(task_id, "complete") self._notify( f"Folder watcher: pipeline **completed** for **{filename}**.\n" f"ClickUp task: {matched_task.name}", category="linkbuilding", ) except Exception as e: log.error("Folder watcher pipeline error for %s: %s", filename, e) self.db.kv_set( kv_key, json.dumps( { "status": "failed", "filename": filename, "task_id": task_id, "error": str(e)[:500], "failed_at": datetime.now(UTC).isoformat(), } ), ) client.update_task_status(task_id, self.config.clickup.error_status) def _match_xlsx_to_clickup(self, normalized_stem: str): """Find a ClickUp Link Building task whose Keyword matches the file stem. Returns the matched ClickUpTask or None. """ from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match client = self._get_clickup_client() space_id = self.config.clickup.space_id if not space_id: return None try: tasks = client.get_tasks_from_overall_lists(space_id) except Exception as e: log.warning("ClickUp query failed in _match_xlsx_to_clickup: %s", e) return None for task in tasks: if task.task_type != "Link Building": continue lb_method = task.custom_fields.get("LB Method", "") if lb_method and lb_method != "Cora Backlinks": continue keyword = task.custom_fields.get("Keyword", "") if not keyword: continue keyword_norm = _normalize_for_match(str(keyword)) if _fuzzy_keyword_match(normalized_stem, keyword_norm): return task return None