CheddahBot/.claude/skills/content-researcher/scripts/entity_optimizer.py

456 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Entity Optimizer — Cora Entity Analysis for Content Drafts
Counts Cora-defined entities in a markdown content draft and recommends
additions based on relevance and deficit data from a Cora XLSX report.
Usage:
uv run --with openpyxl python entity_optimizer.py <draft_path> <cora_xlsx_path> [--format json|text] [--top-n 30]
Options:
--format Output format: json or text (default: text)
--top-n Number of top recommendations to show (default: 30)
"""
import argparse
import json
import re
import sys
from pathlib import Path
from cora_parser import CoraReport
class EntityOptimizer:
"""Analyzes a content draft against Cora entity targets and recommends additions."""
def __init__(self, cora_xlsx_path: str):
"""Load entity targets from a Cora XLSX report.
Args:
cora_xlsx_path: Path to the Cora SEO XLSX file.
"""
self.report = CoraReport(cora_xlsx_path)
self.entities = self.report.get_entities()
self.search_term = self.report.get_search_term()
# Populated after analyze_draft() is called
self.draft_text = ""
self.sections = [] # list of {"heading": str, "level": int, "text": str}
self.entity_counts = {} # entity name -> {"total": int, "per_section": {heading: count}}
def analyze_draft(self, draft_path: str) -> dict:
"""Run a full analysis of a content draft against Cora entity targets.
Args:
draft_path: Path to a markdown content draft file.
Returns:
dict with keys: summary, entity_counts, deficits, recommendations, section_density
"""
path = Path(draft_path)
if not path.exists():
raise FileNotFoundError(f"Draft file not found: {draft_path}")
self.draft_text = path.read_text(encoding="utf-8")
self.sections = self._parse_sections(self.draft_text)
self.entity_counts = self.count_entities(self.draft_text)
deficits = self.calculate_deficits()
recommendations = self.recommend_additions()
section_density = self._section_density()
# Build summary stats
entities_found = sum(
1 for name, counts in self.entity_counts.items() if counts["total"] > 0
)
entities_with_deficit = sum(1 for d in deficits if d["remaining_deficit"] > 0)
summary = {
"search_term": self.search_term,
"total_entities_tracked": len(self.entities),
"entities_found_in_draft": entities_found,
"entities_with_deficit": entities_with_deficit,
"total_sections": len(self.sections),
}
return {
"summary": summary,
"entity_counts": self.entity_counts,
"deficits": deficits,
"recommendations": recommendations,
"section_density": section_density,
}
def count_entities(self, text: str) -> dict:
"""Count occurrences of each Cora entity in the text, total and per section.
Uses case-insensitive matching with word boundaries so partial matches
inside larger words are excluded.
Args:
text: The full draft text.
Returns:
dict mapping entity name to {"total": int, "per_section": {heading: int}}
"""
counts = {}
sections = self.sections if self.sections else self._parse_sections(text)
for entity in self.entities:
name = entity["name"]
pattern = re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE)
total = len(pattern.findall(text))
per_section = {}
for section in sections:
section_count = len(pattern.findall(section["text"]))
if section_count > 0:
per_section[section["heading"]] = section_count
counts[name] = {
"total": total,
"per_section": per_section,
}
return counts
def calculate_deficits(self) -> list[dict]:
"""Calculate which entities are still below their Cora deficit target.
Compares the count found in the draft against the deficit value from
the Cora report. An entity with a Cora deficit of 20 and a draft count
of 5 has a remaining deficit of 15.
Returns:
List of dicts with: name, relevance, correlation, cora_deficit,
draft_count, remaining_deficit — sorted by remaining_deficit descending.
"""
deficits = []
for entity in self.entities:
name = entity["name"]
cora_deficit = entity.get("deficit") or 0
draft_count = self.entity_counts.get(name, {}).get("total", 0)
remaining = max(0, cora_deficit - draft_count)
deficits.append({
"name": name,
"relevance": entity.get("relevance") or 0,
"correlation": entity.get("correlation") or 0,
"cora_deficit": cora_deficit,
"draft_count": draft_count,
"remaining_deficit": remaining,
})
deficits.sort(key=lambda d: d["remaining_deficit"], reverse=True)
return deficits
def recommend_additions(self) -> list[dict]:
"""Generate prioritized recommendations for entity additions.
Priority is calculated as relevance * remaining_deficit, so entities
that are both highly relevant and far below target rank highest.
Each recommendation includes suggested sections where the entity
could naturally be added, based on where related entities already appear.
Returns:
List of recommendation dicts sorted by priority descending. Each dict
has: name, relevance, correlation, cora_deficit, draft_count,
remaining_deficit, priority, suggested_sections.
"""
deficits = self.calculate_deficits()
recommendations = []
for deficit_entry in deficits:
if deficit_entry["remaining_deficit"] <= 0:
continue
relevance = deficit_entry["relevance"]
remaining = deficit_entry["remaining_deficit"]
priority = relevance * remaining
suggested = self._suggest_sections(deficit_entry["name"])
recommendations.append({
"name": deficit_entry["name"],
"relevance": relevance,
"correlation": deficit_entry["correlation"],
"cora_deficit": deficit_entry["cora_deficit"],
"draft_count": deficit_entry["draft_count"],
"remaining_deficit": remaining,
"priority": round(priority, 4),
"suggested_sections": suggested,
})
recommendations.sort(key=lambda r: r["priority"], reverse=True)
return recommendations
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _parse_sections(self, text: str) -> list[dict]:
"""Split markdown text into sections by headings.
Each section captures the heading text, heading level, and the body
text under that heading (up to the next heading of equal or higher level).
A virtual "Introduction" section is created for content before the first heading.
Returns:
list of {"heading": str, "level": int, "text": str}
"""
heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
matches = list(heading_pattern.finditer(text))
sections = []
# Content before the first heading becomes the Introduction section
if matches:
intro_text = text[:matches[0].start()].strip()
if intro_text:
sections.append({
"heading": "Introduction",
"level": 0,
"text": intro_text,
})
else:
# No headings at all — treat the entire text as one section
return [{
"heading": "Full Document",
"level": 0,
"text": text,
}]
for i, match in enumerate(matches):
level = len(match.group(1))
heading = match.group(2).strip()
start = match.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
body = text[start:end].strip()
sections.append({
"heading": heading,
"level": level,
"text": body,
})
return sections
def _suggest_sections(self, entity_name: str) -> list[str]:
"""Suggest sections where an entity could naturally be added.
Strategy: find sections that already contain other entities from the
same Cora report. Sections with higher concentrations of related
entities are better candidates because the topic is contextually aligned.
If no sections have related entities, return all non-empty sections
as general candidates.
Args:
entity_name: The entity to find placement for.
Returns:
List of section heading strings, ordered by relevance.
"""
if not self.sections:
return []
# Build a score for each section: count how many other entities appear there
section_scores = []
for section in self.sections:
heading = section["heading"]
other_entity_count = 0
for name, counts in self.entity_counts.items():
if name.lower() == entity_name.lower():
continue
if heading in counts.get("per_section", {}):
other_entity_count += counts["per_section"][heading]
if other_entity_count > 0:
section_scores.append((heading, other_entity_count))
# Sort by entity richness descending
section_scores.sort(key=lambda x: x[1], reverse=True)
if section_scores:
return [heading for heading, _score in section_scores]
# Fallback: return all sections with non-trivial content
return [
s["heading"]
for s in self.sections
if len(s["text"].split()) > 20
]
def _section_density(self) -> list[dict]:
"""Calculate per-section entity density.
Returns:
List of dicts with: heading, level, word_count, entities_found,
entity_mentions, density (mentions per 100 words).
"""
densities = []
for section in self.sections:
heading = section["heading"]
word_count = len(section["text"].split())
entities_found = 0
total_mentions = 0
for name, counts in self.entity_counts.items():
section_count = counts.get("per_section", {}).get(heading, 0)
if section_count > 0:
entities_found += 1
total_mentions += section_count
density = round((total_mentions / word_count) * 100, 2) if word_count > 0 else 0.0
densities.append({
"heading": heading,
"level": section["level"],
"word_count": word_count,
"entities_found": entities_found,
"entity_mentions": total_mentions,
"density_per_100_words": density,
})
return densities
# ------------------------------------------------------------------
# Output formatting
# ------------------------------------------------------------------
def format_text_report(analysis: dict, top_n: int = 30) -> str:
"""Format the analysis result as a human-readable text report."""
lines = []
summary = analysis["summary"]
# --- Header ---
lines.append("=" * 70)
lines.append(" ENTITY OPTIMIZATION REPORT")
if summary.get("search_term"):
lines.append(f" Target keyword: {summary['search_term']}")
lines.append("=" * 70)
lines.append("")
# --- Summary ---
lines.append("SUMMARY")
lines.append("-" * 40)
lines.append(f" Total entities tracked: {summary['total_entities_tracked']}")
lines.append(f" Entities found in draft: {summary['entities_found_in_draft']}")
lines.append(f" Entities with deficit: {summary['entities_with_deficit']}")
lines.append(f" Total sections in draft: {summary['total_sections']}")
lines.append("")
# --- Top Recommendations ---
recommendations = analysis["recommendations"]
shown = recommendations[:top_n]
lines.append(f"TOP {min(top_n, len(recommendations))} RECOMMENDATIONS (sorted by priority)")
lines.append("-" * 70)
if not shown:
lines.append(" No entity deficits found — the draft covers all targets.")
else:
for i, rec in enumerate(shown, 1):
sections_str = ", ".join(rec["suggested_sections"][:3]) if rec["suggested_sections"] else "any section"
lines.append(
f" {i:>3}. Entity '{rec['name']}' found {rec['draft_count']} times, "
f"target deficit is {rec['cora_deficit']}. "
f"Remaining: {rec['remaining_deficit']}. "
f"Priority: {rec['priority']}"
)
lines.append(
f" Relevance: {rec['relevance']} | Correlation: {rec['correlation']}"
)
lines.append(
f" Suggested sections: [{sections_str}]"
)
lines.append("")
# --- Per-Section Entity Density ---
lines.append("PER-SECTION ENTITY DENSITY")
lines.append("-" * 70)
lines.append(f" {'Section':<40} {'Words':>6} {'Entities':>9} {'Mentions':>9} {'Density':>8}")
lines.append(f" {'-' * 40} {'-' * 6} {'-' * 9} {'-' * 9} {'-' * 8}")
for sd in analysis["section_density"]:
indent = " " * sd["level"] if sd["level"] > 0 else ""
heading_display = indent + sd["heading"]
if len(heading_display) > 38:
heading_display = heading_display[:35] + "..."
lines.append(
f" {heading_display:<40} {sd['word_count']:>6} {sd['entities_found']:>9} "
f"{sd['entity_mentions']:>9} {sd['density_per_100_words']:>7.2f}%"
)
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def format_json_report(analysis: dict, top_n: int = 30) -> str:
"""Format the analysis result as machine-readable JSON."""
output = {
"summary": analysis["summary"],
"recommendations": analysis["recommendations"][:top_n],
"section_density": analysis["section_density"],
"entity_counts": analysis["entity_counts"],
"deficits": analysis["deficits"],
}
return json.dumps(output, indent=2, default=str)
# ------------------------------------------------------------------
# CLI entry point
# ------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Analyze a content draft against Cora entity targets and recommend additions.",
usage="uv run --with openpyxl python entity_optimizer.py <draft_path> <cora_xlsx_path> [options]",
)
parser.add_argument(
"draft_path",
help="Path to the markdown content draft",
)
parser.add_argument(
"cora_xlsx_path",
help="Path to the Cora SEO XLSX report",
)
parser.add_argument(
"--format",
choices=["json", "text"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--top-n",
type=int,
default=30,
help="Number of top recommendations to display (default: 30)",
)
args = parser.parse_args()
try:
optimizer = EntityOptimizer(args.cora_xlsx_path)
analysis = optimizer.analyze_draft(args.draft_path)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error analyzing draft: {e}", file=sys.stderr)
sys.exit(1)
if args.format == "json":
print(format_json_report(analysis, top_n=args.top_n))
else:
print(format_text_report(analysis, top_n=args.top_n))
if __name__ == "__main__":
main()