""" LSI Keyword Optimizer Counts Cora-defined LSI keywords in a content draft and recommends additions. Reads LSI targets from a Cora XLSX report via cora_parser.CoraReport, then scans a markdown draft to measure per-keyword usage and calculate deficits. Recommendations are prioritized by |correlation| x deficit so the most ranking-impactful gaps surface first. Usage: uv run --with openpyxl python lsi_optimizer.py \ [--format json|text] [--min-correlation 0.2] [--top-n 50] """ import argparse import json import re import sys from pathlib import Path from cora_parser import CoraReport class LSIOptimizer: """Analyzes a content draft against Cora LSI keyword targets.""" def __init__(self, cora_xlsx_path: str): """Load LSI keyword targets from a Cora XLSX report. Args: cora_xlsx_path: Path to the Cora SEO report XLSX file. """ self.report = CoraReport(cora_xlsx_path) self.lsi_keywords = self.report.get_lsi_keywords() self.draft_text = "" self.sections: list[dict] = [] self._keyword_counts: dict[str, int] = {} # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def analyze_draft(self, draft_path: str) -> dict: """Run full LSI analysis on a markdown draft. Args: draft_path: Path to a markdown content draft. Returns: Analysis dict with keys: summary, keyword_counts, deficits, recommendations, section_coverage. """ path = Path(draft_path) if not path.exists(): raise FileNotFoundError(f"Draft file not found: {draft_path}") self.draft_text = path.read_text(encoding="utf-8") self.sections = self._parse_sections(self.draft_text) self._keyword_counts = self.count_lsi_keywords(self.draft_text) deficits = self.calculate_deficits() recommendations = self.recommend_additions() section_coverage = self._section_coverage() total_tracked = len(self.lsi_keywords) found_in_draft = sum(1 for c in self._keyword_counts.values() if c > 0) with_deficit = len(deficits) return { "summary": { "total_lsi_tracked": total_tracked, "found_in_draft": found_in_draft, "with_deficit": with_deficit, "fully_satisfied": total_tracked - with_deficit, }, "keyword_counts": self._keyword_counts, "deficits": deficits, "recommendations": recommendations, "section_coverage": section_coverage, } def count_lsi_keywords(self, text: str) -> dict[str, int]: """Count occurrences of each LSI keyword in the given text. Uses word-boundary-aware regex matching so multi-word phrases like "part that" are matched correctly and case-insensitively. Args: text: The content string to scan. Returns: Dict mapping keyword string to its occurrence count. """ counts: dict[str, int] = {} for kw_data in self.lsi_keywords: keyword = kw_data["keyword"] pattern = self._keyword_pattern(keyword) matches = pattern.findall(text) counts[keyword] = len(matches) return counts def calculate_deficits(self) -> list[dict]: """Identify LSI keywords whose draft count is below the Cora target. A keyword has a deficit when the Cora report indicates a positive deficit value (target minus current usage in the report) AND the draft count has not yet closed that gap. Returns: List of dicts with: keyword, draft_count, target, deficit, spearmans, pearsons, best_of_both. Only keywords with remaining deficit > 0 are included. """ deficits = [] for kw_data in self.lsi_keywords: keyword = kw_data["keyword"] cora_deficit = kw_data.get("deficit") or 0 if cora_deficit <= 0: continue # The Cora deficit is based on the original page. The draft may # have added some occurrences, so we re-compute: how many more # are still needed? cora_current = kw_data.get("current_count") or 0 target = cora_current + cora_deficit draft_count = self._keyword_counts.get(keyword, 0) remaining_deficit = target - draft_count if remaining_deficit <= 0: continue deficits.append({ "keyword": keyword, "draft_count": draft_count, "target": target, "deficit": remaining_deficit, "spearmans": kw_data.get("spearmans"), "pearsons": kw_data.get("pearsons"), "best_of_both": kw_data.get("best_of_both"), }) return deficits def recommend_additions( self, min_correlation: float = 0.0, top_n: int = 0, ) -> list[dict]: """Produce a prioritized list of LSI keyword additions. Priority score = abs(best_of_both) x deficit. Keywords with higher correlation to ranking AND larger deficits sort to the top. Args: min_correlation: Only include keywords whose abs(best_of_both) >= this threshold. top_n: Limit to top N results (0 = no limit). Returns: Sorted list of dicts with: keyword, priority, deficit, draft_count, target, best_of_both, spearmans, pearsons. """ deficits = self.calculate_deficits() recommendations = [] for d in deficits: correlation = abs(d["best_of_both"]) if d["best_of_both"] else 0.0 if correlation < min_correlation: continue priority = correlation * d["deficit"] recommendations.append({ "keyword": d["keyword"], "priority": round(priority, 4), "deficit": d["deficit"], "draft_count": d["draft_count"], "target": d["target"], "best_of_both": d["best_of_both"], "spearmans": d["spearmans"], "pearsons": d["pearsons"], }) recommendations.sort(key=lambda r: r["priority"], reverse=True) if top_n > 0: recommendations = recommendations[:top_n] return recommendations # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _keyword_pattern(keyword: str) -> re.Pattern: """Build a word-boundary-aware regex for an LSI keyword. Handles multi-word phrases by joining escaped tokens with flexible whitespace. Case-insensitive. """ tokens = keyword.strip().split() escaped = [re.escape(t) for t in tokens] # Allow flexible whitespace between tokens in multi-word phrases pattern_str = r"\b" + r"\s+".join(escaped) + r"\b" return re.compile(pattern_str, re.IGNORECASE) @staticmethod def _parse_sections(text: str) -> list[dict]: """Split markdown text into sections by headings. Returns list of dicts with: heading, level, content. The content before the first heading gets heading="(intro)". """ heading_re = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) matches = list(heading_re.finditer(text)) sections: list[dict] = [] if not matches: # No headings — treat entire text as one section sections.append({ "heading": "(intro)", "level": 0, "content": text, }) return sections # Content before first heading if matches[0].start() > 0: intro = text[: matches[0].start()] if intro.strip(): sections.append({ "heading": "(intro)", "level": 0, "content": intro, }) for i, match in enumerate(matches): level = len(match.group(1)) heading = match.group(2).strip() start = match.end() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) content = text[start:end] sections.append({ "heading": heading, "level": level, "content": content, }) return sections def _section_coverage(self) -> list[dict]: """Calculate LSI keyword coverage per section. Returns list of dicts with: heading, level, total_keywords_found, keyword_details (list of keyword/count pairs present in that section). """ coverage = [] for section in self.sections: section_counts = self.count_lsi_keywords(section["content"]) found = {kw: cnt for kw, cnt in section_counts.items() if cnt > 0} coverage.append({ "heading": section["heading"], "level": section["level"], "total_keywords_found": len(found), "keyword_details": [ {"keyword": kw, "count": cnt} for kw, cnt in sorted(found.items(), key=lambda x: x[1], reverse=True) ], }) return coverage # ---------------------------------------------------------------------- # Output formatting # ---------------------------------------------------------------------- def format_text_report(analysis: dict) -> str: """Format the analysis dict as a human-readable text report.""" lines: list[str] = [] summary = analysis["summary"] # --- Summary --- lines.append("=" * 60) lines.append(" LSI KEYWORD OPTIMIZATION REPORT") lines.append("=" * 60) lines.append("") lines.append(f" Total LSI keywords tracked : {summary['total_lsi_tracked']}") lines.append(f" Found in draft : {summary['found_in_draft']}") lines.append(f" With deficit (need more) : {summary['with_deficit']}") lines.append(f" Fully satisfied : {summary['fully_satisfied']}") lines.append("") # --- Top Recommendations --- recs = analysis["recommendations"] if recs: lines.append("-" * 60) lines.append(" TOP RECOMMENDATIONS (sorted by priority)") lines.append("-" * 60) lines.append("") lines.append( f" {'#':<4} {'Keyword':<30} {'Priority':>9} " f"{'Deficit':>8} {'Draft':>6} {'Target':>7} {'Corr':>7}" ) lines.append(f" {'—'*4} {'—'*30} {'—'*9} {'—'*8} {'—'*6} {'—'*7} {'—'*7}") for i, rec in enumerate(recs, 1): corr = rec["best_of_both"] corr_str = f"{corr:.3f}" if corr is not None else "N/A" keyword_display = rec["keyword"] if len(keyword_display) > 28: keyword_display = keyword_display[:25] + "..." lines.append( f" {i:<4} {keyword_display:<30} {rec['priority']:>9.4f} " f"{rec['deficit']:>8} {rec['draft_count']:>6} " f"{rec['target']:>7} {corr_str:>7}" ) lines.append("") else: lines.append(" No recommendations — all LSI targets met or no deficits found.") lines.append("") # --- Section Coverage --- sections = analysis["section_coverage"] if sections: lines.append("-" * 60) lines.append(" PER-SECTION LSI COVERAGE") lines.append("-" * 60) lines.append("") for sec in sections: indent = " " * (sec["level"] + 1) heading = sec["heading"] kw_count = sec["total_keywords_found"] lines.append(f"{indent}{heading} ({kw_count} LSI keyword{'s' if kw_count != 1 else ''})") if sec["keyword_details"]: for detail in sec["keyword_details"][:10]: lines.append(f"{indent} - \"{detail['keyword']}\" x{detail['count']}") remaining = len(sec["keyword_details"]) - 10 if remaining > 0: lines.append(f"{indent} ... and {remaining} more") lines.append("") lines.append("=" * 60) return "\n".join(lines) # ---------------------------------------------------------------------- # CLI entry point # ---------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Analyze a content draft against Cora LSI keyword targets.", ) parser.add_argument( "draft_path", help="Path to the markdown content draft", ) parser.add_argument( "cora_xlsx_path", help="Path to the Cora SEO XLSX report", ) parser.add_argument( "--format", choices=["json", "text"], default="text", help="Output format (default: text)", ) parser.add_argument( "--min-correlation", type=float, default=0.2, help="Minimum |correlation| to include in recommendations (default: 0.2)", ) parser.add_argument( "--top-n", type=int, default=50, help="Limit recommendations to top N (default: 50, 0 = unlimited)", ) args = parser.parse_args() try: optimizer = LSIOptimizer(args.cora_xlsx_path) except FileNotFoundError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) try: analysis = optimizer.analyze_draft(args.draft_path) except FileNotFoundError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # Apply CLI filters to recommendations analysis["recommendations"] = optimizer.recommend_additions( min_correlation=args.min_correlation, top_n=args.top_n, ) if args.format == "json": print(json.dumps(analysis, indent=2, default=str)) else: print(format_text_report(analysis)) if __name__ == "__main__": main()