992 lines
36 KiB
Python
992 lines
36 KiB
Python
"""Press-release pipeline tool.
|
||
|
||
Autonomous workflow:
|
||
1. Generate 7 compliant headlines (chat brain)
|
||
2. AI judge picks the 2 best (chat brain)
|
||
3. Write 2 full press releases (execution brain × 2)
|
||
4. Generate 2 JSON-LD schemas (execution brain × 2, Sonnet + WebSearch)
|
||
5. Save 4 files, return cost summary
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from ..docx_export import text_to_docx
|
||
from ..press_advantage import PressAdvantageClient
|
||
from . import tool
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Paths
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_ROOT_DIR = Path(__file__).resolve().parent.parent.parent
|
||
_SKILLS_DIR = _ROOT_DIR / "skills"
|
||
_DATA_DIR = _ROOT_DIR / "data"
|
||
_OUTPUT_DIR = _DATA_DIR / "generated" / "press_releases"
|
||
_COMPANIES_FILE = _SKILLS_DIR / "companies.md"
|
||
_HEADLINES_FILE = _SKILLS_DIR / "headlines.md"
|
||
|
||
SONNET_CLI_MODEL = "sonnet"
|
||
|
||
|
||
def _set_status(ctx: dict | None, message: str) -> None:
|
||
"""Write pipeline progress to the DB so the UI can poll it."""
|
||
if ctx and "db" in ctx:
|
||
ctx["db"].kv_set("pipeline:status", message)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _load_skill(filename: str) -> str:
|
||
"""Read a markdown skill file from the skills/ directory."""
|
||
path = _SKILLS_DIR / filename
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"Skill file not found: {path}")
|
||
return path.read_text(encoding="utf-8")
|
||
|
||
|
||
def _load_file_if_exists(path: Path) -> str:
|
||
"""Read a file if it exists, return empty string otherwise."""
|
||
if path.exists():
|
||
return path.read_text(encoding="utf-8")
|
||
return ""
|
||
|
||
|
||
def _slugify(text: str) -> str:
|
||
"""Turn a headline into a filesystem-safe slug."""
|
||
text = text.lower().strip()
|
||
text = re.sub(r"[^\w\s-]", "", text)
|
||
text = re.sub(r"[\s_]+", "-", text)
|
||
return text[:60].strip("-")
|
||
|
||
|
||
def _word_count(text: str) -> int:
|
||
return len(text.split())
|
||
|
||
|
||
def _chat_call(agent, messages: list[dict]) -> str:
|
||
"""Make a non-streaming chat-brain call and return the full text."""
|
||
parts: list[str] = []
|
||
for chunk in agent.llm.chat(messages, tools=None, stream=False):
|
||
if chunk["type"] == "text":
|
||
parts.append(chunk["content"])
|
||
return "".join(parts)
|
||
|
||
|
||
def _clean_pr_output(raw: str, headline: str) -> str:
|
||
"""Clean execution brain output to just the press release text.
|
||
|
||
Strategy: find the headline we asked for in the output, take everything
|
||
from that point forward. Strip any markdown formatting artifacts.
|
||
"""
|
||
# Normalize the headline for matching
|
||
headline_lower = headline.strip().lower()
|
||
|
||
lines = raw.strip().splitlines()
|
||
|
||
# Try to find the exact headline in the output
|
||
pr_start = None
|
||
for i, line in enumerate(lines):
|
||
clean_line = re.sub(r"\*\*", "", line).strip().lower()
|
||
if clean_line == headline_lower:
|
||
pr_start = i
|
||
break
|
||
|
||
# Fallback: find a line that contains most of the headline words
|
||
if pr_start is None:
|
||
headline_words = set(headline_lower.split())
|
||
for i, line in enumerate(lines):
|
||
clean_line = re.sub(r"\*\*", "", line).strip().lower()
|
||
line_words = set(clean_line.split())
|
||
# If >70% of headline words are in this line, it's probably the headline
|
||
if len(headline_words & line_words) >= len(headline_words) * 0.7:
|
||
pr_start = i
|
||
break
|
||
|
||
# If we still can't find it, just take the whole output
|
||
if pr_start is None:
|
||
pr_start = 0
|
||
|
||
# Rebuild from the headline forward
|
||
result_lines = []
|
||
for line in lines[pr_start:]:
|
||
# Strip markdown formatting
|
||
line = re.sub(r"\*\*", "", line)
|
||
line = re.sub(r"^#{1,6}\s+", "", line)
|
||
result_lines.append(line)
|
||
|
||
result = "\n".join(result_lines).strip()
|
||
|
||
# Remove trailing horizontal rules
|
||
result = re.sub(r"\n---\s*$", "", result).strip()
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Prompt builders
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_headline_prompt(topic: str, company_name: str, url: str,
|
||
lsi_terms: str, headlines_ref: str) -> str:
|
||
"""Build the prompt for Step 1: generate 7 headlines."""
|
||
prompt = (
|
||
f"Generate exactly 7 unique press release headline options for the following.\n\n"
|
||
f"Topic: {topic}\n"
|
||
f"Company: {company_name}\n"
|
||
)
|
||
if url:
|
||
prompt += f"Reference URL: {url}\n"
|
||
if lsi_terms:
|
||
prompt += f"LSI terms to consider: {lsi_terms}\n"
|
||
|
||
prompt += (
|
||
"\nRules for EVERY headline:\n"
|
||
"- Maximum 70 characters (including spaces)\n"
|
||
"- Title case\n"
|
||
"- News-focused, not promotional\n"
|
||
"- NO location/geographic keywords\n"
|
||
"- NO superlatives (best, top, leading, #1)\n"
|
||
"- NO questions\n"
|
||
"- NO colons — colons are considered lower quality\n"
|
||
"- Must contain an actual news announcement\n"
|
||
)
|
||
|
||
if headlines_ref:
|
||
prompt += (
|
||
"\nHere are examples of high-quality headlines to use as reference "
|
||
"for tone, structure, and length:\n\n"
|
||
f"{headlines_ref}\n"
|
||
)
|
||
|
||
prompt += (
|
||
"\nReturn ONLY a numbered list (1-7), one headline per line. "
|
||
"No commentary, no character counts, just the headlines."
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _build_judge_prompt(headlines: str, headlines_ref: str) -> str:
|
||
"""Build the prompt for Step 2: pick the 2 best headlines."""
|
||
prompt = (
|
||
"You are judging press release headlines for Press Advantage distribution. "
|
||
"Pick the 2 best headlines from the candidates below.\n\n"
|
||
"DISQUALIFY any headline that:\n"
|
||
"- Contains a colon\n"
|
||
"- Contains location/geographic keywords\n"
|
||
"- Contains superlatives (best, top, leading, #1)\n"
|
||
"- Is a question\n"
|
||
"- Exceeds 70 characters\n"
|
||
"- Implies a NEW product launch when none exists (avoid 'launches', "
|
||
"'introduces', 'unveils', 'announces new' unless the topic is genuinely new)\n\n"
|
||
"PREFER headlines that:\n"
|
||
"- Match the tone and structure of the reference examples below\n"
|
||
"- Use action verbs like 'Highlights', 'Expands', 'Strengthens', "
|
||
"'Reinforces', 'Delivers', 'Adds'\n"
|
||
"- Describe what the company DOES or OFFERS, not what it just invented\n"
|
||
"- Read like a real news wire headline, not a product announcement\n\n"
|
||
f"Candidates:\n{headlines}\n\n"
|
||
)
|
||
|
||
if headlines_ref:
|
||
prompt += (
|
||
"Reference headlines (these scored 77+ on quality — match their style):\n"
|
||
f"{headlines_ref}\n\n"
|
||
)
|
||
|
||
prompt += (
|
||
"Return ONLY the 2 best headlines, one per line, exactly as written in the candidates. "
|
||
"No numbering, no commentary."
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _derive_anchor_phrase(company_name: str, topic: str) -> str:
|
||
"""Derive a 'brand + keyword' anchor phrase from company name and topic.
|
||
|
||
Examples:
|
||
("Advanced Industrial", "PEEK machining") -> "Advanced Industrial PEEK machining"
|
||
("Metal Craft", "custom metal fabrication") -> "Metal Craft custom metal fabrication"
|
||
"""
|
||
# Clean up topic: strip leading articles, lowercase
|
||
keyword = topic.strip()
|
||
return f"{company_name} {keyword}"
|
||
|
||
|
||
def _find_anchor_in_text(text: str, anchor: str) -> bool:
|
||
"""Check if the anchor phrase exists in the text (case-insensitive)."""
|
||
return anchor.lower() in text.lower()
|
||
|
||
|
||
def _fuzzy_find_anchor(text: str, company_name: str, topic: str) -> str | None:
|
||
"""Try to find a close match for the brand+keyword anchor in the text.
|
||
|
||
Looks for the company name followed by topic-related words within
|
||
a reasonable proximity (same sentence).
|
||
"""
|
||
text_lower = text.lower()
|
||
company_lower = company_name.lower()
|
||
|
||
# Extract key words from topic (skip short/common words)
|
||
stop_words = {"a", "an", "the", "and", "or", "for", "in", "on", "of", "to", "with", "is", "are"}
|
||
topic_words = [w for w in topic.lower().split() if w not in stop_words and len(w) > 2]
|
||
|
||
if not topic_words:
|
||
return None
|
||
|
||
# Find all positions of company name in text
|
||
start = 0
|
||
while True:
|
||
pos = text_lower.find(company_lower, start)
|
||
if pos == -1:
|
||
break
|
||
|
||
# Look at the surrounding context (next 80 chars after company name)
|
||
context_start = pos
|
||
context_end = min(pos + len(company_name) + 80, len(text))
|
||
context = text[context_start:context_end]
|
||
|
||
# Check if any topic keyword appears near the company name
|
||
context_lower = context.lower()
|
||
for word in topic_words:
|
||
if word in context_lower:
|
||
# Extract the phrase from company name to end of the keyword match
|
||
word_pos = context_lower.find(word)
|
||
phrase_end = word_pos + len(word)
|
||
candidate = context[:phrase_end].strip()
|
||
# Clean: stop at sentence boundaries
|
||
for sep in (".", ",", ";", "\n"):
|
||
if sep in candidate[len(company_name):]:
|
||
break
|
||
else:
|
||
return candidate
|
||
|
||
start = pos + 1
|
||
|
||
return None
|
||
|
||
|
||
def _build_pr_prompt(headline: str, topic: str, company_name: str,
|
||
url: str, lsi_terms: str, required_phrase: str,
|
||
skill_text: str, companies_file: str,
|
||
anchor_phrase: str = "") -> str:
|
||
"""Build the prompt for Step 3: write one full press release."""
|
||
prompt = (
|
||
f"{skill_text}\n\n"
|
||
"---\n\n"
|
||
f"Write a press release using the headline below. "
|
||
f"Follow every rule in the skill instructions above.\n\n"
|
||
f"Headline: {headline}\n"
|
||
f"Topic: {topic}\n"
|
||
f"Company: {company_name}\n"
|
||
)
|
||
if url:
|
||
prompt += f"Reference URL (fetch for context): {url}\n"
|
||
if lsi_terms:
|
||
prompt += f"LSI terms to integrate: {lsi_terms}\n"
|
||
if required_phrase:
|
||
prompt += f'Required phrase (use exactly once): "{required_phrase}"\n'
|
||
|
||
if anchor_phrase:
|
||
prompt += (
|
||
f'\nANCHOR TEXT REQUIREMENT: You MUST include the exact phrase '
|
||
f'"{anchor_phrase}" somewhere naturally in the body of the press '
|
||
f'release. This phrase will be used as anchor text for an SEO link. '
|
||
f'Work it into a sentence where it reads naturally — for example: '
|
||
f'"As a {anchor_phrase.split(company_name, 1)[-1].strip()} provider, '
|
||
f'{company_name}..." or "{anchor_phrase} continues to...".\n'
|
||
)
|
||
|
||
if companies_file:
|
||
prompt += (
|
||
f"\nCompany directory — look up the executive name and title for {company_name}. "
|
||
f"If the company is NOT listed below, use 'a company spokesperson' for quotes "
|
||
f"instead of making up a name:\n"
|
||
f"{companies_file}\n"
|
||
)
|
||
|
||
prompt += (
|
||
"\nTarget 600-750 words. Minimum 575, maximum 800.\n\n"
|
||
"CRITICAL OUTPUT RULES:\n"
|
||
"- Output ONLY the press release text\n"
|
||
"- Start with the headline on the first line, then the body\n"
|
||
"- Do NOT include any commentary, reasoning, notes, or explanations\n"
|
||
"- Do NOT use markdown formatting (no **, no ##, no ---)\n"
|
||
"- Do NOT prefix with 'Here is the press release' or similar\n"
|
||
"- The very first line of your output must be the headline"
|
||
)
|
||
return prompt
|
||
|
||
|
||
def _build_schema_prompt(pr_text: str, company_name: str, url: str,
|
||
skill_text: str) -> str:
|
||
"""Build the prompt for Step 4: generate JSON-LD schema for one PR."""
|
||
prompt = (
|
||
f"{skill_text}\n\n"
|
||
"---\n\n"
|
||
"Generate a NewsArticle JSON-LD schema for the press release below. "
|
||
"Follow every rule in the skill instructions above. "
|
||
"Use WebSearch to find Wikipedia URLs for each entity.\n\n"
|
||
"CRITICAL OUTPUT RULES:\n"
|
||
"- Output ONLY valid JSON\n"
|
||
"- No markdown fences, no commentary, no explanations\n"
|
||
"- The very first character of your output must be {\n"
|
||
)
|
||
prompt += (
|
||
f"\nCompany name: {company_name}\n\n"
|
||
f"Press release text:\n{pr_text}"
|
||
)
|
||
return prompt
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main tool
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@tool(
|
||
"write_press_releases",
|
||
description=(
|
||
"Full autonomous press-release pipeline. Generates 7 headlines, "
|
||
"AI-picks the best 2, writes 2 complete press releases (600-750 words each), "
|
||
"generates JSON-LD schema for each, and saves all files. "
|
||
"Returns both press releases, both schemas, file paths, and a cost summary. "
|
||
"Use when the user asks to write, create, or draft a press release."
|
||
),
|
||
category="content",
|
||
)
|
||
def write_press_releases(
|
||
topic: str,
|
||
company_name: str,
|
||
url: str = "",
|
||
lsi_terms: str = "",
|
||
required_phrase: str = "",
|
||
ctx: dict = None,
|
||
) -> str:
|
||
"""Run the full press-release pipeline and return results + cost summary."""
|
||
if not ctx or "agent" not in ctx:
|
||
return "Error: press release tool requires agent context."
|
||
|
||
agent = ctx["agent"]
|
||
|
||
# Load skill prompts
|
||
try:
|
||
pr_skill = _load_skill("press_release_prompt.md")
|
||
schema_skill = _load_skill("press-release-schema.md")
|
||
except FileNotFoundError as e:
|
||
return f"Error: {e}"
|
||
|
||
# Load reference files
|
||
companies_file = _load_file_if_exists(_COMPANIES_FILE)
|
||
headlines_ref = _load_file_if_exists(_HEADLINES_FILE)
|
||
|
||
# Ensure output directory (company subfolder)
|
||
company_slug = _slugify(company_name)
|
||
output_dir = _OUTPUT_DIR / company_slug
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
cost_log: list[dict] = []
|
||
|
||
# ── Step 1: Generate 7 headlines (chat brain) ─────────────────────────
|
||
log.info("[PR Pipeline] Step 1/4: Generating 7 headlines for %s...", company_name)
|
||
_set_status(ctx, f"Step 1/4: Generating 7 headlines for {company_name}...")
|
||
step_start = time.time()
|
||
headline_prompt = _build_headline_prompt(topic, company_name, url, lsi_terms, headlines_ref)
|
||
messages = [
|
||
{"role": "system", "content": "You are a senior press-release headline writer."},
|
||
{"role": "user", "content": headline_prompt},
|
||
]
|
||
headlines_raw = _chat_call(agent, messages)
|
||
cost_log.append({
|
||
"step": "1. Generate 7 headlines",
|
||
"model": agent.llm.current_model,
|
||
"elapsed_s": round(time.time() - step_start, 1),
|
||
})
|
||
|
||
if not headlines_raw.strip():
|
||
return "Error: headline generation returned empty result."
|
||
|
||
# Save all 7 headline candidates to file
|
||
slug_base = _slugify(f"{company_name}-{topic}")
|
||
headlines_file = output_dir / f"{slug_base}_{today}_headlines.txt"
|
||
headlines_file.write_text(headlines_raw.strip(), encoding="utf-8")
|
||
|
||
# ── Step 2: AI judge picks best 2 (chat brain) ───────────────────────
|
||
log.info("[PR Pipeline] Step 2/4: AI judge selecting best 2 headlines...")
|
||
_set_status(ctx, "Step 2/4: AI judge selecting best 2 headlines...")
|
||
step_start = time.time()
|
||
judge_prompt = _build_judge_prompt(headlines_raw, headlines_ref)
|
||
messages = [
|
||
{"role": "system", "content": "You are a senior PR editor."},
|
||
{"role": "user", "content": judge_prompt},
|
||
]
|
||
judge_result = _chat_call(agent, messages)
|
||
cost_log.append({
|
||
"step": "2. Judge picks best 2",
|
||
"model": agent.llm.current_model,
|
||
"elapsed_s": round(time.time() - step_start, 1),
|
||
})
|
||
|
||
# Parse the two winning headlines
|
||
winners = [line.strip().lstrip("0123456789.-) ") for line in judge_result.strip().splitlines() if line.strip()]
|
||
if len(winners) < 2:
|
||
all_headlines = [line.strip().lstrip("0123456789.-) ") for line in headlines_raw.strip().splitlines() if line.strip()]
|
||
winners = all_headlines[:2] if len(all_headlines) >= 2 else [all_headlines[0], all_headlines[0]] if all_headlines else ["Headline A", "Headline B"]
|
||
winners = winners[:2]
|
||
|
||
# ── Step 3: Write 2 press releases (execution brain × 2) ─────────────
|
||
log.info("[PR Pipeline] Step 3/4: Writing 2 press releases...")
|
||
anchor_phrase = _derive_anchor_phrase(company_name, topic)
|
||
pr_texts: list[str] = []
|
||
pr_files: list[str] = []
|
||
docx_files: list[str] = []
|
||
anchor_warnings: list[str] = []
|
||
for i, headline in enumerate(winners):
|
||
log.info("[PR Pipeline] Writing PR %d/2: %s", i + 1, headline[:60])
|
||
_set_status(ctx, f"Step 3/4: Writing press release {i+1}/2 — {headline[:60]}...")
|
||
step_start = time.time()
|
||
pr_prompt = _build_pr_prompt(
|
||
headline, topic, company_name, url, lsi_terms,
|
||
required_phrase, pr_skill, companies_file,
|
||
anchor_phrase=anchor_phrase,
|
||
)
|
||
exec_tools = "Bash,Read,Edit,Write,Glob,Grep,WebFetch"
|
||
raw_result = agent.execute_task(pr_prompt, tools=exec_tools)
|
||
elapsed = round(time.time() - step_start, 1)
|
||
cost_log.append({
|
||
"step": f"3{chr(97+i)}. Write PR '{headline[:40]}...'",
|
||
"model": "execution-brain (default)",
|
||
"elapsed_s": elapsed,
|
||
})
|
||
|
||
# Clean output: find the headline, strip preamble and markdown
|
||
clean_result = _clean_pr_output(raw_result, headline)
|
||
pr_texts.append(clean_result)
|
||
|
||
# Validate word count
|
||
wc = _word_count(clean_result)
|
||
if wc < 575 or wc > 800:
|
||
log.warning("PR %d word count %d outside 575-800 range", i + 1, wc)
|
||
|
||
# Validate anchor phrase
|
||
if _find_anchor_in_text(clean_result, anchor_phrase):
|
||
log.info("PR %d contains anchor phrase '%s'", i + 1, anchor_phrase)
|
||
else:
|
||
fuzzy = _fuzzy_find_anchor(clean_result, company_name, topic)
|
||
if fuzzy:
|
||
log.info("PR %d: exact anchor not found, fuzzy match: '%s'", i + 1, fuzzy)
|
||
anchor_warnings.append(
|
||
f"PR {chr(65+i)}: Exact anchor phrase \"{anchor_phrase}\" not found. "
|
||
f"Closest match: \"{fuzzy}\" — you may want to adjust before submitting."
|
||
)
|
||
else:
|
||
log.warning("PR %d: anchor phrase '%s' NOT found", i + 1, anchor_phrase)
|
||
anchor_warnings.append(
|
||
f"PR {chr(65+i)}: Anchor phrase \"{anchor_phrase}\" NOT found in the text. "
|
||
f"You'll need to manually add it before submitting to PA."
|
||
)
|
||
|
||
# Save PR to file
|
||
slug = _slugify(headline)
|
||
filename = f"{slug}_{today}.txt"
|
||
filepath = output_dir / filename
|
||
filepath.write_text(clean_result, encoding="utf-8")
|
||
pr_files.append(str(filepath))
|
||
|
||
# Also save as .docx for Google Docs import
|
||
docx_path = output_dir / f"{slug}_{today}.docx"
|
||
text_to_docx(clean_result, docx_path)
|
||
docx_files.append(str(docx_path))
|
||
|
||
# ── Step 4: Generate 2 JSON-LD schemas (Sonnet + WebSearch) ───────────
|
||
log.info("[PR Pipeline] Step 4/4: Generating 2 JSON-LD schemas...")
|
||
schema_texts: list[str] = []
|
||
schema_files: list[str] = []
|
||
for i, pr_text in enumerate(pr_texts):
|
||
log.info("[PR Pipeline] Schema %d/2 for: %s", i + 1, winners[i][:60])
|
||
_set_status(ctx, f"Step 4/4: Generating schema {i+1}/2...")
|
||
step_start = time.time()
|
||
schema_prompt = _build_schema_prompt(pr_text, company_name, url, schema_skill)
|
||
exec_tools = "WebSearch,WebFetch"
|
||
result = agent.execute_task(
|
||
schema_prompt,
|
||
tools=exec_tools,
|
||
model=SONNET_CLI_MODEL,
|
||
)
|
||
elapsed = round(time.time() - step_start, 1)
|
||
cost_log.append({
|
||
"step": f"4{chr(97+i)}. Schema for PR {i+1}",
|
||
"model": SONNET_CLI_MODEL,
|
||
"elapsed_s": elapsed,
|
||
})
|
||
|
||
# Extract clean JSON and force correct mainEntityOfPage
|
||
schema_json = _extract_json(result)
|
||
if schema_json:
|
||
try:
|
||
schema_obj = json.loads(schema_json)
|
||
if url:
|
||
schema_obj["mainEntityOfPage"] = url
|
||
schema_json = json.dumps(schema_obj, indent=2)
|
||
except json.JSONDecodeError:
|
||
log.warning("Schema %d is not valid JSON", i + 1)
|
||
schema_texts.append(schema_json or result)
|
||
|
||
# Save schema to file
|
||
slug = _slugify(winners[i])
|
||
filename = f"{slug}_{today}_schema.json"
|
||
filepath = output_dir / filename
|
||
filepath.write_text(schema_json or result, encoding="utf-8")
|
||
schema_files.append(str(filepath))
|
||
|
||
# ── Build final output ────────────────────────────────────────────────
|
||
_set_status(ctx, "") # Clear status — pipeline complete
|
||
total_elapsed = sum(c["elapsed_s"] for c in cost_log)
|
||
log.info("[PR Pipeline] Complete for %s — %.0fs total", company_name, total_elapsed)
|
||
output_parts = []
|
||
|
||
for i in range(2):
|
||
label = chr(65 + i) # A, B
|
||
wc = _word_count(pr_texts[i])
|
||
output_parts.append(f"## Press Release {label}: {winners[i]}")
|
||
output_parts.append(f"**Word count:** {wc}")
|
||
output_parts.append(f"**File:** `{pr_files[i]}`")
|
||
output_parts.append(f"**Docx:** `{docx_files[i]}`\n")
|
||
output_parts.append(pr_texts[i])
|
||
output_parts.append("\n---\n")
|
||
output_parts.append(f"### Schema {label}")
|
||
output_parts.append(f"**File:** `{schema_files[i]}`\n")
|
||
output_parts.append(f"```json\n{schema_texts[i]}\n```")
|
||
output_parts.append("\n---\n")
|
||
|
||
# Anchor text warnings
|
||
if anchor_warnings:
|
||
output_parts.append("## Anchor Text Warnings\n")
|
||
output_parts.append(f"Required anchor phrase: **\"{anchor_phrase}\"**\n")
|
||
for warning in anchor_warnings:
|
||
output_parts.append(f"- {warning}")
|
||
output_parts.append("")
|
||
|
||
# Cost summary table
|
||
output_parts.append("## Cost Summary\n")
|
||
output_parts.append("| Step | Model | Time (s) |")
|
||
output_parts.append("|------|-------|----------|")
|
||
for c in cost_log:
|
||
output_parts.append(f"| {c['step']} | {c['model']} | {c['elapsed_s']} |")
|
||
output_parts.append(f"| **Total** | | **{round(total_elapsed, 1)}** |")
|
||
|
||
return "\n".join(output_parts)
|
||
|
||
|
||
def _parse_company_org_ids(companies_text: str) -> dict[str, int]:
|
||
"""Parse companies.md and return {company_name_lower: pa_org_id}."""
|
||
mapping: dict[str, int] = {}
|
||
current_company = ""
|
||
for line in companies_text.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("## "):
|
||
current_company = line[3:].strip()
|
||
elif line.startswith("- **PA Org ID:**") and current_company:
|
||
try:
|
||
org_id = int(line.split(":**")[1].strip())
|
||
mapping[current_company.lower()] = org_id
|
||
except (ValueError, IndexError):
|
||
pass
|
||
return mapping
|
||
|
||
|
||
def _parse_company_data(companies_text: str) -> dict[str, dict]:
|
||
"""Parse companies.md and return full company data keyed by lowercase name.
|
||
|
||
Returns dict like: {"advanced industrial": {"org_id": 19634, "website": "...", "gbp": "..."}}
|
||
"""
|
||
companies: dict[str, dict] = {}
|
||
current_company = ""
|
||
current_data: dict = {}
|
||
for line in companies_text.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("## "):
|
||
if current_company and current_data:
|
||
companies[current_company.lower()] = current_data
|
||
current_company = line[3:].strip()
|
||
current_data = {"name": current_company}
|
||
elif current_company:
|
||
if line.startswith("- **PA Org ID:**"):
|
||
try:
|
||
current_data["org_id"] = int(line.split(":**")[1].strip())
|
||
except (ValueError, IndexError):
|
||
pass
|
||
elif line.startswith("- **Website:**"):
|
||
current_data["website"] = line.split(":**")[1].strip()
|
||
elif line.startswith("- **GBP:**"):
|
||
current_data["gbp"] = line.split(":**")[1].strip()
|
||
|
||
# Don't forget the last company
|
||
if current_company and current_data:
|
||
companies[current_company.lower()] = current_data
|
||
|
||
return companies
|
||
|
||
|
||
def _fuzzy_match_company(name: str, candidates: dict[str, int]) -> int | None:
|
||
"""Try to match a company name against the org ID mapping.
|
||
|
||
Tries exact match first, then substring containment in both directions.
|
||
"""
|
||
name_lower = name.lower().strip()
|
||
|
||
# Exact match
|
||
if name_lower in candidates:
|
||
return candidates[name_lower]
|
||
|
||
# Substring: input contains a known company name, or vice versa
|
||
for key, org_id in candidates.items():
|
||
if key in name_lower or name_lower in key:
|
||
return org_id
|
||
|
||
return None
|
||
|
||
|
||
def _fuzzy_match_company_data(name: str, candidates: dict[str, dict]) -> dict | None:
|
||
"""Try to match a company name against company data.
|
||
|
||
Same fuzzy logic as _fuzzy_match_company but returns the full data dict.
|
||
"""
|
||
name_lower = name.lower().strip()
|
||
|
||
# Exact match
|
||
if name_lower in candidates:
|
||
return candidates[name_lower]
|
||
|
||
# Substring: input contains a known company name, or vice versa
|
||
for key, data in candidates.items():
|
||
if key in name_lower or name_lower in key:
|
||
return data
|
||
|
||
return None
|
||
|
||
|
||
def _text_to_html(text: str, links: list[dict] | None = None) -> str:
|
||
"""Convert plain text to HTML with link injection.
|
||
|
||
Args:
|
||
text: Plain text press release body.
|
||
links: List of dicts with 'url' and 'anchor' keys. Each anchor's first
|
||
occurrence in the text is wrapped in an <a> tag.
|
||
|
||
Returns:
|
||
HTML string with <p> tags and injected links.
|
||
"""
|
||
# Inject anchor text links before paragraph splitting
|
||
if links:
|
||
for link in links:
|
||
anchor = link.get("anchor", "")
|
||
url = link.get("url", "")
|
||
if anchor and url:
|
||
# Replace first occurrence only
|
||
html_link = f'<a href="{url}">{anchor}</a>'
|
||
text = text.replace(anchor, html_link, 1)
|
||
|
||
# Split into paragraphs on double newlines
|
||
paragraphs = re.split(r"\n\s*\n", text.strip())
|
||
|
||
html_parts = []
|
||
for para in paragraphs:
|
||
# Collapse internal newlines to spaces within a paragraph
|
||
para = re.sub(r"\s*\n\s*", " ", para).strip()
|
||
if not para:
|
||
continue
|
||
|
||
# Convert bare URLs to links (skip already-linked ones)
|
||
para = re.sub(
|
||
r'(?<!href=")(?<!">)(https?://\S+)',
|
||
r'<a href="\1">\1</a>',
|
||
para,
|
||
)
|
||
|
||
html_parts.append(f"<p>{para}</p>")
|
||
|
||
return "\n".join(html_parts)
|
||
|
||
|
||
def _extract_json(text: str) -> str | None:
|
||
"""Try to pull a JSON object out of LLM output (strip fences, prose, etc)."""
|
||
stripped = text.strip()
|
||
if stripped.startswith("{"):
|
||
try:
|
||
json.loads(stripped)
|
||
return stripped
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Strip markdown fences
|
||
fence_match = re.search(r"```(?:json)?\s*\n?([\s\S]*?)\n?```", text)
|
||
if fence_match:
|
||
candidate = fence_match.group(1).strip()
|
||
try:
|
||
json.loads(candidate)
|
||
return candidate
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Last resort: find first { to last }
|
||
start = text.find("{")
|
||
end = text.rfind("}")
|
||
if start != -1 and end != -1 and end > start:
|
||
candidate = text[start:end + 1]
|
||
try:
|
||
json.loads(candidate)
|
||
return candidate
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
return None # noqa: RET501
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Submit tool
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _resolve_branded_url(branded_url: str, company_data: dict | None) -> str:
|
||
"""Resolve the branded link URL.
|
||
|
||
- "GBP" (case-insensitive) → look up GBP from company data
|
||
- A real URL → use as-is
|
||
- Empty → fall back to company website
|
||
"""
|
||
if branded_url.strip().upper() == "GBP":
|
||
if company_data and company_data.get("gbp"):
|
||
return company_data["gbp"]
|
||
log.warning("GBP shortcut used but no GBP URL in companies.md")
|
||
return ""
|
||
|
||
if branded_url.strip():
|
||
return branded_url.strip()
|
||
|
||
# Fallback to homepage
|
||
if company_data and company_data.get("website"):
|
||
return company_data["website"]
|
||
|
||
return ""
|
||
|
||
|
||
def _build_links(
|
||
pr_text: str,
|
||
company_name: str,
|
||
topic: str,
|
||
target_url: str,
|
||
branded_url_resolved: str,
|
||
) -> tuple[list[dict], list[str]]:
|
||
"""Build the link list for HTML injection and return (links, warnings).
|
||
|
||
Link 1: brand+keyword anchor → target_url (IMSURL)
|
||
Link 2: company name anchor → branded_url (SocialURL / homepage / GBP)
|
||
"""
|
||
links: list[dict] = []
|
||
warnings: list[str] = []
|
||
|
||
# Link 1: brand+keyword → target_url
|
||
if target_url:
|
||
anchor_phrase = _derive_anchor_phrase(company_name, topic)
|
||
if _find_anchor_in_text(pr_text, anchor_phrase):
|
||
links.append({"url": target_url, "anchor": anchor_phrase})
|
||
else:
|
||
# Try fuzzy match
|
||
fuzzy = _fuzzy_find_anchor(pr_text, company_name, topic)
|
||
if fuzzy:
|
||
links.append({"url": target_url, "anchor": fuzzy})
|
||
warnings.append(
|
||
f"Brand+keyword link: exact phrase \"{anchor_phrase}\" not found. "
|
||
f"Used fuzzy match: \"{fuzzy}\""
|
||
)
|
||
else:
|
||
warnings.append(
|
||
f"Brand+keyword link: anchor phrase \"{anchor_phrase}\" NOT found in PR text. "
|
||
f"Link to {target_url} could not be injected — add it manually in PA."
|
||
)
|
||
|
||
# Link 2: branded → social/homepage/GBP
|
||
if branded_url_resolved:
|
||
# Use company name as anchor — it will always be in the PR
|
||
if _find_anchor_in_text(pr_text, company_name):
|
||
links.append({"url": branded_url_resolved, "anchor": company_name})
|
||
else:
|
||
warnings.append(
|
||
f"Branded link: company name \"{company_name}\" not found in PR text. "
|
||
f"Link to {branded_url_resolved} could not be injected."
|
||
)
|
||
|
||
return links, warnings
|
||
|
||
|
||
@tool(
|
||
"submit_press_release",
|
||
description=(
|
||
"Submit a press release to Press Advantage as a draft. Takes the PR text "
|
||
"(or file path), headline, company name, target URL (IMSURL), and branded "
|
||
"URL (SocialURL). Auto-constructs SEO links: brand+keyword anchor → target "
|
||
"URL, company name → branded URL. If branded_url is 'GBP', uses the Google "
|
||
"Business Profile URL from companies.md. Converts to HTML, resolves the PA "
|
||
"organization ID, and creates a draft for review. Will NOT auto-publish."
|
||
),
|
||
category="content",
|
||
)
|
||
def submit_press_release(
|
||
headline: str,
|
||
company_name: str,
|
||
target_url: str = "",
|
||
branded_url: str = "",
|
||
topic: str = "",
|
||
pr_text: str = "",
|
||
file_path: str = "",
|
||
description: str = "",
|
||
ctx: dict = None,
|
||
) -> str:
|
||
"""Submit a finished press release to Press Advantage as a draft."""
|
||
# --- Get config ---
|
||
if not ctx or "config" not in ctx:
|
||
return "Error: submit_press_release requires agent context."
|
||
|
||
config = ctx["config"]
|
||
api_key = config.press_advantage.api_key
|
||
if not api_key:
|
||
return (
|
||
"Error: PRESS_ADVANTAGE_API key not configured. "
|
||
"Set the PRESS_ADVANTAGE_API environment variable in .env."
|
||
)
|
||
|
||
# --- Get PR text ---
|
||
if not pr_text and file_path:
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
return f"Error: file not found: {file_path}"
|
||
pr_text = path.read_text(encoding="utf-8")
|
||
|
||
if not pr_text:
|
||
return "Error: provide either pr_text or file_path with the press release content."
|
||
|
||
# --- Validate word count ---
|
||
wc = _word_count(pr_text)
|
||
if wc < 550:
|
||
return (
|
||
f"Error: press release is only {wc} words. "
|
||
f"Press Advantage requires at least 550 words. Please expand the content."
|
||
)
|
||
|
||
# --- Derive topic from headline if not provided ---
|
||
if not topic:
|
||
topic = headline
|
||
for part in [company_name, "Inc.", "LLC", "Corp.", "Ltd.", "Limited", "Inc"]:
|
||
topic = topic.replace(part, "").strip()
|
||
topic = re.sub(r"\s+", " ", topic).strip(" -\u2013\u2014,")
|
||
|
||
# --- Load company data ---
|
||
companies_text = _load_file_if_exists(_COMPANIES_FILE)
|
||
company_all = _parse_company_data(companies_text)
|
||
company_data = _fuzzy_match_company_data(company_name, company_all)
|
||
|
||
# --- Look up PA org ID ---
|
||
org_id = company_data.get("org_id") if company_data else None
|
||
|
||
# Fallback: try live API lookup
|
||
if org_id is None:
|
||
log.info("Org ID not found in companies.md for '%s', trying live API...", company_name)
|
||
org_mapping = _parse_company_org_ids(companies_text)
|
||
org_id = _fuzzy_match_company(company_name, org_mapping)
|
||
|
||
if org_id is None:
|
||
try:
|
||
client = PressAdvantageClient(api_key)
|
||
try:
|
||
orgs = client.get_organizations()
|
||
api_mapping: dict[str, int] = {}
|
||
for org in orgs:
|
||
org_name = org.get("name", "")
|
||
oid = org.get("id")
|
||
if org_name and oid:
|
||
api_mapping[org_name.lower()] = int(oid)
|
||
org_id = _fuzzy_match_company(company_name, api_mapping)
|
||
finally:
|
||
client.close()
|
||
except Exception as e:
|
||
log.warning("Failed to fetch orgs from PA API: %s", e)
|
||
|
||
if org_id is None:
|
||
return (
|
||
f"Error: could not find Press Advantage organization for '{company_name}'. "
|
||
f"Add a 'PA Org ID' entry to skills/companies.md or check the company name."
|
||
)
|
||
|
||
# --- Build links ---
|
||
branded_url_resolved = _resolve_branded_url(branded_url, company_data)
|
||
link_list, link_warnings = _build_links(
|
||
pr_text, company_name, topic, target_url, branded_url_resolved,
|
||
)
|
||
|
||
# --- Convert to HTML ---
|
||
html_body = _text_to_html(pr_text, link_list)
|
||
|
||
# --- Auto-generate description if not provided ---
|
||
if not description:
|
||
keyword = headline
|
||
for part in [company_name, "Inc.", "LLC", "Corp.", "Ltd.", "Limited", "Inc"]:
|
||
keyword = keyword.replace(part, "").strip()
|
||
keyword = re.sub(r"\s+", " ", keyword).strip(" -\u2013\u2014,")
|
||
description = f"{company_name} - {keyword}" if keyword else company_name
|
||
|
||
# --- Submit to PA ---
|
||
log.info("Submitting PR to Press Advantage: org=%d, title='%s'", org_id, headline[:60])
|
||
client = PressAdvantageClient(api_key)
|
||
try:
|
||
result = client.create_release(
|
||
org_id=org_id,
|
||
title=headline,
|
||
body=html_body,
|
||
description=description,
|
||
distribution="standard",
|
||
schedule_distribution="false",
|
||
)
|
||
except Exception as e:
|
||
return f"Error submitting to Press Advantage: {e}"
|
||
finally:
|
||
client.close()
|
||
|
||
# --- Format response ---
|
||
release_id = result.get("id", "unknown")
|
||
status = result.get("state", result.get("status", "draft"))
|
||
|
||
output_parts = [
|
||
"Press release submitted to Press Advantage as a DRAFT.\n",
|
||
f"- **Release ID:** {release_id}",
|
||
f"- **Status:** {status}",
|
||
f"- **Organization:** {company_name} (ID: {org_id})",
|
||
f"- **Title:** {headline}",
|
||
f"- **Word count:** {wc}",
|
||
f"- **Links injected:** {len(link_list)}",
|
||
]
|
||
|
||
if link_list:
|
||
output_parts.append("\n**Links:**")
|
||
for link in link_list:
|
||
output_parts.append(f" - \"{link['anchor']}\" → {link['url']}")
|
||
|
||
if link_warnings:
|
||
output_parts.append("\n**Link warnings:**")
|
||
for warning in link_warnings:
|
||
output_parts.append(f" - {warning}")
|
||
|
||
output_parts.append(
|
||
"\n**Next step:** Review and approve in the Press Advantage dashboard before publishing."
|
||
)
|
||
return "\n".join(output_parts)
|