CheddahBot/cheddahbot/agent.py

308 lines
11 KiB
Python

"""Core agent loop - the brain of CheddahBot."""
from __future__ import annotations
import base64
import json
import logging
import uuid
from collections.abc import Generator
from pathlib import Path
from .config import AgentConfig, Config
from .db import Database
from .llm import LLMAdapter
from .router import build_system_prompt, format_messages_for_llm
log = logging.getLogger(__name__)
MAX_TOOL_ITERATIONS = 5
_IMAGE_MIME = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
}
def _build_file_content_parts(files: list[str]) -> list[dict]:
"""Encode file attachments as content parts for the LLM message.
Images → base64 image_url parts; text files → inline text parts.
"""
parts: list[dict] = []
for file_path in files:
p = Path(file_path).resolve()
if not p.exists():
parts.append({"type": "text", "text": f"[File not found: {file_path}]"})
continue
suffix = p.suffix.lower()
if suffix in _IMAGE_MIME:
try:
data = base64.b64encode(p.read_bytes()).decode("utf-8")
mime = _IMAGE_MIME[suffix]
parts.append({
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{data}"},
})
except Exception as e:
parts.append({"type": "text", "text": f"[Error reading image {p.name}: {e}]"})
else:
try:
text = p.read_text(encoding="utf-8", errors="replace")
if len(text) > 10000:
text = text[:10000] + "\n... (truncated)"
parts.append({"type": "text", "text": f"[File: {p.name}]\n{text}"})
except Exception as e:
parts.append({"type": "text", "text": f"[Error reading {p.name}: {e}]"})
return parts
class Agent:
def __init__(
self,
config: Config,
db: Database,
llm: LLMAdapter,
agent_config: AgentConfig | None = None,
):
self.config = config
self.db = db
self.llm = llm
self.agent_config = agent_config or AgentConfig()
self.conv_id: str | None = None
self._memory = None # set by app after memory system init
self._tools = None # set by app after tool system init
self._skills_registry = None # set by app after skills init
@property
def name(self) -> str:
return self.agent_config.name
def set_memory(self, memory):
self._memory = memory
def set_tools(self, tools):
self._tools = tools
def set_skills_registry(self, registry):
self._skills_registry = registry
def ensure_conversation(self) -> str:
if not self.conv_id:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def new_conversation(self) -> str:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def respond(self, user_input: str, files: list | None = None) -> Generator[str, None, None]:
"""Process user input and yield streaming response text."""
conv_id = self.ensure_conversation()
# Store user message
self.db.add_message(conv_id, "user", user_input)
# Build system prompt
memory_context = ""
if self._memory:
memory_context = self._memory.get_context(user_input)
# Apply tool whitelist from agent config
tool_filter = self.agent_config.tools
tools_schema = []
tools_description = ""
if self._tools:
tools_schema = self._tools.get_tools_schema(filter_names=tool_filter)
tools_description = self._tools.get_tools_description(filter_names=tool_filter)
skills_context = ""
if self._skills_registry:
skills_context = self._skills_registry.get_prompt_section(self.name)
# Use agent-specific personality file if configured
identity_dir = self.config.identity_dir
personality_file = self.agent_config.personality_file
if personality_file:
pf = Path(personality_file)
if pf.exists():
identity_dir = pf.parent
system_prompt = build_system_prompt(
identity_dir=identity_dir,
memory_context=memory_context,
tools_description=tools_description,
skills_context=skills_context,
)
# Load conversation history
history = self.db.get_messages(conv_id, limit=self.config.memory.max_context_messages)
messages = format_messages_for_llm(
system_prompt, history, self.config.memory.max_context_messages
)
# If files are attached, replace the last user message with multipart content
if files:
file_parts = _build_file_content_parts(files)
if file_parts:
# Find the last user message and convert to multipart
for i in range(len(messages) - 1, -1, -1):
if messages[i]["role"] == "user":
text_content = messages[i]["content"]
messages[i]["content"] = [
{"type": "text", "text": text_content},
*file_parts,
]
break
# Agent loop: LLM call → tool execution → repeat
seen_tool_calls: set[str] = set() # track (name, args_json) to prevent duplicates
for _iteration in range(MAX_TOOL_ITERATIONS):
full_response = ""
tool_calls = []
for chunk in self.llm.chat(messages, tools=tools_schema or None, stream=True):
if chunk["type"] == "text":
full_response += chunk["content"]
yield chunk["content"]
elif chunk["type"] == "tool_use":
tool_calls.append(chunk)
# If no tool calls, we're done
if not tool_calls:
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
break
# Filter out duplicate tool calls
unique_tool_calls = []
for tc in tool_calls:
call_key = f"{tc['name']}:{json.dumps(tc.get('input', {}), sort_keys=True)}"
if call_key in seen_tool_calls:
log.warning("Skipping duplicate tool call: %s", tc["name"])
continue
seen_tool_calls.add(call_key)
unique_tool_calls.append(tc)
if not unique_tool_calls:
# All tool calls were duplicates — force the model to respond
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
else:
yield "(I already have the information needed to answer.)"
break
# Store assistant message with tool calls
self.db.add_message(
conv_id,
"assistant",
full_response,
tool_calls=[{"name": tc["name"], "input": tc["input"]} for tc in unique_tool_calls],
model=self.llm.current_model,
)
# Execute tools
if self._tools:
# Build OpenAI-format assistant message with tool_calls
openai_tool_calls = [
{
"id": tc.get("id", f"call_{tc['name']}_{i}"),
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("input", {})),
},
}
for i, tc in enumerate(unique_tool_calls)
]
messages.append({
"role": "assistant",
"content": full_response or None,
"tool_calls": openai_tool_calls,
})
for tc in unique_tool_calls:
yield f"\n\n**Using tool: {tc['name']}**\n"
try:
result = self._tools.execute(tc["name"], tc.get("input", {}))
except Exception as e:
result = f"Tool error: {e}"
yield f"```\n{result[:2000]}\n```\n\n"
self.db.add_message(conv_id, "tool", result, tool_result=tc["name"])
messages.append({
"role": "tool",
"tool_call_id": tc.get("id", f"call_{tc['name']}"),
"content": result,
})
else:
# No tool system configured - just mention tool was requested
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
for tc in unique_tool_calls:
yield f"\n(Tool requested: {tc['name']} - tool system not yet initialized)\n"
break
else:
yield "\n(Reached maximum tool iterations)"
# Check if memory flush is needed
if self._memory:
msg_count = self.db.count_messages(conv_id)
if msg_count > self.config.memory.flush_threshold:
self._memory.auto_flush(conv_id)
def respond_to_prompt(self, prompt: str) -> str:
"""Non-streaming response for scheduled tasks / internal use."""
result_parts = []
for chunk in self.respond(prompt):
result_parts.append(chunk)
return "".join(result_parts)
def execute_task(
self,
prompt: str,
system_context: str = "",
tools: str = "",
model: str = "",
) -> str:
"""Execute a task using the execution brain (Claude Code CLI).
Used by heartbeat, scheduler, and the delegate tool.
Logs the result to daily memory if available.
Args:
tools: Override Claude Code tool list (e.g. "Bash,Read,WebSearch").
model: Override the CLI model (e.g. "claude-sonnet-4.5").
"""
log.info("Execution brain task: %s", prompt[:100])
kwargs: dict = {"system_prompt": system_context}
if tools:
kwargs["tools"] = tools
if model:
kwargs["model"] = model
result = self.llm.execute(prompt, **kwargs)
# Log to daily memory
if self._memory:
try:
self._memory.log_daily(f"[Execution] {prompt[:200]}\n{result[:500]}")
except Exception as e:
log.warning("Failed to log execution to memory: %s", e)
return result