#!/usr/bin/env python3 """ Entity Optimizer — Cora Entity Analysis for Content Drafts Counts Cora-defined entities in a markdown content draft and recommends additions based on relevance and deficit data from a Cora XLSX report. Usage: uv run --with openpyxl python entity_optimizer.py [--format json|text] [--top-n 30] Options: --format Output format: json or text (default: text) --top-n Number of top recommendations to show (default: 30) """ import argparse import json import re import sys from pathlib import Path from cora_parser import CoraReport class EntityOptimizer: """Analyzes a content draft against Cora entity targets and recommends additions.""" def __init__(self, cora_xlsx_path: str): """Load entity targets from a Cora XLSX report. Args: cora_xlsx_path: Path to the Cora SEO XLSX file. """ self.report = CoraReport(cora_xlsx_path) self.entities = self.report.get_entities() self.search_term = self.report.get_search_term() # Populated after analyze_draft() is called self.draft_text = "" self.sections = [] # list of {"heading": str, "level": int, "text": str} self.entity_counts = {} # entity name -> {"total": int, "per_section": {heading: count}} def analyze_draft(self, draft_path: str) -> dict: """Run a full analysis of a content draft against Cora entity targets. Args: draft_path: Path to a markdown content draft file. Returns: dict with keys: summary, entity_counts, deficits, recommendations, section_density """ path = Path(draft_path) if not path.exists(): raise FileNotFoundError(f"Draft file not found: {draft_path}") self.draft_text = path.read_text(encoding="utf-8") self.sections = self._parse_sections(self.draft_text) self.entity_counts = self.count_entities(self.draft_text) deficits = self.calculate_deficits() recommendations = self.recommend_additions() section_density = self._section_density() # Build summary stats entities_found = sum( 1 for name, counts in self.entity_counts.items() if counts["total"] > 0 ) entities_with_deficit = sum(1 for d in deficits if d["remaining_deficit"] > 0) summary = { "search_term": self.search_term, "total_entities_tracked": len(self.entities), "entities_found_in_draft": entities_found, "entities_with_deficit": entities_with_deficit, "total_sections": len(self.sections), } return { "summary": summary, "entity_counts": self.entity_counts, "deficits": deficits, "recommendations": recommendations, "section_density": section_density, } def count_entities(self, text: str) -> dict: """Count occurrences of each Cora entity in the text, total and per section. Uses case-insensitive matching with word boundaries so partial matches inside larger words are excluded. Args: text: The full draft text. Returns: dict mapping entity name to {"total": int, "per_section": {heading: int}} """ counts = {} sections = self.sections if self.sections else self._parse_sections(text) for entity in self.entities: name = entity["name"] pattern = re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE) total = len(pattern.findall(text)) per_section = {} for section in sections: section_count = len(pattern.findall(section["text"])) if section_count > 0: per_section[section["heading"]] = section_count counts[name] = { "total": total, "per_section": per_section, } return counts def calculate_deficits(self) -> list[dict]: """Calculate which entities are still below their Cora deficit target. Compares the count found in the draft against the deficit value from the Cora report. An entity with a Cora deficit of 20 and a draft count of 5 has a remaining deficit of 15. Returns: List of dicts with: name, relevance, correlation, cora_deficit, draft_count, remaining_deficit — sorted by remaining_deficit descending. """ deficits = [] for entity in self.entities: name = entity["name"] cora_deficit = entity.get("deficit") or 0 draft_count = self.entity_counts.get(name, {}).get("total", 0) remaining = max(0, cora_deficit - draft_count) deficits.append({ "name": name, "relevance": entity.get("relevance") or 0, "correlation": entity.get("correlation") or 0, "cora_deficit": cora_deficit, "draft_count": draft_count, "remaining_deficit": remaining, }) deficits.sort(key=lambda d: d["remaining_deficit"], reverse=True) return deficits def recommend_additions(self) -> list[dict]: """Generate prioritized recommendations for entity additions. Priority is calculated as relevance * remaining_deficit, so entities that are both highly relevant and far below target rank highest. Each recommendation includes suggested sections where the entity could naturally be added, based on where related entities already appear. Returns: List of recommendation dicts sorted by priority descending. Each dict has: name, relevance, correlation, cora_deficit, draft_count, remaining_deficit, priority, suggested_sections. """ deficits = self.calculate_deficits() recommendations = [] for deficit_entry in deficits: if deficit_entry["remaining_deficit"] <= 0: continue relevance = deficit_entry["relevance"] remaining = deficit_entry["remaining_deficit"] priority = relevance * remaining suggested = self._suggest_sections(deficit_entry["name"]) recommendations.append({ "name": deficit_entry["name"], "relevance": relevance, "correlation": deficit_entry["correlation"], "cora_deficit": deficit_entry["cora_deficit"], "draft_count": deficit_entry["draft_count"], "remaining_deficit": remaining, "priority": round(priority, 4), "suggested_sections": suggested, }) recommendations.sort(key=lambda r: r["priority"], reverse=True) return recommendations # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _parse_sections(self, text: str) -> list[dict]: """Split markdown text into sections by headings. Each section captures the heading text, heading level, and the body text under that heading (up to the next heading of equal or higher level). A virtual "Introduction" section is created for content before the first heading. Returns: list of {"heading": str, "level": int, "text": str} """ heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) matches = list(heading_pattern.finditer(text)) sections = [] # Content before the first heading becomes the Introduction section if matches: intro_text = text[:matches[0].start()].strip() if intro_text: sections.append({ "heading": "Introduction", "level": 0, "text": intro_text, }) else: # No headings at all — treat the entire text as one section return [{ "heading": "Full Document", "level": 0, "text": text, }] for i, match in enumerate(matches): level = len(match.group(1)) heading = match.group(2).strip() start = match.end() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) body = text[start:end].strip() sections.append({ "heading": heading, "level": level, "text": body, }) return sections def _suggest_sections(self, entity_name: str) -> list[str]: """Suggest sections where an entity could naturally be added. Strategy: find sections that already contain other entities from the same Cora report. Sections with higher concentrations of related entities are better candidates because the topic is contextually aligned. If no sections have related entities, return all non-empty sections as general candidates. Args: entity_name: The entity to find placement for. Returns: List of section heading strings, ordered by relevance. """ if not self.sections: return [] # Build a score for each section: count how many other entities appear there section_scores = [] for section in self.sections: heading = section["heading"] other_entity_count = 0 for name, counts in self.entity_counts.items(): if name.lower() == entity_name.lower(): continue if heading in counts.get("per_section", {}): other_entity_count += counts["per_section"][heading] if other_entity_count > 0: section_scores.append((heading, other_entity_count)) # Sort by entity richness descending section_scores.sort(key=lambda x: x[1], reverse=True) if section_scores: return [heading for heading, _score in section_scores] # Fallback: return all sections with non-trivial content return [ s["heading"] for s in self.sections if len(s["text"].split()) > 20 ] def _section_density(self) -> list[dict]: """Calculate per-section entity density. Returns: List of dicts with: heading, level, word_count, entities_found, entity_mentions, density (mentions per 100 words). """ densities = [] for section in self.sections: heading = section["heading"] word_count = len(section["text"].split()) entities_found = 0 total_mentions = 0 for name, counts in self.entity_counts.items(): section_count = counts.get("per_section", {}).get(heading, 0) if section_count > 0: entities_found += 1 total_mentions += section_count density = round((total_mentions / word_count) * 100, 2) if word_count > 0 else 0.0 densities.append({ "heading": heading, "level": section["level"], "word_count": word_count, "entities_found": entities_found, "entity_mentions": total_mentions, "density_per_100_words": density, }) return densities # ------------------------------------------------------------------ # Output formatting # ------------------------------------------------------------------ def format_text_report(analysis: dict, top_n: int = 30) -> str: """Format the analysis result as a human-readable text report.""" lines = [] summary = analysis["summary"] # --- Header --- lines.append("=" * 70) lines.append(" ENTITY OPTIMIZATION REPORT") if summary.get("search_term"): lines.append(f" Target keyword: {summary['search_term']}") lines.append("=" * 70) lines.append("") # --- Summary --- lines.append("SUMMARY") lines.append("-" * 40) lines.append(f" Total entities tracked: {summary['total_entities_tracked']}") lines.append(f" Entities found in draft: {summary['entities_found_in_draft']}") lines.append(f" Entities with deficit: {summary['entities_with_deficit']}") lines.append(f" Total sections in draft: {summary['total_sections']}") lines.append("") # --- Top Recommendations --- recommendations = analysis["recommendations"] shown = recommendations[:top_n] lines.append(f"TOP {min(top_n, len(recommendations))} RECOMMENDATIONS (sorted by priority)") lines.append("-" * 70) if not shown: lines.append(" No entity deficits found — the draft covers all targets.") else: for i, rec in enumerate(shown, 1): sections_str = ", ".join(rec["suggested_sections"][:3]) if rec["suggested_sections"] else "any section" lines.append( f" {i:>3}. Entity '{rec['name']}' found {rec['draft_count']} times, " f"target deficit is {rec['cora_deficit']}. " f"Remaining: {rec['remaining_deficit']}. " f"Priority: {rec['priority']}" ) lines.append( f" Relevance: {rec['relevance']} | Correlation: {rec['correlation']}" ) lines.append( f" Suggested sections: [{sections_str}]" ) lines.append("") # --- Per-Section Entity Density --- lines.append("PER-SECTION ENTITY DENSITY") lines.append("-" * 70) lines.append(f" {'Section':<40} {'Words':>6} {'Entities':>9} {'Mentions':>9} {'Density':>8}") lines.append(f" {'-' * 40} {'-' * 6} {'-' * 9} {'-' * 9} {'-' * 8}") for sd in analysis["section_density"]: indent = " " * sd["level"] if sd["level"] > 0 else "" heading_display = indent + sd["heading"] if len(heading_display) > 38: heading_display = heading_display[:35] + "..." lines.append( f" {heading_display:<40} {sd['word_count']:>6} {sd['entities_found']:>9} " f"{sd['entity_mentions']:>9} {sd['density_per_100_words']:>7.2f}%" ) lines.append("") lines.append("=" * 70) return "\n".join(lines) def format_json_report(analysis: dict, top_n: int = 30) -> str: """Format the analysis result as machine-readable JSON.""" output = { "summary": analysis["summary"], "recommendations": analysis["recommendations"][:top_n], "section_density": analysis["section_density"], "entity_counts": analysis["entity_counts"], "deficits": analysis["deficits"], } return json.dumps(output, indent=2, default=str) # ------------------------------------------------------------------ # CLI entry point # ------------------------------------------------------------------ def main(): parser = argparse.ArgumentParser( description="Analyze a content draft against Cora entity targets and recommend additions.", usage="uv run --with openpyxl python entity_optimizer.py [options]", ) parser.add_argument( "draft_path", help="Path to the markdown content draft", ) parser.add_argument( "cora_xlsx_path", help="Path to the Cora SEO XLSX report", ) parser.add_argument( "--format", choices=["json", "text"], default="text", help="Output format (default: text)", ) parser.add_argument( "--top-n", type=int, default=30, help="Number of top recommendations to display (default: 30)", ) args = parser.parse_args() try: optimizer = EntityOptimizer(args.cora_xlsx_path) analysis = optimizer.analyze_draft(args.draft_path) except FileNotFoundError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"Error analyzing draft: {e}", file=sys.stderr) sys.exit(1) if args.format == "json": print(format_json_report(analysis, top_n=args.top_n)) else: print(format_text_report(analysis, top_n=args.top_n)) if __name__ == "__main__": main()