Add Cora report distribution watcher to route xlsx to pipeline inboxes

New watcher thread scans Z:/Cora-For-Human for post-macro Cora xlsx files,
matches them to ClickUp tasks by keyword, and copies to the appropriate
pipeline inbox (Z:/cora-inbox for Link Building, Z:/content-cora-inbox for
Content/OPO). Fixes issue where shared Cora reports left one pipeline's
tasks stuck in automation underway forever.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/customer-field-migration
PeninsulaInd 2026-03-03 13:07:03 -06:00
parent 84c81b6df4
commit 236b64c11c
7 changed files with 402 additions and 23 deletions

View File

@ -43,6 +43,7 @@ class ClickUpConfig:
poll_interval_minutes: int = 20 poll_interval_minutes: int = 20
poll_statuses: list[str] = field(default_factory=lambda: ["to do"]) poll_statuses: list[str] = field(default_factory=lambda: ["to do"])
review_status: str = "internal review" review_status: str = "internal review"
pr_review_status: str = "pr needs review"
in_progress_status: str = "in progress" in_progress_status: str = "in progress"
automation_status: str = "automation underway" automation_status: str = "automation underway"
error_status: str = "error" error_status: str = "error"
@ -87,6 +88,7 @@ class AutoCoraConfig:
cora_categories: list[str] = field( cora_categories: list[str] = field(
default_factory=lambda: ["Content Creation", "On Page Optimization", "Link Building"] default_factory=lambda: ["Content Creation", "On Page Optimization", "Link Building"]
) )
cora_human_inbox: str = "" # e.g. "Z:/Cora-For-Human"
@dataclass @dataclass

View File

@ -23,6 +23,7 @@ log = logging.getLogger(__name__)
HEARTBEAT_OK = "HEARTBEAT_OK" HEARTBEAT_OK = "HEARTBEAT_OK"
class Scheduler: class Scheduler:
# Tasks due within this window are eligible for execution # Tasks due within this window are eligible for execution
DUE_DATE_WINDOW_WEEKS = 3 DUE_DATE_WINDOW_WEEKS = 3
@ -47,6 +48,7 @@ class Scheduler:
self._folder_watch_thread: threading.Thread | None = None self._folder_watch_thread: threading.Thread | None = None
self._autocora_thread: threading.Thread | None = None self._autocora_thread: threading.Thread | None = None
self._content_watch_thread: threading.Thread | None = None self._content_watch_thread: threading.Thread | None = None
self._cora_distribute_thread: threading.Thread | None = None
self._force_autocora = threading.Event() self._force_autocora = threading.Event()
self._clickup_client = None self._clickup_client = None
self._field_filter_cache: dict | None = None self._field_filter_cache: dict | None = None
@ -57,6 +59,7 @@ class Scheduler:
"folder_watch": None, "folder_watch": None,
"autocora": None, "autocora": None,
"content_watch": None, "content_watch": None,
"cora_distribute": None,
} }
def start(self): def start(self):
@ -124,6 +127,21 @@ class Scheduler:
else: else:
log.info("Content folder watcher disabled (no cora_inbox configured)") log.info("Content folder watcher disabled (no cora_inbox configured)")
# Start Cora distribution watcher if configured
cora_human_inbox = self.config.autocora.cora_human_inbox
if cora_human_inbox:
self._cora_distribute_thread = threading.Thread(
target=self._cora_distribute_loop, daemon=True, name="cora-distribute"
)
self._cora_distribute_thread.start()
log.info(
"Cora distribution watcher started (folder=%s, interval=%dm)",
cora_human_inbox,
self.config.link_building.watch_interval_minutes,
)
else:
log.info("Cora distribution watcher disabled (no cora_human_inbox configured)")
log.info( log.info(
"Scheduler started (poll=%ds, heartbeat=%dm)", "Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds, self.config.scheduler.poll_interval_seconds,
@ -179,9 +197,7 @@ class Scheduler:
self._loop_timestamps["poll"] = datetime.now(UTC).isoformat() self._loop_timestamps["poll"] = datetime.now(UTC).isoformat()
except Exception as e: except Exception as e:
log.error("Scheduler poll error: %s", e) log.error("Scheduler poll error: %s", e)
self._interruptible_wait( self._interruptible_wait(self.config.scheduler.poll_interval_seconds, self._force_poll)
self.config.scheduler.poll_interval_seconds, self._force_poll
)
def _run_due_tasks(self): def _run_due_tasks(self):
tasks = self.db.get_due_tasks() tasks = self.db.get_due_tasks()
@ -430,18 +446,12 @@ class Scheduler:
) )
client.update_task_status(task_id, self.config.clickup.error_status) client.update_task_status(task_id, self.config.clickup.error_status)
self._notify( self._notify(f"ClickUp task skipped: **{task.name}**\nReason: {result[:200]}")
f"ClickUp task skipped: **{task.name}**\n"
f"Reason: {result[:200]}"
)
log.info("ClickUp task skipped: %s%s", task.name, result[:200]) log.info("ClickUp task skipped: %s%s", task.name, result[:200])
return return
# Tool handled its own ClickUp sync — just log success # Tool handled its own ClickUp sync — just log success
self._notify( self._notify(f"ClickUp task completed: **{task.name}**\nSkill: `{tool_name}`")
f"ClickUp task completed: **{task.name}**\n"
f"Skill: `{tool_name}`"
)
log.info("ClickUp task completed: %s", task.name) log.info("ClickUp task completed: %s", task.name)
except Exception as e: except Exception as e:
@ -470,9 +480,7 @@ class Scheduler:
automation_status = self.config.clickup.automation_status automation_status = self.config.clickup.automation_status
try: try:
stale_tasks = client.get_tasks_from_space( stale_tasks = client.get_tasks_from_space(space_id, statuses=[automation_status])
space_id, statuses=[automation_status]
)
except Exception as e: except Exception as e:
log.warning("Failed to query stale tasks: %s", e) log.warning("Failed to query stale tasks: %s", e)
return return
@ -494,7 +502,10 @@ class Scheduler:
reset_status = poll_sts[0] if poll_sts else "to do" reset_status = poll_sts[0] if poll_sts else "to do"
log.warning( log.warning(
"Recovering stale task %s (%s) — stuck in '%s' for %.1f hours", "Recovering stale task %s (%s) — stuck in '%s' for %.1f hours",
task.id, task.name, automation_status, age_ms / 3_600_000, task.id,
task.name,
automation_status,
age_ms / 3_600_000,
) )
client.update_task_status(task.id, reset_status) client.update_task_status(task.id, reset_status)
client.add_comment( client.add_comment(
@ -593,9 +604,7 @@ class Scheduler:
if client and task_ids: if client and task_ids:
for tid in task_ids: for tid in task_ids:
client.update_task_status(tid, autocora.success_status) client.update_task_status(tid, autocora.success_status)
client.add_comment( client.add_comment(tid, f"Cora report completed for keyword: {keyword}")
tid, f"Cora report completed for keyword: {keyword}"
)
self._notify( self._notify(
f"AutoCora SUCCESS: **{keyword}** — " f"AutoCora SUCCESS: **{keyword}** — "
@ -968,3 +977,137 @@ class Scheduler:
return task return task
return None return None
# ── Cora Distribution Watcher ──
def _cora_distribute_loop(self):
"""Poll the human Cora inbox and distribute xlsx to pipeline inboxes."""
interval = self.config.link_building.watch_interval_minutes * 60
# Wait before first scan to let other systems initialize
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._scan_cora_human_inbox()
self._loop_timestamps["cora_distribute"] = datetime.now(UTC).isoformat()
except Exception as e:
log.error("Cora distribution watcher error: %s", e)
self._interruptible_wait(interval)
def _scan_cora_human_inbox(self):
"""Scan the human Cora inbox for new .xlsx files and distribute them."""
inbox = Path(self.config.autocora.cora_human_inbox)
if not inbox.exists():
log.warning("Cora human inbox does not exist: %s", inbox)
return
xlsx_files = sorted(inbox.glob("*.xlsx"))
if not xlsx_files:
log.debug("No .xlsx files in Cora human inbox")
return
# Check processed/ subfolder for already-handled files
processed_dir = inbox / "processed"
processed_names = set()
if processed_dir.exists():
processed_names = {f.name for f in processed_dir.glob("*.xlsx")}
for xlsx_path in xlsx_files:
filename = xlsx_path.name
if filename.startswith("~$"):
continue
if filename in processed_names:
continue
log.info("Cora distributor: new .xlsx found: %s", filename)
self._distribute_cora_file(xlsx_path)
def _distribute_cora_file(self, xlsx_path: Path):
"""Match a Cora .xlsx to ClickUp tasks and copy to the right pipeline inboxes."""
from .tools.linkbuilding import _fuzzy_keyword_match, _normalize_for_match
filename = xlsx_path.name
stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
if not self.config.clickup.enabled:
log.warning("Cora distributor: ClickUp disabled, cannot match '%s'", filename)
return
client = self._get_clickup_client()
space_id = self.config.clickup.space_id
if not space_id:
return
try:
tasks = client.get_tasks_from_overall_lists(space_id)
except Exception as e:
log.warning("ClickUp query failed in _distribute_cora_file: %s", e)
return
# Find ALL matching tasks across all types
has_lb = False
has_content = False
matched_names = []
for task in tasks:
keyword = task.custom_fields.get("Keyword", "")
if not keyword:
continue
keyword_norm = _normalize_for_match(str(keyword))
if not _fuzzy_keyword_match(stem, keyword_norm):
continue
matched_names.append(task.name)
if task.task_type == "Link Building":
has_lb = True
elif task.task_type in ("Content Creation", "On Page Optimization"):
has_content = True
if not has_lb and not has_content:
log.warning("No ClickUp task match for '%s' — leaving in inbox", filename)
self._notify(
f"Cora distributor: no ClickUp match for **{filename}**.\n"
f"Create a task with Keyword matching '{stem}' to enable distribution.",
category="autocora",
)
return
# Copy to the appropriate pipeline inboxes
copied_to = []
try:
if has_lb and self.config.link_building.watch_folder:
dest_dir = Path(self.config.link_building.watch_folder)
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(xlsx_path), str(dest_dir / filename))
copied_to.append(f"link building ({dest_dir})")
if has_content and self.config.content.cora_inbox:
dest_dir = Path(self.config.content.cora_inbox)
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(xlsx_path), str(dest_dir / filename))
copied_to.append(f"content ({dest_dir})")
except OSError as e:
log.error("Cora distributor: copy failed for %s: %s", filename, e)
self._notify(
f"Cora distributor: **copy failed** for **{filename}**.\nError: {e}",
category="autocora",
)
return
# Move original to processed/
processed_dir = xlsx_path.parent / "processed"
processed_dir.mkdir(exist_ok=True)
try:
shutil.move(str(xlsx_path), str(processed_dir / filename))
except OSError as e:
log.warning("Could not move %s to processed: %s", filename, e)
log.info("Cora distributor: %s%s", filename, ", ".join(copied_to))
self._notify(
f"Cora distributor: **{filename}** copied to {', '.join(copied_to)}.\n"
f"Matched tasks: {', '.join(matched_names)}",
category="autocora",
)

View File

@ -862,13 +862,13 @@ def write_press_releases(
) )
cu_client.add_comment(clickup_task_id, comment) cu_client.add_comment(clickup_task_id, comment)
# Set status to internal review # Set status to pr needs review
cu_client.update_task_status(clickup_task_id, config.clickup.review_status) cu_client.update_task_status(clickup_task_id, config.clickup.pr_review_status)
output_parts.append("\n## ClickUp Sync\n") output_parts.append("\n## ClickUp Sync\n")
output_parts.append(f"- Task `{clickup_task_id}` updated") output_parts.append(f"- Task `{clickup_task_id}` updated")
output_parts.append(f"- {uploaded_count} file(s) uploaded") output_parts.append(f"- {uploaded_count} file(s) uploaded")
output_parts.append(f"- Status set to '{config.clickup.review_status}'") output_parts.append(f"- Status set to '{config.clickup.pr_review_status}'")
log.info("ClickUp sync complete for task %s", clickup_task_id) log.info("ClickUp sync complete for task %s", clickup_task_id)
except Exception as e: except Exception as e:

View File

@ -44,6 +44,7 @@ clickup:
poll_interval_minutes: 20 # 3x per hour poll_interval_minutes: 20 # 3x per hour
poll_statuses: ["to do", "outline approved"] poll_statuses: ["to do", "outline approved"]
review_status: "internal review" review_status: "internal review"
pr_review_status: "pr needs review"
in_progress_status: "in progress" in_progress_status: "in progress"
automation_status: "automation underway" automation_status: "automation underway"
error_status: "error" error_status: "error"
@ -102,6 +103,7 @@ autocora:
success_status: "running cora" success_status: "running cora"
error_status: "error" error_status: "error"
enabled: true enabled: true
cora_human_inbox: "Z:/Cora-For-Human"
# Content creation settings # Content creation settings
content: content:

View File

@ -11,7 +11,8 @@ These are the ClickUp task statuses that CheddahBot reads and writes:
| `running cora` | CheddahBot (AutoCora) | Cora report is being generated by external worker | | `running cora` | CheddahBot (AutoCora) | Cora report is being generated by external worker |
| `outline review` | CheddahBot (Content) | Phase 1 outline is ready for human review | | `outline review` | CheddahBot (Content) | Phase 1 outline is ready for human review |
| `outline approved` | Human | Human reviewed the outline, ready for Phase 2 | | `outline approved` | Human | Human reviewed the outline, ready for Phase 2 |
| `internal review` | CheddahBot | Bot finished, deliverables ready for human review | | `pr needs review` | CheddahBot (Press Release) | Press release pipeline finished, PRs ready for human review |
| `internal review` | CheddahBot (Content/OPT) | Content/OPT pipeline finished, deliverables ready for human review |
| `complete` | CheddahBot (Link Building) | Pipeline fully done | | `complete` | CheddahBot (Link Building) | Pipeline fully done |
| `error` | CheddahBot | Something failed, needs attention | | `error` | CheddahBot | Something failed, needs attention |
| `in progress` | (configured but not used in automation) | — | | `in progress` | (configured but not used in automation) | — |

View File

@ -206,7 +206,7 @@ Before finalizing, verify:
3. Include 1-2 executive quotes for human perspective 3. Include 1-2 executive quotes for human perspective
4. Provide context about the company/organization 4. Provide context about the company/organization
5. Explain significance and impact 5. Explain significance and impact
6. End with company boilerplate and contact information 6. Do NOT include an "About" section or company boilerplate — Press Advantage adds this automatically
7. Write in inverted pyramid style - can be cut from bottom up 7. Write in inverted pyramid style - can be cut from bottom up
## Tone Guidelines ## Tone Guidelines

View File

@ -0,0 +1,231 @@
"""Tests for the Cora distribution watcher (scheduler._distribute_cora_file)."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from unittest.mock import MagicMock, patch
from cheddahbot.config import AutoCoraConfig, Config, ContentConfig, LinkBuildingConfig
@dataclass
class FakeTask:
"""Minimal ClickUp task stub for distribution tests."""
name: str = ""
task_type: str = ""
custom_fields: dict = field(default_factory=dict)
def _make_scheduler(tmp_path, *, lb_folder="", content_inbox="", human_inbox=""):
"""Build a Scheduler with temp paths and mocked dependencies."""
from cheddahbot.scheduler import Scheduler
config = Config()
config.link_building = LinkBuildingConfig(watch_folder=lb_folder)
config.content = ContentConfig(cora_inbox=content_inbox)
config.autocora = AutoCoraConfig(cora_human_inbox=human_inbox, enabled=True)
config.clickup.enabled = True
config.clickup.space_id = "sp1"
config.clickup.api_token = "tok"
db = MagicMock()
agent = MagicMock()
sched = Scheduler(config=config, db=db, agent=agent)
return sched
KW_FIELDS = {"Keyword": "ac drive repair"}
def _drop_xlsx(folder: Path, name: str = "ac-drive-repair.xlsx") -> Path:
"""Create a dummy xlsx file in the given folder."""
folder.mkdir(parents=True, exist_ok=True)
p = folder / name
p.write_bytes(b"fake-xlsx-data")
return p
# ── Distribution logic tests ──
def test_distribute_lb_only(tmp_path):
"""LB task matched → copies to cora-inbox only."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
tasks = [FakeTask(name="LB task", task_type="Link Building", custom_fields=KW_FIELDS)]
with patch.object(sched, "_get_clickup_client") as mock_client:
mock_client.return_value.get_tasks_from_overall_lists.return_value = tasks
sched._distribute_cora_file(xlsx)
assert (lb / xlsx.name).exists()
assert not (content / xlsx.name).exists()
assert (human / "processed" / xlsx.name).exists()
assert not xlsx.exists()
def test_distribute_content_only(tmp_path):
"""Content task matched → copies to content-cora-inbox only."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
tasks = [FakeTask(name="CC task", task_type="Content Creation", custom_fields=KW_FIELDS)]
with patch.object(sched, "_get_clickup_client") as mock_client:
mock_client.return_value.get_tasks_from_overall_lists.return_value = tasks
sched._distribute_cora_file(xlsx)
assert not (lb / xlsx.name).exists()
assert (content / xlsx.name).exists()
assert (human / "processed" / xlsx.name).exists()
def test_distribute_mixed(tmp_path):
"""Both LB and Content tasks matched → copies to both inboxes."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
tasks = [
FakeTask(name="LB task", task_type="Link Building", custom_fields=KW_FIELDS),
FakeTask(name="CC task", task_type="Content Creation", custom_fields=KW_FIELDS),
]
with patch.object(sched, "_get_clickup_client") as mock_client:
mock_client.return_value.get_tasks_from_overall_lists.return_value = tasks
sched._distribute_cora_file(xlsx)
assert (lb / xlsx.name).exists()
assert (content / xlsx.name).exists()
assert (human / "processed" / xlsx.name).exists()
def test_distribute_no_match(tmp_path):
"""No matching tasks → file stays in inbox, not moved to processed."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
with patch.object(sched, "_get_clickup_client") as mock_client:
mock_client.return_value.get_tasks_from_overall_lists.return_value = []
sched._distribute_cora_file(xlsx)
assert xlsx.exists() # Still in inbox
assert not (human / "processed" / xlsx.name).exists()
def test_distribute_opo_task(tmp_path):
"""On Page Optimization task → copies to content inbox."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
tasks = [FakeTask(name="OPO task", task_type="On Page Optimization", custom_fields=KW_FIELDS)]
with patch.object(sched, "_get_clickup_client") as mock_client:
mock_client.return_value.get_tasks_from_overall_lists.return_value = tasks
sched._distribute_cora_file(xlsx)
assert not (lb / xlsx.name).exists()
assert (content / xlsx.name).exists()
# ── Scan tests ──
def test_scan_skips_processed(tmp_path):
"""Files already in processed/ are skipped."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
# File in both top-level and processed/
_drop_xlsx(human)
_drop_xlsx(human / "processed")
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
with patch.object(sched, "_distribute_cora_file") as mock_dist:
sched._scan_cora_human_inbox()
mock_dist.assert_not_called()
def test_scan_skips_temp_files(tmp_path):
"""Office temp files (~$...) are skipped."""
human = tmp_path / "human"
lb = tmp_path / "lb"
content = tmp_path / "content"
_drop_xlsx(human, name="~$ac-drive-repair.xlsx")
sched = _make_scheduler(
tmp_path, lb_folder=str(lb), content_inbox=str(content), human_inbox=str(human)
)
with patch.object(sched, "_distribute_cora_file") as mock_dist:
sched._scan_cora_human_inbox()
mock_dist.assert_not_called()
def test_scan_empty_inbox(tmp_path):
"""Empty inbox → no-op."""
human = tmp_path / "human"
human.mkdir()
sched = _make_scheduler(tmp_path, human_inbox=str(human))
with patch.object(sched, "_distribute_cora_file") as mock_dist:
sched._scan_cora_human_inbox()
mock_dist.assert_not_called()
def test_distribute_copy_failure_no_move(tmp_path):
"""If copy fails, original is NOT moved to processed."""
human = tmp_path / "human"
xlsx = _drop_xlsx(human)
sched = _make_scheduler(tmp_path, lb_folder="/nonexistent/network/path", human_inbox=str(human))
tasks = [FakeTask(name="LB task", task_type="Link Building", custom_fields=KW_FIELDS)]
with (
patch.object(sched, "_get_clickup_client") as mock_client,
patch("cheddahbot.scheduler.shutil.copy2", side_effect=OSError("network down")),
):
mock_client.return_value.get_tasks_from_overall_lists.return_value = tasks
sched._distribute_cora_file(xlsx)
assert xlsx.exists() # Original untouched
assert not (human / "processed" / xlsx.name).exists()