#!/usr/bin/env python3 """ Test Block Prep — Extract Deficit Data for Test Block Generation Reads existing content (from competitor_scraper.py output or plain text) and a Cora XLSX report, then calculates all deficit metrics needed to programmatically generate a test block. Outputs structured JSON with: - Word count vs target + deficit - Distinct entity count vs target + deficit + list of missing entities - Variation density vs target + deficit (Cora row 46) - Entity density vs target + deficit (Cora row 47) - LSI density vs target + deficit (Cora row 48) - Heading structure deficits - Template generation instructions (slots per sentence, sentence count, etc.) Usage: uv run --with openpyxl python test_block_prep.py [--format json|text] """ import argparse import json import math import re import sys from pathlib import Path from cora_parser import CoraReport # --------------------------------------------------------------------------- # Content parsing # --------------------------------------------------------------------------- def parse_scraper_content(file_path: str) -> dict: """Parse a competitor_scraper.py output file or plain text/markdown. Returns dict with: headings, content, word_count, title, meta_description. """ text = Path(file_path).read_text(encoding="utf-8") result = { "headings": [], "content": "", "word_count": 0, "title": "", "meta_description": "", } if "--- HEADINGS ---" in text and "--- CONTENT ---" in text: headings_start = text.index("--- HEADINGS ---") content_start = text.index("--- CONTENT ---") # Parse metadata metadata = text[:headings_start] for line in metadata.splitlines(): if line.startswith("Title: "): result["title"] = line[7:].strip() elif line.startswith("Meta Description: "): result["meta_description"] = line[18:].strip() # Parse headings headings_text = text[headings_start + len("--- HEADINGS ---"):content_start].strip() for line in headings_text.splitlines(): line = line.strip() match = re.match(r"H(\d):\s+(.+)", line) if match: result["headings"].append({ "level": int(match.group(1)), "text": match.group(2).strip(), }) # Parse content result["content"] = text[content_start + len("--- CONTENT ---"):].strip() else: # Plain text/markdown result["content"] = text.strip() for match in re.finditer(r"^(#{1,6})\s+(.+)$", text, re.MULTILINE): result["headings"].append({ "level": len(match.group(1)), "text": match.group(2).strip(), }) words = re.findall(r"[a-zA-Z']+", result["content"]) result["word_count"] = len(words) return result # --------------------------------------------------------------------------- # Counting functions # --------------------------------------------------------------------------- def count_entity_mentions(text: str, entities: list[dict]) -> dict: """Count mentions of each Cora entity in text. Returns: per_entity dict, total_mentions, distinct_count. """ per_entity = {} total_mentions = 0 distinct_count = 0 for entity in entities: name = entity["name"] pattern = re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE) count = len(pattern.findall(text)) per_entity[name] = count total_mentions += count if count > 0: distinct_count += 1 return { "per_entity": per_entity, "total_mentions": total_mentions, "distinct_count": distinct_count, } def count_variation_mentions(text: str, variations: list[str]) -> dict: """Count mentions of each keyword variation in text. Returns: per_variation dict, total_mentions. """ per_variation = {} total_mentions = 0 for var in variations: pattern = re.compile(r"\b" + re.escape(var) + r"\b", re.IGNORECASE) count = len(pattern.findall(text)) per_variation[var] = count total_mentions += count return { "per_variation": per_variation, "total_mentions": total_mentions, } def count_lsi_mentions(text: str, lsi_keywords: list[dict]) -> dict: """Count mentions of each LSI keyword in text. Returns: per_keyword dict, total_mentions, distinct_count. """ per_keyword = {} total_mentions = 0 distinct_count = 0 for kw_data in lsi_keywords: keyword = kw_data["keyword"] tokens = keyword.strip().split() escaped = [re.escape(t) for t in tokens] pattern_str = r"\b" + r"\s+".join(escaped) + r"\b" pattern = re.compile(pattern_str, re.IGNORECASE) count = len(pattern.findall(text)) per_keyword[keyword] = count total_mentions += count if count > 0: distinct_count += 1 return { "per_keyword": per_keyword, "total_mentions": total_mentions, "distinct_count": distinct_count, } def count_terms_in_headings( headings: list[dict], entities: list[dict], variations: list[str], ) -> dict: """Count entity and variation mentions in heading text. Returns total counts and per-level breakdown. """ all_heading_text = " ".join(h["text"] for h in headings) entity_mentions = 0 for entity in entities: pattern = re.compile(r"\b" + re.escape(entity["name"]) + r"\b", re.IGNORECASE) entity_mentions += len(pattern.findall(all_heading_text)) variation_mentions = 0 for var in variations: pattern = re.compile(r"\b" + re.escape(var) + r"\b", re.IGNORECASE) variation_mentions += len(pattern.findall(all_heading_text)) per_level = {} for level in [2, 3]: level_headings = [h for h in headings if h["level"] == level] level_text = " ".join(h["text"] for h in level_headings) lev_entity = 0 for entity in entities: pattern = re.compile(r"\b" + re.escape(entity["name"]) + r"\b", re.IGNORECASE) lev_entity += len(pattern.findall(level_text)) lev_var = 0 for var in variations: pattern = re.compile(r"\b" + re.escape(var) + r"\b", re.IGNORECASE) lev_var += len(pattern.findall(level_text)) per_level[f"h{level}"] = { "count": len(level_headings), "entity_mentions": lev_entity, "variation_mentions": lev_var, } return { "entity_mentions_total": entity_mentions, "variation_mentions_total": variation_mentions, "per_level": per_level, } # --------------------------------------------------------------------------- # Template instruction calculation # --------------------------------------------------------------------------- def calculate_template_instructions( current_words: int, current_entity_mentions: int, current_variation_mentions: int, target_entity_density: float, target_variation_density: float, distinct_entity_deficit: int, word_count_deficit: int, ) -> dict: """Calculate template parameters for the generator script. Figures out how many words the test block needs, how many slots per sentence, and how many sentences — so the LLM knows what to generate. """ AVG_WORDS_PER_SENTENCE = 15 MAX_SLOTS = 5 MIN_SLOTS = 2 current_entity_density = current_entity_mentions / current_words if current_words > 0 else 0 current_variation_density = current_variation_mentions / current_words if current_words > 0 else 0 # Minimum test block size from word count deficit min_words = max(word_count_deficit, 150) # Calculate minimum words needed to close entity density gap entity_deficit_pct = target_entity_density - current_entity_density if entity_deficit_pct > 0: # At max internal density (MAX_SLOTS / AVG_WORDS), how many words? max_internal = MAX_SLOTS / AVG_WORDS_PER_SENTENCE if max_internal > target_entity_density: needed = (target_entity_density * current_words - current_entity_mentions) words_for_entity = math.ceil(needed / (max_internal - target_entity_density)) min_words = max(min_words, words_for_entity) # Same for variation density gap var_deficit_pct = target_variation_density - current_variation_density if var_deficit_pct > 0: max_internal = MAX_SLOTS / AVG_WORDS_PER_SENTENCE if max_internal > target_variation_density: needed = (target_variation_density * current_words - current_variation_mentions) words_for_var = math.ceil(needed / (max_internal - target_variation_density)) min_words = max(min_words, words_for_var) # If only distinct entities are deficit (densities met), smaller block if entity_deficit_pct <= 0 and var_deficit_pct <= 0 and distinct_entity_deficit > 0: min_words = max(150, distinct_entity_deficit * AVG_WORDS_PER_SENTENCE) # Round up to nearest 50 target_words = math.ceil(max(min_words, 150) / 50) * 50 # Required entity mentions in test block if target_entity_density > 0: total_needed = math.ceil(target_entity_density * (current_words + target_words)) entity_mentions_needed = max(0, total_needed - current_entity_mentions) else: entity_mentions_needed = max(distinct_entity_deficit, 0) # Required variation mentions in test block if target_variation_density > 0: total_needed = math.ceil(target_variation_density * (current_words + target_words)) variation_mentions_needed = max(0, total_needed - current_variation_mentions) else: variation_mentions_needed = 0 # Derive slots per sentence target_sentences = max(1, math.ceil(target_words / AVG_WORDS_PER_SENTENCE)) total_slots = entity_mentions_needed + variation_mentions_needed # Overlapping terms count toward both, so reduce estimate total_slots = max(total_slots, entity_mentions_needed) slots_per_sentence = math.ceil(total_slots / target_sentences) if target_sentences > 0 else MIN_SLOTS slots_per_sentence = max(MIN_SLOTS, min(MAX_SLOTS, slots_per_sentence)) # Number of templates: derived from two factors # 1. Word deficit: how many sentences to fill the word gap word_driven = math.ceil(target_words / AVG_WORDS_PER_SENTENCE) # 2. Entity deficit: how many sentences to introduce all missing entities entity_driven = math.ceil(distinct_entity_deficit / slots_per_sentence) if slots_per_sentence > 0 else 0 num_templates = max(word_driven, entity_driven, 5) return { "target_word_count": target_words, "num_templates": num_templates, "num_templates_reason": "word_deficit" if word_driven >= entity_driven else "entity_deficit", "slots_per_sentence": slots_per_sentence, "avg_words_per_template": AVG_WORDS_PER_SENTENCE, "entity_mentions_needed": entity_mentions_needed, "variation_mentions_needed": variation_mentions_needed, "rationale": ( f"Need ~{entity_mentions_needed} entity mentions and " f"~{variation_mentions_needed} variation mentions " f"across ~{target_words} words. " f"Templates: {num_templates} (driven by {'word deficit' if word_driven >= entity_driven else 'entity deficit'}), " f"{slots_per_sentence} slots each." ), } # --------------------------------------------------------------------------- # Main prep function # --------------------------------------------------------------------------- def run_prep(content_path: str, cora_xlsx_path: str) -> dict: """Run the full test block prep analysis.""" report = CoraReport(cora_xlsx_path) entities = report.get_entities() lsi_keywords = report.get_lsi_keywords() variations_list = report.get_variations_list() density_targets = report.get_density_targets() content_targets = report.get_content_targets() structure_targets = report.get_structure_targets() word_count_dist = report.get_word_count_distribution() # Parse existing content parsed = parse_scraper_content(content_path) content_text = parsed["content"] current_words = parsed["word_count"] headings = parsed["headings"] # --- Word count --- cluster_target = word_count_dist.get("cluster_target", 0) wc_target = cluster_target if cluster_target else word_count_dist.get("average", 0) wc_deficit = max(0, wc_target - current_words) # --- Entity counts --- entity_data = count_entity_mentions(content_text, entities) distinct_target = content_targets.get("distinct_entities", {}).get("target", 0) distinct_deficit = max(0, distinct_target - entity_data["distinct_count"]) # Missing entities (0 count, sorted by relevance) missing_entities = [] for entity in entities: if entity_data["per_entity"].get(entity["name"], 0) == 0: missing_entities.append({ "name": entity["name"], "relevance": entity.get("relevance") or 0, "type": entity.get("type", ""), }) missing_entities.sort(key=lambda e: e["relevance"], reverse=True) # --- Variation counts --- variation_data = count_variation_mentions(content_text, variations_list) # --- LSI counts --- lsi_data = count_lsi_mentions(content_text, lsi_keywords) # --- Density calculations --- cur_entity_d = entity_data["total_mentions"] / current_words if current_words else 0 cur_var_d = variation_data["total_mentions"] / current_words if current_words else 0 cur_lsi_d = lsi_data["total_mentions"] / current_words if current_words else 0 tgt_entity_d = density_targets.get("entity_density", {}).get("avg") or 0 tgt_var_d = density_targets.get("variation_density", {}).get("avg") or 0 tgt_lsi_d = density_targets.get("lsi_density", {}).get("avg") or 0 # --- Heading analysis --- heading_data = count_terms_in_headings(headings, entities, variations_list) h2_target = structure_targets.get("h2", {}).get("count", {}).get("target", 0) h3_target = structure_targets.get("h3", {}).get("count", {}).get("target", 0) h2_current = heading_data["per_level"].get("h2", {}).get("count", 0) h3_current = heading_data["per_level"].get("h3", {}).get("count", 0) all_h_var_target = structure_targets.get("all_h_tags", {}).get("variations", {}).get("target", 0) all_h_ent_target = structure_targets.get("all_h_tags", {}).get("entities", {}).get("target", 0) # --- Template instructions --- template_inst = calculate_template_instructions( current_words=current_words, current_entity_mentions=entity_data["total_mentions"], current_variation_mentions=variation_data["total_mentions"], target_entity_density=tgt_entity_d, target_variation_density=tgt_var_d, distinct_entity_deficit=distinct_deficit, word_count_deficit=wc_deficit, ) return { "search_term": report.get_search_term(), "content_file": content_path, "word_count": { "current": current_words, "target": wc_target, "deficit": wc_deficit, "status": "meets_target" if wc_deficit == 0 else "below_target", }, "distinct_entities": { "current": entity_data["distinct_count"], "target": distinct_target, "deficit": distinct_deficit, "total_tracked": len(entities), "missing_entities": missing_entities, }, "entity_density": { "current_pct": round(cur_entity_d * 100, 2), "target_pct": round(tgt_entity_d * 100, 2), "deficit_pct": round(max(0, tgt_entity_d - cur_entity_d) * 100, 2), "current_mentions": entity_data["total_mentions"], "target_decimal": tgt_entity_d, "current_decimal": cur_entity_d, "status": "meets_target" if cur_entity_d >= tgt_entity_d else "below_target", }, "variation_density": { "current_pct": round(cur_var_d * 100, 2), "target_pct": round(tgt_var_d * 100, 2), "deficit_pct": round(max(0, tgt_var_d - cur_var_d) * 100, 2), "current_mentions": variation_data["total_mentions"], "target_decimal": tgt_var_d, "current_decimal": cur_var_d, "status": "meets_target" if cur_var_d >= tgt_var_d else "below_target", }, "lsi_density": { "current_pct": round(cur_lsi_d * 100, 2), "target_pct": round(tgt_lsi_d * 100, 2), "deficit_pct": round(max(0, tgt_lsi_d - cur_lsi_d) * 100, 2), "current_mentions": lsi_data["total_mentions"], "target_decimal": tgt_lsi_d, "current_decimal": cur_lsi_d, "status": "meets_target" if cur_lsi_d >= tgt_lsi_d else "below_target", }, "headings": { "h2": { "current": h2_current, "target": h2_target, "deficit": max(0, h2_target - h2_current), }, "h3": { "current": h3_current, "target": h3_target, "deficit": max(0, h3_target - h3_current), }, "variations_in_headings": { "current": heading_data["variation_mentions_total"], "target": all_h_var_target, "deficit": max(0, all_h_var_target - heading_data["variation_mentions_total"]), }, "entities_in_headings": { "current": heading_data["entity_mentions_total"], "target": all_h_ent_target, "deficit": max(0, all_h_ent_target - heading_data["entity_mentions_total"]), }, }, "template_instructions": template_inst, } # --------------------------------------------------------------------------- # Output formatting # --------------------------------------------------------------------------- def format_text_report(data: dict) -> str: """Format prep data as a human-readable text report.""" lines = [] sep = "=" * 65 lines.append(sep) lines.append(f" TEST BLOCK PREP — {data['search_term']}") lines.append(sep) lines.append("") # Word count wc = data["word_count"] lines.append("WORD COUNT") lines.append(f" Current: {wc['current']} | Target: {wc['target']} | Deficit: {wc['deficit']} [{wc['status']}]") lines.append("") # Distinct entities de = data["distinct_entities"] lines.append("DISTINCT ENTITIES") lines.append(f" Current: {de['current']} | Target: {de['target']} | Deficit: {de['deficit']} (of {de['total_tracked']} tracked)") if de["missing_entities"]: lines.append(f" Top missing (0->1):") for ent in de["missing_entities"][:15]: lines.append(f" - {ent['name']} (relevance: {ent['relevance']}, type: {ent['type']})") remaining = len(de["missing_entities"]) - 15 if remaining > 0: lines.append(f" ... and {remaining} more") lines.append("") # Entity density ed = data["entity_density"] lines.append("ENTITY DENSITY (Cora row 47)") lines.append(f" Current: {ed['current_pct']}% | Target: {ed['target_pct']}% | Deficit: {ed['deficit_pct']}% [{ed['status']}]") lines.append(f" Current mentions: {ed['current_mentions']}") lines.append("") # Variation density vd = data["variation_density"] lines.append("VARIATION DENSITY (Cora row 46)") lines.append(f" Current: {vd['current_pct']}% | Target: {vd['target_pct']}% | Deficit: {vd['deficit_pct']}% [{vd['status']}]") lines.append(f" Current mentions: {vd['current_mentions']}") lines.append("") # LSI density ld = data["lsi_density"] lines.append("LSI DENSITY (Cora row 48)") lines.append(f" Current: {ld['current_pct']}% | Target: {ld['target_pct']}% | Deficit: {ld['deficit_pct']}% [{ld['status']}]") lines.append(f" Current mentions: {ld['current_mentions']}") lines.append("") # Headings hd = data["headings"] lines.append("HEADING DEFICITS") lines.append(f" H2: {hd['h2']['current']} current / {hd['h2']['target']} target -- deficit {hd['h2']['deficit']}") lines.append(f" H3: {hd['h3']['current']} current / {hd['h3']['target']} target -- deficit {hd['h3']['deficit']}") lines.append(f" Variations in headings: {hd['variations_in_headings']['current']} / {hd['variations_in_headings']['target']} -- deficit {hd['variations_in_headings']['deficit']}") lines.append(f" Entities in headings: {hd['entities_in_headings']['current']} / {hd['entities_in_headings']['target']} -- deficit {hd['entities_in_headings']['deficit']}") lines.append("") # Template instructions ti = data["template_instructions"] lines.append("TEMPLATE INSTRUCTIONS") lines.append(f" {ti['rationale']}") lines.append(f" >> Generate {ti['num_templates']} templates, ~{ti['avg_words_per_template']} words each, {ti['slots_per_sentence']} slots per template") lines.append("") lines.append(sep) return "\n".join(lines) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Extract deficit data for test block generation.", ) parser.add_argument("content_path", help="Path to scraper output or content file") parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report") parser.add_argument( "--format", choices=["json", "text"], default="text", help="Output format (default: text)", ) parser.add_argument( "--output", "-o", default=None, help="Write output to file instead of stdout", ) args = parser.parse_args() try: data = run_prep(args.content_path, args.cora_xlsx_path) except FileNotFoundError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if args.format == "json": output = json.dumps(data, indent=2, default=str) else: output = format_text_report(data) if args.output: Path(args.output).write_text(output, encoding="utf-8") print(f"Written to {args.output}", file=sys.stderr) else: print(output) if __name__ == "__main__": main()