456 lines
16 KiB
Python
456 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Entity Optimizer — Cora Entity Analysis for Content Drafts
|
|
|
|
Counts Cora-defined entities in a markdown content draft and recommends
|
|
additions based on relevance and deficit data from a Cora XLSX report.
|
|
|
|
Usage:
|
|
uv run --with openpyxl python entity_optimizer.py <draft_path> <cora_xlsx_path> [--format json|text] [--top-n 30]
|
|
|
|
Options:
|
|
--format Output format: json or text (default: text)
|
|
--top-n Number of top recommendations to show (default: 30)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from cora_parser import CoraReport
|
|
|
|
|
|
class EntityOptimizer:
|
|
"""Analyzes a content draft against Cora entity targets and recommends additions."""
|
|
|
|
def __init__(self, cora_xlsx_path: str):
|
|
"""Load entity targets from a Cora XLSX report.
|
|
|
|
Args:
|
|
cora_xlsx_path: Path to the Cora SEO XLSX file.
|
|
"""
|
|
self.report = CoraReport(cora_xlsx_path)
|
|
self.entities = self.report.get_entities()
|
|
self.search_term = self.report.get_search_term()
|
|
|
|
# Populated after analyze_draft() is called
|
|
self.draft_text = ""
|
|
self.sections = [] # list of {"heading": str, "level": int, "text": str}
|
|
self.entity_counts = {} # entity name -> {"total": int, "per_section": {heading: count}}
|
|
|
|
def analyze_draft(self, draft_path: str) -> dict:
|
|
"""Run a full analysis of a content draft against Cora entity targets.
|
|
|
|
Args:
|
|
draft_path: Path to a markdown content draft file.
|
|
|
|
Returns:
|
|
dict with keys: summary, entity_counts, deficits, recommendations, section_density
|
|
"""
|
|
path = Path(draft_path)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Draft file not found: {draft_path}")
|
|
|
|
self.draft_text = path.read_text(encoding="utf-8")
|
|
self.sections = self._parse_sections(self.draft_text)
|
|
self.entity_counts = self.count_entities(self.draft_text)
|
|
deficits = self.calculate_deficits()
|
|
recommendations = self.recommend_additions()
|
|
section_density = self._section_density()
|
|
|
|
# Build summary stats
|
|
entities_found = sum(
|
|
1 for name, counts in self.entity_counts.items() if counts["total"] > 0
|
|
)
|
|
entities_with_deficit = sum(1 for d in deficits if d["remaining_deficit"] > 0)
|
|
|
|
summary = {
|
|
"search_term": self.search_term,
|
|
"total_entities_tracked": len(self.entities),
|
|
"entities_found_in_draft": entities_found,
|
|
"entities_with_deficit": entities_with_deficit,
|
|
"total_sections": len(self.sections),
|
|
}
|
|
|
|
return {
|
|
"summary": summary,
|
|
"entity_counts": self.entity_counts,
|
|
"deficits": deficits,
|
|
"recommendations": recommendations,
|
|
"section_density": section_density,
|
|
}
|
|
|
|
def count_entities(self, text: str) -> dict:
|
|
"""Count occurrences of each Cora entity in the text, total and per section.
|
|
|
|
Uses case-insensitive matching with word boundaries so partial matches
|
|
inside larger words are excluded.
|
|
|
|
Args:
|
|
text: The full draft text.
|
|
|
|
Returns:
|
|
dict mapping entity name to {"total": int, "per_section": {heading: int}}
|
|
"""
|
|
counts = {}
|
|
sections = self.sections if self.sections else self._parse_sections(text)
|
|
|
|
for entity in self.entities:
|
|
name = entity["name"]
|
|
pattern = re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE)
|
|
|
|
total = len(pattern.findall(text))
|
|
|
|
per_section = {}
|
|
for section in sections:
|
|
section_count = len(pattern.findall(section["text"]))
|
|
if section_count > 0:
|
|
per_section[section["heading"]] = section_count
|
|
|
|
counts[name] = {
|
|
"total": total,
|
|
"per_section": per_section,
|
|
}
|
|
|
|
return counts
|
|
|
|
def calculate_deficits(self) -> list[dict]:
|
|
"""Calculate which entities are still below their Cora deficit target.
|
|
|
|
Compares the count found in the draft against the deficit value from
|
|
the Cora report. An entity with a Cora deficit of 20 and a draft count
|
|
of 5 has a remaining deficit of 15.
|
|
|
|
Returns:
|
|
List of dicts with: name, relevance, correlation, cora_deficit,
|
|
draft_count, remaining_deficit — sorted by remaining_deficit descending.
|
|
"""
|
|
deficits = []
|
|
for entity in self.entities:
|
|
name = entity["name"]
|
|
cora_deficit = entity.get("deficit") or 0
|
|
draft_count = self.entity_counts.get(name, {}).get("total", 0)
|
|
remaining = max(0, cora_deficit - draft_count)
|
|
|
|
deficits.append({
|
|
"name": name,
|
|
"relevance": entity.get("relevance") or 0,
|
|
"correlation": entity.get("correlation") or 0,
|
|
"cora_deficit": cora_deficit,
|
|
"draft_count": draft_count,
|
|
"remaining_deficit": remaining,
|
|
})
|
|
|
|
deficits.sort(key=lambda d: d["remaining_deficit"], reverse=True)
|
|
return deficits
|
|
|
|
def recommend_additions(self) -> list[dict]:
|
|
"""Generate prioritized recommendations for entity additions.
|
|
|
|
Priority is calculated as relevance * remaining_deficit, so entities
|
|
that are both highly relevant and far below target rank highest.
|
|
Each recommendation includes suggested sections where the entity
|
|
could naturally be added, based on where related entities already appear.
|
|
|
|
Returns:
|
|
List of recommendation dicts sorted by priority descending. Each dict
|
|
has: name, relevance, correlation, cora_deficit, draft_count,
|
|
remaining_deficit, priority, suggested_sections.
|
|
"""
|
|
deficits = self.calculate_deficits()
|
|
recommendations = []
|
|
|
|
for deficit_entry in deficits:
|
|
if deficit_entry["remaining_deficit"] <= 0:
|
|
continue
|
|
|
|
relevance = deficit_entry["relevance"]
|
|
remaining = deficit_entry["remaining_deficit"]
|
|
priority = relevance * remaining
|
|
|
|
suggested = self._suggest_sections(deficit_entry["name"])
|
|
|
|
recommendations.append({
|
|
"name": deficit_entry["name"],
|
|
"relevance": relevance,
|
|
"correlation": deficit_entry["correlation"],
|
|
"cora_deficit": deficit_entry["cora_deficit"],
|
|
"draft_count": deficit_entry["draft_count"],
|
|
"remaining_deficit": remaining,
|
|
"priority": round(priority, 4),
|
|
"suggested_sections": suggested,
|
|
})
|
|
|
|
recommendations.sort(key=lambda r: r["priority"], reverse=True)
|
|
return recommendations
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _parse_sections(self, text: str) -> list[dict]:
|
|
"""Split markdown text into sections by headings.
|
|
|
|
Each section captures the heading text, heading level, and the body
|
|
text under that heading (up to the next heading of equal or higher level).
|
|
|
|
A virtual "Introduction" section is created for content before the first heading.
|
|
|
|
Returns:
|
|
list of {"heading": str, "level": int, "text": str}
|
|
"""
|
|
heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
|
|
matches = list(heading_pattern.finditer(text))
|
|
|
|
sections = []
|
|
|
|
# Content before the first heading becomes the Introduction section
|
|
if matches:
|
|
intro_text = text[:matches[0].start()].strip()
|
|
if intro_text:
|
|
sections.append({
|
|
"heading": "Introduction",
|
|
"level": 0,
|
|
"text": intro_text,
|
|
})
|
|
else:
|
|
# No headings at all — treat the entire text as one section
|
|
return [{
|
|
"heading": "Full Document",
|
|
"level": 0,
|
|
"text": text,
|
|
}]
|
|
|
|
for i, match in enumerate(matches):
|
|
level = len(match.group(1))
|
|
heading = match.group(2).strip()
|
|
start = match.end()
|
|
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
|
|
body = text[start:end].strip()
|
|
|
|
sections.append({
|
|
"heading": heading,
|
|
"level": level,
|
|
"text": body,
|
|
})
|
|
|
|
return sections
|
|
|
|
def _suggest_sections(self, entity_name: str) -> list[str]:
|
|
"""Suggest sections where an entity could naturally be added.
|
|
|
|
Strategy: find sections that already contain other entities from the
|
|
same Cora report. Sections with higher concentrations of related
|
|
entities are better candidates because the topic is contextually aligned.
|
|
|
|
If no sections have related entities, return all non-empty sections
|
|
as general candidates.
|
|
|
|
Args:
|
|
entity_name: The entity to find placement for.
|
|
|
|
Returns:
|
|
List of section heading strings, ordered by relevance.
|
|
"""
|
|
if not self.sections:
|
|
return []
|
|
|
|
# Build a score for each section: count how many other entities appear there
|
|
section_scores = []
|
|
for section in self.sections:
|
|
heading = section["heading"]
|
|
other_entity_count = 0
|
|
for name, counts in self.entity_counts.items():
|
|
if name.lower() == entity_name.lower():
|
|
continue
|
|
if heading in counts.get("per_section", {}):
|
|
other_entity_count += counts["per_section"][heading]
|
|
|
|
if other_entity_count > 0:
|
|
section_scores.append((heading, other_entity_count))
|
|
|
|
# Sort by entity richness descending
|
|
section_scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
if section_scores:
|
|
return [heading for heading, _score in section_scores]
|
|
|
|
# Fallback: return all sections with non-trivial content
|
|
return [
|
|
s["heading"]
|
|
for s in self.sections
|
|
if len(s["text"].split()) > 20
|
|
]
|
|
|
|
def _section_density(self) -> list[dict]:
|
|
"""Calculate per-section entity density.
|
|
|
|
Returns:
|
|
List of dicts with: heading, level, word_count, entities_found,
|
|
entity_mentions, density (mentions per 100 words).
|
|
"""
|
|
densities = []
|
|
for section in self.sections:
|
|
heading = section["heading"]
|
|
word_count = len(section["text"].split())
|
|
entities_found = 0
|
|
total_mentions = 0
|
|
|
|
for name, counts in self.entity_counts.items():
|
|
section_count = counts.get("per_section", {}).get(heading, 0)
|
|
if section_count > 0:
|
|
entities_found += 1
|
|
total_mentions += section_count
|
|
|
|
density = round((total_mentions / word_count) * 100, 2) if word_count > 0 else 0.0
|
|
|
|
densities.append({
|
|
"heading": heading,
|
|
"level": section["level"],
|
|
"word_count": word_count,
|
|
"entities_found": entities_found,
|
|
"entity_mentions": total_mentions,
|
|
"density_per_100_words": density,
|
|
})
|
|
|
|
return densities
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Output formatting
|
|
# ------------------------------------------------------------------
|
|
|
|
def format_text_report(analysis: dict, top_n: int = 30) -> str:
|
|
"""Format the analysis result as a human-readable text report."""
|
|
lines = []
|
|
summary = analysis["summary"]
|
|
|
|
# --- Header ---
|
|
lines.append("=" * 70)
|
|
lines.append(" ENTITY OPTIMIZATION REPORT")
|
|
if summary.get("search_term"):
|
|
lines.append(f" Target keyword: {summary['search_term']}")
|
|
lines.append("=" * 70)
|
|
lines.append("")
|
|
|
|
# --- Summary ---
|
|
lines.append("SUMMARY")
|
|
lines.append("-" * 40)
|
|
lines.append(f" Total entities tracked: {summary['total_entities_tracked']}")
|
|
lines.append(f" Entities found in draft: {summary['entities_found_in_draft']}")
|
|
lines.append(f" Entities with deficit: {summary['entities_with_deficit']}")
|
|
lines.append(f" Total sections in draft: {summary['total_sections']}")
|
|
lines.append("")
|
|
|
|
# --- Top Recommendations ---
|
|
recommendations = analysis["recommendations"]
|
|
shown = recommendations[:top_n]
|
|
|
|
lines.append(f"TOP {min(top_n, len(recommendations))} RECOMMENDATIONS (sorted by priority)")
|
|
lines.append("-" * 70)
|
|
|
|
if not shown:
|
|
lines.append(" No entity deficits found — the draft covers all targets.")
|
|
else:
|
|
for i, rec in enumerate(shown, 1):
|
|
sections_str = ", ".join(rec["suggested_sections"][:3]) if rec["suggested_sections"] else "any section"
|
|
lines.append(
|
|
f" {i:>3}. Entity '{rec['name']}' found {rec['draft_count']} times, "
|
|
f"target deficit is {rec['cora_deficit']}. "
|
|
f"Remaining: {rec['remaining_deficit']}. "
|
|
f"Priority: {rec['priority']}"
|
|
)
|
|
lines.append(
|
|
f" Relevance: {rec['relevance']} | Correlation: {rec['correlation']}"
|
|
)
|
|
lines.append(
|
|
f" Suggested sections: [{sections_str}]"
|
|
)
|
|
lines.append("")
|
|
|
|
# --- Per-Section Entity Density ---
|
|
lines.append("PER-SECTION ENTITY DENSITY")
|
|
lines.append("-" * 70)
|
|
lines.append(f" {'Section':<40} {'Words':>6} {'Entities':>9} {'Mentions':>9} {'Density':>8}")
|
|
lines.append(f" {'-' * 40} {'-' * 6} {'-' * 9} {'-' * 9} {'-' * 8}")
|
|
|
|
for sd in analysis["section_density"]:
|
|
indent = " " * sd["level"] if sd["level"] > 0 else ""
|
|
heading_display = indent + sd["heading"]
|
|
if len(heading_display) > 38:
|
|
heading_display = heading_display[:35] + "..."
|
|
lines.append(
|
|
f" {heading_display:<40} {sd['word_count']:>6} {sd['entities_found']:>9} "
|
|
f"{sd['entity_mentions']:>9} {sd['density_per_100_words']:>7.2f}%"
|
|
)
|
|
|
|
lines.append("")
|
|
lines.append("=" * 70)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_json_report(analysis: dict, top_n: int = 30) -> str:
|
|
"""Format the analysis result as machine-readable JSON."""
|
|
output = {
|
|
"summary": analysis["summary"],
|
|
"recommendations": analysis["recommendations"][:top_n],
|
|
"section_density": analysis["section_density"],
|
|
"entity_counts": analysis["entity_counts"],
|
|
"deficits": analysis["deficits"],
|
|
}
|
|
return json.dumps(output, indent=2, default=str)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze a content draft against Cora entity targets and recommend additions.",
|
|
usage="uv run --with openpyxl python entity_optimizer.py <draft_path> <cora_xlsx_path> [options]",
|
|
)
|
|
parser.add_argument(
|
|
"draft_path",
|
|
help="Path to the markdown content draft",
|
|
)
|
|
parser.add_argument(
|
|
"cora_xlsx_path",
|
|
help="Path to the Cora SEO XLSX report",
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["json", "text"],
|
|
default="text",
|
|
help="Output format (default: text)",
|
|
)
|
|
parser.add_argument(
|
|
"--top-n",
|
|
type=int,
|
|
default=30,
|
|
help="Number of top recommendations to display (default: 30)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
optimizer = EntityOptimizer(args.cora_xlsx_path)
|
|
analysis = optimizer.analyze_draft(args.draft_path)
|
|
except FileNotFoundError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Error analyzing draft: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.format == "json":
|
|
print(format_json_report(analysis, top_n=args.top_n))
|
|
else:
|
|
print(format_text_report(analysis, top_n=args.top_n))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|