CheddahBot/.claude/skills/content-researcher/scripts/test_block_validate.py

379 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Test Block Validator — Before/After Comparison
Runs the same deficit analysis from test_block_prep.py on:
1. Existing content alone (before)
2. Existing content + test block (after)
Produces a deterministic comparison showing exactly how each metric changed.
Usage:
uv run --with openpyxl python test_block_validate.py <content_path> <test_block_path> <cora_xlsx_path>
[--format json|text] [--output PATH]
"""
import argparse
import json
import re
import sys
from pathlib import Path
from cora_parser import CoraReport
from test_block_prep import (
parse_scraper_content,
count_entity_mentions,
count_variation_mentions,
count_lsi_mentions,
count_terms_in_headings,
)
def extract_test_block_text(file_path: str) -> str:
"""Read test block file and return the text content.
Strips HTML tags and test block markers. Returns plain text for counting.
"""
text = Path(file_path).read_text(encoding="utf-8")
# Remove test block markers
text = text.replace("<!-- HIDDEN TEST BLOCK START -->", "")
text = text.replace("<!-- HIDDEN TEST BLOCK END -->", "")
# Remove HTML tags
text = re.sub(r"<[^>]+>", " ", text)
# Remove markdown heading markers
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE)
return text.strip()
def extract_test_block_headings(file_path: str) -> list[dict]:
"""Extract heading structure from test block (HTML or markdown)."""
text = Path(file_path).read_text(encoding="utf-8")
headings = []
# Try HTML headings first
for match in re.finditer(r"<h(\d)>(.+?)</h\d>", text, re.IGNORECASE):
headings.append({
"level": int(match.group(1)),
"text": match.group(2).strip(),
})
# If no HTML headings, try markdown
if not headings:
for match in re.finditer(r"^(#{1,6})\s+(.+)$", text, re.MULTILINE):
headings.append({
"level": len(match.group(1)),
"text": match.group(2).strip(),
})
return headings
def run_validation(
content_path: str,
test_block_path: str,
cora_xlsx_path: str,
) -> dict:
"""Run before/after validation.
Returns dict with: before, after, delta, targets, status.
"""
report = CoraReport(cora_xlsx_path)
entities = report.get_entities()
lsi_keywords = report.get_lsi_keywords()
variations_list = report.get_variations_list()
density_targets = report.get_density_targets()
content_targets = report.get_content_targets()
structure_targets = report.get_structure_targets()
word_count_dist = report.get_word_count_distribution()
# --- Parse existing content ---
parsed = parse_scraper_content(content_path)
existing_text = parsed["content"]
existing_headings = parsed["headings"]
# --- Parse test block ---
block_text = extract_test_block_text(test_block_path)
block_headings = extract_test_block_headings(test_block_path)
# --- Combined ---
combined_text = existing_text + "\n\n" + block_text
combined_headings = existing_headings + block_headings
# --- Count words ---
count_words = lambda t: len(re.findall(r"[a-zA-Z']+", t))
before_words = count_words(existing_text)
block_words = count_words(block_text)
after_words = count_words(combined_text)
# --- Count entities ---
before_ent = count_entity_mentions(existing_text, entities)
after_ent = count_entity_mentions(combined_text, entities)
# --- Count variations ---
before_var = count_variation_mentions(existing_text, variations_list)
after_var = count_variation_mentions(combined_text, variations_list)
# --- Count LSI ---
before_lsi = count_lsi_mentions(existing_text, lsi_keywords)
after_lsi = count_lsi_mentions(combined_text, lsi_keywords)
# --- Heading analysis ---
before_hdg = count_terms_in_headings(existing_headings, entities, variations_list)
after_hdg = count_terms_in_headings(combined_headings, entities, variations_list)
# --- Targets ---
tgt_entity_d = density_targets.get("entity_density", {}).get("avg") or 0
tgt_var_d = density_targets.get("variation_density", {}).get("avg") or 0
tgt_lsi_d = density_targets.get("lsi_density", {}).get("avg") or 0
distinct_target = content_targets.get("distinct_entities", {}).get("target", 0)
cluster_target = word_count_dist.get("cluster_target", 0)
wc_target = cluster_target if cluster_target else word_count_dist.get("average", 0)
h2_target = structure_targets.get("h2", {}).get("count", {}).get("target", 0)
h3_target = structure_targets.get("h3", {}).get("count", {}).get("target", 0)
# --- Build comparison ---
def density(mentions, words):
return mentions / words if words > 0 else 0
def pct(d):
return round(d * 100, 2)
# Find new 0->1 entities
new_entities = []
for name, after_count in after_ent["per_entity"].items():
before_count = before_ent["per_entity"].get(name, 0)
if before_count == 0 and after_count > 0:
new_entities.append(name)
before_h2 = len([h for h in existing_headings if h["level"] == 2])
after_h2 = len([h for h in combined_headings if h["level"] == 2])
before_h3 = len([h for h in existing_headings if h["level"] == 3])
after_h3 = len([h for h in combined_headings if h["level"] == 3])
return {
"search_term": report.get_search_term(),
"test_block_words": block_words,
"word_count": {
"before": before_words,
"after": after_words,
"target": wc_target,
"before_status": "meets" if before_words >= wc_target else "below",
"after_status": "meets" if after_words >= wc_target else "below",
},
"distinct_entities": {
"before": before_ent["distinct_count"],
"after": after_ent["distinct_count"],
"target": distinct_target,
"new_0_to_1": len(new_entities),
"new_entity_names": sorted(new_entities),
"before_status": "meets" if before_ent["distinct_count"] >= distinct_target else "below",
"after_status": "meets" if after_ent["distinct_count"] >= distinct_target else "below",
},
"entity_density": {
"before_pct": pct(density(before_ent["total_mentions"], before_words)),
"after_pct": pct(density(after_ent["total_mentions"], after_words)),
"target_pct": pct(tgt_entity_d),
"before_mentions": before_ent["total_mentions"],
"after_mentions": after_ent["total_mentions"],
"delta_mentions": after_ent["total_mentions"] - before_ent["total_mentions"],
"before_status": "meets" if density(before_ent["total_mentions"], before_words) >= tgt_entity_d else "below",
"after_status": "meets" if density(after_ent["total_mentions"], after_words) >= tgt_entity_d else "below",
},
"variation_density": {
"before_pct": pct(density(before_var["total_mentions"], before_words)),
"after_pct": pct(density(after_var["total_mentions"], after_words)),
"target_pct": pct(tgt_var_d),
"before_mentions": before_var["total_mentions"],
"after_mentions": after_var["total_mentions"],
"delta_mentions": after_var["total_mentions"] - before_var["total_mentions"],
"before_status": "meets" if density(before_var["total_mentions"], before_words) >= tgt_var_d else "below",
"after_status": "meets" if density(after_var["total_mentions"], after_words) >= tgt_var_d else "below",
},
"lsi_density": {
"before_pct": pct(density(before_lsi["total_mentions"], before_words)),
"after_pct": pct(density(after_lsi["total_mentions"], after_words)),
"target_pct": pct(tgt_lsi_d),
"before_mentions": before_lsi["total_mentions"],
"after_mentions": after_lsi["total_mentions"],
"delta_mentions": after_lsi["total_mentions"] - before_lsi["total_mentions"],
"before_status": "meets" if density(before_lsi["total_mentions"], before_words) >= tgt_lsi_d else "below",
"after_status": "meets" if density(after_lsi["total_mentions"], after_words) >= tgt_lsi_d else "below",
},
"headings": {
"h2": {
"before": before_h2,
"after": after_h2,
"target": h2_target,
},
"h3": {
"before": before_h3,
"after": after_h3,
"target": h3_target,
},
"entities_in_headings": {
"before": before_hdg["entity_mentions_total"],
"after": after_hdg["entity_mentions_total"],
},
"variations_in_headings": {
"before": before_hdg["variation_mentions_total"],
"after": after_hdg["variation_mentions_total"],
},
},
}
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def format_text_report(data: dict) -> str:
"""Format validation as a human-readable before/after comparison."""
lines = []
sep = "=" * 70
lines.append(sep)
lines.append(f" TEST BLOCK VALIDATION -- {data['search_term']}")
lines.append(f" Test block added {data['test_block_words']} words")
lines.append(sep)
lines.append("")
# Helper for status indicator
def status(s):
return "[OK]" if s == "meets" else "[!!]"
# Word count
wc = data["word_count"]
lines.append(f" {'METRIC':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10} {'STATUS':>8}")
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10} {'-'*8}")
lines.append(
f" {'Word count':<30} {wc['before']:>10} {wc['after']:>10} "
f"{wc['target']:>10} {status(wc['after_status']):>8}"
)
# Distinct entities
de = data["distinct_entities"]
lines.append(
f" {'Distinct entities':<30} {de['before']:>10} {de['after']:>10} "
f"{de['target']:>10} {status(de['after_status']):>8}"
)
# Entity density
ed = data["entity_density"]
lines.append(
f" {'Entity density %':<30} {ed['before_pct']:>9}% {ed['after_pct']:>9}% "
f"{ed['target_pct']:>9}% {status(ed['after_status']):>8}"
)
# Variation density
vd = data["variation_density"]
lines.append(
f" {'Variation density %':<30} {vd['before_pct']:>9}% {vd['after_pct']:>9}% "
f"{vd['target_pct']:>9}% {status(vd['after_status']):>8}"
)
# LSI density
ld = data["lsi_density"]
lines.append(
f" {'LSI density %':<30} {ld['before_pct']:>9}% {ld['after_pct']:>9}% "
f"{ld['target_pct']:>9}% {status(ld['after_status']):>8}"
)
lines.append("")
# Mention counts
lines.append(f" {'MENTION COUNTS':<30} {'BEFORE':>10} {'AFTER':>10} {'DELTA':>10}")
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}")
lines.append(
f" {'Entity mentions':<30} {ed['before_mentions']:>10} "
f"{ed['after_mentions']:>10} {'+' + str(ed['delta_mentions']):>10}"
)
lines.append(
f" {'Variation mentions':<30} {vd['before_mentions']:>10} "
f"{vd['after_mentions']:>10} {'+' + str(vd['delta_mentions']):>10}"
)
lines.append(
f" {'LSI mentions':<30} {ld['before_mentions']:>10} "
f"{ld['after_mentions']:>10} {'+' + str(ld['delta_mentions']):>10}"
)
lines.append("")
# Headings
hd = data["headings"]
lines.append(f" {'HEADINGS':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10}")
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}")
lines.append(f" {'H2 count':<30} {hd['h2']['before']:>10} {hd['h2']['after']:>10} {hd['h2']['target']:>10}")
lines.append(f" {'H3 count':<30} {hd['h3']['before']:>10} {hd['h3']['after']:>10} {hd['h3']['target']:>10}")
lines.append(
f" {'Entities in headings':<30} {hd['entities_in_headings']['before']:>10} "
f"{hd['entities_in_headings']['after']:>10}"
)
lines.append(
f" {'Variations in headings':<30} {hd['variations_in_headings']['before']:>10} "
f"{hd['variations_in_headings']['after']:>10}"
)
lines.append("")
# New entities
de = data["distinct_entities"]
if de["new_entity_names"]:
lines.append(f" NEW ENTITIES INTRODUCED (0->1): {de['new_0_to_1']}")
for name in de["new_entity_names"]:
lines.append(f" + {name}")
lines.append("")
lines.append(sep)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Validate a test block with before/after comparison.",
)
parser.add_argument("content_path", help="Path to existing content (scraper output)")
parser.add_argument("test_block_path", help="Path to test block (.md or .html)")
parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report")
parser.add_argument(
"--format", choices=["json", "text"], default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--output", "-o", default=None,
help="Write output to file instead of stdout",
)
args = parser.parse_args()
try:
data = run_validation(args.content_path, args.test_block_path, args.cora_xlsx_path)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if args.format == "json":
output = json.dumps(data, indent=2, default=str)
else:
output = format_text_report(data)
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
print(f"Written to {args.output}", file=sys.stderr)
else:
# Handle Windows encoding
try:
print(output)
except UnicodeEncodeError:
sys.stdout.buffer.write(output.encode("utf-8"))
if __name__ == "__main__":
main()