736 lines
27 KiB
Python
736 lines
27 KiB
Python
"""Press-release pipeline tool.
|
||
|
||
Autonomous workflow:
|
||
1. Generate 7 compliant headlines (chat brain)
|
||
2. AI judge picks the 2 best (chat brain)
|
||
3. Write 2 full press releases (execution brain × 2)
|
||
4. Generate 2 JSON-LD schemas (execution brain × 2, Sonnet + WebSearch)
|
||
5. Save 4 files, return cost summary
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from ..docx_export import text_to_docx
|
||
from ..press_advantage import PressAdvantageClient
|
||
from . import tool
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Paths
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_ROOT_DIR = Path(__file__).resolve().parent.parent.parent
|
||
_SKILLS_DIR = _ROOT_DIR / "skills"
|
||
_DATA_DIR = _ROOT_DIR / "data"
|
||
_OUTPUT_DIR = _DATA_DIR / "generated" / "press_releases"
|
||
_COMPANIES_FILE = _SKILLS_DIR / "companies.md"
|
||
_HEADLINES_FILE = _SKILLS_DIR / "headlines.md"
|
||
|
||
SONNET_CLI_MODEL = "sonnet"
|
||
|
||
|
||
def _set_status(ctx: dict | None, message: str) -> None:
|
||
"""Write pipeline progress to the DB so the UI can poll it."""
|
||
if ctx and "db" in ctx:
|
||
ctx["db"].kv_set("pipeline:status", message)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _load_skill(filename: str) -> str:
|
||
"""Read a markdown skill file from the skills/ directory."""
|
||
path = _SKILLS_DIR / filename
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"Skill file not found: {path}")
|
||
return path.read_text(encoding="utf-8")
|
||
|
||
|
||
def _load_file_if_exists(path: Path) -> str:
|
||
"""Read a file if it exists, return empty string otherwise."""
|
||
if path.exists():
|
||
return path.read_text(encoding="utf-8")
|
||
return ""
|
||
|
||
|
||
def _slugify(text: str) -> str:
|
||
"""Turn a headline into a filesystem-safe slug."""
|
||
text = text.lower().strip()
|
||
text = re.sub(r"[^\w\s-]", "", text)
|
||
text = re.sub(r"[\s_]+", "-", text)
|
||
return text[:60].strip("-")
|
||
|
||
|
||
def _word_count(text: str) -> int:
|
||
return len(text.split())
|
||
|
||
|
||
def _chat_call(agent, messages: list[dict]) -> str:
|
||
"""Make a non-streaming chat-brain call and return the full text."""
|
||
parts: list[str] = []
|
||
for chunk in agent.llm.chat(messages, tools=None, stream=False):
|
||
if chunk["type"] == "text":
|
||
parts.append(chunk["content"])
|
||
return "".join(parts)
|
||
|
||
|
||
def _clean_pr_output(raw: str, headline: str) -> str:
|
||
"""Clean execution brain output to just the press release text.
|
||
|
||
Strategy: find the headline we asked for in the output, take everything
|
||
from that point forward. Strip any markdown formatting artifacts.
|
||
"""
|
||
# Normalize the headline for matching
|
||
headline_lower = headline.strip().lower()
|
||
|
||
lines = raw.strip().splitlines()
|
||
|
||
# Try to find the exact headline in the output
|
||
pr_start = None
|
||
for i, line in enumerate(lines):
|
||
clean_line = re.sub(r"\*\*", "", line).strip().lower()
|
||
if clean_line == headline_lower:
|
||
pr_start = i
|
||
break
|
||
|
||
# Fallback: find a line that contains most of the headline words
|
||
if pr_start is None:
|
||
headline_words = set(headline_lower.split())
|
||
for i, line in enumerate(lines):
|
||
clean_line = re.sub(r"\*\*", "", line).strip().lower()
|
||
line_words = set(clean_line.split())
|
||
# If >70% of headline words are in this line, it's probably the headline
|
||
if len(headline_words & line_words) >= len(headline_words) * 0.7:
|
||
pr_start = i
|
||
break
|
||
|
||
# If we still can't find it, just take the whole output
|
||
if pr_start is None:
|
||
pr_start = 0
|
||
|
||
# Rebuild from the headline forward
|
||
result_lines = []
|
||
for line in lines[pr_start:]:
|
||
# Strip markdown formatting
|
||
line = re.sub(r"\*\*", "", line)
|
||
line = re.sub(r"^#{1,6}\s+", "", line)
|
||
result_lines.append(line)
|
||
|
||
result = "\n".join(result_lines).strip()
|
||
|
||
# Remove trailing horizontal rules
|
||
result = re.sub(r"\n---\s*$", "", result).strip()
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Prompt builders
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_headline_prompt(topic: str, company_name: str, url: str,
|
||
lsi_terms: str, headlines_ref: str) -> str:
|
||
"""Build the prompt for Step 1: generate 7 headlines."""
|
||
prompt = (
|
||
f"Generate exactly 7 unique press release headline options for the following.\n\n"
|
||
f"Topic: {topic}\n"
|
||
f"Company: {company_name}\n"
|
||
)
|
||
if url:
|
||
prompt += f"Reference URL: {url}\n"
|
||
if lsi_terms:
|
||
prompt += f"LSI terms to consider: {lsi_terms}\n"
|
||
|
||
prompt += (
|
||
"\nRules for EVERY headline:\n"
|
||
"- Maximum 70 characters (including spaces)\n"
|
||
"- Title case\n"
|
||
"- News-focused, not promotional\n"
|
||
"- NO location/geographic keywords\n"
|
||
"- NO superlatives (best, top, leading, #1)\n"
|
||
"- NO questions\n"
|
||
"- NO colons — colons are considered lower quality\n"
|
||
"- Must contain an actual news announcement\n"
|
||
)
|
||
|
||
if headlines_ref:
|
||
prompt += (
|
||
"\nHere are examples of high-quality headlines to use as reference "
|
||
"for tone, structure, and length:\n\n"
|
||
f"{headlines_ref}\n"
|
||
)
|
||
|
||
prompt += (
|
||
"\nReturn ONLY a numbered list (1-7), one headline per line. "
|
||
"No commentary, no character counts, just the headlines."
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _build_judge_prompt(headlines: str, headlines_ref: str) -> str:
|
||
"""Build the prompt for Step 2: pick the 2 best headlines."""
|
||
prompt = (
|
||
"You are judging press release headlines for Press Advantage distribution. "
|
||
"Pick the 2 best headlines from the candidates below.\n\n"
|
||
"DISQUALIFY any headline that:\n"
|
||
"- Contains a colon\n"
|
||
"- Contains location/geographic keywords\n"
|
||
"- Contains superlatives (best, top, leading, #1)\n"
|
||
"- Is a question\n"
|
||
"- Exceeds 70 characters\n"
|
||
"- Implies a NEW product launch when none exists (avoid 'launches', "
|
||
"'introduces', 'unveils', 'announces new' unless the topic is genuinely new)\n\n"
|
||
"PREFER headlines that:\n"
|
||
"- Match the tone and structure of the reference examples below\n"
|
||
"- Use action verbs like 'Highlights', 'Expands', 'Strengthens', "
|
||
"'Reinforces', 'Delivers', 'Adds'\n"
|
||
"- Describe what the company DOES or OFFERS, not what it just invented\n"
|
||
"- Read like a real news wire headline, not a product announcement\n\n"
|
||
f"Candidates:\n{headlines}\n\n"
|
||
)
|
||
|
||
if headlines_ref:
|
||
prompt += (
|
||
"Reference headlines (these scored 77+ on quality — match their style):\n"
|
||
f"{headlines_ref}\n\n"
|
||
)
|
||
|
||
prompt += (
|
||
"Return ONLY the 2 best headlines, one per line, exactly as written in the candidates. "
|
||
"No numbering, no commentary."
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _build_pr_prompt(headline: str, topic: str, company_name: str,
|
||
url: str, lsi_terms: str, required_phrase: str,
|
||
skill_text: str, companies_file: str) -> str:
|
||
"""Build the prompt for Step 3: write one full press release."""
|
||
prompt = (
|
||
f"{skill_text}\n\n"
|
||
"---\n\n"
|
||
f"Write a press release using the headline below. "
|
||
f"Follow every rule in the skill instructions above.\n\n"
|
||
f"Headline: {headline}\n"
|
||
f"Topic: {topic}\n"
|
||
f"Company: {company_name}\n"
|
||
)
|
||
if url:
|
||
prompt += f"Reference URL (fetch for context): {url}\n"
|
||
if lsi_terms:
|
||
prompt += f"LSI terms to integrate: {lsi_terms}\n"
|
||
if required_phrase:
|
||
prompt += f'Required phrase (use exactly once): "{required_phrase}"\n'
|
||
|
||
if companies_file:
|
||
prompt += (
|
||
f"\nCompany directory — look up the executive name and title for {company_name}. "
|
||
f"If the company is NOT listed below, use 'a company spokesperson' for quotes "
|
||
f"instead of making up a name:\n"
|
||
f"{companies_file}\n"
|
||
)
|
||
|
||
prompt += (
|
||
"\nTarget 600-750 words. Minimum 575, maximum 800.\n\n"
|
||
"CRITICAL OUTPUT RULES:\n"
|
||
"- Output ONLY the press release text\n"
|
||
"- Start with the headline on the first line, then the body\n"
|
||
"- Do NOT include any commentary, reasoning, notes, or explanations\n"
|
||
"- Do NOT use markdown formatting (no **, no ##, no ---)\n"
|
||
"- Do NOT prefix with 'Here is the press release' or similar\n"
|
||
"- The very first line of your output must be the headline"
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _build_schema_prompt(pr_text: str, company_name: str, url: str,
|
||
skill_text: str) -> str:
|
||
"""Build the prompt for Step 4: generate JSON-LD schema for one PR."""
|
||
prompt = (
|
||
f"{skill_text}\n\n"
|
||
"---\n\n"
|
||
"Generate a NewsArticle JSON-LD schema for the press release below. "
|
||
"Follow every rule in the skill instructions above. "
|
||
"Use WebSearch to find Wikipedia URLs for each entity.\n\n"
|
||
"CRITICAL OUTPUT RULES:\n"
|
||
"- Output ONLY valid JSON\n"
|
||
"- No markdown fences, no commentary, no explanations\n"
|
||
"- The very first character of your output must be {\n"
|
||
)
|
||
prompt += (
|
||
f"\nCompany name: {company_name}\n\n"
|
||
f"Press release text:\n{pr_text}"
|
||
)
|
||
return prompt
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main tool
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@tool(
|
||
"write_press_releases",
|
||
description=(
|
||
"Full autonomous press-release pipeline. Generates 7 headlines, "
|
||
"AI-picks the best 2, writes 2 complete press releases (600-750 words each), "
|
||
"generates JSON-LD schema for each, and saves all files. "
|
||
"Returns both press releases, both schemas, file paths, and a cost summary. "
|
||
"Use when the user asks to write, create, or draft a press release."
|
||
),
|
||
category="content",
|
||
)
|
||
def write_press_releases(
|
||
topic: str,
|
||
company_name: str,
|
||
url: str = "",
|
||
lsi_terms: str = "",
|
||
required_phrase: str = "",
|
||
ctx: dict = None,
|
||
) -> str:
|
||
"""Run the full press-release pipeline and return results + cost summary."""
|
||
if not ctx or "agent" not in ctx:
|
||
return "Error: press release tool requires agent context."
|
||
|
||
agent = ctx["agent"]
|
||
|
||
# Load skill prompts
|
||
try:
|
||
pr_skill = _load_skill("press_release_prompt.md")
|
||
schema_skill = _load_skill("press-release-schema.md")
|
||
except FileNotFoundError as e:
|
||
return f"Error: {e}"
|
||
|
||
# Load reference files
|
||
companies_file = _load_file_if_exists(_COMPANIES_FILE)
|
||
headlines_ref = _load_file_if_exists(_HEADLINES_FILE)
|
||
|
||
# Ensure output directory (company subfolder)
|
||
company_slug = _slugify(company_name)
|
||
output_dir = _OUTPUT_DIR / company_slug
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
cost_log: list[dict] = []
|
||
|
||
# ── Step 1: Generate 7 headlines (chat brain) ─────────────────────────
|
||
log.info("[PR Pipeline] Step 1/4: Generating 7 headlines for %s...", company_name)
|
||
_set_status(ctx, f"Step 1/4: Generating 7 headlines for {company_name}...")
|
||
step_start = time.time()
|
||
headline_prompt = _build_headline_prompt(topic, company_name, url, lsi_terms, headlines_ref)
|
||
messages = [
|
||
{"role": "system", "content": "You are a senior press-release headline writer."},
|
||
{"role": "user", "content": headline_prompt},
|
||
]
|
||
headlines_raw = _chat_call(agent, messages)
|
||
cost_log.append({
|
||
"step": "1. Generate 7 headlines",
|
||
"model": agent.llm.current_model,
|
||
"elapsed_s": round(time.time() - step_start, 1),
|
||
})
|
||
|
||
if not headlines_raw.strip():
|
||
return "Error: headline generation returned empty result."
|
||
|
||
# Save all 7 headline candidates to file
|
||
slug_base = _slugify(f"{company_name}-{topic}")
|
||
headlines_file = output_dir / f"{slug_base}_{today}_headlines.txt"
|
||
headlines_file.write_text(headlines_raw.strip(), encoding="utf-8")
|
||
|
||
# ── Step 2: AI judge picks best 2 (chat brain) ───────────────────────
|
||
log.info("[PR Pipeline] Step 2/4: AI judge selecting best 2 headlines...")
|
||
_set_status(ctx, "Step 2/4: AI judge selecting best 2 headlines...")
|
||
step_start = time.time()
|
||
judge_prompt = _build_judge_prompt(headlines_raw, headlines_ref)
|
||
messages = [
|
||
{"role": "system", "content": "You are a senior PR editor."},
|
||
{"role": "user", "content": judge_prompt},
|
||
]
|
||
judge_result = _chat_call(agent, messages)
|
||
cost_log.append({
|
||
"step": "2. Judge picks best 2",
|
||
"model": agent.llm.current_model,
|
||
"elapsed_s": round(time.time() - step_start, 1),
|
||
})
|
||
|
||
# Parse the two winning headlines
|
||
winners = [line.strip().lstrip("0123456789.-) ") for line in judge_result.strip().splitlines() if line.strip()]
|
||
if len(winners) < 2:
|
||
all_headlines = [line.strip().lstrip("0123456789.-) ") for line in headlines_raw.strip().splitlines() if line.strip()]
|
||
winners = all_headlines[:2] if len(all_headlines) >= 2 else [all_headlines[0], all_headlines[0]] if all_headlines else ["Headline A", "Headline B"]
|
||
winners = winners[:2]
|
||
|
||
# ── Step 3: Write 2 press releases (execution brain × 2) ─────────────
|
||
log.info("[PR Pipeline] Step 3/4: Writing 2 press releases...")
|
||
pr_texts: list[str] = []
|
||
pr_files: list[str] = []
|
||
docx_files: list[str] = []
|
||
for i, headline in enumerate(winners):
|
||
log.info("[PR Pipeline] Writing PR %d/2: %s", i + 1, headline[:60])
|
||
_set_status(ctx, f"Step 3/4: Writing press release {i+1}/2 — {headline[:60]}...")
|
||
step_start = time.time()
|
||
pr_prompt = _build_pr_prompt(
|
||
headline, topic, company_name, url, lsi_terms,
|
||
required_phrase, pr_skill, companies_file,
|
||
)
|
||
exec_tools = "Bash,Read,Edit,Write,Glob,Grep,WebFetch"
|
||
raw_result = agent.execute_task(pr_prompt, tools=exec_tools)
|
||
elapsed = round(time.time() - step_start, 1)
|
||
cost_log.append({
|
||
"step": f"3{chr(97+i)}. Write PR '{headline[:40]}...'",
|
||
"model": "execution-brain (default)",
|
||
"elapsed_s": elapsed,
|
||
})
|
||
|
||
# Clean output: find the headline, strip preamble and markdown
|
||
clean_result = _clean_pr_output(raw_result, headline)
|
||
pr_texts.append(clean_result)
|
||
|
||
# Validate word count
|
||
wc = _word_count(clean_result)
|
||
if wc < 575 or wc > 800:
|
||
log.warning("PR %d word count %d outside 575-800 range", i + 1, wc)
|
||
|
||
# Save PR to file
|
||
slug = _slugify(headline)
|
||
filename = f"{slug}_{today}.txt"
|
||
filepath = output_dir / filename
|
||
filepath.write_text(clean_result, encoding="utf-8")
|
||
pr_files.append(str(filepath))
|
||
|
||
# Also save as .docx for Google Docs import
|
||
docx_path = output_dir / f"{slug}_{today}.docx"
|
||
text_to_docx(clean_result, docx_path)
|
||
docx_files.append(str(docx_path))
|
||
|
||
# ── Step 4: Generate 2 JSON-LD schemas (Sonnet + WebSearch) ───────────
|
||
log.info("[PR Pipeline] Step 4/4: Generating 2 JSON-LD schemas...")
|
||
schema_texts: list[str] = []
|
||
schema_files: list[str] = []
|
||
for i, pr_text in enumerate(pr_texts):
|
||
log.info("[PR Pipeline] Schema %d/2 for: %s", i + 1, winners[i][:60])
|
||
_set_status(ctx, f"Step 4/4: Generating schema {i+1}/2...")
|
||
step_start = time.time()
|
||
schema_prompt = _build_schema_prompt(pr_text, company_name, url, schema_skill)
|
||
exec_tools = "WebSearch,WebFetch"
|
||
result = agent.execute_task(
|
||
schema_prompt,
|
||
tools=exec_tools,
|
||
model=SONNET_CLI_MODEL,
|
||
)
|
||
elapsed = round(time.time() - step_start, 1)
|
||
cost_log.append({
|
||
"step": f"4{chr(97+i)}. Schema for PR {i+1}",
|
||
"model": SONNET_CLI_MODEL,
|
||
"elapsed_s": elapsed,
|
||
})
|
||
|
||
# Extract clean JSON and force correct mainEntityOfPage
|
||
schema_json = _extract_json(result)
|
||
if schema_json:
|
||
try:
|
||
schema_obj = json.loads(schema_json)
|
||
if url:
|
||
schema_obj["mainEntityOfPage"] = url
|
||
schema_json = json.dumps(schema_obj, indent=2)
|
||
except json.JSONDecodeError:
|
||
log.warning("Schema %d is not valid JSON", i + 1)
|
||
schema_texts.append(schema_json or result)
|
||
|
||
# Save schema to file
|
||
slug = _slugify(winners[i])
|
||
filename = f"{slug}_{today}_schema.json"
|
||
filepath = output_dir / filename
|
||
filepath.write_text(schema_json or result, encoding="utf-8")
|
||
schema_files.append(str(filepath))
|
||
|
||
# ── Build final output ────────────────────────────────────────────────
|
||
_set_status(ctx, "") # Clear status — pipeline complete
|
||
total_elapsed = sum(c["elapsed_s"] for c in cost_log)
|
||
log.info("[PR Pipeline] Complete for %s — %.0fs total", company_name, total_elapsed)
|
||
output_parts = []
|
||
|
||
for i in range(2):
|
||
label = chr(65 + i) # A, B
|
||
wc = _word_count(pr_texts[i])
|
||
output_parts.append(f"## Press Release {label}: {winners[i]}")
|
||
output_parts.append(f"**Word count:** {wc}")
|
||
output_parts.append(f"**File:** `{pr_files[i]}`")
|
||
output_parts.append(f"**Docx:** `{docx_files[i]}`\n")
|
||
output_parts.append(pr_texts[i])
|
||
output_parts.append("\n---\n")
|
||
output_parts.append(f"### Schema {label}")
|
||
output_parts.append(f"**File:** `{schema_files[i]}`\n")
|
||
output_parts.append(f"```json\n{schema_texts[i]}\n```")
|
||
output_parts.append("\n---\n")
|
||
|
||
# Cost summary table
|
||
output_parts.append("## Cost Summary\n")
|
||
output_parts.append("| Step | Model | Time (s) |")
|
||
output_parts.append("|------|-------|----------|")
|
||
for c in cost_log:
|
||
output_parts.append(f"| {c['step']} | {c['model']} | {c['elapsed_s']} |")
|
||
output_parts.append(f"| **Total** | | **{round(total_elapsed, 1)}** |")
|
||
|
||
return "\n".join(output_parts)
|
||
|
||
|
||
def _parse_company_org_ids(companies_text: str) -> dict[str, int]:
|
||
"""Parse companies.md and return {company_name_lower: pa_org_id}."""
|
||
mapping: dict[str, int] = {}
|
||
current_company = ""
|
||
for line in companies_text.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("## "):
|
||
current_company = line[3:].strip()
|
||
elif line.startswith("- **PA Org ID:**") and current_company:
|
||
try:
|
||
org_id = int(line.split(":**")[1].strip())
|
||
mapping[current_company.lower()] = org_id
|
||
except (ValueError, IndexError):
|
||
pass
|
||
return mapping
|
||
|
||
|
||
def _fuzzy_match_company(name: str, candidates: dict[str, int]) -> int | None:
|
||
"""Try to match a company name against the org ID mapping.
|
||
|
||
Tries exact match first, then substring containment in both directions.
|
||
"""
|
||
name_lower = name.lower().strip()
|
||
|
||
# Exact match
|
||
if name_lower in candidates:
|
||
return candidates[name_lower]
|
||
|
||
# Substring: input contains a known company name, or vice versa
|
||
for key, org_id in candidates.items():
|
||
if key in name_lower or name_lower in key:
|
||
return org_id
|
||
|
||
return None
|
||
|
||
|
||
def _text_to_html(text: str, links: list[dict] | None = None) -> str:
|
||
"""Convert plain text to HTML with link injection.
|
||
|
||
Args:
|
||
text: Plain text press release body.
|
||
links: List of dicts with 'url' and 'anchor' keys. Each anchor's first
|
||
occurrence in the text is wrapped in an <a> tag.
|
||
|
||
Returns:
|
||
HTML string with <p> tags and injected links.
|
||
"""
|
||
# Inject anchor text links before paragraph splitting
|
||
if links:
|
||
for link in links:
|
||
anchor = link.get("anchor", "")
|
||
url = link.get("url", "")
|
||
if anchor and url:
|
||
# Replace first occurrence only
|
||
html_link = f'<a href="{url}">{anchor}</a>'
|
||
text = text.replace(anchor, html_link, 1)
|
||
|
||
# Split into paragraphs on double newlines
|
||
paragraphs = re.split(r"\n\s*\n", text.strip())
|
||
|
||
html_parts = []
|
||
for para in paragraphs:
|
||
# Collapse internal newlines to spaces within a paragraph
|
||
para = re.sub(r"\s*\n\s*", " ", para).strip()
|
||
if not para:
|
||
continue
|
||
|
||
# Convert bare URLs to links (skip already-linked ones)
|
||
para = re.sub(
|
||
r'(?<!href=")(?<!">)(https?://\S+)',
|
||
r'<a href="\1">\1</a>',
|
||
para,
|
||
)
|
||
|
||
html_parts.append(f"<p>{para}</p>")
|
||
|
||
return "\n".join(html_parts)
|
||
|
||
|
||
def _extract_json(text: str) -> str | None:
|
||
"""Try to pull a JSON object out of LLM output (strip fences, prose, etc)."""
|
||
stripped = text.strip()
|
||
if stripped.startswith("{"):
|
||
try:
|
||
json.loads(stripped)
|
||
return stripped
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Strip markdown fences
|
||
fence_match = re.search(r"```(?:json)?\s*\n?([\s\S]*?)\n?```", text)
|
||
if fence_match:
|
||
candidate = fence_match.group(1).strip()
|
||
try:
|
||
json.loads(candidate)
|
||
return candidate
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Last resort: find first { to last }
|
||
start = text.find("{")
|
||
end = text.rfind("}")
|
||
if start != -1 and end != -1 and end > start:
|
||
candidate = text[start:end + 1]
|
||
try:
|
||
json.loads(candidate)
|
||
return candidate
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
return None # noqa: RET501
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Submit tool
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@tool(
|
||
"submit_press_release",
|
||
description=(
|
||
"Submit a press release to Press Advantage as a draft. Takes the PR text "
|
||
"(or file path), headline, company name, and links to inject. Converts to "
|
||
"HTML, resolves the PA organization ID, and creates a draft release for "
|
||
"review. The release will NOT auto-publish — Bryan must review and approve "
|
||
"it in the PA dashboard."
|
||
),
|
||
category="content",
|
||
)
|
||
def submit_press_release(
|
||
headline: str,
|
||
company_name: str,
|
||
links: str = "",
|
||
pr_text: str = "",
|
||
file_path: str = "",
|
||
description: str = "",
|
||
ctx: dict = None,
|
||
) -> str:
|
||
"""Submit a finished press release to Press Advantage as a draft."""
|
||
# --- Get config ---
|
||
if not ctx or "config" not in ctx:
|
||
return "Error: submit_press_release requires agent context."
|
||
|
||
config = ctx["config"]
|
||
api_key = config.press_advantage.api_key
|
||
if not api_key:
|
||
return (
|
||
"Error: PRESS_ADVANTAGE_API key not configured. "
|
||
"Set the PRESS_ADVANTAGE_API environment variable in .env."
|
||
)
|
||
|
||
# --- Get PR text ---
|
||
if not pr_text and file_path:
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
return f"Error: file not found: {file_path}"
|
||
pr_text = path.read_text(encoding="utf-8")
|
||
|
||
if not pr_text:
|
||
return "Error: provide either pr_text or file_path with the press release content."
|
||
|
||
# --- Validate word count ---
|
||
wc = _word_count(pr_text)
|
||
if wc < 550:
|
||
return (
|
||
f"Error: press release is only {wc} words. "
|
||
f"Press Advantage requires at least 550 words. Please expand the content."
|
||
)
|
||
|
||
# --- Parse links ---
|
||
link_list: list[dict] = []
|
||
if links:
|
||
try:
|
||
link_list = json.loads(links)
|
||
except json.JSONDecodeError:
|
||
return "Error: 'links' must be a valid JSON array, e.g. '[{\"url\": \"...\", \"anchor\": \"...\"}]'"
|
||
|
||
# --- Convert to HTML ---
|
||
html_body = _text_to_html(pr_text, link_list)
|
||
|
||
# --- Look up PA org ID ---
|
||
companies_text = _load_file_if_exists(_COMPANIES_FILE)
|
||
org_mapping = _parse_company_org_ids(companies_text)
|
||
org_id = _fuzzy_match_company(company_name, org_mapping)
|
||
|
||
# Fallback: try live API lookup
|
||
if org_id is None:
|
||
log.info("Org ID not found in companies.md for '%s', trying live API...", company_name)
|
||
try:
|
||
client = PressAdvantageClient(api_key)
|
||
try:
|
||
orgs = client.get_organizations()
|
||
# Build a mapping from API results and try fuzzy match
|
||
api_mapping: dict[str, int] = {}
|
||
for org in orgs:
|
||
org_name = org.get("name", "")
|
||
oid = org.get("id")
|
||
if org_name and oid:
|
||
api_mapping[org_name.lower()] = int(oid)
|
||
org_id = _fuzzy_match_company(company_name, api_mapping)
|
||
finally:
|
||
client.close()
|
||
except Exception as e:
|
||
log.warning("Failed to fetch orgs from PA API: %s", e)
|
||
|
||
if org_id is None:
|
||
return (
|
||
f"Error: could not find Press Advantage organization for '{company_name}'. "
|
||
f"Add a 'PA Org ID' entry to skills/companies.md or check the company name."
|
||
)
|
||
|
||
# --- Auto-generate description if not provided ---
|
||
if not description:
|
||
# Extract a keyword from the headline (drop the company name, take remaining key phrase)
|
||
keyword = headline
|
||
for part in [company_name, "Inc.", "LLC", "Corp.", "Ltd.", "Limited", "Inc"]:
|
||
keyword = keyword.replace(part, "").strip()
|
||
# Clean up and take first meaningful chunk
|
||
keyword = re.sub(r"\s+", " ", keyword).strip(" -\u2013\u2014,")
|
||
description = f"{company_name} - {keyword}" if keyword else company_name
|
||
|
||
# --- Submit to PA ---
|
||
log.info("Submitting PR to Press Advantage: org=%d, title='%s'", org_id, headline[:60])
|
||
client = PressAdvantageClient(api_key)
|
||
try:
|
||
result = client.create_release(
|
||
org_id=org_id,
|
||
title=headline,
|
||
body=html_body,
|
||
description=description,
|
||
distribution="standard",
|
||
schedule_distribution="false",
|
||
)
|
||
except Exception as e:
|
||
return f"Error submitting to Press Advantage: {e}"
|
||
finally:
|
||
client.close()
|
||
|
||
# --- Format response ---
|
||
release_id = result.get("id", "unknown")
|
||
status = result.get("state", result.get("status", "draft"))
|
||
return (
|
||
f"Press release submitted to Press Advantage as a DRAFT.\n\n"
|
||
f"- **Release ID:** {release_id}\n"
|
||
f"- **Status:** {status}\n"
|
||
f"- **Organization:** {company_name} (ID: {org_id})\n"
|
||
f"- **Title:** {headline}\n"
|
||
f"- **Word count:** {wc}\n"
|
||
f"- **Links injected:** {len(link_list)}\n\n"
|
||
f"**Next step:** Review and approve in the Press Advantage dashboard before publishing."
|
||
)
|