#!/usr/bin/env python3 """ Test Block Validator — Before/After Comparison Runs the same deficit analysis from test_block_prep.py on: 1. Existing content alone (before) 2. Existing content + test block (after) Produces a deterministic comparison showing exactly how each metric changed. Usage: uv run --with openpyxl python test_block_validate.py [--format json|text] [--output PATH] """ import argparse import json import re import sys from pathlib import Path from cora_parser import CoraReport from test_block_prep import ( parse_scraper_content, count_entity_mentions, count_variation_mentions, count_lsi_mentions, count_terms_in_headings, ) def extract_test_block_text(file_path: str) -> str: """Read test block file and return the text content. Strips HTML tags and test block markers. Returns plain text for counting. """ text = Path(file_path).read_text(encoding="utf-8") # Remove test block markers text = text.replace("", "") text = text.replace("", "") # Remove HTML tags text = re.sub(r"<[^>]+>", " ", text) # Remove markdown heading markers text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE) return text.strip() def extract_test_block_headings(file_path: str) -> list[dict]: """Extract heading structure from test block (HTML or markdown).""" text = Path(file_path).read_text(encoding="utf-8") headings = [] # Try HTML headings first for match in re.finditer(r"(.+?)", text, re.IGNORECASE): headings.append({ "level": int(match.group(1)), "text": match.group(2).strip(), }) # If no HTML headings, try markdown if not headings: for match in re.finditer(r"^(#{1,6})\s+(.+)$", text, re.MULTILINE): headings.append({ "level": len(match.group(1)), "text": match.group(2).strip(), }) return headings def run_validation( content_path: str, test_block_path: str, cora_xlsx_path: str, ) -> dict: """Run before/after validation. Returns dict with: before, after, delta, targets, status. """ report = CoraReport(cora_xlsx_path) entities = report.get_entities() lsi_keywords = report.get_lsi_keywords() variations_list = report.get_variations_list() density_targets = report.get_density_targets() content_targets = report.get_content_targets() structure_targets = report.get_structure_targets() word_count_dist = report.get_word_count_distribution() # --- Parse existing content --- parsed = parse_scraper_content(content_path) existing_text = parsed["content"] existing_headings = parsed["headings"] # --- Parse test block --- block_text = extract_test_block_text(test_block_path) block_headings = extract_test_block_headings(test_block_path) # --- Combined --- combined_text = existing_text + "\n\n" + block_text combined_headings = existing_headings + block_headings # --- Count words --- count_words = lambda t: len(re.findall(r"[a-zA-Z']+", t)) before_words = count_words(existing_text) block_words = count_words(block_text) after_words = count_words(combined_text) # --- Count entities --- before_ent = count_entity_mentions(existing_text, entities) after_ent = count_entity_mentions(combined_text, entities) # --- Count variations --- before_var = count_variation_mentions(existing_text, variations_list) after_var = count_variation_mentions(combined_text, variations_list) # --- Count LSI --- before_lsi = count_lsi_mentions(existing_text, lsi_keywords) after_lsi = count_lsi_mentions(combined_text, lsi_keywords) # --- Heading analysis --- before_hdg = count_terms_in_headings(existing_headings, entities, variations_list) after_hdg = count_terms_in_headings(combined_headings, entities, variations_list) # --- Targets --- tgt_entity_d = density_targets.get("entity_density", {}).get("avg") or 0 tgt_var_d = density_targets.get("variation_density", {}).get("avg") or 0 tgt_lsi_d = density_targets.get("lsi_density", {}).get("avg") or 0 distinct_target = content_targets.get("distinct_entities", {}).get("target", 0) cluster_target = word_count_dist.get("cluster_target", 0) wc_target = cluster_target if cluster_target else word_count_dist.get("average", 0) h2_target = structure_targets.get("h2", {}).get("count", {}).get("target", 0) h3_target = structure_targets.get("h3", {}).get("count", {}).get("target", 0) # --- Build comparison --- def density(mentions, words): return mentions / words if words > 0 else 0 def pct(d): return round(d * 100, 2) # Find new 0->1 entities new_entities = [] for name, after_count in after_ent["per_entity"].items(): before_count = before_ent["per_entity"].get(name, 0) if before_count == 0 and after_count > 0: new_entities.append(name) before_h2 = len([h for h in existing_headings if h["level"] == 2]) after_h2 = len([h for h in combined_headings if h["level"] == 2]) before_h3 = len([h for h in existing_headings if h["level"] == 3]) after_h3 = len([h for h in combined_headings if h["level"] == 3]) return { "search_term": report.get_search_term(), "test_block_words": block_words, "word_count": { "before": before_words, "after": after_words, "target": wc_target, "before_status": "meets" if before_words >= wc_target else "below", "after_status": "meets" if after_words >= wc_target else "below", }, "distinct_entities": { "before": before_ent["distinct_count"], "after": after_ent["distinct_count"], "target": distinct_target, "new_0_to_1": len(new_entities), "new_entity_names": sorted(new_entities), "before_status": "meets" if before_ent["distinct_count"] >= distinct_target else "below", "after_status": "meets" if after_ent["distinct_count"] >= distinct_target else "below", }, "entity_density": { "before_pct": pct(density(before_ent["total_mentions"], before_words)), "after_pct": pct(density(after_ent["total_mentions"], after_words)), "target_pct": pct(tgt_entity_d), "before_mentions": before_ent["total_mentions"], "after_mentions": after_ent["total_mentions"], "delta_mentions": after_ent["total_mentions"] - before_ent["total_mentions"], "before_status": "meets" if density(before_ent["total_mentions"], before_words) >= tgt_entity_d else "below", "after_status": "meets" if density(after_ent["total_mentions"], after_words) >= tgt_entity_d else "below", }, "variation_density": { "before_pct": pct(density(before_var["total_mentions"], before_words)), "after_pct": pct(density(after_var["total_mentions"], after_words)), "target_pct": pct(tgt_var_d), "before_mentions": before_var["total_mentions"], "after_mentions": after_var["total_mentions"], "delta_mentions": after_var["total_mentions"] - before_var["total_mentions"], "before_status": "meets" if density(before_var["total_mentions"], before_words) >= tgt_var_d else "below", "after_status": "meets" if density(after_var["total_mentions"], after_words) >= tgt_var_d else "below", }, "lsi_density": { "before_pct": pct(density(before_lsi["total_mentions"], before_words)), "after_pct": pct(density(after_lsi["total_mentions"], after_words)), "target_pct": pct(tgt_lsi_d), "before_mentions": before_lsi["total_mentions"], "after_mentions": after_lsi["total_mentions"], "delta_mentions": after_lsi["total_mentions"] - before_lsi["total_mentions"], "before_status": "meets" if density(before_lsi["total_mentions"], before_words) >= tgt_lsi_d else "below", "after_status": "meets" if density(after_lsi["total_mentions"], after_words) >= tgt_lsi_d else "below", }, "headings": { "h2": { "before": before_h2, "after": after_h2, "target": h2_target, }, "h3": { "before": before_h3, "after": after_h3, "target": h3_target, }, "entities_in_headings": { "before": before_hdg["entity_mentions_total"], "after": after_hdg["entity_mentions_total"], }, "variations_in_headings": { "before": before_hdg["variation_mentions_total"], "after": after_hdg["variation_mentions_total"], }, }, } # --------------------------------------------------------------------------- # Output formatting # --------------------------------------------------------------------------- def format_text_report(data: dict) -> str: """Format validation as a human-readable before/after comparison.""" lines = [] sep = "=" * 70 lines.append(sep) lines.append(f" TEST BLOCK VALIDATION -- {data['search_term']}") lines.append(f" Test block added {data['test_block_words']} words") lines.append(sep) lines.append("") # Helper for status indicator def status(s): return "[OK]" if s == "meets" else "[!!]" # Word count wc = data["word_count"] lines.append(f" {'METRIC':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10} {'STATUS':>8}") lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10} {'-'*8}") lines.append( f" {'Word count':<30} {wc['before']:>10} {wc['after']:>10} " f"{wc['target']:>10} {status(wc['after_status']):>8}" ) # Distinct entities de = data["distinct_entities"] lines.append( f" {'Distinct entities':<30} {de['before']:>10} {de['after']:>10} " f"{de['target']:>10} {status(de['after_status']):>8}" ) # Entity density ed = data["entity_density"] lines.append( f" {'Entity density %':<30} {ed['before_pct']:>9}% {ed['after_pct']:>9}% " f"{ed['target_pct']:>9}% {status(ed['after_status']):>8}" ) # Variation density vd = data["variation_density"] lines.append( f" {'Variation density %':<30} {vd['before_pct']:>9}% {vd['after_pct']:>9}% " f"{vd['target_pct']:>9}% {status(vd['after_status']):>8}" ) # LSI density ld = data["lsi_density"] lines.append( f" {'LSI density %':<30} {ld['before_pct']:>9}% {ld['after_pct']:>9}% " f"{ld['target_pct']:>9}% {status(ld['after_status']):>8}" ) lines.append("") # Mention counts lines.append(f" {'MENTION COUNTS':<30} {'BEFORE':>10} {'AFTER':>10} {'DELTA':>10}") lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}") lines.append( f" {'Entity mentions':<30} {ed['before_mentions']:>10} " f"{ed['after_mentions']:>10} {'+' + str(ed['delta_mentions']):>10}" ) lines.append( f" {'Variation mentions':<30} {vd['before_mentions']:>10} " f"{vd['after_mentions']:>10} {'+' + str(vd['delta_mentions']):>10}" ) lines.append( f" {'LSI mentions':<30} {ld['before_mentions']:>10} " f"{ld['after_mentions']:>10} {'+' + str(ld['delta_mentions']):>10}" ) lines.append("") # Headings hd = data["headings"] lines.append(f" {'HEADINGS':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10}") lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}") lines.append(f" {'H2 count':<30} {hd['h2']['before']:>10} {hd['h2']['after']:>10} {hd['h2']['target']:>10}") lines.append(f" {'H3 count':<30} {hd['h3']['before']:>10} {hd['h3']['after']:>10} {hd['h3']['target']:>10}") lines.append( f" {'Entities in headings':<30} {hd['entities_in_headings']['before']:>10} " f"{hd['entities_in_headings']['after']:>10}" ) lines.append( f" {'Variations in headings':<30} {hd['variations_in_headings']['before']:>10} " f"{hd['variations_in_headings']['after']:>10}" ) lines.append("") # New entities de = data["distinct_entities"] if de["new_entity_names"]: lines.append(f" NEW ENTITIES INTRODUCED (0->1): {de['new_0_to_1']}") for name in de["new_entity_names"]: lines.append(f" + {name}") lines.append("") lines.append(sep) return "\n".join(lines) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Validate a test block with before/after comparison.", ) parser.add_argument("content_path", help="Path to existing content (scraper output)") parser.add_argument("test_block_path", help="Path to test block (.md or .html)") parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report") parser.add_argument( "--format", choices=["json", "text"], default="text", help="Output format (default: text)", ) parser.add_argument( "--output", "-o", default=None, help="Write output to file instead of stdout", ) args = parser.parse_args() try: data = run_validation(args.content_path, args.test_block_path, args.cora_xlsx_path) except FileNotFoundError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if args.format == "json": output = json.dumps(data, indent=2, default=str) else: output = format_text_report(data) if args.output: Path(args.output).write_text(output, encoding="utf-8") print(f"Written to {args.output}", file=sys.stderr) else: # Handle Windows encoding try: print(output) except UnicodeEncodeError: sys.stdout.buffer.write(output.encode("utf-8")) if __name__ == "__main__": main()