379 lines
14 KiB
Python
379 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test Block Validator — Before/After Comparison
|
|
|
|
Runs the same deficit analysis from test_block_prep.py on:
|
|
1. Existing content alone (before)
|
|
2. Existing content + test block (after)
|
|
|
|
Produces a deterministic comparison showing exactly how each metric changed.
|
|
|
|
Usage:
|
|
uv run --with openpyxl python test_block_validate.py <content_path> <test_block_path> <cora_xlsx_path>
|
|
[--format json|text] [--output PATH]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from cora_parser import CoraReport
|
|
from test_block_prep import (
|
|
parse_scraper_content,
|
|
count_entity_mentions,
|
|
count_variation_mentions,
|
|
count_lsi_mentions,
|
|
count_terms_in_headings,
|
|
)
|
|
|
|
|
|
def extract_test_block_text(file_path: str) -> str:
|
|
"""Read test block file and return the text content.
|
|
|
|
Strips HTML tags and test block markers. Returns plain text for counting.
|
|
"""
|
|
text = Path(file_path).read_text(encoding="utf-8")
|
|
|
|
# Remove test block markers
|
|
text = text.replace("<!-- HIDDEN TEST BLOCK START -->", "")
|
|
text = text.replace("<!-- HIDDEN TEST BLOCK END -->", "")
|
|
|
|
# Remove HTML tags
|
|
text = re.sub(r"<[^>]+>", " ", text)
|
|
|
|
# Remove markdown heading markers
|
|
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE)
|
|
|
|
return text.strip()
|
|
|
|
|
|
def extract_test_block_headings(file_path: str) -> list[dict]:
|
|
"""Extract heading structure from test block (HTML or markdown)."""
|
|
text = Path(file_path).read_text(encoding="utf-8")
|
|
headings = []
|
|
|
|
# Try HTML headings first
|
|
for match in re.finditer(r"<h(\d)>(.+?)</h\d>", text, re.IGNORECASE):
|
|
headings.append({
|
|
"level": int(match.group(1)),
|
|
"text": match.group(2).strip(),
|
|
})
|
|
|
|
# If no HTML headings, try markdown
|
|
if not headings:
|
|
for match in re.finditer(r"^(#{1,6})\s+(.+)$", text, re.MULTILINE):
|
|
headings.append({
|
|
"level": len(match.group(1)),
|
|
"text": match.group(2).strip(),
|
|
})
|
|
|
|
return headings
|
|
|
|
|
|
def run_validation(
|
|
content_path: str,
|
|
test_block_path: str,
|
|
cora_xlsx_path: str,
|
|
) -> dict:
|
|
"""Run before/after validation.
|
|
|
|
Returns dict with: before, after, delta, targets, status.
|
|
"""
|
|
report = CoraReport(cora_xlsx_path)
|
|
entities = report.get_entities()
|
|
lsi_keywords = report.get_lsi_keywords()
|
|
variations_list = report.get_variations_list()
|
|
density_targets = report.get_density_targets()
|
|
content_targets = report.get_content_targets()
|
|
structure_targets = report.get_structure_targets()
|
|
word_count_dist = report.get_word_count_distribution()
|
|
|
|
# --- Parse existing content ---
|
|
parsed = parse_scraper_content(content_path)
|
|
existing_text = parsed["content"]
|
|
existing_headings = parsed["headings"]
|
|
|
|
# --- Parse test block ---
|
|
block_text = extract_test_block_text(test_block_path)
|
|
block_headings = extract_test_block_headings(test_block_path)
|
|
|
|
# --- Combined ---
|
|
combined_text = existing_text + "\n\n" + block_text
|
|
combined_headings = existing_headings + block_headings
|
|
|
|
# --- Count words ---
|
|
count_words = lambda t: len(re.findall(r"[a-zA-Z']+", t))
|
|
before_words = count_words(existing_text)
|
|
block_words = count_words(block_text)
|
|
after_words = count_words(combined_text)
|
|
|
|
# --- Count entities ---
|
|
before_ent = count_entity_mentions(existing_text, entities)
|
|
after_ent = count_entity_mentions(combined_text, entities)
|
|
|
|
# --- Count variations ---
|
|
before_var = count_variation_mentions(existing_text, variations_list)
|
|
after_var = count_variation_mentions(combined_text, variations_list)
|
|
|
|
# --- Count LSI ---
|
|
before_lsi = count_lsi_mentions(existing_text, lsi_keywords)
|
|
after_lsi = count_lsi_mentions(combined_text, lsi_keywords)
|
|
|
|
# --- Heading analysis ---
|
|
before_hdg = count_terms_in_headings(existing_headings, entities, variations_list)
|
|
after_hdg = count_terms_in_headings(combined_headings, entities, variations_list)
|
|
|
|
# --- Targets ---
|
|
tgt_entity_d = density_targets.get("entity_density", {}).get("avg") or 0
|
|
tgt_var_d = density_targets.get("variation_density", {}).get("avg") or 0
|
|
tgt_lsi_d = density_targets.get("lsi_density", {}).get("avg") or 0
|
|
distinct_target = content_targets.get("distinct_entities", {}).get("target", 0)
|
|
cluster_target = word_count_dist.get("cluster_target", 0)
|
|
wc_target = cluster_target if cluster_target else word_count_dist.get("average", 0)
|
|
|
|
h2_target = structure_targets.get("h2", {}).get("count", {}).get("target", 0)
|
|
h3_target = structure_targets.get("h3", {}).get("count", {}).get("target", 0)
|
|
|
|
# --- Build comparison ---
|
|
def density(mentions, words):
|
|
return mentions / words if words > 0 else 0
|
|
|
|
def pct(d):
|
|
return round(d * 100, 2)
|
|
|
|
# Find new 0->1 entities
|
|
new_entities = []
|
|
for name, after_count in after_ent["per_entity"].items():
|
|
before_count = before_ent["per_entity"].get(name, 0)
|
|
if before_count == 0 and after_count > 0:
|
|
new_entities.append(name)
|
|
|
|
before_h2 = len([h for h in existing_headings if h["level"] == 2])
|
|
after_h2 = len([h for h in combined_headings if h["level"] == 2])
|
|
before_h3 = len([h for h in existing_headings if h["level"] == 3])
|
|
after_h3 = len([h for h in combined_headings if h["level"] == 3])
|
|
|
|
return {
|
|
"search_term": report.get_search_term(),
|
|
"test_block_words": block_words,
|
|
"word_count": {
|
|
"before": before_words,
|
|
"after": after_words,
|
|
"target": wc_target,
|
|
"before_status": "meets" if before_words >= wc_target else "below",
|
|
"after_status": "meets" if after_words >= wc_target else "below",
|
|
},
|
|
"distinct_entities": {
|
|
"before": before_ent["distinct_count"],
|
|
"after": after_ent["distinct_count"],
|
|
"target": distinct_target,
|
|
"new_0_to_1": len(new_entities),
|
|
"new_entity_names": sorted(new_entities),
|
|
"before_status": "meets" if before_ent["distinct_count"] >= distinct_target else "below",
|
|
"after_status": "meets" if after_ent["distinct_count"] >= distinct_target else "below",
|
|
},
|
|
"entity_density": {
|
|
"before_pct": pct(density(before_ent["total_mentions"], before_words)),
|
|
"after_pct": pct(density(after_ent["total_mentions"], after_words)),
|
|
"target_pct": pct(tgt_entity_d),
|
|
"before_mentions": before_ent["total_mentions"],
|
|
"after_mentions": after_ent["total_mentions"],
|
|
"delta_mentions": after_ent["total_mentions"] - before_ent["total_mentions"],
|
|
"before_status": "meets" if density(before_ent["total_mentions"], before_words) >= tgt_entity_d else "below",
|
|
"after_status": "meets" if density(after_ent["total_mentions"], after_words) >= tgt_entity_d else "below",
|
|
},
|
|
"variation_density": {
|
|
"before_pct": pct(density(before_var["total_mentions"], before_words)),
|
|
"after_pct": pct(density(after_var["total_mentions"], after_words)),
|
|
"target_pct": pct(tgt_var_d),
|
|
"before_mentions": before_var["total_mentions"],
|
|
"after_mentions": after_var["total_mentions"],
|
|
"delta_mentions": after_var["total_mentions"] - before_var["total_mentions"],
|
|
"before_status": "meets" if density(before_var["total_mentions"], before_words) >= tgt_var_d else "below",
|
|
"after_status": "meets" if density(after_var["total_mentions"], after_words) >= tgt_var_d else "below",
|
|
},
|
|
"lsi_density": {
|
|
"before_pct": pct(density(before_lsi["total_mentions"], before_words)),
|
|
"after_pct": pct(density(after_lsi["total_mentions"], after_words)),
|
|
"target_pct": pct(tgt_lsi_d),
|
|
"before_mentions": before_lsi["total_mentions"],
|
|
"after_mentions": after_lsi["total_mentions"],
|
|
"delta_mentions": after_lsi["total_mentions"] - before_lsi["total_mentions"],
|
|
"before_status": "meets" if density(before_lsi["total_mentions"], before_words) >= tgt_lsi_d else "below",
|
|
"after_status": "meets" if density(after_lsi["total_mentions"], after_words) >= tgt_lsi_d else "below",
|
|
},
|
|
"headings": {
|
|
"h2": {
|
|
"before": before_h2,
|
|
"after": after_h2,
|
|
"target": h2_target,
|
|
},
|
|
"h3": {
|
|
"before": before_h3,
|
|
"after": after_h3,
|
|
"target": h3_target,
|
|
},
|
|
"entities_in_headings": {
|
|
"before": before_hdg["entity_mentions_total"],
|
|
"after": after_hdg["entity_mentions_total"],
|
|
},
|
|
"variations_in_headings": {
|
|
"before": before_hdg["variation_mentions_total"],
|
|
"after": after_hdg["variation_mentions_total"],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def format_text_report(data: dict) -> str:
|
|
"""Format validation as a human-readable before/after comparison."""
|
|
lines = []
|
|
sep = "=" * 70
|
|
|
|
lines.append(sep)
|
|
lines.append(f" TEST BLOCK VALIDATION -- {data['search_term']}")
|
|
lines.append(f" Test block added {data['test_block_words']} words")
|
|
lines.append(sep)
|
|
lines.append("")
|
|
|
|
# Helper for status indicator
|
|
def status(s):
|
|
return "[OK]" if s == "meets" else "[!!]"
|
|
|
|
# Word count
|
|
wc = data["word_count"]
|
|
lines.append(f" {'METRIC':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10} {'STATUS':>8}")
|
|
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10} {'-'*8}")
|
|
|
|
lines.append(
|
|
f" {'Word count':<30} {wc['before']:>10} {wc['after']:>10} "
|
|
f"{wc['target']:>10} {status(wc['after_status']):>8}"
|
|
)
|
|
|
|
# Distinct entities
|
|
de = data["distinct_entities"]
|
|
lines.append(
|
|
f" {'Distinct entities':<30} {de['before']:>10} {de['after']:>10} "
|
|
f"{de['target']:>10} {status(de['after_status']):>8}"
|
|
)
|
|
|
|
# Entity density
|
|
ed = data["entity_density"]
|
|
lines.append(
|
|
f" {'Entity density %':<30} {ed['before_pct']:>9}% {ed['after_pct']:>9}% "
|
|
f"{ed['target_pct']:>9}% {status(ed['after_status']):>8}"
|
|
)
|
|
|
|
# Variation density
|
|
vd = data["variation_density"]
|
|
lines.append(
|
|
f" {'Variation density %':<30} {vd['before_pct']:>9}% {vd['after_pct']:>9}% "
|
|
f"{vd['target_pct']:>9}% {status(vd['after_status']):>8}"
|
|
)
|
|
|
|
# LSI density
|
|
ld = data["lsi_density"]
|
|
lines.append(
|
|
f" {'LSI density %':<30} {ld['before_pct']:>9}% {ld['after_pct']:>9}% "
|
|
f"{ld['target_pct']:>9}% {status(ld['after_status']):>8}"
|
|
)
|
|
|
|
lines.append("")
|
|
|
|
# Mention counts
|
|
lines.append(f" {'MENTION COUNTS':<30} {'BEFORE':>10} {'AFTER':>10} {'DELTA':>10}")
|
|
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}")
|
|
lines.append(
|
|
f" {'Entity mentions':<30} {ed['before_mentions']:>10} "
|
|
f"{ed['after_mentions']:>10} {'+' + str(ed['delta_mentions']):>10}"
|
|
)
|
|
lines.append(
|
|
f" {'Variation mentions':<30} {vd['before_mentions']:>10} "
|
|
f"{vd['after_mentions']:>10} {'+' + str(vd['delta_mentions']):>10}"
|
|
)
|
|
lines.append(
|
|
f" {'LSI mentions':<30} {ld['before_mentions']:>10} "
|
|
f"{ld['after_mentions']:>10} {'+' + str(ld['delta_mentions']):>10}"
|
|
)
|
|
lines.append("")
|
|
|
|
# Headings
|
|
hd = data["headings"]
|
|
lines.append(f" {'HEADINGS':<30} {'BEFORE':>10} {'AFTER':>10} {'TARGET':>10}")
|
|
lines.append(f" {'-'*30} {'-'*10} {'-'*10} {'-'*10}")
|
|
lines.append(f" {'H2 count':<30} {hd['h2']['before']:>10} {hd['h2']['after']:>10} {hd['h2']['target']:>10}")
|
|
lines.append(f" {'H3 count':<30} {hd['h3']['before']:>10} {hd['h3']['after']:>10} {hd['h3']['target']:>10}")
|
|
lines.append(
|
|
f" {'Entities in headings':<30} {hd['entities_in_headings']['before']:>10} "
|
|
f"{hd['entities_in_headings']['after']:>10}"
|
|
)
|
|
lines.append(
|
|
f" {'Variations in headings':<30} {hd['variations_in_headings']['before']:>10} "
|
|
f"{hd['variations_in_headings']['after']:>10}"
|
|
)
|
|
lines.append("")
|
|
|
|
# New entities
|
|
de = data["distinct_entities"]
|
|
if de["new_entity_names"]:
|
|
lines.append(f" NEW ENTITIES INTRODUCED (0->1): {de['new_0_to_1']}")
|
|
for name in de["new_entity_names"]:
|
|
lines.append(f" + {name}")
|
|
lines.append("")
|
|
lines.append(sep)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate a test block with before/after comparison.",
|
|
)
|
|
parser.add_argument("content_path", help="Path to existing content (scraper output)")
|
|
parser.add_argument("test_block_path", help="Path to test block (.md or .html)")
|
|
parser.add_argument("cora_xlsx_path", help="Path to Cora XLSX report")
|
|
parser.add_argument(
|
|
"--format", choices=["json", "text"], default="text",
|
|
help="Output format (default: text)",
|
|
)
|
|
parser.add_argument(
|
|
"--output", "-o", default=None,
|
|
help="Write output to file instead of stdout",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
data = run_validation(args.content_path, args.test_block_path, args.cora_xlsx_path)
|
|
except FileNotFoundError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.format == "json":
|
|
output = json.dumps(data, indent=2, default=str)
|
|
else:
|
|
output = format_text_report(data)
|
|
|
|
if args.output:
|
|
Path(args.output).write_text(output, encoding="utf-8")
|
|
print(f"Written to {args.output}", file=sys.stderr)
|
|
else:
|
|
# Handle Windows encoding
|
|
try:
|
|
print(output)
|
|
except UnicodeEncodeError:
|
|
sys.stdout.buffer.write(output.encode("utf-8"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|