221 lines
8.1 KiB
Python
221 lines
8.1 KiB
Python
"""Adversarial fact-checker for press release outputs.
|
|
|
|
Runs a second Claude Code pass on generated PR text files to catch
|
|
factual errors. Treats all client-provided data (company name, titles,
|
|
URLs, topic) as ground truth and only corrects claims the PR inferred
|
|
or fabricated beyond what was given.
|
|
|
|
Graceful failure: any error returns the original text untouched.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
FACT_CHECK_MODEL = "sonnet"
|
|
FACT_CHECK_TIMEOUT = 300 # 5 minutes per PR
|
|
|
|
|
|
def build_fact_check_prompt(
|
|
pr_text: str,
|
|
company_name: str,
|
|
url: str,
|
|
topic: str,
|
|
keyword: str,
|
|
) -> str:
|
|
"""Build the prompt for the adversarial fact-checker."""
|
|
return (
|
|
"You are a factual accuracy reviewer for press releases. Your ONLY job is to "
|
|
"find and correct statements that are factually wrong. You are NOT an editor.\n\n"
|
|
"GROUND TRUTH -- the following data was provided by the client and is correct "
|
|
"by definition. Do NOT change, question, or 'correct' any of it, even if your "
|
|
"web search suggests something different:\n"
|
|
" - Company name: %s\n"
|
|
" - Target URL: %s\n"
|
|
" - Topic: %s\n"
|
|
" - Keyword: %s\n"
|
|
" - Any person names, titles, quotes, or contact details in the PR\n"
|
|
" - Any product names, service names, or brand names\n"
|
|
" - The overall framing, angle, and tone of the PR\n\n"
|
|
"WHAT TO CHECK (use WebSearch/WebFetch to verify):\n"
|
|
" - Industry statistics or market size claims\n"
|
|
" - Historical dates or facts\n"
|
|
" - Technical specifications not sourced from the client data\n"
|
|
" - General knowledge claims (e.g. 'X is the leading cause of Y')\n"
|
|
" - Geographic or regulatory facts\n\n"
|
|
"RULES:\n"
|
|
" - ONLY fix actual factual errors -- wrong numbers, wrong dates, wrong facts\n"
|
|
" - Do NOT add content, remove content, restructure, or 'improve' anything\n"
|
|
" - Do NOT change tone, style, word choice, or sentence structure\n"
|
|
" - Do NOT suggest additions or enhancements\n"
|
|
" - Make the MINIMUM change needed to fix each error\n"
|
|
" - Preserve the exact formatting, paragraph breaks, and headline\n\n"
|
|
"OUTPUT FORMAT:\n"
|
|
" - If you find NO factual errors: output exactly [NO_ERRORS] and nothing else\n"
|
|
" - If you find errors: output [CORRECTED] on the first line, then the full "
|
|
"corrected PR text (preserving all formatting), then a blank line, then "
|
|
"CHANGES: followed by a numbered list of what you changed and why\n\n"
|
|
"Press release to review:\n"
|
|
"---\n"
|
|
"%s\n"
|
|
"---"
|
|
) % (company_name, url, topic, keyword, pr_text)
|
|
|
|
|
|
def apply_fact_check(raw_output: str, original_text: str) -> tuple[str, str, str]:
|
|
"""Parse fact-checker output. Returns (text, status, changes).
|
|
|
|
status is one of: "clean", "corrected", "skipped"
|
|
On any parse failure or suspect rewrite, returns original text unchanged.
|
|
"""
|
|
if not raw_output or not raw_output.strip():
|
|
return original_text, "skipped", ""
|
|
|
|
stripped = raw_output.strip()
|
|
|
|
# No errors found
|
|
if stripped.startswith("[NO_ERRORS]"):
|
|
return original_text, "clean", ""
|
|
|
|
# Corrections found
|
|
if stripped.startswith("[CORRECTED]"):
|
|
body = stripped[len("[CORRECTED]"):].strip()
|
|
|
|
# Split into corrected text and change log
|
|
changes = ""
|
|
if "\nCHANGES:" in body:
|
|
text_part, changes = body.split("\nCHANGES:", 1)
|
|
corrected = text_part.strip()
|
|
changes = changes.strip()
|
|
else:
|
|
corrected = body
|
|
|
|
if not corrected:
|
|
return original_text, "skipped", ""
|
|
|
|
# Safety: reject if word count differs by more than 15%
|
|
orig_wc = len(original_text.split())
|
|
new_wc = len(corrected.split())
|
|
if orig_wc > 0 and abs(new_wc - orig_wc) / orig_wc > 0.15:
|
|
log.warning(
|
|
"Fact-check rejected: word count changed too much "
|
|
"(%d -> %d, %.0f%% delta)",
|
|
orig_wc, new_wc, abs(new_wc - orig_wc) / orig_wc * 100,
|
|
)
|
|
return original_text, "skipped", "rejected -- word count delta too large"
|
|
|
|
return corrected, "corrected", changes
|
|
|
|
# Unparseable output
|
|
return original_text, "skipped", ""
|
|
|
|
|
|
def fact_check_pr_files(
|
|
output_files: list[Path],
|
|
company_name: str,
|
|
url: str,
|
|
topic: str,
|
|
keyword: str,
|
|
timeout: int = FACT_CHECK_TIMEOUT,
|
|
) -> tuple[list[str], bool]:
|
|
"""Run fact-check on .txt PR files in the output list.
|
|
|
|
Returns:
|
|
(status_lines, any_failed) where status_lines is a list of
|
|
human-readable results per PR, and any_failed is True if the
|
|
fact-checker could not run on at least one PR.
|
|
"""
|
|
claude_bin = shutil.which("claude")
|
|
if not claude_bin:
|
|
log.warning("Fact-check: claude CLI not found, skipping")
|
|
return ["Fact-check: claude CLI not found, skipped"], True
|
|
|
|
txt_files = [f for f in output_files if f.suffix == ".txt"]
|
|
# Skip non-PR files like "Headlines Evaluation.md"
|
|
# PR files are the .txt files (the actual press releases)
|
|
if not txt_files:
|
|
return [], False
|
|
|
|
status_lines: list[str] = []
|
|
any_failed = False
|
|
|
|
for i, txt_file in enumerate(txt_files):
|
|
label = "PR %s" % chr(65 + i) # PR A, PR B, etc.
|
|
try:
|
|
original = txt_file.read_text(encoding="utf-8")
|
|
if not original.strip():
|
|
continue
|
|
|
|
prompt = build_fact_check_prompt(
|
|
original, company_name, url, topic, keyword
|
|
)
|
|
|
|
cmd = [
|
|
claude_bin,
|
|
"-p", prompt,
|
|
"--output-format", "text",
|
|
"--permission-mode", "bypassPermissions",
|
|
"--allowedTools", "WebSearch,WebFetch",
|
|
"--max-turns", "10",
|
|
"--model", FACT_CHECK_MODEL,
|
|
]
|
|
|
|
log.info("Fact-checking %s: %s", label, txt_file.name)
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
cwd=str(txt_file.parent),
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
log.warning(
|
|
"Fact-check %s failed (exit %d): %s",
|
|
label, result.returncode, (result.stderr or "")[:500],
|
|
)
|
|
status_lines.append(
|
|
"Fact-check %s: could not run -- manual review recommended" % label
|
|
)
|
|
any_failed = True
|
|
continue
|
|
|
|
corrected, status, changes = apply_fact_check(result.stdout, original)
|
|
|
|
if status == "corrected":
|
|
txt_file.write_text(corrected, encoding="utf-8")
|
|
log.info("Fact-check %s: corrections applied", label)
|
|
line = "Fact-check %s: corrections applied" % label
|
|
if changes:
|
|
line += "\n %s" % changes
|
|
status_lines.append(line)
|
|
elif status == "clean":
|
|
log.info("Fact-check %s: no errors found", label)
|
|
status_lines.append("Fact-check %s: no errors found" % label)
|
|
else:
|
|
log.warning("Fact-check %s: skipped (unparseable output)", label)
|
|
status_lines.append(
|
|
"Fact-check %s: could not run -- manual review recommended" % label
|
|
)
|
|
any_failed = True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log.warning("Fact-check %s timed out after %ds", label, timeout)
|
|
status_lines.append(
|
|
"Fact-check %s: timed out -- manual review recommended" % label
|
|
)
|
|
any_failed = True
|
|
except Exception as e:
|
|
log.warning("Fact-check %s error: %s", label, e)
|
|
status_lines.append(
|
|
"Fact-check %s: could not run -- manual review recommended" % label
|
|
)
|
|
any_failed = True
|
|
|
|
return status_lines, any_failed
|