Add pipeline status box to show PR tool progress in the UI

The press release pipeline takes 2-4 minutes per run with no feedback.
This adds a DB-polled status box (gr.Timer every 3s) that shows the
current step (e.g. "Step 3/4: Writing press release 1/2 — headline...")
and auto-hides when the pipeline completes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-16 16:41:33 -06:00
parent 7153e65ae6
commit 7864ec6f17
2 changed files with 42 additions and 5 deletions

View File

@ -35,6 +35,12 @@ _HEADLINES_FILE = _SKILLS_DIR / "headlines.md"
SONNET_CLI_MODEL = "sonnet" SONNET_CLI_MODEL = "sonnet"
def _set_status(ctx: dict | None, message: str) -> None:
"""Write pipeline progress to the DB so the UI can poll it."""
if ctx and "db" in ctx:
ctx["db"].kv_set("pipeline:status", message)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -305,13 +311,17 @@ def write_press_releases(
companies_file = _load_file_if_exists(_COMPANIES_FILE) companies_file = _load_file_if_exists(_COMPANIES_FILE)
headlines_ref = _load_file_if_exists(_HEADLINES_FILE) headlines_ref = _load_file_if_exists(_HEADLINES_FILE)
# Ensure output directory # Ensure output directory (company subfolder)
_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) company_slug = _slugify(company_name)
output_dir = _OUTPUT_DIR / company_slug
output_dir.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d") today = datetime.now().strftime("%Y-%m-%d")
cost_log: list[dict] = [] cost_log: list[dict] = []
# ── Step 1: Generate 7 headlines (chat brain) ───────────────────────── # ── Step 1: Generate 7 headlines (chat brain) ─────────────────────────
log.info("[PR Pipeline] Step 1/4: Generating 7 headlines for %s...", company_name)
_set_status(ctx, f"Step 1/4: Generating 7 headlines for {company_name}...")
step_start = time.time() step_start = time.time()
headline_prompt = _build_headline_prompt(topic, company_name, url, lsi_terms, headlines_ref) headline_prompt = _build_headline_prompt(topic, company_name, url, lsi_terms, headlines_ref)
messages = [ messages = [
@ -330,10 +340,12 @@ def write_press_releases(
# Save all 7 headline candidates to file # Save all 7 headline candidates to file
slug_base = _slugify(f"{company_name}-{topic}") slug_base = _slugify(f"{company_name}-{topic}")
headlines_file = _OUTPUT_DIR / f"{slug_base}_{today}_headlines.txt" headlines_file = output_dir / f"{slug_base}_{today}_headlines.txt"
headlines_file.write_text(headlines_raw.strip(), encoding="utf-8") headlines_file.write_text(headlines_raw.strip(), encoding="utf-8")
# ── Step 2: AI judge picks best 2 (chat brain) ─────────────────────── # ── Step 2: AI judge picks best 2 (chat brain) ───────────────────────
log.info("[PR Pipeline] Step 2/4: AI judge selecting best 2 headlines...")
_set_status(ctx, "Step 2/4: AI judge selecting best 2 headlines...")
step_start = time.time() step_start = time.time()
judge_prompt = _build_judge_prompt(headlines_raw, headlines_ref) judge_prompt = _build_judge_prompt(headlines_raw, headlines_ref)
messages = [ messages = [
@ -355,9 +367,12 @@ def write_press_releases(
winners = winners[:2] winners = winners[:2]
# ── Step 3: Write 2 press releases (execution brain × 2) ───────────── # ── Step 3: Write 2 press releases (execution brain × 2) ─────────────
log.info("[PR Pipeline] Step 3/4: Writing 2 press releases...")
pr_texts: list[str] = [] pr_texts: list[str] = []
pr_files: list[str] = [] pr_files: list[str] = []
for i, headline in enumerate(winners): for i, headline in enumerate(winners):
log.info("[PR Pipeline] Writing PR %d/2: %s", i + 1, headline[:60])
_set_status(ctx, f"Step 3/4: Writing press release {i+1}/2 — {headline[:60]}...")
step_start = time.time() step_start = time.time()
pr_prompt = _build_pr_prompt( pr_prompt = _build_pr_prompt(
headline, topic, company_name, url, lsi_terms, headline, topic, company_name, url, lsi_terms,
@ -384,14 +399,17 @@ def write_press_releases(
# Save PR to file # Save PR to file
slug = _slugify(headline) slug = _slugify(headline)
filename = f"{slug}_{today}.txt" filename = f"{slug}_{today}.txt"
filepath = _OUTPUT_DIR / filename filepath = output_dir / filename
filepath.write_text(clean_result, encoding="utf-8") filepath.write_text(clean_result, encoding="utf-8")
pr_files.append(str(filepath)) pr_files.append(str(filepath))
# ── Step 4: Generate 2 JSON-LD schemas (Sonnet + WebSearch) ─────────── # ── Step 4: Generate 2 JSON-LD schemas (Sonnet + WebSearch) ───────────
log.info("[PR Pipeline] Step 4/4: Generating 2 JSON-LD schemas...")
schema_texts: list[str] = [] schema_texts: list[str] = []
schema_files: list[str] = [] schema_files: list[str] = []
for i, pr_text in enumerate(pr_texts): for i, pr_text in enumerate(pr_texts):
log.info("[PR Pipeline] Schema %d/2 for: %s", i + 1, winners[i][:60])
_set_status(ctx, f"Step 4/4: Generating schema {i+1}/2...")
step_start = time.time() step_start = time.time()
schema_prompt = _build_schema_prompt(pr_text, company_name, url, schema_skill) schema_prompt = _build_schema_prompt(pr_text, company_name, url, schema_skill)
exec_tools = "WebSearch,WebFetch" exec_tools = "WebSearch,WebFetch"
@ -422,12 +440,14 @@ def write_press_releases(
# Save schema to file # Save schema to file
slug = _slugify(winners[i]) slug = _slugify(winners[i])
filename = f"{slug}_{today}_schema.json" filename = f"{slug}_{today}_schema.json"
filepath = _OUTPUT_DIR / filename filepath = output_dir / filename
filepath.write_text(schema_json or result, encoding="utf-8") filepath.write_text(schema_json or result, encoding="utf-8")
schema_files.append(str(filepath)) schema_files.append(str(filepath))
# ── Build final output ──────────────────────────────────────────────── # ── Build final output ────────────────────────────────────────────────
_set_status(ctx, "") # Clear status — pipeline complete
total_elapsed = sum(c["elapsed_s"] for c in cost_log) total_elapsed = sum(c["elapsed_s"] for c in cost_log)
log.info("[PR Pipeline] Complete for %s%.0fs total", company_name, total_elapsed)
output_parts = [] output_parts = []
for i in range(2): for i in range(2):

View File

@ -77,6 +77,12 @@ def create_ui(agent: Agent, config: Config, llm: LLMAdapter,
elem_classes=["contain"], elem_classes=["contain"],
) )
pipeline_status = gr.Markdown(
value="",
visible=False,
elem_classes=["contain"],
)
with gr.Row(elem_classes=["contain"]): with gr.Row(elem_classes=["contain"]):
msg_input = gr.MultimodalTextbox( msg_input = gr.MultimodalTextbox(
placeholder="Type a message... (attach files, use mic, or camera)", placeholder="Type a message... (attach files, use mic, or camera)",
@ -210,6 +216,13 @@ def create_ui(agent: Agent, config: Config, llm: LLMAdapter,
except Exception as e: except Exception as e:
return None, f"Voice chat error: {e}" return None, f"Voice chat error: {e}"
def poll_pipeline_status():
"""Poll the DB for pipeline progress updates."""
status = agent.db.kv_get("pipeline:status")
if status:
return gr.update(value=f"{status}", visible=True)
return gr.update(value="", visible=False)
def poll_notifications(): def poll_notifications():
"""Poll the notification bus for pending messages.""" """Poll the notification bus for pending messages."""
if not notification_bus: if not notification_bus:
@ -244,6 +257,10 @@ def create_ui(agent: Agent, config: Config, llm: LLMAdapter,
[voice_output, voice_status], [voice_output, voice_status],
) )
# Pipeline status polling timer (every 3 seconds)
status_timer = gr.Timer(3)
status_timer.tick(poll_pipeline_status, None, [pipeline_status])
# Notification polling timer (every 10 seconds) # Notification polling timer (every 10 seconds)
if notification_bus: if notification_bus:
notification_bus.subscribe("gradio", lambda msg, cat: None) # Register listener notification_bus.subscribe("gradio", lambda msg, cat: None) # Register listener