CheddahBot/cheddahbot/agent.py

405 lines
15 KiB
Python

"""Core agent loop - the brain of CheddahBot."""
from __future__ import annotations
import base64
import json
import logging
import threading
import uuid
from collections.abc import Generator
from pathlib import Path
from .config import AgentConfig, Config
from .db import Database
from .llm import LLMAdapter
from .router import build_system_prompt, format_messages_for_llm
log = logging.getLogger(__name__)
MAX_TOOL_ITERATIONS = 15
_IMAGE_MIME = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
}
def _build_file_content_parts(files: list[str]) -> list[dict]:
"""Encode file attachments as content parts for the LLM message.
Images → base64 image_url parts; text files → inline text parts.
"""
parts: list[dict] = []
for file_path in files:
p = Path(file_path).resolve()
if not p.exists():
parts.append({"type": "text", "text": f"[File not found: {file_path}]"})
continue
suffix = p.suffix.lower()
if suffix in _IMAGE_MIME:
try:
data = base64.b64encode(p.read_bytes()).decode("utf-8")
mime = _IMAGE_MIME[suffix]
parts.append(
{
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{data}"},
}
)
except Exception as e:
parts.append({"type": "text", "text": f"[Error reading image {p.name}: {e}]"})
else:
try:
text = p.read_text(encoding="utf-8", errors="replace")
if len(text) > 10000:
text = text[:10000] + "\n... (truncated)"
parts.append({"type": "text", "text": f"[File: {p.name}]\n{text}"})
except Exception as e:
parts.append({"type": "text", "text": f"[Error reading {p.name}: {e}]"})
return parts
class Agent:
def __init__(
self,
config: Config,
db: Database,
llm: LLMAdapter,
agent_config: AgentConfig | None = None,
):
self.config = config
self.db = db
self.llm = llm
self.agent_config = agent_config or AgentConfig()
self.conv_id: str | None = None
self._memory = None # set by app after memory system init
self._tools = None # set by app after tool system init
self._skills_registry = None # set by app after skills init
@property
def name(self) -> str:
return self.agent_config.name
def set_memory(self, memory):
self._memory = memory
def set_tools(self, tools):
self._tools = tools
def set_skills_registry(self, registry):
self._skills_registry = registry
def ensure_conversation(self) -> str:
if not self.conv_id:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id, agent_name=self.name)
return self.conv_id
def new_conversation(self):
"""Reset conversation state. DB row is created lazily by ensure_conversation()."""
self.conv_id = None
def load_conversation(self, conv_id: str) -> list[dict]:
"""Load an existing conversation by ID. Returns message list."""
self.conv_id = conv_id
return self.db.get_messages(conv_id)
def respond(self, user_input: str, files: list | None = None) -> Generator[str, None, None]:
"""Process user input and yield streaming response text."""
conv_id = self.ensure_conversation()
# Auto-title early so it's set even if the generator is closed mid-stream
self._maybe_set_title(conv_id, user_input)
# Store user message
self.db.add_message(conv_id, "user", user_input)
# Build system prompt
memory_context = ""
if self._memory:
memory_context = self._memory.get_context(user_input)
# Apply tool whitelist from agent config
tool_filter = self.agent_config.tools
tools_schema = []
tools_description = ""
if self._tools:
tools_schema = self._tools.get_tools_schema(filter_names=tool_filter)
tools_description = self._tools.get_tools_description(filter_names=tool_filter)
skills_context = ""
if self._skills_registry:
skills_context = self._skills_registry.get_prompt_section(self.name)
# Use agent-specific personality file if configured
identity_dir = self.config.identity_dir
personality_file = self.agent_config.personality_file
if personality_file:
pf = Path(personality_file)
if pf.exists():
identity_dir = pf.parent
system_prompt = build_system_prompt(
identity_dir=identity_dir,
memory_context=memory_context,
tools_description=tools_description,
skills_context=skills_context,
)
# Load conversation history
history = self.db.get_messages(conv_id, limit=self.config.memory.max_context_messages)
messages = format_messages_for_llm(
system_prompt, history, self.config.memory.max_context_messages
)
# If files are attached, replace the last user message with multipart content
if files:
file_parts = _build_file_content_parts(files)
if file_parts:
# Find the last user message and convert to multipart
for i in range(len(messages) - 1, -1, -1):
if messages[i]["role"] == "user":
text_content = messages[i]["content"]
messages[i]["content"] = [
{"type": "text", "text": text_content},
*file_parts,
]
break
# Agent loop: LLM call → tool execution → repeat
seen_tool_calls: set[str] = set() # track (name, args_json) to prevent duplicates
for _iteration in range(MAX_TOOL_ITERATIONS):
full_response = ""
tool_calls = []
for chunk in self.llm.chat(messages, tools=tools_schema or None, stream=True):
if chunk["type"] == "text":
full_response += chunk["content"]
yield chunk["content"]
elif chunk["type"] == "tool_use":
tool_calls.append(chunk)
elif chunk["type"] == "usage":
if self.db:
self.db.log_api_usage(
model=chunk["model"],
provider="openrouter",
prompt_tokens=chunk["prompt_tokens"],
completion_tokens=chunk["completion_tokens"],
total_tokens=chunk["total_tokens"],
estimated_cost=chunk["estimated_cost"],
conv_id=conv_id,
agent_name=self.agent_config.name if self.agent_config else "default",
)
# If no tool calls, we're done
if not tool_calls:
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
break
# Filter out duplicate tool calls
unique_tool_calls = []
for tc in tool_calls:
call_key = f"{tc['name']}:{json.dumps(tc.get('input', {}), sort_keys=True)}"
if call_key in seen_tool_calls:
log.warning("Skipping duplicate tool call: %s", tc["name"])
continue
seen_tool_calls.add(call_key)
unique_tool_calls.append(tc)
if not unique_tool_calls:
# All tool calls were duplicates — force the model to respond
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
else:
yield "(I already have the information needed to answer.)"
break
# Store assistant message with tool calls
self.db.add_message(
conv_id,
"assistant",
full_response,
tool_calls=[{"name": tc["name"], "input": tc["input"]} for tc in unique_tool_calls],
model=self.llm.current_model,
)
# Execute tools
if self._tools:
# Resolve IDs once so assistant message and tool responses match
resolved_ids = [
tc.get("id") or f"call_{tc['name']}_{i}"
for i, tc in enumerate(unique_tool_calls)
]
# Build OpenAI-format assistant message with tool_calls
openai_tool_calls = [
{
"id": resolved_ids[i],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("input", {})),
},
}
for i, tc in enumerate(unique_tool_calls)
]
messages.append(
{
"role": "assistant",
"content": full_response or None,
"tool_calls": openai_tool_calls,
}
)
for i, tc in enumerate(unique_tool_calls):
args = tc.get("input", {})
# Build a brief summary of the args for the user
arg_summary = ", ".join(
f"{k}={repr(v)[:60]}" for k, v in args.items()
)
if arg_summary:
yield f"\n\n*Calling {tc['name']}({arg_summary})...*\n"
else:
yield f"\n\n*Calling {tc['name']}...*\n"
try:
result = self._tools.execute(tc["name"], args)
except Exception as e:
result = f"Tool error: {e}"
log.info("Tool %s result: %s", tc["name"], result[:500])
self.db.add_message(conv_id, "tool", result, tool_result=tc["name"])
messages.append(
{
"role": "tool",
"tool_call_id": resolved_ids[i],
"content": result,
}
)
else:
# No tool system configured - just mention tool was requested
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
for tc in unique_tool_calls:
yield f"\n(Tool requested: {tc['name']} - tool system not yet initialized)\n"
break
else:
yield "\n(Reached maximum tool iterations)"
# Check if memory flush is needed
if self._memory:
msg_count = self.db.count_messages(conv_id)
if msg_count > self.config.memory.flush_threshold:
self._memory.auto_flush(conv_id)
def _maybe_set_title(self, conv_id: str, user_input: str):
"""Set conversation title from first user message if still 'New Chat'.
Sets a quick truncated fallback immediately, then fires a background
thread to generate a proper 5-8 word LLM summary.
"""
try:
current_title = self.db.get_conversation_title(conv_id)
if current_title and current_title != "New Chat":
return
# Immediate fallback: first line, truncated
fallback = user_input.split("\n", 1)[0].strip()
if len(fallback) > 50:
fallback = fallback[:47] + "..."
if fallback:
self.db.update_conversation_title(conv_id, fallback)
# Fire background LLM call to generate a better title
threading.Thread(
target=self._generate_title,
args=(conv_id, user_input),
daemon=True,
).start()
except Exception as e:
log.warning("Failed to set conversation title: %s", e)
def _generate_title(self, conv_id: str, user_input: str):
"""Background: ask the LLM for a 5-8 word conversation title."""
try:
prompt = user_input[:500] # cap input to keep it cheap
messages = [
{
"role": "system",
"content": (
"Generate a short 5-8 word title summarizing this conversation opener. "
"Reply with ONLY the title — no quotes, no punctuation at the end, no explanation."
),
},
{"role": "user", "content": prompt},
]
parts = []
for chunk in self.llm.chat(messages, tools=None, stream=False):
if chunk.get("type") == "text":
parts.append(chunk["content"])
title = "".join(parts).strip().strip('"').strip("'")
if title and len(title) <= 60:
self.db.update_conversation_title(conv_id, title)
log.debug("Generated conversation title: %s", title)
except Exception as e:
log.warning("Background title generation failed: %s", e)
def respond_to_prompt(self, prompt: str) -> str:
"""Non-streaming response for scheduled tasks / internal use."""
result_parts = []
for chunk in self.respond(prompt):
result_parts.append(chunk)
return "".join(result_parts)
def execute_task(
self,
prompt: str,
system_context: str = "",
tools: str = "",
model: str = "",
skip_permissions: bool = False,
) -> str:
"""Execute a task using the execution brain (Claude Code CLI).
Used by heartbeat, scheduler, and the delegate tool.
Logs the result to daily memory if available.
Args:
tools: Override Claude Code tool list (e.g. "Bash,Read,WebSearch").
model: Override the CLI model (e.g. "claude-sonnet-4.5").
skip_permissions: If True, run CLI with --dangerously-skip-permissions.
"""
log.info("Execution brain task: %s", prompt[:100])
kwargs: dict = {
"system_prompt": system_context,
"timeout": self.config.timeouts.execution_brain,
}
if tools:
kwargs["tools"] = tools
if model:
kwargs["model"] = model
if skip_permissions:
kwargs["skip_permissions"] = True
result = self.llm.execute(prompt, **kwargs)
# Log to daily memory
if self._memory:
try:
self._memory.log_daily(f"[Execution] {prompt[:200]}\n-> {result[:500]}")
except Exception as e:
log.warning("Failed to log execution to memory: %s", e)
return result