CheddahBot/.claude/skills/content-researcher/scripts/test_block_generator.py

467 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Test Block Generator — Programmatically Assemble Test Blocks from Templates
Takes LLM-generated sentence templates (with {N} slots for body text) and
pre-written headings, plus an LLM-curated entity list, and assembles a test
block. Tracks aggregate densities in real-time and stops when targets are met.
The LLM handles all intelligence: filtering entities for topical relevance,
writing headings, creating body templates. This script handles all math:
slot filling, density tracking, stop conditions.
Usage:
uv run --with openpyxl python test_block_generator.py <templates_path> <prep_json_path> <cora_xlsx_path>
--entities-file <path> [--output-dir ./working/] [--min-sentences 5]
"""
import argparse
import json
import re
import sys
from pathlib import Path
from cora_parser import CoraReport
# ---------------------------------------------------------------------------
# Term selection
# ---------------------------------------------------------------------------
def load_entity_names(entities_file: str) -> list[str]:
"""Load LLM-curated entity names from file (one per line)."""
path = Path(entities_file)
if not path.exists():
print(f"Error: entities file not found: {path}", file=sys.stderr)
sys.exit(1)
names = []
for line in path.read_text(encoding="utf-8").splitlines():
name = line.strip()
if name:
names.append(name)
return names
def build_term_queue(
filtered_entity_names: list[str],
variations: list[str],
) -> list[str]:
"""Build a flat priority-ordered term list.
Order: filtered entities (LLM-curated, in provided order) -> keyword variations.
"""
terms = []
seen = set()
# 1. Filtered entities from LLM (already curated for topical relevance)
for name in filtered_entity_names:
if name.lower() not in seen:
terms.append(name)
seen.add(name.lower())
# 2. Keyword variations
for v in variations:
if v.lower() not in seen:
terms.append(v)
seen.add(v.lower())
return terms
# ---------------------------------------------------------------------------
# Generator
# ---------------------------------------------------------------------------
class TestBlockGenerator:
"""Fills body templates with entity/variation terms, inserts pre-written
headings, and tracks aggregate densities."""
def __init__(self, cora_xlsx_path: str, prep_data: dict, filtered_entity_names: list[str]):
self.report = CoraReport(cora_xlsx_path)
self.prep = prep_data
self.entities = self.report.get_entities()
self.variations = self.report.get_variations_list()
# Compile regex patterns for counting (built once, used per sentence)
self.entity_patterns = {}
for e in self.entities:
name = e["name"]
self.entity_patterns[name] = re.compile(
r"\b" + re.escape(name) + r"\b", re.IGNORECASE
)
self.variation_patterns = {}
for v in self.variations:
self.variation_patterns[v] = re.compile(
r"\b" + re.escape(v) + r"\b", re.IGNORECASE
)
# Build term queue from LLM-curated entity list
self.term_queue = build_term_queue(filtered_entity_names, self.variations)
self.term_idx = 0
# Track which 0->1 entities have been introduced
# Use the full missing list from prep to track introductions accurately
missing = prep_data.get("distinct_entities", {}).get("missing_entities", [])
self.missing_names = {e["name"] for e in missing}
self.introduced = set()
# Running totals for new content
self.new_words = 0
self.new_entity_mentions = 0
self.new_variation_mentions = 0
self.new_h2_count = 0
self.new_h3_count = 0
# Baseline from prep
self.base_words = prep_data["word_count"]["current"]
self.base_entity_mentions = prep_data["entity_density"]["current_mentions"]
self.base_variation_mentions = prep_data["variation_density"]["current_mentions"]
self.target_entity_d = prep_data["entity_density"]["target_decimal"]
self.target_variation_d = prep_data["variation_density"]["target_decimal"]
def pick_term(self, used_in_sentence: set) -> str:
"""Pick next term from the queue, skipping duplicates within a sentence."""
if not self.term_queue:
return "equipment"
used_lower = {u.lower() for u in used_in_sentence}
for _ in range(len(self.term_queue)):
term = self.term_queue[self.term_idx % len(self.term_queue)]
self.term_idx = (self.term_idx + 1) % len(self.term_queue)
if term.lower() not in used_lower:
return term
# All exhausted for this sentence, return next anyway
term = self.term_queue[self.term_idx % len(self.term_queue)]
self.term_idx = (self.term_idx + 1) % len(self.term_queue)
return term
def fill_template(self, template: str) -> str:
"""Fill a template's {N} slots with terms."""
slots = re.findall(r"\{(\d+)\}", template)
used = set()
filled = template
for slot_num in slots:
term = self.pick_term(used)
used.add(term)
filled = filled.replace(f"{{{slot_num}}}", term, 1)
return filled
def count_sentence(self, text: str) -> tuple[int, int, int]:
"""Count words, entity mentions, and variation mentions in text.
Also tracks which 0->1 entities have been introduced.
Returns: (word_count, entity_mentions, variation_mentions)
"""
entity_mentions = 0
for name, pattern in self.entity_patterns.items():
count = len(pattern.findall(text))
entity_mentions += count
if count > 0 and name in self.missing_names:
self.introduced.add(name)
variation_mentions = 0
for v, pattern in self.variation_patterns.items():
variation_mentions += len(pattern.findall(text))
words = len(re.findall(r"[a-zA-Z']+", text))
return words, entity_mentions, variation_mentions
def projected_density(self, metric: str) -> float:
"""Calculate projected density after current additions."""
total_words = self.base_words + self.new_words
if total_words == 0:
return 0.0
if metric == "entity":
return (self.base_entity_mentions + self.new_entity_mentions) / total_words
elif metric == "variation":
return (self.base_variation_mentions + self.new_variation_mentions) / total_words
return 0.0
def targets_met(self, min_reached: bool) -> bool:
"""Check if all density targets are met and minimums reached."""
if not min_reached:
return False
entity_ok = self.projected_density("entity") >= self.target_entity_d
variation_ok = self.projected_density("variation") >= self.target_variation_d
distinct_deficit = self.prep["distinct_entities"]["deficit"]
distinct_ok = len(self.introduced) >= distinct_deficit
wc_deficit = self.prep["word_count"]["deficit"]
wc_ok = self.new_words >= wc_deficit
return entity_ok and variation_ok and distinct_ok and wc_ok
def generate(
self,
templates: list[str],
min_sentences: int = 5,
) -> dict:
"""Generate the test block by filling body templates and inserting
pre-written headings.
Args:
templates: List of template strings. Lines starting with "H2:" or
"H3:" are pre-written headings (inserted as-is, no slot filling).
Everything else is a body template with {N} slots.
min_sentences: Minimum sentences before checking stop condition.
Returns:
Dict with "sentences" list and "stats" summary.
"""
h2_headings = []
h3_headings = []
body_templates = []
for t in templates:
t = t.strip()
if not t:
continue
if t.upper().startswith("H2:"):
h2_headings.append(t[3:].strip())
elif t.upper().startswith("H3:"):
h3_headings.append(t[3:].strip())
else:
body_templates.append(t)
if not body_templates:
return {"error": "No body templates found", "sentences": [], "stats": {}}
h2_needed = self.prep["headings"]["h2"]["deficit"]
h3_needed = self.prep["headings"]["h3"]["deficit"]
sentences = []
count = 0
body_idx = 0
h2_idx = 0
h3_idx = 0
max_iter = max(len(body_templates) * 3, 60)
for _ in range(max_iter):
# Insert pre-written heading if deficit exists and we're at a paragraph break
if h2_needed > 0 and h2_headings and count % 5 == 0:
text = h2_headings[h2_idx % len(h2_headings)]
w, e, v = self.count_sentence(text)
self.new_words += w
self.new_entity_mentions += e
self.new_variation_mentions += v
self.new_h2_count += 1
h2_needed -= 1
h2_idx += 1
sentences.append({"text": text, "type": "h2"})
count += 1
continue
if h3_needed > 0 and h3_headings and count > 0 and count % 3 == 0:
text = h3_headings[h3_idx % len(h3_headings)]
w, e, v = self.count_sentence(text)
self.new_words += w
self.new_entity_mentions += e
self.new_variation_mentions += v
self.new_h3_count += 1
h3_needed -= 1
h3_idx += 1
sentences.append({"text": text, "type": "h3"})
count += 1
continue
# Body sentence — fill template slots
tmpl = body_templates[body_idx % len(body_templates)]
filled = self.fill_template(tmpl)
w, e, v = self.count_sentence(filled)
self.new_words += w
self.new_entity_mentions += e
self.new_variation_mentions += v
body_idx += 1
sentences.append({"text": filled, "type": "body"})
count += 1
if self.targets_met(count >= min_sentences):
break
return {
"sentences": sentences,
"stats": {
"total_sentences": count,
"new_words": self.new_words,
"new_entity_mentions": self.new_entity_mentions,
"new_variation_mentions": self.new_variation_mentions,
"new_distinct_entities_introduced": len(self.introduced),
"introduced_entities": sorted(self.introduced),
"new_h2_count": self.new_h2_count,
"new_h3_count": self.new_h3_count,
"projected_entity_density_pct": round(
self.projected_density("entity") * 100, 2
),
"projected_variation_density_pct": round(
self.projected_density("variation") * 100, 2
),
"target_entity_density_pct": round(self.target_entity_d * 100, 2),
"target_variation_density_pct": round(self.target_variation_d * 100, 2),
},
}
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def format_markdown(sentences: list[dict]) -> str:
"""Convert sentence list to markdown with test block markers."""
lines = ["<!-- HIDDEN TEST BLOCK START -->", ""]
paragraph = []
for s in sentences:
if s["type"] in ("h2", "h3"):
# Flush paragraph before heading
if paragraph:
lines.append(" ".join(paragraph))
lines.append("")
paragraph = []
prefix = "##" if s["type"] == "h2" else "###"
lines.append(f"{prefix} {s['text']}")
lines.append("")
else:
paragraph.append(s["text"])
if len(paragraph) >= 4:
lines.append(" ".join(paragraph))
lines.append("")
paragraph = []
if paragraph:
lines.append(" ".join(paragraph))
lines.append("")
lines.append("<!-- HIDDEN TEST BLOCK END -->")
return "\n".join(lines)
def format_html(sentences: list[dict]) -> str:
"""Convert sentence list to HTML with test block markers."""
lines = ["<!-- HIDDEN TEST BLOCK START -->", ""]
paragraph = []
for s in sentences:
if s["type"] in ("h2", "h3"):
if paragraph:
lines.append("<p>" + " ".join(paragraph) + "</p>")
lines.append("")
paragraph = []
tag = "h2" if s["type"] == "h2" else "h3"
lines.append(f"<{tag}>{s['text']}</{tag}>")
lines.append("")
else:
paragraph.append(s["text"])
if len(paragraph) >= 4:
lines.append("<p>" + " ".join(paragraph) + "</p>")
lines.append("")
paragraph = []
if paragraph:
lines.append("<p>" + " ".join(paragraph) + "</p>")
lines.append("")
lines.append("<!-- HIDDEN TEST BLOCK END -->")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate a test block from templates and deficit data.",
)
parser.add_argument("templates_path", help="Path to templates file (one per line)")
parser.add_argument("prep_json_path", help="Path to prep JSON from test_block_prep.py")
parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report")
parser.add_argument(
"--entities-file", required=True,
help="Path to LLM-curated entity list (one name per line)",
)
parser.add_argument(
"--output-dir", default="./working",
help="Directory for output files (default: ./working)",
)
parser.add_argument(
"--min-sentences", type=int, default=5,
help="Minimum sentences before checking stop condition (default: 5)",
)
args = parser.parse_args()
# Load inputs
templates_path = Path(args.templates_path)
if not templates_path.exists():
print(f"Error: templates file not found: {templates_path}", file=sys.stderr)
sys.exit(1)
templates = [
line.strip()
for line in templates_path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
prep_path = Path(args.prep_json_path)
if not prep_path.exists():
print(f"Error: prep JSON not found: {prep_path}", file=sys.stderr)
sys.exit(1)
prep_data = json.loads(prep_path.read_text(encoding="utf-8"))
# Load LLM-curated entity list
filtered_entity_names = load_entity_names(args.entities_file)
# Generate
gen = TestBlockGenerator(args.cora_xlsx_path, prep_data, filtered_entity_names)
result = gen.generate(templates, min_sentences=args.min_sentences)
if "error" in result and result["error"]:
print(f"Error: {result['error']}", file=sys.stderr)
sys.exit(1)
# Write outputs
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
md_path = out_dir / "test_block.md"
html_path = out_dir / "test_block.html"
stats_path = out_dir / "test_block_stats.json"
md_content = format_markdown(result["sentences"])
html_content = format_html(result["sentences"])
md_path.write_text(md_content, encoding="utf-8")
html_path.write_text(html_content, encoding="utf-8")
stats_path.write_text(
json.dumps(result["stats"], indent=2, default=str), encoding="utf-8"
)
# Print summary
stats = result["stats"]
print(f"Test block generated:")
print(f" Sentences: {stats['total_sentences']}")
print(f" Words: {stats['new_words']}")
print(f" Entity mentions: {stats['new_entity_mentions']}")
print(f" Variation mentions: {stats['new_variation_mentions']}")
print(f" New 0->1 entities: {stats['new_distinct_entities_introduced']}")
print(f" Projected entity density: {stats['projected_entity_density_pct']}%"
f" (target: {stats['target_entity_density_pct']}%)")
print(f" Projected variation density: {stats['projected_variation_density_pct']}%"
f" (target: {stats['target_variation_density_pct']}%)")
print(f"\nFiles written:")
print(f" {md_path}")
print(f" {html_path}")
print(f" {stats_path}")
if __name__ == "__main__":
main()