diff --git a/cheddahbot/scheduler.py b/cheddahbot/scheduler.py index b1a59e2..89b23a3 100644 --- a/cheddahbot/scheduler.py +++ b/cheddahbot/scheduler.py @@ -10,6 +10,7 @@ import threading from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING +from zoneinfo import ZoneInfo from croniter import croniter @@ -49,7 +50,10 @@ class Scheduler: self._autocora_thread: threading.Thread | None = None self._content_watch_thread: threading.Thread | None = None self._cora_distribute_thread: threading.Thread | None = None + self._briefing_thread: threading.Thread | None = None self._force_autocora = threading.Event() + self._force_briefing = threading.Event() + self._last_briefing_date: str | None = None self._clickup_client = None self._field_filter_cache: dict | None = None self._loop_timestamps: dict[str, str | None] = { @@ -60,6 +64,7 @@ class Scheduler: "autocora": None, "content_watch": None, "cora_distribute": None, + "briefing": None, } def start(self): @@ -142,6 +147,16 @@ class Scheduler: else: log.info("Cora distribution watcher disabled (no cora_human_inbox configured)") + # Start morning briefing loop if ClickUp is configured + if self.config.clickup.enabled: + self._briefing_thread = threading.Thread( + target=self._briefing_loop, daemon=True, name="briefing" + ) + self._briefing_thread.start() + log.info("Morning briefing loop started") + else: + log.info("Morning briefing disabled (ClickUp not configured)") + log.info( "Scheduler started (poll=%ds, heartbeat=%dm)", self.config.scheduler.poll_interval_seconds, @@ -184,6 +199,11 @@ class Scheduler: """Wake the AutoCora poll loop immediately.""" self._force_autocora.set() + def force_briefing(self): + """Force the morning briefing to send now (ignores schedule/dedup).""" + self._last_briefing_date = None + self._force_briefing.set() + def get_loop_timestamps(self) -> dict[str, str | None]: """Return last_run timestamps for all loops (in-memory).""" return dict(self._loop_timestamps) @@ -1111,3 +1131,199 @@ class Scheduler: f"Matched tasks: {', '.join(matched_names)}", category="autocora", ) + + # ── Morning Briefing ── + + _CENTRAL = ZoneInfo("America/Chicago") + + def _briefing_loop(self): + """Check every 60s if it's time to send the morning briefing.""" + # Wait before first check + self._stop_event.wait(30) + + while not self._stop_event.is_set(): + try: + forced = self._force_briefing.is_set() + if forced: + self._force_briefing.clear() + + now = datetime.now(self._CENTRAL) + today = now.strftime("%Y-%m-%d") + is_weekday = now.weekday() < 5 # Mon=0 .. Fri=4 + + target_hour = 6 if is_weekday else 8 + target_minute = 30 if is_weekday else 0 + + at_briefing_time = ( + now.hour == target_hour and now.minute == target_minute + ) + already_sent = self._last_briefing_date == today + + if forced or (at_briefing_time and not already_sent): + self._send_morning_briefing() + self._last_briefing_date = today + self._loop_timestamps["briefing"] = datetime.now(UTC).isoformat() + + except Exception as e: + log.error("Briefing loop error: %s", e) + + self._interruptible_wait(60, self._force_briefing) + + def _send_morning_briefing(self): + """Build and send the morning briefing notification.""" + client = self._get_clickup_client() + space_id = self.config.clickup.space_id + if not space_id: + log.warning("Briefing skipped — no space_id configured") + return + + # Query tasks in the 4 relevant statuses + briefing_statuses = [ + "running cora", + "outline review", + self.config.clickup.pr_review_status, # "pr needs review" + self.config.clickup.error_status, # "error" + ] + + try: + tasks = client.get_tasks_from_overall_lists( + space_id, statuses=briefing_statuses + ) + except Exception as e: + log.error("Briefing: ClickUp query failed: %s", e) + return + + # Bucket tasks by status + cora_tasks = [] + outline_tasks = [] + pr_tasks = [] + error_tasks = [] + + for t in tasks: + if t.status == "running cora": + cora_tasks.append(t) + elif t.status == "outline review": + outline_tasks.append(t) + elif t.status == self.config.clickup.pr_review_status: + pr_tasks.append(t) + elif t.status == self.config.clickup.error_status: + error_tasks.append(t) + + # If nothing needs attention, send a short all-clear + if not any([cora_tasks, outline_tasks, pr_tasks, error_tasks]): + self._notify("Morning briefing: All clear — nothing needs attention.", category="briefing") + return + + lines = ["Morning Briefing", ""] + + # Section 1: Cora reports + if cora_tasks: + cora_statuses = self._check_cora_file_status(cora_tasks) + lines.append(f"CORA REPORTS ({len(cora_tasks)})") + by_customer = self._group_by_customer(cora_tasks) + for customer, ctasks in by_customer.items(): + lines.append(f" {customer}:") + for t in ctasks: + status_note = cora_statuses.get(t.id, "") + suffix = f" — {status_note}" if status_note else "" + lines.append(f" - {t.name}{suffix}") + lines.append("") + + # Section 2: Outlines to approve + if outline_tasks: + lines.append(f"OUTLINES TO APPROVE ({len(outline_tasks)})") + by_customer = self._group_by_customer(outline_tasks) + for customer, ctasks in by_customer.items(): + lines.append(f" {customer}:") + for t in ctasks: + lines.append(f" - {t.name}") + lines.append("") + + # Section 3: PRs to review + if pr_tasks: + lines.append(f"PRs TO REVIEW ({len(pr_tasks)})") + by_customer = self._group_by_customer(pr_tasks) + for customer, ctasks in by_customer.items(): + lines.append(f" {customer}:") + for t in ctasks: + lines.append(f" - {t.name}") + lines.append("") + + # Section 4: Errors + if error_tasks: + lines.append(f"ERRORS ({len(error_tasks)})") + by_customer = self._group_by_customer(error_tasks) + for customer, ctasks in by_customer.items(): + lines.append(f" {customer}:") + for t in ctasks: + lines.append(f" - {t.name}") + lines.append("") + + message = "\n".join(lines).rstrip() + self._notify(message, category="briefing") + log.info("Morning briefing sent (%d cora, %d outlines, %d PRs, %d errors)", + len(cora_tasks), len(outline_tasks), len(pr_tasks), len(error_tasks)) + + def _group_by_customer(self, tasks) -> dict[str, list]: + """Group tasks by their Customer custom field.""" + groups: dict[str, list] = {} + for t in tasks: + customer = t.custom_fields.get("Customer", "") or "Unknown" + groups.setdefault(str(customer), []).append(t) + return groups + + def _check_cora_file_status(self, cora_tasks) -> dict[str, str]: + """For each 'running cora' task, check where its xlsx sits on the network. + + Returns a dict of task_id → human-readable status note. + """ + from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match + + # Collect xlsx filenames from all relevant folders + folders = { + "cora_human": Path(self.config.autocora.cora_human_inbox), + "cora_human_processed": Path(self.config.autocora.cora_human_inbox) / "processed", + "lb_inbox": Path(self.config.link_building.watch_folder), + "lb_processed": Path(self.config.link_building.watch_folder) / "processed", + "content_inbox": Path(self.config.content.cora_inbox), + "content_processed": Path(self.config.content.cora_inbox) / "processed", + } + + # Build a map: normalized_stem → set of folder keys + file_locations: dict[str, set[str]] = {} + for folder_key, folder_path in folders.items(): + if not folder_path.exists(): + continue + for xlsx in folder_path.glob("*.xlsx"): + if xlsx.name.startswith("~$"): + continue + stem = xlsx.stem.lower().replace("-", " ").replace("_", " ") + stem = re.sub(r"\s+", " ", stem).strip() + file_locations.setdefault(stem, set()).add(folder_key) + + # Match each task's keyword against the file stems + result: dict[str, str] = {} + for task in cora_tasks: + keyword = task.custom_fields.get("Keyword", "") or task.name + keyword_norm = _normalize_for_match(str(keyword)) + + # Find which folders have a matching file + matched_folders: set[str] = set() + for stem, locs in file_locations.items(): + if _fuzzy_keyword_match(keyword_norm, stem): + matched_folders.update(locs) + + if not matched_folders: + result[task.id] = "needs cora.xlsx from worker machine" + elif matched_folders & {"lb_inbox", "content_inbox"}: + result[task.id] = "queued for processing" + elif matched_folders & {"lb_processed", "content_processed"}: + result[task.id] = "pipeline complete" + elif "cora_human" in matched_folders: + result[task.id] = "waiting for distribution" + elif "cora_human_processed" in matched_folders: + result[task.id] = "distributed, waiting for pipeline" + else: + result[task.id] = "needs cora.xlsx from worker machine" + + return result diff --git a/config.yaml b/config.yaml index e37106d..bf9524a 100644 --- a/config.yaml +++ b/config.yaml @@ -127,6 +127,11 @@ ntfy: include_patterns: ["failed", "FAILURE", "skipped", "no ClickUp match", "copy failed", "IMSURL is empty"] priority: urgent tags: rotating_light + - name: daily_briefing + topic_env_var: NTFY_TOPIC_DAILY_BRIEFING + categories: [briefing] + priority: high + tags: clipboard # Multi-agent configuration # Each agent gets its own personality, tool whitelist, and memory scope.