470 lines
17 KiB
Python
470 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test Block Generator — Programmatically Assemble Test Blocks from Templates
|
|
|
|
Takes LLM-generated sentence templates (with {N} slots for body text) and
|
|
pre-written headings, plus an LLM-curated entity list, and assembles a test
|
|
block. Tracks aggregate densities in real-time and stops when targets are met.
|
|
|
|
The LLM handles all intelligence: filtering entities for topical relevance,
|
|
writing headings, creating body templates. This script handles all math:
|
|
slot filling, density tracking, stop conditions.
|
|
|
|
Usage:
|
|
uv run --with openpyxl python test_block_generator.py <templates_path> <prep_json_path> <cora_xlsx_path>
|
|
--entities-file <path> [--output-dir ./working/] [--min-sentences 5]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from cora_parser import CoraReport
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Term selection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_entity_names(entities_file: str) -> list[str]:
|
|
"""Load LLM-curated entity names from file (one per line)."""
|
|
path = Path(entities_file)
|
|
if not path.exists():
|
|
print(f"Error: entities file not found: {path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
names = []
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
name = line.strip()
|
|
if name:
|
|
names.append(name)
|
|
return names
|
|
|
|
|
|
def build_term_queue(
|
|
filtered_entity_names: list[str],
|
|
variations: list[str],
|
|
) -> list[str]:
|
|
"""Build a flat priority-ordered term list.
|
|
|
|
Order: filtered entities (LLM-curated, in provided order) -> keyword variations.
|
|
"""
|
|
terms = []
|
|
seen = set()
|
|
|
|
# 1. Filtered entities from LLM (already curated for topical relevance)
|
|
for name in filtered_entity_names:
|
|
if name.lower() not in seen:
|
|
terms.append(name)
|
|
seen.add(name.lower())
|
|
|
|
# 2. Keyword variations
|
|
for v in variations:
|
|
if v.lower() not in seen:
|
|
terms.append(v)
|
|
seen.add(v.lower())
|
|
|
|
return terms
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Generator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBlockGenerator:
|
|
"""Fills body templates with entity/variation terms, inserts pre-written
|
|
headings, and tracks aggregate densities."""
|
|
|
|
def __init__(self, cora_xlsx_path: str, prep_data: dict, filtered_entity_names: list[str]):
|
|
self.report = CoraReport(cora_xlsx_path)
|
|
self.prep = prep_data
|
|
self.entities = self.report.get_entities()
|
|
self.variations = self.report.get_variations_list()
|
|
|
|
# Compile regex patterns for counting (built once, used per sentence)
|
|
self.entity_patterns = {}
|
|
for e in self.entities:
|
|
name = e["name"]
|
|
self.entity_patterns[name] = re.compile(
|
|
r"\b" + re.escape(name) + r"\b", re.IGNORECASE
|
|
)
|
|
|
|
self.variation_patterns = {}
|
|
for v in self.variations:
|
|
self.variation_patterns[v] = re.compile(
|
|
r"\b" + re.escape(v) + r"\b", re.IGNORECASE
|
|
)
|
|
|
|
# Build term queue from LLM-curated entity list
|
|
self.term_queue = build_term_queue(filtered_entity_names, self.variations)
|
|
self.term_idx = 0
|
|
|
|
# Track which 0->1 entities have been introduced
|
|
# Use the full missing list from prep to track introductions accurately
|
|
missing = prep_data.get("distinct_entities", {}).get("missing_entities", [])
|
|
self.missing_names = {e["name"] for e in missing}
|
|
self.introduced = set()
|
|
|
|
# Running totals for new content
|
|
self.new_words = 0
|
|
self.new_entity_mentions = 0
|
|
self.new_variation_mentions = 0
|
|
self.new_h2_count = 0
|
|
self.new_h3_count = 0
|
|
|
|
# Baseline from prep
|
|
self.base_words = prep_data["word_count"]["current"]
|
|
self.base_entity_mentions = prep_data["entity_density"]["current_mentions"]
|
|
self.base_variation_mentions = prep_data["variation_density"]["current_mentions"]
|
|
self.target_entity_d = prep_data["entity_density"]["target_decimal"]
|
|
self.target_variation_d = prep_data["variation_density"]["target_decimal"]
|
|
|
|
def pick_term(self, used_in_sentence: set) -> str:
|
|
"""Pick next term from the queue, skipping duplicates within a sentence."""
|
|
if not self.term_queue:
|
|
return "equipment"
|
|
|
|
used_lower = {u.lower() for u in used_in_sentence}
|
|
for _ in range(len(self.term_queue)):
|
|
term = self.term_queue[self.term_idx % len(self.term_queue)]
|
|
self.term_idx = (self.term_idx + 1) % len(self.term_queue)
|
|
if term.lower() not in used_lower:
|
|
return term
|
|
|
|
# All exhausted for this sentence, return next anyway
|
|
term = self.term_queue[self.term_idx % len(self.term_queue)]
|
|
self.term_idx = (self.term_idx + 1) % len(self.term_queue)
|
|
return term
|
|
|
|
def fill_template(self, template: str) -> str:
|
|
"""Fill a template's {N} slots with terms."""
|
|
slots = re.findall(r"\{(\d+)\}", template)
|
|
used = set()
|
|
filled = template
|
|
|
|
for slot_num in slots:
|
|
term = self.pick_term(used)
|
|
used.add(term)
|
|
filled = filled.replace(f"{{{slot_num}}}", term, 1)
|
|
|
|
return filled
|
|
|
|
def count_sentence(self, text: str) -> tuple[int, int, int]:
|
|
"""Count words, entity mentions, and variation mentions in text.
|
|
|
|
Also tracks which 0->1 entities have been introduced.
|
|
Returns: (word_count, entity_mentions, variation_mentions)
|
|
"""
|
|
entity_mentions = 0
|
|
for name, pattern in self.entity_patterns.items():
|
|
count = len(pattern.findall(text))
|
|
entity_mentions += count
|
|
if count > 0 and name in self.missing_names:
|
|
self.introduced.add(name)
|
|
|
|
variation_mentions = 0
|
|
for v, pattern in self.variation_patterns.items():
|
|
variation_mentions += len(pattern.findall(text))
|
|
|
|
words = len(re.findall(r"[a-zA-Z']+", text))
|
|
return words, entity_mentions, variation_mentions
|
|
|
|
def projected_density(self, metric: str) -> float:
|
|
"""Calculate projected density after current additions."""
|
|
total_words = self.base_words + self.new_words
|
|
if total_words == 0:
|
|
return 0.0
|
|
|
|
if metric == "entity":
|
|
return (self.base_entity_mentions + self.new_entity_mentions) / total_words
|
|
elif metric == "variation":
|
|
return (self.base_variation_mentions + self.new_variation_mentions) / total_words
|
|
return 0.0
|
|
|
|
def targets_met(self, min_reached: bool) -> bool:
|
|
"""Check if all density targets are met and minimums reached."""
|
|
if not min_reached:
|
|
return False
|
|
|
|
entity_ok = self.projected_density("entity") >= self.target_entity_d
|
|
variation_ok = self.projected_density("variation") >= self.target_variation_d
|
|
|
|
distinct_deficit = self.prep["distinct_entities"]["deficit"]
|
|
distinct_ok = len(self.introduced) >= distinct_deficit
|
|
|
|
wc_deficit = self.prep["word_count"]["deficit"]
|
|
wc_ok = self.new_words >= wc_deficit
|
|
|
|
return entity_ok and variation_ok and distinct_ok and wc_ok
|
|
|
|
def generate(
|
|
self,
|
|
templates: list[str],
|
|
min_sentences: int = 5,
|
|
) -> dict:
|
|
"""Generate the test block by filling body templates and inserting
|
|
pre-written headings.
|
|
|
|
Args:
|
|
templates: List of template strings. Lines starting with "H2:" or
|
|
"H3:" are pre-written headings (inserted as-is, no slot filling).
|
|
Everything else is a body template with {N} slots.
|
|
min_sentences: Minimum sentences before checking stop condition.
|
|
|
|
Returns:
|
|
Dict with "sentences" list and "stats" summary.
|
|
"""
|
|
h2_headings = []
|
|
h3_headings = []
|
|
body_templates = []
|
|
|
|
for t in templates:
|
|
t = t.strip()
|
|
if not t:
|
|
continue
|
|
if t.upper().startswith("H2:"):
|
|
h2_headings.append(t[3:].strip())
|
|
elif t.upper().startswith("H3:"):
|
|
h3_headings.append(t[3:].strip())
|
|
else:
|
|
body_templates.append(t)
|
|
|
|
if not body_templates:
|
|
return {"error": "No body templates found", "sentences": [], "stats": {}}
|
|
|
|
h2_needed = self.prep["headings"]["h2"]["deficit"]
|
|
h3_needed = self.prep["headings"]["h3"]["deficit"]
|
|
|
|
sentences = []
|
|
count = 0
|
|
body_idx = 0
|
|
h2_idx = 0
|
|
h3_idx = 0
|
|
max_iter = max(len(body_templates) * 3, 60)
|
|
|
|
for _ in range(max_iter):
|
|
# Insert pre-written heading if deficit exists and we're at a paragraph break
|
|
if h2_needed > 0 and h2_headings and count % 5 == 0:
|
|
text = h2_headings[h2_idx % len(h2_headings)]
|
|
w, e, v = self.count_sentence(text)
|
|
self.new_words += w
|
|
self.new_entity_mentions += e
|
|
self.new_variation_mentions += v
|
|
self.new_h2_count += 1
|
|
h2_needed -= 1
|
|
h2_idx += 1
|
|
sentences.append({"text": text, "type": "h2"})
|
|
count += 1
|
|
continue
|
|
|
|
if h3_needed > 0 and h3_headings and count > 0 and count % 3 == 0:
|
|
text = h3_headings[h3_idx % len(h3_headings)]
|
|
w, e, v = self.count_sentence(text)
|
|
self.new_words += w
|
|
self.new_entity_mentions += e
|
|
self.new_variation_mentions += v
|
|
self.new_h3_count += 1
|
|
h3_needed -= 1
|
|
h3_idx += 1
|
|
sentences.append({"text": text, "type": "h3"})
|
|
count += 1
|
|
continue
|
|
|
|
# Body sentence — fill template slots
|
|
tmpl = body_templates[body_idx % len(body_templates)]
|
|
filled = self.fill_template(tmpl)
|
|
w, e, v = self.count_sentence(filled)
|
|
self.new_words += w
|
|
self.new_entity_mentions += e
|
|
self.new_variation_mentions += v
|
|
body_idx += 1
|
|
sentences.append({"text": filled, "type": "body"})
|
|
count += 1
|
|
|
|
if self.targets_met(count >= min_sentences):
|
|
break
|
|
|
|
return {
|
|
"sentences": sentences,
|
|
"stats": {
|
|
"total_sentences": count,
|
|
"new_words": self.new_words,
|
|
"new_entity_mentions": self.new_entity_mentions,
|
|
"new_variation_mentions": self.new_variation_mentions,
|
|
"new_distinct_entities_introduced": len(self.introduced),
|
|
"introduced_entities": sorted(self.introduced),
|
|
"new_h2_count": self.new_h2_count,
|
|
"new_h3_count": self.new_h3_count,
|
|
"projected_entity_density_pct": round(
|
|
self.projected_density("entity") * 100, 2
|
|
),
|
|
"projected_variation_density_pct": round(
|
|
self.projected_density("variation") * 100, 2
|
|
),
|
|
"target_entity_density_pct": round(self.target_entity_d * 100, 2),
|
|
"target_variation_density_pct": round(self.target_variation_d * 100, 2),
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def format_markdown(sentences: list[dict]) -> str:
|
|
"""Convert sentence list to markdown with test block markers."""
|
|
lines = ["<!-- HIDDEN TEST BLOCK START -->", ""]
|
|
paragraph = []
|
|
|
|
for s in sentences:
|
|
if s["type"] in ("h2", "h3"):
|
|
# Flush paragraph before heading
|
|
if paragraph:
|
|
lines.append(" ".join(paragraph))
|
|
lines.append("")
|
|
paragraph = []
|
|
prefix = "##" if s["type"] == "h2" else "###"
|
|
lines.append(f"{prefix} {s['text']}")
|
|
lines.append("")
|
|
else:
|
|
paragraph.append(s["text"])
|
|
if len(paragraph) >= 4:
|
|
lines.append(" ".join(paragraph))
|
|
lines.append("")
|
|
paragraph = []
|
|
|
|
if paragraph:
|
|
lines.append(" ".join(paragraph))
|
|
lines.append("")
|
|
|
|
lines.append("<!-- HIDDEN TEST BLOCK END -->")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_html(sentences: list[dict]) -> str:
|
|
"""Convert sentence list to HTML with test block markers."""
|
|
lines = ["<!-- HIDDEN TEST BLOCK START -->", ""]
|
|
paragraph = []
|
|
|
|
for s in sentences:
|
|
if s["type"] in ("h2", "h3"):
|
|
if paragraph:
|
|
lines.append("<p>" + " ".join(paragraph) + "</p>")
|
|
lines.append("")
|
|
paragraph = []
|
|
tag = "h2" if s["type"] == "h2" else "h3"
|
|
lines.append(f"<{tag}>{s['text']}</{tag}>")
|
|
lines.append("")
|
|
else:
|
|
paragraph.append(s["text"])
|
|
if len(paragraph) >= 4:
|
|
lines.append("<p>" + " ".join(paragraph) + "</p>")
|
|
lines.append("")
|
|
paragraph = []
|
|
|
|
if paragraph:
|
|
lines.append("<p>" + " ".join(paragraph) + "</p>")
|
|
lines.append("")
|
|
|
|
lines.append("<!-- HIDDEN TEST BLOCK END -->")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate a test block from templates and deficit data.",
|
|
)
|
|
parser.add_argument("templates_path", help="Path to templates file (one per line)")
|
|
parser.add_argument("prep_json_path", help="Path to prep JSON from test_block_prep.py")
|
|
parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report")
|
|
parser.add_argument(
|
|
"--entities-file", required=True,
|
|
help="Path to LLM-curated entity list (one name per line)",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir", default="./working",
|
|
help="Directory for output files (default: ./working)",
|
|
)
|
|
parser.add_argument(
|
|
"--min-sentences", type=int, default=5,
|
|
help="Minimum sentences before checking stop condition (default: 5)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Load inputs
|
|
templates_path = Path(args.templates_path)
|
|
if not templates_path.exists():
|
|
print(f"Error: templates file not found: {templates_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
templates = [
|
|
line.strip()
|
|
for line in templates_path.read_text(encoding="utf-8").splitlines()
|
|
if line.strip()
|
|
]
|
|
|
|
prep_path = Path(args.prep_json_path)
|
|
if not prep_path.exists():
|
|
print(f"Error: prep JSON not found: {prep_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
prep_data = json.loads(prep_path.read_text(encoding="utf-8"))
|
|
|
|
# Load LLM-curated entity list
|
|
filtered_entity_names = load_entity_names(args.entities_file)
|
|
|
|
# Generate
|
|
gen = TestBlockGenerator(args.cora_xlsx_path, prep_data, filtered_entity_names)
|
|
result = gen.generate(templates, min_sentences=args.min_sentences)
|
|
|
|
if "error" in result and result["error"]:
|
|
print(f"Error: {result['error']}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Write outputs
|
|
out_dir = Path(args.output_dir)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
md_path = out_dir / "test_block.md"
|
|
html_path = out_dir / "test_block.html"
|
|
txt_path = out_dir / "test_block.txt"
|
|
stats_path = out_dir / "test_block_stats.json"
|
|
|
|
md_content = format_markdown(result["sentences"])
|
|
html_content = format_html(result["sentences"])
|
|
|
|
md_path.write_text(md_content, encoding="utf-8")
|
|
html_path.write_text(html_content, encoding="utf-8")
|
|
txt_path.write_text(html_content, encoding="utf-8")
|
|
stats_path.write_text(
|
|
json.dumps(result["stats"], indent=2, default=str), encoding="utf-8"
|
|
)
|
|
|
|
# Print summary
|
|
stats = result["stats"]
|
|
print(f"Test block generated:")
|
|
print(f" Sentences: {stats['total_sentences']}")
|
|
print(f" Words: {stats['new_words']}")
|
|
print(f" Entity mentions: {stats['new_entity_mentions']}")
|
|
print(f" Variation mentions: {stats['new_variation_mentions']}")
|
|
print(f" New 0->1 entities: {stats['new_distinct_entities_introduced']}")
|
|
print(f" Projected entity density: {stats['projected_entity_density_pct']}%"
|
|
f" (target: {stats['target_entity_density_pct']}%)")
|
|
print(f" Projected variation density: {stats['projected_variation_density_pct']}%"
|
|
f" (target: {stats['target_variation_density_pct']}%)")
|
|
print(f"\nFiles written:")
|
|
print(f" {md_path}")
|
|
print(f" {html_path}")
|
|
print(f" {txt_path}")
|
|
print(f" {stats_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|