#!/usr/bin/env python3 """ Test Block Generator — Programmatically Assemble Test Blocks from Templates Takes LLM-generated sentence templates (with {N} slots for body text) and pre-written headings, plus an LLM-curated entity list, and assembles a test block. Tracks aggregate densities in real-time and stops when targets are met. The LLM handles all intelligence: filtering entities for topical relevance, writing headings, creating body templates. This script handles all math: slot filling, density tracking, stop conditions. Usage: uv run --with openpyxl python test_block_generator.py --entities-file [--output-dir ./working/] [--min-sentences 5] """ import argparse import json import re import sys from pathlib import Path from cora_parser import CoraReport # --------------------------------------------------------------------------- # Term selection # --------------------------------------------------------------------------- def load_entity_names(entities_file: str) -> list[str]: """Load LLM-curated entity names from file (one per line).""" path = Path(entities_file) if not path.exists(): print(f"Error: entities file not found: {path}", file=sys.stderr) sys.exit(1) names = [] for line in path.read_text(encoding="utf-8").splitlines(): name = line.strip() if name: names.append(name) return names def build_term_queue( filtered_entity_names: list[str], variations: list[str], ) -> list[str]: """Build a flat priority-ordered term list. Order: filtered entities (LLM-curated, in provided order) -> keyword variations. """ terms = [] seen = set() # 1. Filtered entities from LLM (already curated for topical relevance) for name in filtered_entity_names: if name.lower() not in seen: terms.append(name) seen.add(name.lower()) # 2. Keyword variations for v in variations: if v.lower() not in seen: terms.append(v) seen.add(v.lower()) return terms # --------------------------------------------------------------------------- # Generator # --------------------------------------------------------------------------- class TestBlockGenerator: """Fills body templates with entity/variation terms, inserts pre-written headings, and tracks aggregate densities.""" def __init__(self, cora_xlsx_path: str, prep_data: dict, filtered_entity_names: list[str]): self.report = CoraReport(cora_xlsx_path) self.prep = prep_data self.entities = self.report.get_entities() self.variations = self.report.get_variations_list() # Compile regex patterns for counting (built once, used per sentence) self.entity_patterns = {} for e in self.entities: name = e["name"] self.entity_patterns[name] = re.compile( r"\b" + re.escape(name) + r"\b", re.IGNORECASE ) self.variation_patterns = {} for v in self.variations: self.variation_patterns[v] = re.compile( r"\b" + re.escape(v) + r"\b", re.IGNORECASE ) # Build term queue from LLM-curated entity list self.term_queue = build_term_queue(filtered_entity_names, self.variations) self.term_idx = 0 # Track which 0->1 entities have been introduced # Use the full missing list from prep to track introductions accurately missing = prep_data.get("distinct_entities", {}).get("missing_entities", []) self.missing_names = {e["name"] for e in missing} self.introduced = set() # Running totals for new content self.new_words = 0 self.new_entity_mentions = 0 self.new_variation_mentions = 0 self.new_h2_count = 0 self.new_h3_count = 0 # Baseline from prep self.base_words = prep_data["word_count"]["current"] self.base_entity_mentions = prep_data["entity_density"]["current_mentions"] self.base_variation_mentions = prep_data["variation_density"]["current_mentions"] self.target_entity_d = prep_data["entity_density"]["target_decimal"] self.target_variation_d = prep_data["variation_density"]["target_decimal"] def pick_term(self, used_in_sentence: set) -> str: """Pick next term from the queue, skipping duplicates within a sentence.""" if not self.term_queue: return "equipment" used_lower = {u.lower() for u in used_in_sentence} for _ in range(len(self.term_queue)): term = self.term_queue[self.term_idx % len(self.term_queue)] self.term_idx = (self.term_idx + 1) % len(self.term_queue) if term.lower() not in used_lower: return term # All exhausted for this sentence, return next anyway term = self.term_queue[self.term_idx % len(self.term_queue)] self.term_idx = (self.term_idx + 1) % len(self.term_queue) return term def fill_template(self, template: str) -> str: """Fill a template's {N} slots with terms.""" slots = re.findall(r"\{(\d+)\}", template) used = set() filled = template for slot_num in slots: term = self.pick_term(used) used.add(term) filled = filled.replace(f"{{{slot_num}}}", term, 1) return filled def count_sentence(self, text: str) -> tuple[int, int, int]: """Count words, entity mentions, and variation mentions in text. Also tracks which 0->1 entities have been introduced. Returns: (word_count, entity_mentions, variation_mentions) """ entity_mentions = 0 for name, pattern in self.entity_patterns.items(): count = len(pattern.findall(text)) entity_mentions += count if count > 0 and name in self.missing_names: self.introduced.add(name) variation_mentions = 0 for v, pattern in self.variation_patterns.items(): variation_mentions += len(pattern.findall(text)) words = len(re.findall(r"[a-zA-Z']+", text)) return words, entity_mentions, variation_mentions def projected_density(self, metric: str) -> float: """Calculate projected density after current additions.""" total_words = self.base_words + self.new_words if total_words == 0: return 0.0 if metric == "entity": return (self.base_entity_mentions + self.new_entity_mentions) / total_words elif metric == "variation": return (self.base_variation_mentions + self.new_variation_mentions) / total_words return 0.0 def targets_met(self, min_reached: bool) -> bool: """Check if all density targets are met and minimums reached.""" if not min_reached: return False entity_ok = self.projected_density("entity") >= self.target_entity_d variation_ok = self.projected_density("variation") >= self.target_variation_d distinct_deficit = self.prep["distinct_entities"]["deficit"] distinct_ok = len(self.introduced) >= distinct_deficit wc_deficit = self.prep["word_count"]["deficit"] wc_ok = self.new_words >= wc_deficit return entity_ok and variation_ok and distinct_ok and wc_ok def generate( self, templates: list[str], min_sentences: int = 5, ) -> dict: """Generate the test block by filling body templates and inserting pre-written headings. Args: templates: List of template strings. Lines starting with "H2:" or "H3:" are pre-written headings (inserted as-is, no slot filling). Everything else is a body template with {N} slots. min_sentences: Minimum sentences before checking stop condition. Returns: Dict with "sentences" list and "stats" summary. """ h2_headings = [] h3_headings = [] body_templates = [] for t in templates: t = t.strip() if not t: continue if t.upper().startswith("H2:"): h2_headings.append(t[3:].strip()) elif t.upper().startswith("H3:"): h3_headings.append(t[3:].strip()) else: body_templates.append(t) if not body_templates: return {"error": "No body templates found", "sentences": [], "stats": {}} h2_needed = self.prep["headings"]["h2"]["deficit"] h3_needed = self.prep["headings"]["h3"]["deficit"] sentences = [] count = 0 body_idx = 0 h2_idx = 0 h3_idx = 0 max_iter = max(len(body_templates) * 3, 60) for _ in range(max_iter): # Insert pre-written heading if deficit exists and we're at a paragraph break if h2_needed > 0 and h2_headings and count % 5 == 0: text = h2_headings[h2_idx % len(h2_headings)] w, e, v = self.count_sentence(text) self.new_words += w self.new_entity_mentions += e self.new_variation_mentions += v self.new_h2_count += 1 h2_needed -= 1 h2_idx += 1 sentences.append({"text": text, "type": "h2"}) count += 1 continue if h3_needed > 0 and h3_headings and count > 0 and count % 3 == 0: text = h3_headings[h3_idx % len(h3_headings)] w, e, v = self.count_sentence(text) self.new_words += w self.new_entity_mentions += e self.new_variation_mentions += v self.new_h3_count += 1 h3_needed -= 1 h3_idx += 1 sentences.append({"text": text, "type": "h3"}) count += 1 continue # Body sentence — fill template slots tmpl = body_templates[body_idx % len(body_templates)] filled = self.fill_template(tmpl) w, e, v = self.count_sentence(filled) self.new_words += w self.new_entity_mentions += e self.new_variation_mentions += v body_idx += 1 sentences.append({"text": filled, "type": "body"}) count += 1 if self.targets_met(count >= min_sentences): break return { "sentences": sentences, "stats": { "total_sentences": count, "new_words": self.new_words, "new_entity_mentions": self.new_entity_mentions, "new_variation_mentions": self.new_variation_mentions, "new_distinct_entities_introduced": len(self.introduced), "introduced_entities": sorted(self.introduced), "new_h2_count": self.new_h2_count, "new_h3_count": self.new_h3_count, "projected_entity_density_pct": round( self.projected_density("entity") * 100, 2 ), "projected_variation_density_pct": round( self.projected_density("variation") * 100, 2 ), "target_entity_density_pct": round(self.target_entity_d * 100, 2), "target_variation_density_pct": round(self.target_variation_d * 100, 2), }, } # --------------------------------------------------------------------------- # Output formatting # --------------------------------------------------------------------------- def format_markdown(sentences: list[dict]) -> str: """Convert sentence list to markdown with test block markers.""" lines = ["", ""] paragraph = [] for s in sentences: if s["type"] in ("h2", "h3"): # Flush paragraph before heading if paragraph: lines.append(" ".join(paragraph)) lines.append("") paragraph = [] prefix = "##" if s["type"] == "h2" else "###" lines.append(f"{prefix} {s['text']}") lines.append("") else: paragraph.append(s["text"]) if len(paragraph) >= 4: lines.append(" ".join(paragraph)) lines.append("") paragraph = [] if paragraph: lines.append(" ".join(paragraph)) lines.append("") lines.append("") return "\n".join(lines) def format_html(sentences: list[dict]) -> str: """Convert sentence list to HTML with test block markers.""" lines = ["", ""] paragraph = [] for s in sentences: if s["type"] in ("h2", "h3"): if paragraph: lines.append("

" + " ".join(paragraph) + "

") lines.append("") paragraph = [] tag = "h2" if s["type"] == "h2" else "h3" lines.append(f"<{tag}>{s['text']}") lines.append("") else: paragraph.append(s["text"]) if len(paragraph) >= 4: lines.append("

" + " ".join(paragraph) + "

") lines.append("") paragraph = [] if paragraph: lines.append("

" + " ".join(paragraph) + "

") lines.append("") lines.append("") return "\n".join(lines) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Generate a test block from templates and deficit data.", ) parser.add_argument("templates_path", help="Path to templates file (one per line)") parser.add_argument("prep_json_path", help="Path to prep JSON from test_block_prep.py") parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report") parser.add_argument( "--entities-file", required=True, help="Path to LLM-curated entity list (one name per line)", ) parser.add_argument( "--output-dir", default="./working", help="Directory for output files (default: ./working)", ) parser.add_argument( "--min-sentences", type=int, default=5, help="Minimum sentences before checking stop condition (default: 5)", ) args = parser.parse_args() # Load inputs templates_path = Path(args.templates_path) if not templates_path.exists(): print(f"Error: templates file not found: {templates_path}", file=sys.stderr) sys.exit(1) templates = [ line.strip() for line in templates_path.read_text(encoding="utf-8").splitlines() if line.strip() ] prep_path = Path(args.prep_json_path) if not prep_path.exists(): print(f"Error: prep JSON not found: {prep_path}", file=sys.stderr) sys.exit(1) prep_data = json.loads(prep_path.read_text(encoding="utf-8")) # Load LLM-curated entity list filtered_entity_names = load_entity_names(args.entities_file) # Generate gen = TestBlockGenerator(args.cora_xlsx_path, prep_data, filtered_entity_names) result = gen.generate(templates, min_sentences=args.min_sentences) if "error" in result and result["error"]: print(f"Error: {result['error']}", file=sys.stderr) sys.exit(1) # Write outputs out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) md_path = out_dir / "test_block.md" html_path = out_dir / "test_block.html" stats_path = out_dir / "test_block_stats.json" md_content = format_markdown(result["sentences"]) html_content = format_html(result["sentences"]) md_path.write_text(md_content, encoding="utf-8") html_path.write_text(html_content, encoding="utf-8") stats_path.write_text( json.dumps(result["stats"], indent=2, default=str), encoding="utf-8" ) # Print summary stats = result["stats"] print(f"Test block generated:") print(f" Sentences: {stats['total_sentences']}") print(f" Words: {stats['new_words']}") print(f" Entity mentions: {stats['new_entity_mentions']}") print(f" Variation mentions: {stats['new_variation_mentions']}") print(f" New 0->1 entities: {stats['new_distinct_entities_introduced']}") print(f" Projected entity density: {stats['projected_entity_density_pct']}%" f" (target: {stats['target_entity_density_pct']}%)") print(f" Projected variation density: {stats['projected_variation_density_pct']}%" f" (target: {stats['target_variation_density_pct']}%)") print(f"\nFiles written:") print(f" {md_path}") print(f" {html_path}") print(f" {stats_path}") if __name__ == "__main__": main()