Add daily morning briefing via ntfy push notifications

Sends a summary at 6:30 AM weekdays / 8:00 AM weekends (Central) with
Cora reports needing action, outlines to approve, PRs to review, and
errors — grouped by customer. Cora tasks cross-reference network share
folders to show file pipeline status. No LLM API credits burned.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/customer-field-migration
PeninsulaInd 2026-03-04 22:15:49 -06:00
parent 45ab4c1b33
commit 6b5d0ac71e
2 changed files with 221 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import threading
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo
from croniter import croniter
@ -49,7 +50,10 @@ class Scheduler:
self._autocora_thread: threading.Thread | None = None
self._content_watch_thread: threading.Thread | None = None
self._cora_distribute_thread: threading.Thread | None = None
self._briefing_thread: threading.Thread | None = None
self._force_autocora = threading.Event()
self._force_briefing = threading.Event()
self._last_briefing_date: str | None = None
self._clickup_client = None
self._field_filter_cache: dict | None = None
self._loop_timestamps: dict[str, str | None] = {
@ -60,6 +64,7 @@ class Scheduler:
"autocora": None,
"content_watch": None,
"cora_distribute": None,
"briefing": None,
}
def start(self):
@ -142,6 +147,16 @@ class Scheduler:
else:
log.info("Cora distribution watcher disabled (no cora_human_inbox configured)")
# Start morning briefing loop if ClickUp is configured
if self.config.clickup.enabled:
self._briefing_thread = threading.Thread(
target=self._briefing_loop, daemon=True, name="briefing"
)
self._briefing_thread.start()
log.info("Morning briefing loop started")
else:
log.info("Morning briefing disabled (ClickUp not configured)")
log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds,
@ -184,6 +199,11 @@ class Scheduler:
"""Wake the AutoCora poll loop immediately."""
self._force_autocora.set()
def force_briefing(self):
"""Force the morning briefing to send now (ignores schedule/dedup)."""
self._last_briefing_date = None
self._force_briefing.set()
def get_loop_timestamps(self) -> dict[str, str | None]:
"""Return last_run timestamps for all loops (in-memory)."""
return dict(self._loop_timestamps)
@ -1111,3 +1131,199 @@ class Scheduler:
f"Matched tasks: {', '.join(matched_names)}",
category="autocora",
)
# ── Morning Briefing ──
_CENTRAL = ZoneInfo("America/Chicago")
def _briefing_loop(self):
"""Check every 60s if it's time to send the morning briefing."""
# Wait before first check
self._stop_event.wait(30)
while not self._stop_event.is_set():
try:
forced = self._force_briefing.is_set()
if forced:
self._force_briefing.clear()
now = datetime.now(self._CENTRAL)
today = now.strftime("%Y-%m-%d")
is_weekday = now.weekday() < 5 # Mon=0 .. Fri=4
target_hour = 6 if is_weekday else 8
target_minute = 30 if is_weekday else 0
at_briefing_time = (
now.hour == target_hour and now.minute == target_minute
)
already_sent = self._last_briefing_date == today
if forced or (at_briefing_time and not already_sent):
self._send_morning_briefing()
self._last_briefing_date = today
self._loop_timestamps["briefing"] = datetime.now(UTC).isoformat()
except Exception as e:
log.error("Briefing loop error: %s", e)
self._interruptible_wait(60, self._force_briefing)
def _send_morning_briefing(self):
"""Build and send the morning briefing notification."""
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
log.warning("Briefing skipped — no space_id configured")
return
# Query tasks in the 4 relevant statuses
briefing_statuses = [
"running cora",
"outline review",
self.config.clickup.pr_review_status, # "pr needs review"
self.config.clickup.error_status, # "error"
]
try:
tasks = client.get_tasks_from_overall_lists(
space_id, statuses=briefing_statuses
)
except Exception as e:
log.error("Briefing: ClickUp query failed: %s", e)
return
# Bucket tasks by status
cora_tasks = []
outline_tasks = []
pr_tasks = []
error_tasks = []
for t in tasks:
if t.status == "running cora":
cora_tasks.append(t)
elif t.status == "outline review":
outline_tasks.append(t)
elif t.status == self.config.clickup.pr_review_status:
pr_tasks.append(t)
elif t.status == self.config.clickup.error_status:
error_tasks.append(t)
# If nothing needs attention, send a short all-clear
if not any([cora_tasks, outline_tasks, pr_tasks, error_tasks]):
self._notify("Morning briefing: All clear — nothing needs attention.", category="briefing")
return
lines = ["Morning Briefing", ""]
# Section 1: Cora reports
if cora_tasks:
cora_statuses = self._check_cora_file_status(cora_tasks)
lines.append(f"CORA REPORTS ({len(cora_tasks)})")
by_customer = self._group_by_customer(cora_tasks)
for customer, ctasks in by_customer.items():
lines.append(f" {customer}:")
for t in ctasks:
status_note = cora_statuses.get(t.id, "")
suffix = f"{status_note}" if status_note else ""
lines.append(f" - {t.name}{suffix}")
lines.append("")
# Section 2: Outlines to approve
if outline_tasks:
lines.append(f"OUTLINES TO APPROVE ({len(outline_tasks)})")
by_customer = self._group_by_customer(outline_tasks)
for customer, ctasks in by_customer.items():
lines.append(f" {customer}:")
for t in ctasks:
lines.append(f" - {t.name}")
lines.append("")
# Section 3: PRs to review
if pr_tasks:
lines.append(f"PRs TO REVIEW ({len(pr_tasks)})")
by_customer = self._group_by_customer(pr_tasks)
for customer, ctasks in by_customer.items():
lines.append(f" {customer}:")
for t in ctasks:
lines.append(f" - {t.name}")
lines.append("")
# Section 4: Errors
if error_tasks:
lines.append(f"ERRORS ({len(error_tasks)})")
by_customer = self._group_by_customer(error_tasks)
for customer, ctasks in by_customer.items():
lines.append(f" {customer}:")
for t in ctasks:
lines.append(f" - {t.name}")
lines.append("")
message = "\n".join(lines).rstrip()
self._notify(message, category="briefing")
log.info("Morning briefing sent (%d cora, %d outlines, %d PRs, %d errors)",
len(cora_tasks), len(outline_tasks), len(pr_tasks), len(error_tasks))
def _group_by_customer(self, tasks) -> dict[str, list]:
"""Group tasks by their Customer custom field."""
groups: dict[str, list] = {}
for t in tasks:
customer = t.custom_fields.get("Customer", "") or "Unknown"
groups.setdefault(str(customer), []).append(t)
return groups
def _check_cora_file_status(self, cora_tasks) -> dict[str, str]:
"""For each 'running cora' task, check where its xlsx sits on the network.
Returns a dict of task_id human-readable status note.
"""
from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match
# Collect xlsx filenames from all relevant folders
folders = {
"cora_human": Path(self.config.autocora.cora_human_inbox),
"cora_human_processed": Path(self.config.autocora.cora_human_inbox) / "processed",
"lb_inbox": Path(self.config.link_building.watch_folder),
"lb_processed": Path(self.config.link_building.watch_folder) / "processed",
"content_inbox": Path(self.config.content.cora_inbox),
"content_processed": Path(self.config.content.cora_inbox) / "processed",
}
# Build a map: normalized_stem → set of folder keys
file_locations: dict[str, set[str]] = {}
for folder_key, folder_path in folders.items():
if not folder_path.exists():
continue
for xlsx in folder_path.glob("*.xlsx"):
if xlsx.name.startswith("~$"):
continue
stem = xlsx.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
file_locations.setdefault(stem, set()).add(folder_key)
# Match each task's keyword against the file stems
result: dict[str, str] = {}
for task in cora_tasks:
keyword = task.custom_fields.get("Keyword", "") or task.name
keyword_norm = _normalize_for_match(str(keyword))
# Find which folders have a matching file
matched_folders: set[str] = set()
for stem, locs in file_locations.items():
if _fuzzy_keyword_match(keyword_norm, stem):
matched_folders.update(locs)
if not matched_folders:
result[task.id] = "needs cora.xlsx from worker machine"
elif matched_folders & {"lb_inbox", "content_inbox"}:
result[task.id] = "queued for processing"
elif matched_folders & {"lb_processed", "content_processed"}:
result[task.id] = "pipeline complete"
elif "cora_human" in matched_folders:
result[task.id] = "waiting for distribution"
elif "cora_human_processed" in matched_folders:
result[task.id] = "distributed, waiting for pipeline"
else:
result[task.id] = "needs cora.xlsx from worker machine"
return result

View File

@ -127,6 +127,11 @@ ntfy:
include_patterns: ["failed", "FAILURE", "skipped", "no ClickUp match", "copy failed", "IMSURL is empty"]
priority: urgent
tags: rotating_light
- name: daily_briefing
topic_env_var: NTFY_TOPIC_DAILY_BRIEFING
categories: [briefing]
priority: high
tags: clipboard
# Multi-agent configuration
# Each agent gets its own personality, tool whitelist, and memory scope.