"""Two-phase content creation pipeline tool. Phase 1: Research competitors + generate outline → save → stop for human review. Phase 2: Human approves/edits outline → tool picks it up → writes full content. The content-researcher skill in the execution brain is triggered by keywords like "service page", "content optimization", "SEO content", etc. """ from __future__ import annotations import json import logging import re from datetime import UTC, datetime from pathlib import Path from . import tool log = logging.getLogger(__name__) _ROOT_DIR = Path(__file__).resolve().parent.parent.parent _DATA_DIR = _ROOT_DIR / "data" _LOCAL_CONTENT_DIR = _DATA_DIR / "generated" / "content" EXEC_TOOLS = "Bash,Read,Edit,Write,Glob,Grep,WebSearch,WebFetch" # --------------------------------------------------------------------------- # ClickUp helpers # --------------------------------------------------------------------------- def _get_clickup_client(ctx: dict | None): """Create a ClickUpClient from tool context, or None if unavailable.""" if not ctx or not ctx.get("config") or not ctx["config"].clickup.enabled: return None try: from ..clickup import ClickUpClient config = ctx["config"] return ClickUpClient( api_token=config.clickup.api_token, workspace_id=config.clickup.workspace_id, task_type_field_name=config.clickup.task_type_field_name, ) except Exception as e: log.warning("Could not create ClickUp client: %s", e) return None def _sync_clickup_start(ctx: dict | None, task_id: str) -> None: """Move ClickUp task to 'automation underway'.""" if not task_id or not ctx: return client = _get_clickup_client(ctx) if not client: return try: config = ctx["config"] client.update_task_status(task_id, config.clickup.automation_status) except Exception as e: log.warning("Failed to set ClickUp start status for %s: %s", task_id, e) finally: client.close() def _sync_clickup_outline_ready(ctx: dict | None, task_id: str, outline_path: str) -> None: """Post outline comment and move ClickUp task to 'outline review'.""" if not task_id or not ctx: return client = _get_clickup_client(ctx) if not client: return try: client.add_comment( task_id, f"📝 CheddahBot generated a content outline.\n\n" f"Outline saved to: `{outline_path}`\n\n" f"Please review and edit the outline, then move this task to " f"**outline approved** to trigger the full content write.", ) client.update_task_status(task_id, "outline review") except Exception as e: log.warning("Failed to sync outline-ready for %s: %s", task_id, e) finally: client.close() def _sync_clickup_complete(ctx: dict | None, task_id: str, content_path: str) -> None: """Post completion comment and move ClickUp task to 'internal review'.""" if not task_id or not ctx: return client = _get_clickup_client(ctx) if not client: return try: config = ctx["config"] client.add_comment( task_id, f"✅ CheddahBot completed the content.\n\n" f"Final content saved to: `{content_path}`\n\n" f"Ready for internal review.", ) client.update_task_status(task_id, config.clickup.review_status) except Exception as e: log.warning("Failed to sync completion for %s: %s", task_id, e) finally: client.close() def _sync_clickup_fail(ctx: dict | None, task_id: str, error: str) -> None: """Post error comment and move ClickUp task to 'error'.""" if not task_id or not ctx: return client = _get_clickup_client(ctx) if not client: return try: config = ctx["config"] client.add_comment( task_id, f"❌ CheddahBot failed during content creation.\n\nError: {error[:2000]}", ) client.update_task_status(task_id, config.clickup.error_status) except Exception as e: log.warning("Failed to sync failure for %s: %s", task_id, e) finally: client.close() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _slugify(text: str) -> str: """Turn text into a filesystem-safe slug.""" text = text.lower().strip() text = re.sub(r"[^\w\s-]", "", text) text = re.sub(r"[\s_]+", "-", text) return text[:80].strip("-") def _find_cora_report(keyword: str, cora_inbox: str) -> str: """Fuzzy-match a Cora .xlsx report by keyword. Match priority: exact filename match > substring > word overlap. Skips Office temp files (~$...). Returns the path string, or "" if not found. """ if not cora_inbox or not keyword: return "" inbox = Path(cora_inbox) if not inbox.exists(): return "" xlsx_files = [f for f in inbox.glob("*.xlsx") if not f.name.startswith("~$")] if not xlsx_files: return "" keyword_lower = keyword.lower().strip() keyword_words = set(keyword_lower.split()) # Pass 1: exact stem match for f in xlsx_files: if f.stem.lower().strip() == keyword_lower: return str(f) # Pass 2: keyword is substring of filename (or vice versa) for f in xlsx_files: stem = f.stem.lower().strip() if keyword_lower in stem or stem in keyword_lower: return str(f) # Pass 3: word overlap (at least half the keyword words) best_match = "" best_overlap = 0 for f in xlsx_files: stem_words = set(f.stem.lower().replace("-", " ").replace("_", " ").split()) overlap = len(keyword_words & stem_words) if overlap > best_overlap and overlap >= max(1, len(keyword_words) // 2): best_overlap = overlap best_match = str(f) return best_match def _save_content(content: str, keyword: str, filename: str, config) -> str: """Save content to the outline directory (network path with local fallback). Returns the actual path used. """ slug = _slugify(keyword) if not slug: slug = "unknown" # Try primary (network) path if config.content.outline_dir: primary = Path(config.content.outline_dir) / slug try: primary.mkdir(parents=True, exist_ok=True) out_path = primary / filename out_path.write_text(content, encoding="utf-8") return str(out_path) except OSError as e: log.warning("Network path unavailable (%s), falling back to local: %s", primary, e) # Fallback to local local = _LOCAL_CONTENT_DIR / slug local.mkdir(parents=True, exist_ok=True) out_path = local / filename out_path.write_text(content, encoding="utf-8") return str(out_path) # --------------------------------------------------------------------------- # Prompt builders # --------------------------------------------------------------------------- def _build_phase1_prompt( url: str, keyword: str, content_type: str, cora_path: str, capabilities_default: str, ) -> str: """Build the Phase 1 prompt that triggers the content-researcher skill.""" parts = [ f"Research, outline, and draft an optimized {content_type} for {url} " f"targeting keyword '{keyword}'. This is an SEO content optimization project.", ] if cora_path: parts.append( f"\nA Cora SEO report is available at: {cora_path}\n" f"Read this report to extract keyword targets, entity requirements, " f"and competitive analysis data." ) if capabilities_default: parts.append( f'\nWhen asked about company capabilities, respond with: "{capabilities_default}"' ) parts.append( "\nDeliver the outline as a complete markdown document with sections, " "headings, entity targets, and keyword placement notes." ) return "\n".join(parts) def _build_phase2_prompt( url: str, keyword: str, outline_text: str, cora_path: str, ) -> str: """Build the Phase 2 prompt for writing full content from an approved outline.""" parts = [ f"Write full SEO-optimized content based on this approved outline for {url} " f"targeting '{keyword}'. This is the content writing phase of a " f"content optimization project.", f"\n## Approved Outline\n\n{outline_text}", ] if cora_path: parts.append( f"\nThe Cora SEO report is at: {cora_path}\n" f"Use it for keyword density targets and entity optimization." ) parts.append( "\nWrite publication-ready content following the outline structure. " "Include all entity targets and keyword placements as noted in the outline." ) return "\n".join(parts) # --------------------------------------------------------------------------- # Main tool # --------------------------------------------------------------------------- @tool( "create_content", "Two-phase SEO content creation: Phase 1 researches + outlines, Phase 2 writes " "full content from the approved outline. Auto-detects phase from kv_store state.", category="content", ) def create_content( url: str, keyword: str, content_type: str = "service page", ctx: dict | None = None, ) -> str: """Create SEO content in two phases with human review between them. Args: url: Target page URL (e.g. "https://example.com/services/plumbing"). keyword: Primary target keyword (e.g. "plumbing services"). content_type: Type of content — "service page", "blog post", etc. """ if not url or not keyword: return "Error: Both 'url' and 'keyword' are required." if not ctx or "agent" not in ctx: return "Error: Tool context with agent is required." agent = ctx["agent"] config = ctx.get("config") db = ctx.get("db") task_id = ctx.get("clickup_task_id", "") kv_key = f"clickup:task:{task_id}:state" if task_id else "" # Determine phase from kv_store state phase = 1 existing_state = {} if kv_key and db: raw = db.kv_get(kv_key) if raw: try: existing_state = json.loads(raw) if existing_state.get("state") == "outline_review": phase = 2 except json.JSONDecodeError: pass # Find Cora report cora_inbox = config.content.cora_inbox if config else "" cora_path = _find_cora_report(keyword, cora_inbox) if cora_path: log.info("Found Cora report for '%s': %s", keyword, cora_path) capabilities_default = config.content.company_capabilities_default if config else "" if phase == 1: return _run_phase1( agent=agent, config=config, db=db, ctx=ctx, task_id=task_id, kv_key=kv_key, url=url, keyword=keyword, content_type=content_type, cora_path=cora_path, capabilities_default=capabilities_default, ) else: return _run_phase2( agent=agent, config=config, db=db, ctx=ctx, task_id=task_id, kv_key=kv_key, url=url, keyword=keyword, cora_path=cora_path, existing_state=existing_state, ) # --------------------------------------------------------------------------- # Phase 1: Research + Outline # --------------------------------------------------------------------------- def _run_phase1( *, agent, config, db, ctx, task_id: str, kv_key: str, url: str, keyword: str, content_type: str, cora_path: str, capabilities_default: str, ) -> str: now = datetime.now(UTC).isoformat() # ClickUp: move to automation underway if task_id: _sync_clickup_start(ctx, task_id) prompt = _build_phase1_prompt(url, keyword, content_type, cora_path, capabilities_default) log.info("Phase 1 — researching + outlining for '%s' (%s)", keyword, url) try: result = agent.execute_task( prompt, tools=EXEC_TOOLS, skip_permissions=True, ) except Exception as e: error_msg = f"Phase 1 execution failed: {e}" log.error(error_msg) if task_id: _update_kv_state(db, kv_key, "failed", error=str(e)) _sync_clickup_fail(ctx, task_id, str(e)) return f"Error: {error_msg}" if result.startswith("Error:"): if task_id: _update_kv_state(db, kv_key, "failed", error=result) _sync_clickup_fail(ctx, task_id, result) return result # Save the outline outline_path = _save_content(result, keyword, "outline.md", config) log.info("Outline saved to: %s", outline_path) # Update kv_store if kv_key and db: state = { "state": "outline_review", "clickup_task_id": task_id, "url": url, "keyword": keyword, "content_type": content_type, "cora_path": cora_path, "outline_path": outline_path, "phase1_completed_at": now, "completed_at": None, "error": None, } db.kv_set(kv_key, json.dumps(state)) # ClickUp: move to outline review if task_id: _sync_clickup_outline_ready(ctx, task_id, outline_path) return ( f"## Phase 1 Complete — Outline Ready for Review\n\n" f"**Keyword:** {keyword}\n" f"**URL:** {url}\n" f"**Outline saved to:** `{outline_path}`\n\n" f"Please review and edit the outline. When ready, move the ClickUp task " f"to **outline approved** to trigger Phase 2 (full content writing).\n\n" f"---\n\n{result}\n\n" f"## ClickUp Sync\nPhase 1 complete. Status: outline review." ) # --------------------------------------------------------------------------- # Phase 2: Write Full Content # --------------------------------------------------------------------------- def _run_phase2( *, agent, config, db, ctx, task_id: str, kv_key: str, url: str, keyword: str, cora_path: str, existing_state: dict, ) -> str: # Read the (possibly edited) outline outline_path = existing_state.get("outline_path", "") outline_text = "" if outline_path: try: outline_text = Path(outline_path).read_text(encoding="utf-8") except OSError as e: log.warning("Could not read outline at %s: %s", outline_path, e) if not outline_text: return ( "Error: Could not read the outline file. " f"Expected at: {outline_path or '(no path saved)'}" ) # Use saved cora_path from state if we don't have one now if not cora_path: cora_path = existing_state.get("cora_path", "") # ClickUp: move to automation underway if task_id: _sync_clickup_start(ctx, task_id) prompt = _build_phase2_prompt(url, keyword, outline_text, cora_path) log.info("Phase 2 — writing full content for '%s' (%s)", keyword, url) try: result = agent.execute_task( prompt, tools=EXEC_TOOLS, skip_permissions=True, ) except Exception as e: error_msg = f"Phase 2 execution failed: {e}" log.error(error_msg) if task_id: _update_kv_state(db, kv_key, "failed", error=str(e)) _sync_clickup_fail(ctx, task_id, str(e)) return f"Error: {error_msg}" if result.startswith("Error:"): if task_id: _update_kv_state(db, kv_key, "failed", error=result) _sync_clickup_fail(ctx, task_id, result) return result # Save final content content_path = _save_content(result, keyword, "final-content.md", config) log.info("Final content saved to: %s", content_path) # Update kv_store if kv_key and db: now = datetime.now(UTC).isoformat() state = existing_state.copy() state["state"] = "completed" state["content_path"] = content_path state["completed_at"] = now state["error"] = None db.kv_set(kv_key, json.dumps(state)) # ClickUp: move to internal review if task_id: _sync_clickup_complete(ctx, task_id, content_path) return ( f"## Phase 2 Complete — Content Written\n\n" f"**Keyword:** {keyword}\n" f"**URL:** {url}\n" f"**Content saved to:** `{content_path}`\n\n" f"---\n\n{result}\n\n" f"## ClickUp Sync\nPhase 2 complete. Status: internal review." ) # --------------------------------------------------------------------------- # Continue content (chat-initiated Phase 2) # --------------------------------------------------------------------------- @tool( "continue_content", "Resume content creation for a keyword that has an approved outline. " "Runs Phase 2 (full content writing) for a previously outlined keyword.", category="content", ) def continue_content( keyword: str, ctx: dict | None = None, ) -> str: """Resume content writing for a keyword with an approved outline. Args: keyword: The keyword to continue writing content for. """ if not keyword: return "Error: 'keyword' is required." if not ctx or "agent" not in ctx or "db" not in ctx: return "Error: Tool context with agent and db is required." db = ctx["db"] config = ctx.get("config") # Scan kv_store for outline_review entries matching keyword entries = db.kv_scan("clickup:task:") keyword_lower = keyword.lower().strip() for key, raw in entries: try: state = json.loads(raw) except (json.JSONDecodeError, TypeError): continue if state.get("state") != "outline_review": continue if state.get("keyword", "").lower().strip() == keyword_lower: # Found a matching entry — run Phase 2 task_id = state.get("clickup_task_id", "") kv_key = key url = state.get("url", "") cora_path = state.get("cora_path", "") return _run_phase2( agent=ctx["agent"], config=config, db=db, ctx=ctx, task_id=task_id, kv_key=kv_key, url=url, keyword=keyword, cora_path=cora_path, existing_state=state, ) return ( f"No outline awaiting review found for keyword '{keyword}'. " f"Use create_content to start Phase 1 first." ) # --------------------------------------------------------------------------- # KV state helper # --------------------------------------------------------------------------- def _update_kv_state(db, kv_key: str, state_val: str, error: str = "") -> None: """Update kv_store state without losing existing data.""" if not db or not kv_key: return raw = db.kv_get(kv_key) try: state = json.loads(raw) if raw else {} except json.JSONDecodeError: state = {} state["state"] = state_val if error: state["error"] = error[:2000] state["completed_at"] = datetime.now(UTC).isoformat() db.kv_set(kv_key, json.dumps(state))