"""Gradio interface for CheddahBot."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
import gradio as gr
if TYPE_CHECKING:
from .agent_registry import AgentRegistry
from .config import Config
from .llm import LLMAdapter
from .notifications import NotificationBus
log = logging.getLogger(__name__)
_HEAD = """
"""
_CSS = """
footer { display: none !important; }
.notification-banner {
background: #1a1a2e;
border: 1px solid #16213e;
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 8px;
font-size: 0.9em;
}
/* Sidebar conversation buttons */
.conv-btn {
text-align: left !important;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
padding: 8px 12px !important;
margin: 2px 0 !important;
font-size: 0.85em !important;
min-height: 44px !important;
width: 100% !important;
}
.conv-btn.active {
border-color: var(--color-accent) !important;
background: var(--color-accent-soft) !important;
}
/* Agent radio: larger touch targets */
.agent-radio .wrap {
gap: 4px !important;
}
.agent-radio .wrap label {
min-height: 44px !important;
padding: 8px 12px !important;
display: flex;
align-items: center;
}
/* Mobile optimizations */
@media (max-width: 768px) {
.gradio-container { padding: 4px !important; }
/* 16px base font on chat messages to prevent iOS zoom on focus */
.chatbot .message-row .message { font-size: 16px !important; }
/* Chat container: scrollable, no zoom-stuck overflow */
.chatbot {
overflow-y: auto !important;
-webkit-overflow-scrolling: touch;
height: calc(100dvh - 200px) !important;
max-height: none !important;
}
/* Tighten up header/status bar spacing */
.gradio-container > .main > .wrap { gap: 8px !important; }
/* Keep input area pinned at the bottom, never overlapping chat */
.gradio-container > .main {
display: flex;
flex-direction: column;
height: 100dvh;
}
.gradio-container > .main > .wrap:last-child {
position: sticky;
bottom: 0;
background: var(--background-fill-primary);
padding-bottom: env(safe-area-inset-bottom, 8px);
z-index: 10;
}
/* Input box: prevent tiny text that triggers zoom */
.multimodal-textbox textarea,
.multimodal-textbox input {
font-size: 16px !important;
}
/* Reduce model dropdown row padding */
.contain .gr-row { gap: 4px !important; }
/* Prevent horizontal overflow on narrow viewports */
.gradio-container { overflow-x: hidden !important; max-width: 100vw !important; }
}
"""
def _messages_to_chatbot(messages: list[dict]) -> list[dict]:
"""Convert DB messages to Gradio chatbot format, skipping tool messages."""
chatbot_msgs = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role in ("user", "assistant") and content:
chatbot_msgs.append({"role": role, "content": content})
return chatbot_msgs
def create_ui(
registry: AgentRegistry,
config: Config,
llm: LLMAdapter,
notification_bus: NotificationBus | None = None,
scheduler=None,
) -> gr.Blocks:
"""Build and return the Gradio app."""
available_models = llm.list_chat_models()
model_choices = [(m.name, m.id) for m in available_models]
current_model = llm.current_model
exec_status = "available" if llm.is_execution_brain_available() else "unavailable"
clickup_status = "enabled" if config.clickup.enabled else "disabled"
# Build agent name → display_name mapping
agent_names = registry.list_agents()
agent_display_map = {} # display_name → internal name
agent_choices = [] # (display_name, internal_name) for Radio
for name in agent_names:
agent = registry.get(name)
display = agent.agent_config.display_name if agent else name
agent_display_map[display] = name
agent_choices.append((display, name))
default_agent_name = registry.default_name
with gr.Blocks(title="CheddahBot", fill_width=True, css=_CSS, head=_HEAD) as app:
# -- State --
active_agent_name = gr.State(default_agent_name)
conv_list_state = gr.State([]) # list of {id, title, updated_at}
browser_state = gr.BrowserState(
{"agent_name": default_agent_name, "conv_id": None},
storage_key="cheddahbot_session",
)
# -- Sidebar --
with gr.Sidebar(label="CheddahBot", open=True, width=280):
gr.Markdown("### Agents")
agent_selector = gr.Radio(
choices=agent_choices,
value=default_agent_name,
label="Active Agent",
interactive=True,
elem_classes=["agent-radio"],
)
gr.Markdown("---")
new_chat_btn = gr.Button("+ New Chat", variant="secondary", size="sm")
gr.Markdown("### History")
@gr.render(inputs=[conv_list_state])
def render_conv_list(convs):
if not convs:
gr.Markdown("*No conversations yet*")
return
for conv in convs:
btn = gr.Button(
conv["title"] or "New Chat",
elem_classes=["conv-btn"],
variant="secondary",
size="sm",
key=f"conv-{conv['id']}",
)
btn.click(
on_load_conversation,
inputs=[gr.State(conv["id"]), active_agent_name],
outputs=[chatbot, conv_list_state, browser_state],
)
# -- Main area --
gr.Markdown("# CheddahBot", elem_classes=["contain"])
gr.Markdown(
f"*Chat Brain:* `{current_model}` | "
f"*Execution Brain (Claude Code CLI):* `{exec_status}` | "
f"*ClickUp:* `{clickup_status}`",
elem_classes=["contain"],
)
# -- API Usage card --
api_usage_display = gr.Markdown(
value="*API Usage (30d):* loading...",
elem_classes=["contain"],
)
# -- Notification banner --
notification_display = gr.Markdown(
value="",
visible=False,
elem_classes=["contain", "notification-banner"],
)
# -- System Loop Monitoring --
with gr.Row(elem_classes=["contain"]):
loop_status = gr.Markdown(
value="*Recent System events* | System Loop: waiting for first run...",
)
force_pulse_btn = gr.Button(
"Force Pulse", variant="secondary", size="sm",
scale=0, min_width=110,
)
with gr.Row(elem_classes=["contain"]):
model_dropdown = gr.Dropdown(
choices=model_choices,
value=current_model,
label="Model",
interactive=True,
allow_custom_value=True,
scale=3,
)
refresh_btn = gr.Button("Refresh", scale=0, min_width=70)
chatbot = gr.Chatbot(
label="Chat",
height=500,
buttons=["copy"],
elem_classes=["contain"],
)
pipeline_status = gr.Markdown(
value="",
visible=False,
elem_classes=["contain"],
)
with gr.Row(elem_classes=["contain"]):
msg_input = gr.MultimodalTextbox(
placeholder="Type a message... (attach files, use mic, or camera)",
show_label=False,
scale=4,
sources=["upload", "microphone"],
)
# -- Helper functions --
def _get_agent(agent_name: str):
"""Get agent by name, falling back to default."""
return registry.get(agent_name) or registry.default
def _refresh_conv_list(agent_name: str) -> list[dict]:
"""Load conversation list for the given agent."""
agent = _get_agent(agent_name)
if not agent:
return []
return agent.db.list_conversations(limit=50, agent_name=agent_name)
# -- Event handlers --
def on_model_change(model_id):
llm.switch_model(model_id)
def on_refresh_models():
models = llm.list_chat_models()
choices = [(m.name, m.id) for m in models]
return gr.update(choices=choices, value=llm.current_model)
def on_agent_switch(agent_name, browser):
"""Switch to a different agent: clear chat, refresh history."""
agent = _get_agent(agent_name)
if agent:
agent.new_conversation()
convs = _refresh_conv_list(agent_name)
browser = dict(browser) if browser else {}
browser["agent_name"] = agent_name
browser["conv_id"] = agent.conv_id if agent else None
return agent_name, [], convs, browser
def on_new_chat(agent_name, browser):
"""Create a new conversation on the active agent."""
agent = _get_agent(agent_name)
if agent:
agent.new_conversation()
convs = _refresh_conv_list(agent_name)
browser = dict(browser) if browser else {}
browser["conv_id"] = agent.conv_id if agent else None
return [], convs, browser
def on_load_conversation(conv_id, agent_name):
"""Load an existing conversation into the chatbot."""
agent = _get_agent(agent_name)
if not agent:
return [], [], {"agent_name": agent_name, "conv_id": None}
messages = agent.load_conversation(conv_id)
chatbot_msgs = _messages_to_chatbot(messages)
convs = _refresh_conv_list(agent_name)
return chatbot_msgs, convs, {"agent_name": agent_name, "conv_id": conv_id}
def on_user_message(message, chat_history, agent_name):
chat_history = chat_history or []
# Extract text and files from MultimodalTextbox
if isinstance(message, dict):
text = message.get("text", "")
files = message.get("files", [])
else:
text = str(message)
files = []
if not text and not files:
yield chat_history, gr.update(value=None), gr.update()
return
# Handle audio files - transcribe them
processed_files = []
for f in files:
fpath = f if isinstance(f, str) else f.get("path", f.get("name", ""))
if fpath and Path(fpath).suffix.lower() in (
".wav",
".mp3",
".ogg",
".webm",
".m4a",
):
try:
from .media import transcribe_audio
transcript = transcribe_audio(fpath)
if transcript:
text = (
f"{text}\n[Voice message]: {transcript}"
if text
else f"[Voice message]: {transcript}"
)
continue
except Exception as e:
log.warning("Audio transcription failed: %s", e)
processed_files.append(fpath)
# Add user message
user_display = text
if processed_files:
file_names = [Path(f).name for f in processed_files]
user_display += f"\n[Attached: {', '.join(file_names)}]"
chat_history = [*chat_history, {"role": "user", "content": user_display}]
yield chat_history, gr.update(value=None), gr.update()
# Stream assistant response
agent = _get_agent(agent_name)
try:
response_text = ""
assistant_added = False
title_refreshed = False
for chunk in agent.respond(text, files=processed_files):
response_text += chunk
if not assistant_added:
chat_history = [
*chat_history,
{"role": "assistant", "content": response_text},
]
assistant_added = True
else:
chat_history[-1] = {"role": "assistant", "content": response_text}
# Refresh conv list on first chunk so sidebar shows
# the updated title immediately (title is set at the
# start of agent.respond before streaming begins).
if not title_refreshed:
title_refreshed = True
convs = _refresh_conv_list(agent_name)
yield chat_history, gr.update(value=None), convs
else:
yield chat_history, gr.update(value=None), gr.update()
# If no response came through, show a fallback
if not response_text:
chat_history = [
*chat_history,
{
"role": "assistant",
"content": "(No response received from model)",
},
]
yield chat_history, gr.update(value=None), gr.update()
except Exception as e:
log.error("Error in agent.respond: %s", e, exc_info=True)
error_msg = f"Sorry, something went wrong: {e}"
# If last message is an empty assistant bubble, replace it
if (
chat_history
and chat_history[-1].get("role") == "assistant"
and not chat_history[-1].get("content")
):
chat_history[-1] = {"role": "assistant", "content": error_msg}
else:
chat_history = [
*chat_history,
{"role": "assistant", "content": error_msg},
]
yield chat_history, gr.update(value=None), gr.update()
# Refresh conversation list after message (title/updated_at may have changed)
convs = _refresh_conv_list(agent_name)
yield chat_history, gr.update(value=None), convs
def on_app_load(browser):
"""Restore session from browser localStorage on page load."""
browser = browser or {}
agent_name = browser.get("agent_name", default_agent_name)
conv_id = browser.get("conv_id")
# Validate agent exists
agent = registry.get(agent_name)
if not agent:
agent_name = default_agent_name
agent = registry.default
chatbot_msgs = []
if conv_id and agent:
messages = agent.load_conversation(conv_id)
chatbot_msgs = _messages_to_chatbot(messages)
convs = _refresh_conv_list(agent_name)
new_browser = {"agent_name": agent_name, "conv_id": conv_id}
return agent_name, agent_name, chatbot_msgs, convs, new_browser
def poll_pipeline_status(agent_name):
"""Pipeline status indicator (no longer used — kept for UI timer)."""
return gr.update(value="", visible=False)
def poll_notifications():
"""Poll the notification bus for pending messages."""
if not notification_bus:
return gr.update(value="", visible=False)
messages = notification_bus.get_pending("gradio")
if not messages:
return gr.update() # No change
# Format notifications as markdown
lines = []
for msg in messages[-5:]: # Show last 5 notifications max
lines.append(f"**Notification:** {msg}")
banner = "\n\n".join(lines)
return gr.update(value=banner, visible=True)
def poll_loop_status():
"""Poll scheduler loop timestamps and format for display."""
if not scheduler:
return gr.update()
from datetime import UTC
from datetime import datetime as _dt
ts = scheduler.get_loop_timestamps()
parts = []
for name, iso_ts in ts.items():
if iso_ts:
try:
dt = _dt.fromisoformat(iso_ts)
delta = _dt.now(UTC) - dt
secs = int(delta.total_seconds())
if secs < 60:
ago = f"{secs}s ago"
elif secs < 3600:
ago = f"{secs // 60}m ago"
else:
ago = f"{secs // 3600}h {(secs % 3600) // 60}m ago"
parts.append(f"{name}: {ago}")
except Exception:
parts.append(f"{name}: {iso_ts}")
if parts:
return gr.update(
value=f"*Recent System events* | System Loop: {' | '.join(parts)}"
)
return gr.update(
value="*Recent System events* | System Loop: waiting for first run..."
)
def poll_api_usage():
"""Poll API usage stats for the dashboard card."""
try:
db = registry.default.db if registry.default else None
if not db:
return gr.update()
summary = db.get_api_usage_summary(30)
total_tokens = summary["total_tokens"]
total_cost = summary["total_cost"]
# Format tokens as human-readable
if total_tokens >= 1_000_000:
tok_str = f"{total_tokens / 1_000_000:.1f}M"
elif total_tokens >= 1_000:
tok_str = f"{total_tokens / 1_000:.1f}K"
else:
tok_str = str(total_tokens)
budget_str = ""
if hasattr(config, "api_budget"):
limit = config.api_budget.monthly_limit
budget_str = f" | Budget: ${total_cost:.2f} / ${limit:.2f}"
label = (
f"*API Usage (30d):* {tok_str} tokens"
f" | ${total_cost:.2f} est.{budget_str}"
)
return gr.update(value=label)
except Exception:
return gr.update()
def on_force_pulse():
if not scheduler:
return gr.update(
value="*Recent System events* | System Loop: scheduler not available"
)
scheduler.force_heartbeat()
scheduler.force_poll()
return gr.update(
value="*Recent System events* | System Loop: **Force pulse sent!**"
)
# -- Wire events --
force_pulse_btn.click(on_force_pulse, None, [loop_status])
model_dropdown.change(on_model_change, [model_dropdown], None)
refresh_btn.click(on_refresh_models, None, [model_dropdown])
agent_selector.change(
on_agent_switch,
[agent_selector, browser_state],
[active_agent_name, chatbot, conv_list_state, browser_state],
)
new_chat_btn.click(
on_new_chat,
[active_agent_name, browser_state],
[chatbot, conv_list_state, browser_state],
)
msg_input.submit(
on_user_message,
[msg_input, chatbot, active_agent_name],
[chatbot, msg_input, conv_list_state],
)
# Restore session on page load
app.load(
on_app_load,
inputs=[browser_state],
outputs=[active_agent_name, agent_selector, chatbot, conv_list_state, browser_state],
)
# Pipeline status polling timer (every 10 seconds)
status_timer = gr.Timer(10)
status_timer.tick(poll_pipeline_status, [active_agent_name], [pipeline_status])
# Notification polling timer (every 10 seconds)
if notification_bus:
notification_bus.subscribe("gradio", lambda msg, cat: None) # Register listener
timer = gr.Timer(10)
timer.tick(poll_notifications, None, [notification_display])
# System loop status polling timer (every 30 seconds)
if scheduler:
loop_timer = gr.Timer(30)
loop_timer.tick(poll_loop_status, None, [loop_status])
# API usage polling timer (every 60 seconds)
api_timer = gr.Timer(60)
api_timer.tick(poll_api_usage, None, [api_usage_display])
return app