CheddahBot/cheddahbot/tools/press_release.py

736 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""Press-release pipeline tool.
Autonomous workflow:
1. Generate 7 compliant headlines (chat brain)
2. AI judge picks the 2 best (chat brain)
3. Write 2 full press releases (execution brain × 2)
4. Generate 2 JSON-LD schemas (execution brain × 2, Sonnet + WebSearch)
5. Save 4 files, return cost summary
"""
from __future__ import annotations
import json
import logging
import re
import time
from datetime import datetime
from pathlib import Path
from ..docx_export import text_to_docx
from ..press_advantage import PressAdvantageClient
from . import tool
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
_ROOT_DIR = Path(__file__).resolve().parent.parent.parent
_SKILLS_DIR = _ROOT_DIR / "skills"
_DATA_DIR = _ROOT_DIR / "data"
_OUTPUT_DIR = _DATA_DIR / "generated" / "press_releases"
_COMPANIES_FILE = _SKILLS_DIR / "companies.md"
_HEADLINES_FILE = _SKILLS_DIR / "headlines.md"
SONNET_CLI_MODEL = "sonnet"
def _set_status(ctx: dict | None, message: str) -> None:
"""Write pipeline progress to the DB so the UI can poll it."""
if ctx and "db" in ctx:
ctx["db"].kv_set("pipeline:status", message)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_skill(filename: str) -> str:
"""Read a markdown skill file from the skills/ directory."""
path = _SKILLS_DIR / filename
if not path.exists():
raise FileNotFoundError(f"Skill file not found: {path}")
return path.read_text(encoding="utf-8")
def _load_file_if_exists(path: Path) -> str:
"""Read a file if it exists, return empty string otherwise."""
if path.exists():
return path.read_text(encoding="utf-8")
return ""
def _slugify(text: str) -> str:
"""Turn a headline into a filesystem-safe slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
return text[:60].strip("-")
def _word_count(text: str) -> int:
return len(text.split())
def _chat_call(agent, messages: list[dict]) -> str:
"""Make a non-streaming chat-brain call and return the full text."""
parts: list[str] = []
for chunk in agent.llm.chat(messages, tools=None, stream=False):
if chunk["type"] == "text":
parts.append(chunk["content"])
return "".join(parts)
def _clean_pr_output(raw: str, headline: str) -> str:
"""Clean execution brain output to just the press release text.
Strategy: find the headline we asked for in the output, take everything
from that point forward. Strip any markdown formatting artifacts.
"""
# Normalize the headline for matching
headline_lower = headline.strip().lower()
lines = raw.strip().splitlines()
# Try to find the exact headline in the output
pr_start = None
for i, line in enumerate(lines):
clean_line = re.sub(r"\*\*", "", line).strip().lower()
if clean_line == headline_lower:
pr_start = i
break
# Fallback: find a line that contains most of the headline words
if pr_start is None:
headline_words = set(headline_lower.split())
for i, line in enumerate(lines):
clean_line = re.sub(r"\*\*", "", line).strip().lower()
line_words = set(clean_line.split())
# If >70% of headline words are in this line, it's probably the headline
if len(headline_words & line_words) >= len(headline_words) * 0.7:
pr_start = i
break
# If we still can't find it, just take the whole output
if pr_start is None:
pr_start = 0
# Rebuild from the headline forward
result_lines = []
for line in lines[pr_start:]:
# Strip markdown formatting
line = re.sub(r"\*\*", "", line)
line = re.sub(r"^#{1,6}\s+", "", line)
result_lines.append(line)
result = "\n".join(result_lines).strip()
# Remove trailing horizontal rules
result = re.sub(r"\n---\s*$", "", result).strip()
return result
# ---------------------------------------------------------------------------
# Prompt builders
# ---------------------------------------------------------------------------
def _build_headline_prompt(topic: str, company_name: str, url: str,
lsi_terms: str, headlines_ref: str) -> str:
"""Build the prompt for Step 1: generate 7 headlines."""
prompt = (
f"Generate exactly 7 unique press release headline options for the following.\n\n"
f"Topic: {topic}\n"
f"Company: {company_name}\n"
)
if url:
prompt += f"Reference URL: {url}\n"
if lsi_terms:
prompt += f"LSI terms to consider: {lsi_terms}\n"
prompt += (
"\nRules for EVERY headline:\n"
"- Maximum 70 characters (including spaces)\n"
"- Title case\n"
"- News-focused, not promotional\n"
"- NO location/geographic keywords\n"
"- NO superlatives (best, top, leading, #1)\n"
"- NO questions\n"
"- NO colons — colons are considered lower quality\n"
"- Must contain an actual news announcement\n"
)
if headlines_ref:
prompt += (
"\nHere are examples of high-quality headlines to use as reference "
"for tone, structure, and length:\n\n"
f"{headlines_ref}\n"
)
prompt += (
"\nReturn ONLY a numbered list (1-7), one headline per line. "
"No commentary, no character counts, just the headlines."
)
return prompt
def _build_judge_prompt(headlines: str, headlines_ref: str) -> str:
"""Build the prompt for Step 2: pick the 2 best headlines."""
prompt = (
"You are judging press release headlines for Press Advantage distribution. "
"Pick the 2 best headlines from the candidates below.\n\n"
"DISQUALIFY any headline that:\n"
"- Contains a colon\n"
"- Contains location/geographic keywords\n"
"- Contains superlatives (best, top, leading, #1)\n"
"- Is a question\n"
"- Exceeds 70 characters\n"
"- Implies a NEW product launch when none exists (avoid 'launches', "
"'introduces', 'unveils', 'announces new' unless the topic is genuinely new)\n\n"
"PREFER headlines that:\n"
"- Match the tone and structure of the reference examples below\n"
"- Use action verbs like 'Highlights', 'Expands', 'Strengthens', "
"'Reinforces', 'Delivers', 'Adds'\n"
"- Describe what the company DOES or OFFERS, not what it just invented\n"
"- Read like a real news wire headline, not a product announcement\n\n"
f"Candidates:\n{headlines}\n\n"
)
if headlines_ref:
prompt += (
"Reference headlines (these scored 77+ on quality — match their style):\n"
f"{headlines_ref}\n\n"
)
prompt += (
"Return ONLY the 2 best headlines, one per line, exactly as written in the candidates. "
"No numbering, no commentary."
)
return prompt
def _build_pr_prompt(headline: str, topic: str, company_name: str,
url: str, lsi_terms: str, required_phrase: str,
skill_text: str, companies_file: str) -> str:
"""Build the prompt for Step 3: write one full press release."""
prompt = (
f"{skill_text}\n\n"
"---\n\n"
f"Write a press release using the headline below. "
f"Follow every rule in the skill instructions above.\n\n"
f"Headline: {headline}\n"
f"Topic: {topic}\n"
f"Company: {company_name}\n"
)
if url:
prompt += f"Reference URL (fetch for context): {url}\n"
if lsi_terms:
prompt += f"LSI terms to integrate: {lsi_terms}\n"
if required_phrase:
prompt += f'Required phrase (use exactly once): "{required_phrase}"\n'
if companies_file:
prompt += (
f"\nCompany directory — look up the executive name and title for {company_name}. "
f"If the company is NOT listed below, use 'a company spokesperson' for quotes "
f"instead of making up a name:\n"
f"{companies_file}\n"
)
prompt += (
"\nTarget 600-750 words. Minimum 575, maximum 800.\n\n"
"CRITICAL OUTPUT RULES:\n"
"- Output ONLY the press release text\n"
"- Start with the headline on the first line, then the body\n"
"- Do NOT include any commentary, reasoning, notes, or explanations\n"
"- Do NOT use markdown formatting (no **, no ##, no ---)\n"
"- Do NOT prefix with 'Here is the press release' or similar\n"
"- The very first line of your output must be the headline"
)
return prompt
def _build_schema_prompt(pr_text: str, company_name: str, url: str,
skill_text: str) -> str:
"""Build the prompt for Step 4: generate JSON-LD schema for one PR."""
prompt = (
f"{skill_text}\n\n"
"---\n\n"
"Generate a NewsArticle JSON-LD schema for the press release below. "
"Follow every rule in the skill instructions above. "
"Use WebSearch to find Wikipedia URLs for each entity.\n\n"
"CRITICAL OUTPUT RULES:\n"
"- Output ONLY valid JSON\n"
"- No markdown fences, no commentary, no explanations\n"
"- The very first character of your output must be {\n"
)
prompt += (
f"\nCompany name: {company_name}\n\n"
f"Press release text:\n{pr_text}"
)
return prompt
# ---------------------------------------------------------------------------
# Main tool
# ---------------------------------------------------------------------------
@tool(
"write_press_releases",
description=(
"Full autonomous press-release pipeline. Generates 7 headlines, "
"AI-picks the best 2, writes 2 complete press releases (600-750 words each), "
"generates JSON-LD schema for each, and saves all files. "
"Returns both press releases, both schemas, file paths, and a cost summary. "
"Use when the user asks to write, create, or draft a press release."
),
category="content",
)
def write_press_releases(
topic: str,
company_name: str,
url: str = "",
lsi_terms: str = "",
required_phrase: str = "",
ctx: dict = None,
) -> str:
"""Run the full press-release pipeline and return results + cost summary."""
if not ctx or "agent" not in ctx:
return "Error: press release tool requires agent context."
agent = ctx["agent"]
# Load skill prompts
try:
pr_skill = _load_skill("press_release_prompt.md")
schema_skill = _load_skill("press-release-schema.md")
except FileNotFoundError as e:
return f"Error: {e}"
# Load reference files
companies_file = _load_file_if_exists(_COMPANIES_FILE)
headlines_ref = _load_file_if_exists(_HEADLINES_FILE)
# Ensure output directory (company subfolder)
company_slug = _slugify(company_name)
output_dir = _OUTPUT_DIR / company_slug
output_dir.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
cost_log: list[dict] = []
# ── Step 1: Generate 7 headlines (chat brain) ─────────────────────────
log.info("[PR Pipeline] Step 1/4: Generating 7 headlines for %s...", company_name)
_set_status(ctx, f"Step 1/4: Generating 7 headlines for {company_name}...")
step_start = time.time()
headline_prompt = _build_headline_prompt(topic, company_name, url, lsi_terms, headlines_ref)
messages = [
{"role": "system", "content": "You are a senior press-release headline writer."},
{"role": "user", "content": headline_prompt},
]
headlines_raw = _chat_call(agent, messages)
cost_log.append({
"step": "1. Generate 7 headlines",
"model": agent.llm.current_model,
"elapsed_s": round(time.time() - step_start, 1),
})
if not headlines_raw.strip():
return "Error: headline generation returned empty result."
# Save all 7 headline candidates to file
slug_base = _slugify(f"{company_name}-{topic}")
headlines_file = output_dir / f"{slug_base}_{today}_headlines.txt"
headlines_file.write_text(headlines_raw.strip(), encoding="utf-8")
# ── Step 2: AI judge picks best 2 (chat brain) ───────────────────────
log.info("[PR Pipeline] Step 2/4: AI judge selecting best 2 headlines...")
_set_status(ctx, "Step 2/4: AI judge selecting best 2 headlines...")
step_start = time.time()
judge_prompt = _build_judge_prompt(headlines_raw, headlines_ref)
messages = [
{"role": "system", "content": "You are a senior PR editor."},
{"role": "user", "content": judge_prompt},
]
judge_result = _chat_call(agent, messages)
cost_log.append({
"step": "2. Judge picks best 2",
"model": agent.llm.current_model,
"elapsed_s": round(time.time() - step_start, 1),
})
# Parse the two winning headlines
winners = [line.strip().lstrip("0123456789.-) ") for line in judge_result.strip().splitlines() if line.strip()]
if len(winners) < 2:
all_headlines = [line.strip().lstrip("0123456789.-) ") for line in headlines_raw.strip().splitlines() if line.strip()]
winners = all_headlines[:2] if len(all_headlines) >= 2 else [all_headlines[0], all_headlines[0]] if all_headlines else ["Headline A", "Headline B"]
winners = winners[:2]
# ── Step 3: Write 2 press releases (execution brain × 2) ─────────────
log.info("[PR Pipeline] Step 3/4: Writing 2 press releases...")
pr_texts: list[str] = []
pr_files: list[str] = []
docx_files: list[str] = []
for i, headline in enumerate(winners):
log.info("[PR Pipeline] Writing PR %d/2: %s", i + 1, headline[:60])
_set_status(ctx, f"Step 3/4: Writing press release {i+1}/2 — {headline[:60]}...")
step_start = time.time()
pr_prompt = _build_pr_prompt(
headline, topic, company_name, url, lsi_terms,
required_phrase, pr_skill, companies_file,
)
exec_tools = "Bash,Read,Edit,Write,Glob,Grep,WebFetch"
raw_result = agent.execute_task(pr_prompt, tools=exec_tools)
elapsed = round(time.time() - step_start, 1)
cost_log.append({
"step": f"3{chr(97+i)}. Write PR '{headline[:40]}...'",
"model": "execution-brain (default)",
"elapsed_s": elapsed,
})
# Clean output: find the headline, strip preamble and markdown
clean_result = _clean_pr_output(raw_result, headline)
pr_texts.append(clean_result)
# Validate word count
wc = _word_count(clean_result)
if wc < 575 or wc > 800:
log.warning("PR %d word count %d outside 575-800 range", i + 1, wc)
# Save PR to file
slug = _slugify(headline)
filename = f"{slug}_{today}.txt"
filepath = output_dir / filename
filepath.write_text(clean_result, encoding="utf-8")
pr_files.append(str(filepath))
# Also save as .docx for Google Docs import
docx_path = output_dir / f"{slug}_{today}.docx"
text_to_docx(clean_result, docx_path)
docx_files.append(str(docx_path))
# ── Step 4: Generate 2 JSON-LD schemas (Sonnet + WebSearch) ───────────
log.info("[PR Pipeline] Step 4/4: Generating 2 JSON-LD schemas...")
schema_texts: list[str] = []
schema_files: list[str] = []
for i, pr_text in enumerate(pr_texts):
log.info("[PR Pipeline] Schema %d/2 for: %s", i + 1, winners[i][:60])
_set_status(ctx, f"Step 4/4: Generating schema {i+1}/2...")
step_start = time.time()
schema_prompt = _build_schema_prompt(pr_text, company_name, url, schema_skill)
exec_tools = "WebSearch,WebFetch"
result = agent.execute_task(
schema_prompt,
tools=exec_tools,
model=SONNET_CLI_MODEL,
)
elapsed = round(time.time() - step_start, 1)
cost_log.append({
"step": f"4{chr(97+i)}. Schema for PR {i+1}",
"model": SONNET_CLI_MODEL,
"elapsed_s": elapsed,
})
# Extract clean JSON and force correct mainEntityOfPage
schema_json = _extract_json(result)
if schema_json:
try:
schema_obj = json.loads(schema_json)
if url:
schema_obj["mainEntityOfPage"] = url
schema_json = json.dumps(schema_obj, indent=2)
except json.JSONDecodeError:
log.warning("Schema %d is not valid JSON", i + 1)
schema_texts.append(schema_json or result)
# Save schema to file
slug = _slugify(winners[i])
filename = f"{slug}_{today}_schema.json"
filepath = output_dir / filename
filepath.write_text(schema_json or result, encoding="utf-8")
schema_files.append(str(filepath))
# ── Build final output ────────────────────────────────────────────────
_set_status(ctx, "") # Clear status — pipeline complete
total_elapsed = sum(c["elapsed_s"] for c in cost_log)
log.info("[PR Pipeline] Complete for %s%.0fs total", company_name, total_elapsed)
output_parts = []
for i in range(2):
label = chr(65 + i) # A, B
wc = _word_count(pr_texts[i])
output_parts.append(f"## Press Release {label}: {winners[i]}")
output_parts.append(f"**Word count:** {wc}")
output_parts.append(f"**File:** `{pr_files[i]}`")
output_parts.append(f"**Docx:** `{docx_files[i]}`\n")
output_parts.append(pr_texts[i])
output_parts.append("\n---\n")
output_parts.append(f"### Schema {label}")
output_parts.append(f"**File:** `{schema_files[i]}`\n")
output_parts.append(f"```json\n{schema_texts[i]}\n```")
output_parts.append("\n---\n")
# Cost summary table
output_parts.append("## Cost Summary\n")
output_parts.append("| Step | Model | Time (s) |")
output_parts.append("|------|-------|----------|")
for c in cost_log:
output_parts.append(f"| {c['step']} | {c['model']} | {c['elapsed_s']} |")
output_parts.append(f"| **Total** | | **{round(total_elapsed, 1)}** |")
return "\n".join(output_parts)
def _parse_company_org_ids(companies_text: str) -> dict[str, int]:
"""Parse companies.md and return {company_name_lower: pa_org_id}."""
mapping: dict[str, int] = {}
current_company = ""
for line in companies_text.splitlines():
line = line.strip()
if line.startswith("## "):
current_company = line[3:].strip()
elif line.startswith("- **PA Org ID:**") and current_company:
try:
org_id = int(line.split(":**")[1].strip())
mapping[current_company.lower()] = org_id
except (ValueError, IndexError):
pass
return mapping
def _fuzzy_match_company(name: str, candidates: dict[str, int]) -> int | None:
"""Try to match a company name against the org ID mapping.
Tries exact match first, then substring containment in both directions.
"""
name_lower = name.lower().strip()
# Exact match
if name_lower in candidates:
return candidates[name_lower]
# Substring: input contains a known company name, or vice versa
for key, org_id in candidates.items():
if key in name_lower or name_lower in key:
return org_id
return None
def _text_to_html(text: str, links: list[dict] | None = None) -> str:
"""Convert plain text to HTML with link injection.
Args:
text: Plain text press release body.
links: List of dicts with 'url' and 'anchor' keys. Each anchor's first
occurrence in the text is wrapped in an <a> tag.
Returns:
HTML string with <p> tags and injected links.
"""
# Inject anchor text links before paragraph splitting
if links:
for link in links:
anchor = link.get("anchor", "")
url = link.get("url", "")
if anchor and url:
# Replace first occurrence only
html_link = f'<a href="{url}">{anchor}</a>'
text = text.replace(anchor, html_link, 1)
# Split into paragraphs on double newlines
paragraphs = re.split(r"\n\s*\n", text.strip())
html_parts = []
for para in paragraphs:
# Collapse internal newlines to spaces within a paragraph
para = re.sub(r"\s*\n\s*", " ", para).strip()
if not para:
continue
# Convert bare URLs to links (skip already-linked ones)
para = re.sub(
r'(?<!href=")(?<!">)(https?://\S+)',
r'<a href="\1">\1</a>',
para,
)
html_parts.append(f"<p>{para}</p>")
return "\n".join(html_parts)
def _extract_json(text: str) -> str | None:
"""Try to pull a JSON object out of LLM output (strip fences, prose, etc)."""
stripped = text.strip()
if stripped.startswith("{"):
try:
json.loads(stripped)
return stripped
except json.JSONDecodeError:
pass
# Strip markdown fences
fence_match = re.search(r"```(?:json)?\s*\n?([\s\S]*?)\n?```", text)
if fence_match:
candidate = fence_match.group(1).strip()
try:
json.loads(candidate)
return candidate
except json.JSONDecodeError:
pass
# Last resort: find first { to last }
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
candidate = text[start:end + 1]
try:
json.loads(candidate)
return candidate
except json.JSONDecodeError:
pass
return None # noqa: RET501
# ---------------------------------------------------------------------------
# Submit tool
# ---------------------------------------------------------------------------
@tool(
"submit_press_release",
description=(
"Submit a press release to Press Advantage as a draft. Takes the PR text "
"(or file path), headline, company name, and links to inject. Converts to "
"HTML, resolves the PA organization ID, and creates a draft release for "
"review. The release will NOT auto-publish — Bryan must review and approve "
"it in the PA dashboard."
),
category="content",
)
def submit_press_release(
headline: str,
company_name: str,
links: str = "",
pr_text: str = "",
file_path: str = "",
description: str = "",
ctx: dict = None,
) -> str:
"""Submit a finished press release to Press Advantage as a draft."""
# --- Get config ---
if not ctx or "config" not in ctx:
return "Error: submit_press_release requires agent context."
config = ctx["config"]
api_key = config.press_advantage.api_key
if not api_key:
return (
"Error: PRESS_ADVANTAGE_API key not configured. "
"Set the PRESS_ADVANTAGE_API environment variable in .env."
)
# --- Get PR text ---
if not pr_text and file_path:
path = Path(file_path)
if not path.exists():
return f"Error: file not found: {file_path}"
pr_text = path.read_text(encoding="utf-8")
if not pr_text:
return "Error: provide either pr_text or file_path with the press release content."
# --- Validate word count ---
wc = _word_count(pr_text)
if wc < 550:
return (
f"Error: press release is only {wc} words. "
f"Press Advantage requires at least 550 words. Please expand the content."
)
# --- Parse links ---
link_list: list[dict] = []
if links:
try:
link_list = json.loads(links)
except json.JSONDecodeError:
return "Error: 'links' must be a valid JSON array, e.g. '[{\"url\": \"...\", \"anchor\": \"...\"}]'"
# --- Convert to HTML ---
html_body = _text_to_html(pr_text, link_list)
# --- Look up PA org ID ---
companies_text = _load_file_if_exists(_COMPANIES_FILE)
org_mapping = _parse_company_org_ids(companies_text)
org_id = _fuzzy_match_company(company_name, org_mapping)
# Fallback: try live API lookup
if org_id is None:
log.info("Org ID not found in companies.md for '%s', trying live API...", company_name)
try:
client = PressAdvantageClient(api_key)
try:
orgs = client.get_organizations()
# Build a mapping from API results and try fuzzy match
api_mapping: dict[str, int] = {}
for org in orgs:
org_name = org.get("name", "")
oid = org.get("id")
if org_name and oid:
api_mapping[org_name.lower()] = int(oid)
org_id = _fuzzy_match_company(company_name, api_mapping)
finally:
client.close()
except Exception as e:
log.warning("Failed to fetch orgs from PA API: %s", e)
if org_id is None:
return (
f"Error: could not find Press Advantage organization for '{company_name}'. "
f"Add a 'PA Org ID' entry to skills/companies.md or check the company name."
)
# --- Auto-generate description if not provided ---
if not description:
# Extract a keyword from the headline (drop the company name, take remaining key phrase)
keyword = headline
for part in [company_name, "Inc.", "LLC", "Corp.", "Ltd.", "Limited", "Inc"]:
keyword = keyword.replace(part, "").strip()
# Clean up and take first meaningful chunk
keyword = re.sub(r"\s+", " ", keyword).strip(" -\u2013\u2014,")
description = f"{company_name} - {keyword}" if keyword else company_name
# --- Submit to PA ---
log.info("Submitting PR to Press Advantage: org=%d, title='%s'", org_id, headline[:60])
client = PressAdvantageClient(api_key)
try:
result = client.create_release(
org_id=org_id,
title=headline,
body=html_body,
description=description,
distribution="standard",
schedule_distribution="false",
)
except Exception as e:
return f"Error submitting to Press Advantage: {e}"
finally:
client.close()
# --- Format response ---
release_id = result.get("id", "unknown")
status = result.get("state", result.get("status", "draft"))
return (
f"Press release submitted to Press Advantage as a DRAFT.\n\n"
f"- **Release ID:** {release_id}\n"
f"- **Status:** {status}\n"
f"- **Organization:** {company_name} (ID: {org_id})\n"
f"- **Title:** {headline}\n"
f"- **Word count:** {wc}\n"
f"- **Links injected:** {len(link_list)}\n\n"
f"**Next step:** Review and approve in the Press Advantage dashboard before publishing."
)