CheddahBot/.claude/skills/content-researcher/scripts/lsi_optimizer.py

415 lines
14 KiB
Python

"""
LSI Keyword Optimizer
Counts Cora-defined LSI keywords in a content draft and recommends additions.
Reads LSI targets from a Cora XLSX report via cora_parser.CoraReport, then
scans a markdown draft to measure per-keyword usage and calculate deficits.
Recommendations are prioritized by |correlation| x deficit so the most
ranking-impactful gaps surface first.
Usage:
uv run --with openpyxl python lsi_optimizer.py <draft_path> <cora_xlsx_path> \
[--format json|text] [--min-correlation 0.2] [--top-n 50]
"""
import argparse
import json
import re
import sys
from pathlib import Path
from cora_parser import CoraReport
class LSIOptimizer:
"""Analyzes a content draft against Cora LSI keyword targets."""
def __init__(self, cora_xlsx_path: str):
"""Load LSI keyword targets from a Cora XLSX report.
Args:
cora_xlsx_path: Path to the Cora SEO report XLSX file.
"""
self.report = CoraReport(cora_xlsx_path)
self.lsi_keywords = self.report.get_lsi_keywords()
self.draft_text = ""
self.sections: list[dict] = []
self._keyword_counts: dict[str, int] = {}
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def analyze_draft(self, draft_path: str) -> dict:
"""Run full LSI analysis on a markdown draft.
Args:
draft_path: Path to a markdown content draft.
Returns:
Analysis dict with keys: summary, keyword_counts, deficits,
recommendations, section_coverage.
"""
path = Path(draft_path)
if not path.exists():
raise FileNotFoundError(f"Draft file not found: {draft_path}")
self.draft_text = path.read_text(encoding="utf-8")
self.sections = self._parse_sections(self.draft_text)
self._keyword_counts = self.count_lsi_keywords(self.draft_text)
deficits = self.calculate_deficits()
recommendations = self.recommend_additions()
section_coverage = self._section_coverage()
total_tracked = len(self.lsi_keywords)
found_in_draft = sum(1 for c in self._keyword_counts.values() if c > 0)
with_deficit = len(deficits)
return {
"summary": {
"total_lsi_tracked": total_tracked,
"found_in_draft": found_in_draft,
"with_deficit": with_deficit,
"fully_satisfied": total_tracked - with_deficit,
},
"keyword_counts": self._keyword_counts,
"deficits": deficits,
"recommendations": recommendations,
"section_coverage": section_coverage,
}
def count_lsi_keywords(self, text: str) -> dict[str, int]:
"""Count occurrences of each LSI keyword in the given text.
Uses word-boundary-aware regex matching so multi-word phrases like
"part that" are matched correctly and case-insensitively.
Args:
text: The content string to scan.
Returns:
Dict mapping keyword string to its occurrence count.
"""
counts: dict[str, int] = {}
for kw_data in self.lsi_keywords:
keyword = kw_data["keyword"]
pattern = self._keyword_pattern(keyword)
matches = pattern.findall(text)
counts[keyword] = len(matches)
return counts
def calculate_deficits(self) -> list[dict]:
"""Identify LSI keywords whose draft count is below the Cora target.
A keyword has a deficit when the Cora report indicates a positive
deficit value (target minus current usage in the report) AND the
draft count has not yet closed that gap.
Returns:
List of dicts with: keyword, draft_count, target, deficit,
spearmans, pearsons, best_of_both. Only keywords with
remaining deficit > 0 are included.
"""
deficits = []
for kw_data in self.lsi_keywords:
keyword = kw_data["keyword"]
cora_deficit = kw_data.get("deficit") or 0
if cora_deficit <= 0:
continue
# The Cora deficit is based on the original page. The draft may
# have added some occurrences, so we re-compute: how many more
# are still needed?
cora_current = kw_data.get("current_count") or 0
target = cora_current + cora_deficit
draft_count = self._keyword_counts.get(keyword, 0)
remaining_deficit = target - draft_count
if remaining_deficit <= 0:
continue
deficits.append({
"keyword": keyword,
"draft_count": draft_count,
"target": target,
"deficit": remaining_deficit,
"spearmans": kw_data.get("spearmans"),
"pearsons": kw_data.get("pearsons"),
"best_of_both": kw_data.get("best_of_both"),
})
return deficits
def recommend_additions(
self,
min_correlation: float = 0.0,
top_n: int = 0,
) -> list[dict]:
"""Produce a prioritized list of LSI keyword additions.
Priority score = abs(best_of_both) x deficit. Keywords with higher
correlation to ranking AND larger deficits sort to the top.
Args:
min_correlation: Only include keywords whose
abs(best_of_both) >= this threshold.
top_n: Limit to top N results (0 = no limit).
Returns:
Sorted list of dicts with: keyword, priority, deficit,
draft_count, target, best_of_both, spearmans, pearsons.
"""
deficits = self.calculate_deficits()
recommendations = []
for d in deficits:
correlation = abs(d["best_of_both"]) if d["best_of_both"] else 0.0
if correlation < min_correlation:
continue
priority = correlation * d["deficit"]
recommendations.append({
"keyword": d["keyword"],
"priority": round(priority, 4),
"deficit": d["deficit"],
"draft_count": d["draft_count"],
"target": d["target"],
"best_of_both": d["best_of_both"],
"spearmans": d["spearmans"],
"pearsons": d["pearsons"],
})
recommendations.sort(key=lambda r: r["priority"], reverse=True)
if top_n > 0:
recommendations = recommendations[:top_n]
return recommendations
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _keyword_pattern(keyword: str) -> re.Pattern:
"""Build a word-boundary-aware regex for an LSI keyword.
Handles multi-word phrases by joining escaped tokens with flexible
whitespace. Case-insensitive.
"""
tokens = keyword.strip().split()
escaped = [re.escape(t) for t in tokens]
# Allow flexible whitespace between tokens in multi-word phrases
pattern_str = r"\b" + r"\s+".join(escaped) + r"\b"
return re.compile(pattern_str, re.IGNORECASE)
@staticmethod
def _parse_sections(text: str) -> list[dict]:
"""Split markdown text into sections by headings.
Returns list of dicts with: heading, level, content.
The content before the first heading gets heading="(intro)".
"""
heading_re = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
matches = list(heading_re.finditer(text))
sections: list[dict] = []
if not matches:
# No headings — treat entire text as one section
sections.append({
"heading": "(intro)",
"level": 0,
"content": text,
})
return sections
# Content before first heading
if matches[0].start() > 0:
intro = text[: matches[0].start()]
if intro.strip():
sections.append({
"heading": "(intro)",
"level": 0,
"content": intro,
})
for i, match in enumerate(matches):
level = len(match.group(1))
heading = match.group(2).strip()
start = match.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
content = text[start:end]
sections.append({
"heading": heading,
"level": level,
"content": content,
})
return sections
def _section_coverage(self) -> list[dict]:
"""Calculate LSI keyword coverage per section.
Returns list of dicts with: heading, level, total_keywords_found,
keyword_details (list of keyword/count pairs present in that section).
"""
coverage = []
for section in self.sections:
section_counts = self.count_lsi_keywords(section["content"])
found = {kw: cnt for kw, cnt in section_counts.items() if cnt > 0}
coverage.append({
"heading": section["heading"],
"level": section["level"],
"total_keywords_found": len(found),
"keyword_details": [
{"keyword": kw, "count": cnt}
for kw, cnt in sorted(found.items(), key=lambda x: x[1], reverse=True)
],
})
return coverage
# ----------------------------------------------------------------------
# Output formatting
# ----------------------------------------------------------------------
def format_text_report(analysis: dict) -> str:
"""Format the analysis dict as a human-readable text report."""
lines: list[str] = []
summary = analysis["summary"]
# --- Summary ---
lines.append("=" * 60)
lines.append(" LSI KEYWORD OPTIMIZATION REPORT")
lines.append("=" * 60)
lines.append("")
lines.append(f" Total LSI keywords tracked : {summary['total_lsi_tracked']}")
lines.append(f" Found in draft : {summary['found_in_draft']}")
lines.append(f" With deficit (need more) : {summary['with_deficit']}")
lines.append(f" Fully satisfied : {summary['fully_satisfied']}")
lines.append("")
# --- Top Recommendations ---
recs = analysis["recommendations"]
if recs:
lines.append("-" * 60)
lines.append(" TOP RECOMMENDATIONS (sorted by priority)")
lines.append("-" * 60)
lines.append("")
lines.append(
f" {'#':<4} {'Keyword':<30} {'Priority':>9} "
f"{'Deficit':>8} {'Draft':>6} {'Target':>7} {'Corr':>7}"
)
lines.append(f" {''*4} {''*30} {''*9} {''*8} {''*6} {''*7} {''*7}")
for i, rec in enumerate(recs, 1):
corr = rec["best_of_both"]
corr_str = f"{corr:.3f}" if corr is not None else "N/A"
keyword_display = rec["keyword"]
if len(keyword_display) > 28:
keyword_display = keyword_display[:25] + "..."
lines.append(
f" {i:<4} {keyword_display:<30} {rec['priority']:>9.4f} "
f"{rec['deficit']:>8} {rec['draft_count']:>6} "
f"{rec['target']:>7} {corr_str:>7}"
)
lines.append("")
else:
lines.append(" No recommendations — all LSI targets met or no deficits found.")
lines.append("")
# --- Section Coverage ---
sections = analysis["section_coverage"]
if sections:
lines.append("-" * 60)
lines.append(" PER-SECTION LSI COVERAGE")
lines.append("-" * 60)
lines.append("")
for sec in sections:
indent = " " * (sec["level"] + 1)
heading = sec["heading"]
kw_count = sec["total_keywords_found"]
lines.append(f"{indent}{heading} ({kw_count} LSI keyword{'s' if kw_count != 1 else ''})")
if sec["keyword_details"]:
for detail in sec["keyword_details"][:10]:
lines.append(f"{indent} - \"{detail['keyword']}\" x{detail['count']}")
remaining = len(sec["keyword_details"]) - 10
if remaining > 0:
lines.append(f"{indent} ... and {remaining} more")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
# ----------------------------------------------------------------------
# CLI entry point
# ----------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Analyze a content draft against Cora LSI keyword targets.",
)
parser.add_argument(
"draft_path",
help="Path to the markdown content draft",
)
parser.add_argument(
"cora_xlsx_path",
help="Path to the Cora SEO XLSX report",
)
parser.add_argument(
"--format",
choices=["json", "text"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--min-correlation",
type=float,
default=0.2,
help="Minimum |correlation| to include in recommendations (default: 0.2)",
)
parser.add_argument(
"--top-n",
type=int,
default=50,
help="Limit recommendations to top N (default: 50, 0 = unlimited)",
)
args = parser.parse_args()
try:
optimizer = LSIOptimizer(args.cora_xlsx_path)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
try:
analysis = optimizer.analyze_draft(args.draft_path)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Apply CLI filters to recommendations
analysis["recommendations"] = optimizer.recommend_additions(
min_correlation=args.min_correlation,
top_n=args.top_n,
)
if args.format == "json":
print(json.dumps(analysis, indent=2, default=str))
else:
print(format_text_report(analysis))
if __name__ == "__main__":
main()