diff --git a/cheddahbot/scheduler.py b/cheddahbot/scheduler.py index 4e4ce7b..47ecfd6 100644 --- a/cheddahbot/scheduler.py +++ b/cheddahbot/scheduler.py @@ -1,12 +1,14 @@ -"""Task scheduler with heartbeat and ClickUp polling support.""" +"""Task scheduler with heartbeat, ClickUp polling, and folder watch support.""" from __future__ import annotations import json import logging import re +import shutil import threading from datetime import UTC, datetime +from pathlib import Path from typing import TYPE_CHECKING from croniter import croniter @@ -31,6 +33,9 @@ def _extract_docx_paths(result: str) -> list[str]: class Scheduler: + # Tasks due within this window are eligible for execution + DUE_DATE_WINDOW_WEEKS = 3 + def __init__( self, config: Config, @@ -46,7 +51,9 @@ class Scheduler: self._thread: threading.Thread | None = None self._heartbeat_thread: threading.Thread | None = None self._clickup_thread: threading.Thread | None = None + self._folder_watch_thread: threading.Thread | None = None self._clickup_client = None + self._field_filter_cache: dict | None = None def start(self): """Start the scheduler, heartbeat, and ClickUp threads.""" @@ -70,6 +77,21 @@ class Scheduler: else: log.info("ClickUp integration disabled (no API token)") + # Start folder watcher if configured + watch_folder = self.config.link_building.watch_folder + if watch_folder: + self._folder_watch_thread = threading.Thread( + target=self._folder_watch_loop, daemon=True, name="folder-watch" + ) + self._folder_watch_thread.start() + log.info( + "Folder watcher started (folder=%s, interval=%dm)", + watch_folder, + self.config.link_building.watch_interval_minutes, + ) + else: + log.info("Folder watcher disabled (no watch_folder configured)") + log.info( "Scheduler started (poll=%ds, heartbeat=%dm)", self.config.scheduler.poll_interval_seconds, @@ -175,165 +197,159 @@ class Scheduler: # Wait before first poll to let other systems initialize self._stop_event.wait(30) - # On startup, recover orphaned executing tasks - self._recover_orphaned_tasks() - while not self._stop_event.is_set(): try: self._poll_clickup() - self._execute_approved_tasks() except Exception as e: log.error("ClickUp poll error: %s", e) self._stop_event.wait(interval) - def _recover_orphaned_tasks(self): - """Reset tasks stuck in 'executing' state (from crash/restart) to 'approved'.""" - pairs = self.db.kv_scan("clickup:task:") - for key, value in pairs: - if not key.endswith(":state"): - continue - try: - state = json.loads(value) - if state.get("state") == "executing": - task_id = state["clickup_task_id"] - log.warning("Recovering orphaned executing task: %s", task_id) - state["state"] = "approved" - state["error"] = None - self.db.kv_set(key, json.dumps(state)) - except (json.JSONDecodeError, KeyError): - pass + def _discover_field_filter(self, client): + """Discover and cache the Work Category field UUID + option map.""" + space_id = self.config.clickup.space_id + list_ids = client.get_list_ids_from_space(space_id) + if not list_ids: + log.warning("No lists found in space %s — cannot discover field filter", space_id) + return None + + # Use the first list to discover field metadata + first_list = next(iter(list_ids)) + field_name = self.config.clickup.task_type_field_name + result = client.discover_field_filter(first_list, field_name) + if result: + log.info( + "Discovered field filter for '%s': field_id=%s, options=%s", + field_name, + result["field_id"], + list(result["options"].keys()), + ) + else: + log.warning( + "Field '%s' not found in list %s — falling back to client-side filtering", + field_name, + first_list, + ) + return result def _poll_clickup(self): - """Discover new tasks from ClickUp and process them.""" + """Poll ClickUp for eligible tasks and execute them immediately.""" client = self._get_clickup_client() space_id = self.config.clickup.space_id if not space_id: log.warning("ClickUp space_id not configured, skipping poll") return + skill_map = self.config.clickup.skill_map + if not skill_map: + log.debug("No skill_map configured, skipping ClickUp poll") + return + + # Discover field filter on first poll + if self._field_filter_cache is None: + self._field_filter_cache = self._discover_field_filter(client) or {} + + # Build API filters + now_ms = int(datetime.now(UTC).timestamp() * 1000) + due_date_lt = now_ms + (self.DUE_DATE_WINDOW_WEEKS * 7 * 24 * 60 * 60 * 1000) + + custom_fields_filter = None + if self._field_filter_cache and self._field_filter_cache.get("options"): + field_id = self._field_filter_cache["field_id"] + options = self._field_filter_cache["options"] + # Only include options that map to skills we have + matching_opt_ids = [ + options[name] for name in skill_map if name in options + ] + if matching_opt_ids: + import json as _json + + custom_fields_filter = _json.dumps( + [{"field_id": field_id, "operator": "ANY", "value": matching_opt_ids}] + ) + tasks = client.get_tasks_from_space( space_id, statuses=self.config.clickup.poll_statuses, + due_date_lt=due_date_lt, + custom_fields=custom_fields_filter, ) - # Load active task IDs to avoid re-processing - active_raw = self.db.kv_get("clickup:active_task_ids") - active_ids: set[str] = set(json.loads(active_raw)) if active_raw else set() - for task in tasks: - if task.id in active_ids: - continue # Already tracked - self._process_clickup_task(task, active_ids) + # Skip tasks already processed in kv_store + raw = self.db.kv_get(f"clickup:task:{task.id}:state") + if raw: + try: + existing = json.loads(raw) + if existing.get("state") in ("executing", "completed", "failed"): + continue + except json.JSONDecodeError: + pass - # Save updated active IDs - self.db.kv_set("clickup:active_task_ids", json.dumps(list(active_ids))) + # Client-side verify: Work Category must be in skill_map + if task.task_type not in skill_map: + continue - def _process_clickup_task(self, task, active_ids: set[str]): - """Discover a new ClickUp task, map to skill, decide action.""" + # Client-side verify: due_date must exist and be within window + if not task.due_date: + continue + try: + task_due_ms = int(task.due_date) + if task_due_ms > due_date_lt: + continue + except (ValueError, TypeError): + continue - now = datetime.now(UTC).isoformat() + self._execute_task(task) + + def _execute_task(self, task): + """Execute a single ClickUp task immediately.""" skill_map = self.config.clickup.skill_map + mapping = skill_map.get(task.task_type, {}) + tool_name = mapping.get("tool", "") + if not tool_name: + log.warning("No tool in skill_map for type '%s'", task.task_type) + return - # Build state object + task_id = task.id + kv_key = f"clickup:task:{task_id}:state" + now = datetime.now(UTC).isoformat() + client = self._get_clickup_client() + + # Build state object — starts at "executing" state = { - "state": "discovered", - "clickup_task_id": task.id, + "state": "executing", + "clickup_task_id": task_id, "clickup_task_name": task.name, "task_type": task.task_type, - "skill_name": None, + "skill_name": tool_name, "discovered_at": now, - "started_at": None, + "started_at": now, "completed_at": None, "error": None, "deliverable_paths": [], "custom_fields": task.custom_fields, } - # Try to map task type to a skill - mapping = skill_map.get(task.task_type) - if not mapping: - state["state"] = "unmapped" - self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) - active_ids.add(task.id) - self._notify( - f"New ClickUp task discovered but no skill mapping found.\n" - f"Task: **{task.name}** (Type: {task.task_type or 'none'})\n" - f"Configure a skill_map entry in config.yaml to handle this task type." - ) - log.info("Unmapped ClickUp task: %s (type=%s)", task.name, task.task_type) - return - - tool_name = mapping.get("tool", "") - auto_execute = mapping.get("auto_execute", self.config.clickup.default_auto_execute) - state["skill_name"] = tool_name - - if auto_execute: - state["state"] = "approved" - self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) - active_ids.add(task.id) - self._notify( - f"New ClickUp task auto-approved for execution.\n" - f"Task: **{task.name}** → Skill: `{tool_name}`" - ) - log.info("Auto-approved ClickUp task: %s → %s", task.name, tool_name) - else: - state["state"] = "awaiting_approval" - self.db.kv_set(f"clickup:task:{task.id}:state", json.dumps(state)) - active_ids.add(task.id) - self._notify( - f"New ClickUp task needs your approval.\n" - f"Task: **{task.name}** → Skill: `{tool_name}`\n" - f'Use `clickup_approve_task("{task.id}")` to approve or ' - f'`clickup_decline_task("{task.id}")` to decline.' - ) - log.info("ClickUp task awaiting approval: %s → %s", task.name, tool_name) - - def _execute_approved_tasks(self): - """Scan for approved tasks and execute them.""" - pairs = self.db.kv_scan("clickup:task:") - for key, value in pairs: - if not key.endswith(":state"): - continue - try: - state = json.loads(value) - except json.JSONDecodeError: - continue - - if state.get("state") != "approved": - continue - - self._execute_clickup_task(state, key) - - def _execute_clickup_task(self, state: dict, kv_key: str): - """Execute a single approved ClickUp task.""" - task_id = state["clickup_task_id"] - task_name = state["clickup_task_name"] - skill_name = state["skill_name"] - now = datetime.now(UTC).isoformat() - - log.info("Executing ClickUp task: %s → %s", task_name, skill_name) - - # Update state to executing - state["state"] = "executing" - state["started_at"] = now + # Move to "in progress" on ClickUp immediately + client.update_task_status(task_id, self.config.clickup.in_progress_status) self.db.kv_set(kv_key, json.dumps(state)) - client = self._get_clickup_client() + log.info("Executing ClickUp task: %s → %s", task.name, tool_name) + self._notify( + f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`" + ) try: # Build tool arguments from field mapping args = self._build_tool_args(state) - - # Pass clickup_task_id so the tool can handle its own ClickUp sync - # (status updates, comments, attachments) if it supports it. args["clickup_task_id"] = task_id # Execute the skill via the tool registry if hasattr(self.agent, "_tools") and self.agent._tools: - result = self.agent._tools.execute(skill_name, args) + result = self.agent._tools.execute(tool_name, args) else: result = self.agent.execute_task( - f"Execute the '{skill_name}' tool for ClickUp task '{task_name}'. " + f"Execute the '{tool_name}' tool for ClickUp task '{task.name}'. " f"Task description: {state.get('custom_fields', {})}" ) @@ -341,17 +357,11 @@ class Scheduler: tool_handled_sync = "## ClickUp Sync" in result if tool_handled_sync: - # Tool did its own status updates, comments, and attachments. - # Just update the kv_store state. state["state"] = "completed" state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) else: - # Tool doesn't handle sync — scheduler does it (fallback path). - # Set status to "in progress" (tool didn't do it) - client.update_task_status(task_id, self.config.clickup.in_progress_status) - - # Extract and upload any docx deliverables + # Scheduler handles sync (fallback path) docx_paths = _extract_docx_paths(result) state["deliverable_paths"] = docx_paths uploaded_count = 0 @@ -361,46 +371,45 @@ class Scheduler: else: log.warning("Failed to upload %s for task %s", path, task_id) - # Success state["state"] = "completed" state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) - # Update ClickUp client.update_task_status(task_id, self.config.clickup.review_status) attach_note = ( f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else "" ) comment = ( f"✅ CheddahBot completed this task.\n\n" - f"Skill: {skill_name}\n" + f"Skill: {tool_name}\n" f"Result:\n{result[:3000]}{attach_note}" ) client.add_comment(task_id, comment) self._notify( - f"ClickUp task completed: **{task_name}**\n" - f"Skill: `{skill_name}` | Status set to '{self.config.clickup.review_status}'" + f"ClickUp task completed: **{task.name}**\n" + f"Skill: `{tool_name}` | Status set to '{self.config.clickup.review_status}'" ) - log.info("ClickUp task completed: %s", task_name) + log.info("ClickUp task completed: %s", task.name) except Exception as e: - # Failure + # Failure — move back to "to do" on ClickUp state["state"] = "failed" state["error"] = str(e) state["completed_at"] = datetime.now(UTC).isoformat() self.db.kv_set(kv_key, json.dumps(state)) - # Comment the error on ClickUp client.add_comment( task_id, f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}" ) + # Move back to "to do" so it can be retried after reset + client.update_task_status(task_id, "to do") self._notify( - f"ClickUp task failed: **{task_name}**\n" - f"Skill: `{skill_name}` | Error: {str(e)[:200]}" + f"ClickUp task failed: **{task.name}**\n" + f"Skill: `{tool_name}` | Error: {str(e)[:200]}" ) - log.error("ClickUp task failed: %s — %s", task_name, e) + log.error("ClickUp task failed: %s — %s", task.name, e) def _build_tool_args(self, state: dict) -> dict: """Build tool arguments from ClickUp task fields using the field mapping.""" @@ -420,3 +429,195 @@ class Scheduler: args[tool_param] = state.get("custom_fields", {}).get(source, "") return args + + # ── Folder Watcher ── + + def _folder_watch_loop(self): + """Poll the watch folder for new .xlsx files on a regular interval.""" + interval = self.config.link_building.watch_interval_minutes * 60 + + # Wait before first scan to let other systems initialize + self._stop_event.wait(60) + + while not self._stop_event.is_set(): + try: + self._scan_watch_folder() + except Exception as e: + log.error("Folder watcher error: %s", e) + self._stop_event.wait(interval) + + def _scan_watch_folder(self): + """Scan the watch folder for new .xlsx files and match to ClickUp tasks.""" + watch_folder = Path(self.config.link_building.watch_folder) + if not watch_folder.exists(): + log.warning("Watch folder does not exist: %s", watch_folder) + return + + xlsx_files = sorted(watch_folder.glob("*.xlsx")) + if not xlsx_files: + log.debug("No .xlsx files in watch folder") + return + + for xlsx_path in xlsx_files: + filename = xlsx_path.name + kv_key = f"linkbuilding:watched:{filename}" + + # Skip already processed files + existing = self.db.kv_get(kv_key) + if existing: + try: + state = json.loads(existing) + if state.get("status") in ("completed", "processing", "failed"): + continue + except json.JSONDecodeError: + continue + + log.info("Folder watcher: new .xlsx found: %s", filename) + self._process_watched_file(xlsx_path, kv_key) + + def _process_watched_file(self, xlsx_path: Path, kv_key: str): + """Try to match a watched .xlsx file to a ClickUp task and run the pipeline.""" + filename = xlsx_path.name + # Normalize filename stem for matching (e.g., "precision-cnc-machining" → "precision cnc machining") + stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ") + stem = re.sub(r"\s+", " ", stem).strip() + + # Mark as processing + self.db.kv_set(kv_key, json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()})) + + # Try to find matching ClickUp task + matched_task = None + if self.config.clickup.enabled: + matched_task = self._match_xlsx_to_clickup(stem) + + if not matched_task: + log.warning("No ClickUp task match for '%s' — skipping", filename) + self.db.kv_set(kv_key, json.dumps({ + "status": "unmatched", + "filename": filename, + "stem": stem, + "checked_at": datetime.now(UTC).isoformat(), + })) + self._notify( + f"Folder watcher: no ClickUp match for **{filename}**.\n" + f"Create a Link Building task with Keyword matching '{stem}' to enable auto-processing.", + category="linkbuilding", + ) + return + + # Extract tool args from matched task + task_id = matched_task.id + log.info("Matched '%s' to ClickUp task %s (%s)", filename, task_id, matched_task.name) + self._notify( + f"Folder watcher: matched **{filename}** to ClickUp task **{matched_task.name}**.\n" + f"Starting Cora Backlinks pipeline...", + category="linkbuilding", + ) + + # Build tool args from the matched task's custom fields + args = { + "xlsx_path": str(xlsx_path), + "project_name": matched_task.name, + "money_site_url": matched_task.custom_fields.get("IMSURL", ""), + "custom_anchors": matched_task.custom_fields.get("CustomAnchors", "") or "", + "cli_flags": matched_task.custom_fields.get("CLIFlags", "") or "", + "clickup_task_id": task_id, + } + + # Parse branded_plus_ratio + bp_raw = matched_task.custom_fields.get("BrandedPlusRatio", "") + if bp_raw: + try: + args["branded_plus_ratio"] = float(bp_raw) + except (ValueError, TypeError): + pass + + try: + # Execute via tool registry + if hasattr(self.agent, "_tools") and self.agent._tools: + result = self.agent._tools.execute("run_cora_backlinks", args) + else: + result = "Error: tool registry not available" + + if "Error" in result and "## Step" not in result: + # Pipeline failed + self.db.kv_set(kv_key, json.dumps({ + "status": "failed", + "filename": filename, + "task_id": task_id, + "error": result[:500], + "failed_at": datetime.now(UTC).isoformat(), + })) + self._notify( + f"Folder watcher: pipeline **failed** for **{filename}**.\n" + f"Error: {result[:200]}", + category="linkbuilding", + ) + else: + # Success — move file to processed/ + processed_dir = xlsx_path.parent / "processed" + processed_dir.mkdir(exist_ok=True) + dest = processed_dir / filename + try: + shutil.move(str(xlsx_path), str(dest)) + log.info("Moved %s to %s", filename, dest) + except OSError as e: + log.warning("Could not move %s to processed: %s", filename, e) + + self.db.kv_set(kv_key, json.dumps({ + "status": "completed", + "filename": filename, + "task_id": task_id, + "completed_at": datetime.now(UTC).isoformat(), + })) + self._notify( + f"Folder watcher: pipeline **completed** for **{filename}**.\n" + f"ClickUp task: {matched_task.name}", + category="linkbuilding", + ) + + except Exception as e: + log.error("Folder watcher pipeline error for %s: %s", filename, e) + self.db.kv_set(kv_key, json.dumps({ + "status": "failed", + "filename": filename, + "task_id": task_id, + "error": str(e)[:500], + "failed_at": datetime.now(UTC).isoformat(), + })) + + def _match_xlsx_to_clickup(self, normalized_stem: str): + """Find a ClickUp Link Building task whose Keyword matches the file stem. + + Returns the matched ClickUpTask or None. + """ + from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match + + client = self._get_clickup_client() + space_id = self.config.clickup.space_id + if not space_id: + return None + + try: + tasks = client.get_tasks_from_space(space_id, statuses=["to do"]) + except Exception as e: + log.warning("ClickUp query failed in _match_xlsx_to_clickup: %s", e) + return None + + for task in tasks: + if task.task_type != "Link Building": + continue + + lb_method = task.custom_fields.get("LB Method", "") + if lb_method and lb_method != "Cora Backlinks": + continue + + keyword = task.custom_fields.get("Keyword", "") + if not keyword: + continue + + keyword_norm = _normalize_for_match(str(keyword)) + if _fuzzy_keyword_match(normalized_stem, keyword_norm): + return task + + return None