Fix lint issues across link building files

- Remove f-prefix from strings with no placeholders
- Use list unpacking instead of concatenation
- Fix import sorting in test file
- Remove unused Path import
- Use contextlib.suppress instead of try/except/pass
- Wrap long lines to stay under 100 chars

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-19 20:13:37 -06:00
parent 0f4c77adc9
commit a1fc5a7c0f
6 changed files with 197 additions and 98 deletions

View File

@ -208,22 +208,22 @@ def main():
task_type_field_name=config.clickup.task_type_field_name,
)
try:
tasks = cu.get_tasks_from_space(
config.clickup.space_id, statuses=["to do"]
)
tasks = cu.get_tasks_from_space(config.clickup.space_id, statuses=["to do"])
for task in tasks:
if task.task_type != "Link Building":
continue
lb_method = task.custom_fields.get("LB Method", "")
if lb_method and lb_method != "Cora Backlinks":
continue
result["pending_cora_runs"].append({
result["pending_cora_runs"].append(
{
"keyword": task.custom_fields.get("Keyword", ""),
"url": task.custom_fields.get("IMSURL", ""),
"client": task.custom_fields.get("Client", ""),
"task_id": task.id,
"task_name": task.name,
})
}
)
finally:
cu.close()
except Exception as e:

View File

@ -323,9 +323,7 @@ class ClickUpClient:
log.info("Created custom field '%s' (%s) on list %s", name, field_type, list_id)
return result
def discover_field_filter(
self, list_id: str, field_name: str
) -> dict[str, Any] | None:
def discover_field_filter(self, list_id: str, field_name: str) -> dict[str, Any] | None:
"""Discover a custom field's UUID and dropdown option map.
Returns {"field_id": "<uuid>", "options": {"Press Release": "<opt_id>", ...}}

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import contextlib
import json
import logging
import re
@ -257,9 +258,7 @@ class Scheduler:
field_id = self._field_filter_cache["field_id"]
options = self._field_filter_cache["options"]
# Only include options that map to skills we have
matching_opt_ids = [
options[name] for name in skill_map if name in options
]
matching_opt_ids = [options[name] for name in skill_map if name in options]
if matching_opt_ids:
import json as _json
@ -335,9 +334,7 @@ class Scheduler:
self.db.kv_set(kv_key, json.dumps(state))
log.info("Executing ClickUp task: %s%s", task.name, tool_name)
self._notify(
f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`"
)
self._notify(f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`")
try:
# Build tool arguments from field mapping
@ -376,9 +373,7 @@ class Scheduler:
self.db.kv_set(kv_key, json.dumps(state))
client.update_task_status(task_id, self.config.clickup.review_status)
attach_note = (
f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else ""
)
attach_note = f"\n📎 {uploaded_count} file(s) attached." if uploaded_count else ""
comment = (
f"✅ CheddahBot completed this task.\n\n"
f"Skill: {tool_name}\n"
@ -478,12 +473,16 @@ class Scheduler:
def _process_watched_file(self, xlsx_path: Path, kv_key: str):
"""Try to match a watched .xlsx file to a ClickUp task and run the pipeline."""
filename = xlsx_path.name
# Normalize filename stem for matching (e.g., "precision-cnc-machining" → "precision cnc machining")
# Normalize filename stem for matching
# e.g., "precision-cnc-machining" → "precision cnc machining"
stem = xlsx_path.stem.lower().replace("-", " ").replace("_", " ")
stem = re.sub(r"\s+", " ", stem).strip()
# Mark as processing
self.db.kv_set(kv_key, json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}))
self.db.kv_set(
kv_key,
json.dumps({"status": "processing", "started_at": datetime.now(UTC).isoformat()}),
)
# Try to find matching ClickUp task
matched_task = None
@ -492,15 +491,21 @@ class Scheduler:
if not matched_task:
log.warning("No ClickUp task match for '%s' — skipping", filename)
self.db.kv_set(kv_key, json.dumps({
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "unmatched",
"filename": filename,
"stem": stem,
"checked_at": datetime.now(UTC).isoformat(),
}))
}
),
)
self._notify(
f"Folder watcher: no ClickUp match for **{filename}**.\n"
f"Create a Link Building task with Keyword matching '{stem}' to enable auto-processing.",
f"Create a Link Building task with Keyword "
f"matching '{stem}' to enable auto-processing.",
category="linkbuilding",
)
return
@ -527,10 +532,8 @@ class Scheduler:
# Parse branded_plus_ratio
bp_raw = matched_task.custom_fields.get("BrandedPlusRatio", "")
if bp_raw:
try:
with contextlib.suppress(ValueError, TypeError):
args["branded_plus_ratio"] = float(bp_raw)
except (ValueError, TypeError):
pass
try:
# Execute via tool registry
@ -541,13 +544,18 @@ class Scheduler:
if "Error" in result and "## Step" not in result:
# Pipeline failed
self.db.kv_set(kv_key, json.dumps({
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": result[:500],
"failed_at": datetime.now(UTC).isoformat(),
}))
}
),
)
self._notify(
f"Folder watcher: pipeline **failed** for **{filename}**.\n"
f"Error: {result[:200]}",
@ -564,12 +572,17 @@ class Scheduler:
except OSError as e:
log.warning("Could not move %s to processed: %s", filename, e)
self.db.kv_set(kv_key, json.dumps({
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "completed",
"filename": filename,
"task_id": task_id,
"completed_at": datetime.now(UTC).isoformat(),
}))
}
),
)
self._notify(
f"Folder watcher: pipeline **completed** for **{filename}**.\n"
f"ClickUp task: {matched_task.name}",
@ -578,13 +591,18 @@ class Scheduler:
except Exception as e:
log.error("Folder watcher pipeline error for %s: %s", filename, e)
self.db.kv_set(kv_key, json.dumps({
self.db.kv_set(
kv_key,
json.dumps(
{
"status": "failed",
"filename": filename,
"task_id": task_id,
"error": str(e)[:500],
"failed_at": datetime.now(UTC).isoformat(),
}))
}
),
)
def _match_xlsx_to_clickup(self, normalized_stem: str):
"""Find a ClickUp Link Building task whose Keyword matches the file stem.

View File

@ -31,12 +31,14 @@ def _get_blm_dir(ctx: dict | None) -> str:
return os.getenv("BLM_DIR", "E:/dev/Big-Link-Man")
def _run_blm_command(args: list[str], blm_dir: str, timeout: int = 1800) -> subprocess.CompletedProcess:
def _run_blm_command(
args: list[str], blm_dir: str, timeout: int = 1800
) -> subprocess.CompletedProcess:
"""Run a Big-Link-Man CLI command via subprocess.
Always injects -u/-p from BLM_USERNAME/BLM_PASSWORD env vars.
"""
cmd = ["uv", "run", "python", "main.py"] + args
cmd = ["uv", "run", "python", "main.py", *args]
# Inject credentials from env vars
username = os.getenv("BLM_USERNAME", "")
@ -521,7 +523,7 @@ def run_cora_backlinks(
project_id = ingest_parsed["project_id"]
job_file = ingest_parsed["job_file"]
output_parts.append(f"## Step 1: Ingest CORA Report")
output_parts.append("## Step 1: Ingest CORA Report")
output_parts.append(f"- Project: {project_name} (ID: {project_id})")
output_parts.append(f"- Keyword: {ingest_parsed['main_keyword']}")
output_parts.append(f"- Job file: {job_file}")
@ -529,7 +531,9 @@ def run_cora_backlinks(
if clickup_task_id:
_sync_clickup(
ctx, clickup_task_id, "ingest_done",
ctx,
clickup_task_id,
"ingest_done",
f"✅ CORA report ingested. Project ID: {project_id}. Job file: {job_file}",
)
@ -563,7 +567,7 @@ def run_cora_backlinks(
_fail_clickup_task(ctx, clickup_task_id, error)
return "\n".join(output_parts) + f"\n\nError: {error}"
output_parts.append(f"## Step 2: Generate Content Batch")
output_parts.append("## Step 2: Generate Content Batch")
output_parts.append(f"- Status: {'Success' if gen_parsed['success'] else 'Completed'}")
if gen_parsed["job_moved_to"]:
output_parts.append(f"- Job moved to: {gen_parsed['job_moved_to']}")
@ -583,7 +587,7 @@ def run_cora_backlinks(
output_parts.append("## ClickUp Sync")
output_parts.append(f"- Task `{clickup_task_id}` completed")
output_parts.append(f"- Status set to 'complete'")
output_parts.append("- Status set to 'complete'")
return "\n".join(output_parts)

View File

@ -43,6 +43,95 @@ def _set_status(ctx: dict | None, message: str) -> None:
ctx["db"].kv_set("pipeline:status", message)
def _fuzzy_company_match(name: str, candidate: str) -> bool:
"""Check if company_name fuzzy-matches a candidate string.
Tries exact match, then substring containment in both directions.
"""
if not name or not candidate:
return False
a, b = name.lower().strip(), candidate.lower().strip()
return a == b or a in b or b in a
def _find_clickup_task(ctx: dict, company_name: str) -> str:
"""Query ClickUp API for a matching press-release task.
Looks for "to do" tasks where Work Category == "Press Release" and
the Client custom field fuzzy-matches company_name.
If found: creates kv_store "executing" entry, moves to "in progress"
on ClickUp, and returns the task ID.
If not found: returns "" (tool runs without ClickUp sync).
"""
cu_client = _get_clickup_client(ctx)
if not cu_client:
return ""
config = ctx.get("config")
if not config or not config.clickup.space_id:
return ""
try:
tasks = cu_client.get_tasks_from_space(
config.clickup.space_id,
statuses=["to do"],
)
except Exception as e:
log.warning("ClickUp API query failed in _find_clickup_task: %s", e)
return ""
finally:
cu_client.close()
# Find a task with Work Category == "Press Release" and Client matching company_name
for task in tasks:
if task.task_type != "Press Release":
continue
client_field = task.custom_fields.get("Client", "")
if not (
_fuzzy_company_match(company_name, task.name)
or _fuzzy_company_match(company_name, client_field)
):
continue
# Found a match — create kv_store entry and move to "in progress"
task_id = task.id
now = datetime.now(UTC).isoformat()
state = {
"state": "executing",
"clickup_task_id": task_id,
"clickup_task_name": task.name,
"task_type": task.task_type,
"skill_name": "write_press_releases",
"discovered_at": now,
"started_at": now,
"completed_at": None,
"error": None,
"deliverable_paths": [],
"custom_fields": task.custom_fields,
}
db = ctx.get("db")
if db:
db.kv_set(f"clickup:task:{task_id}:state", json.dumps(state))
# Move to "in progress" on ClickUp
cu_client2 = _get_clickup_client(ctx)
if cu_client2:
try:
cu_client2.update_task_status(task_id, config.clickup.in_progress_status)
except Exception as e:
log.warning("Failed to update ClickUp status for %s: %s", task_id, e)
finally:
cu_client2.close()
log.info("Auto-matched ClickUp task %s for company '%s'", task_id, company_name)
return task_id
return ""
def _get_clickup_client(ctx: dict | None):
"""Create a ClickUpClient from tool context, or None if unavailable."""
if not ctx or not ctx.get("config") or not ctx["config"].clickup.enabled:
@ -414,6 +503,12 @@ def write_press_releases(
# clickup_task_id is injected via ctx by the ToolRegistry (never from LLM)
clickup_task_id = ctx.get("clickup_task_id", "")
# Fallback: auto-lookup from ClickUp API when invoked from chat (no task ID in ctx)
if not clickup_task_id and ctx.get("config"):
clickup_task_id = _find_clickup_task(ctx, company_name)
if clickup_task_id:
log.info("Chat-invoked PR: auto-linked to ClickUp task %s", clickup_task_id)
# ── ClickUp: set "in progress" and post starting comment ────────────
cu_client = None
if clickup_task_id:
@ -421,9 +516,7 @@ def write_press_releases(
if cu_client:
try:
config = ctx["config"]
cu_client.update_task_status(
clickup_task_id, config.clickup.in_progress_status
)
cu_client.update_task_status(clickup_task_id, config.clickup.in_progress_status)
cu_client.add_comment(
clickup_task_id,
f"🔄 CheddahBot starting press release creation.\n\n"
@ -604,7 +697,9 @@ def write_press_releases(
f"{uploaded_count} file(s) attached.\n"
f"Generating JSON-LD schemas next...",
)
log.info("ClickUp: uploaded %d attachments for task %s", uploaded_count, clickup_task_id)
log.info(
"ClickUp: uploaded %d attachments for task %s", uploaded_count, clickup_task_id
)
except Exception as e:
log.warning("ClickUp attachment upload failed for %s: %s", clickup_task_id, e)

View File

@ -4,7 +4,6 @@ from __future__ import annotations
import json
import subprocess
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
@ -22,7 +21,6 @@ from cheddahbot.tools.linkbuilding import (
scan_cora_folder,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@ -127,7 +125,11 @@ class TestParseIngestOutput:
assert result["job_file"] == ""
def test_project_with_special_chars(self):
stdout = "Success: Project 'O'Brien & Sons (LLC)' created (ID: 7)\nJob file created: jobs/obrien.json\n"
stdout = (
"Success: Project 'O'Brien & Sons (LLC)'"
" created (ID: 7)\n"
"Job file created: jobs/obrien.json\n"
)
result = _parse_ingest_output(stdout)
# Regex won't match greedy quote - that's ok, just verify no crash
assert result["job_file"] == "jobs/obrien.json"
@ -310,7 +312,9 @@ class TestRunCoraBacklinks:
assert "not found" in result
@patch("cheddahbot.tools.linkbuilding._run_blm_command")
def test_happy_path(self, mock_cmd, mock_ctx, tmp_path, ingest_success_stdout, generate_success_stdout):
def test_happy_path(
self, mock_cmd, mock_ctx, tmp_path, ingest_success_stdout, generate_success_stdout
):
xlsx = tmp_path / "test.xlsx"
xlsx.write_text("fake data")
@ -324,9 +328,7 @@ class TestRunCoraBacklinks:
)
mock_cmd.side_effect = [ingest_proc, gen_proc]
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test Project", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test Project", ctx=mock_ctx)
assert "Step 1: Ingest CORA Report" in result
assert "Step 2: Generate Content Batch" in result
@ -342,9 +344,7 @@ class TestRunCoraBacklinks:
args=[], returncode=1, stdout="Error: parsing failed", stderr="traceback"
)
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "Error" in result
assert "ingest-cora failed" in result
@ -361,9 +361,7 @@ class TestRunCoraBacklinks:
)
mock_cmd.side_effect = [ingest_proc, gen_proc]
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "Step 1: Ingest CORA Report" in result # Step 1 succeeded
assert "generate-batch failed" in result
@ -374,9 +372,7 @@ class TestRunCoraBacklinks:
mock_cmd.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=1800)
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "timed out" in result
@ -395,9 +391,7 @@ class TestBlmIngestCora:
assert "Error" in result
def test_file_not_found(self, mock_ctx):
result = blm_ingest_cora(
xlsx_path="/nonexistent.xlsx", project_name="Test", ctx=mock_ctx
)
result = blm_ingest_cora(xlsx_path="/nonexistent.xlsx", project_name="Test", ctx=mock_ctx)
assert "not found" in result
@patch("cheddahbot.tools.linkbuilding._run_blm_command")
@ -409,9 +403,7 @@ class TestBlmIngestCora:
args=[], returncode=0, stdout=ingest_success_stdout, stderr=""
)
result = blm_ingest_cora(
xlsx_path=str(xlsx), project_name="Test Project", ctx=mock_ctx
)
result = blm_ingest_cora(xlsx_path=str(xlsx), project_name="Test Project", ctx=mock_ctx)
assert "CORA ingest complete" in result
assert "ID: 42" in result
assert "jobs/test-project.json" in result
@ -425,9 +417,7 @@ class TestBlmIngestCora:
args=[], returncode=1, stdout="Error: bad file", stderr=""
)
result = blm_ingest_cora(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = blm_ingest_cora(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "Error" in result
assert "ingest-cora failed" in result
@ -585,9 +575,7 @@ class TestClickUpStateMachine:
)
mock_cmd.side_effect = [ingest_proc, gen_proc]
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "ClickUp Sync" in result
@ -598,9 +586,7 @@ class TestClickUpStateMachine:
@patch("cheddahbot.tools.linkbuilding._run_blm_command")
@patch("cheddahbot.tools.linkbuilding._get_clickup_client")
def test_pipeline_sets_failed_state(
self, mock_cu, mock_cmd, mock_ctx, tmp_path
):
def test_pipeline_sets_failed_state(self, mock_cu, mock_cmd, mock_ctx, tmp_path):
xlsx = tmp_path / "test.xlsx"
xlsx.write_text("fake")
@ -622,9 +608,7 @@ class TestClickUpStateMachine:
args=[], returncode=1, stdout="Error", stderr="crash"
)
result = run_cora_backlinks(
xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx
)
result = run_cora_backlinks(xlsx_path=str(xlsx), project_name="Test", ctx=mock_ctx)
assert "Error" in result
raw = mock_ctx["db"].kv_get("clickup:task:task_fail:state")