CheddahBot/cheddahbot/agent.py

205 lines
7.3 KiB
Python

"""Core agent loop - the brain of CheddahBot."""
from __future__ import annotations
import json
import logging
import uuid
from collections.abc import Generator
from .config import Config
from .db import Database
from .llm import LLMAdapter
from .router import build_system_prompt, format_messages_for_llm
log = logging.getLogger(__name__)
MAX_TOOL_ITERATIONS = 5
class Agent:
def __init__(self, config: Config, db: Database, llm: LLMAdapter):
self.config = config
self.db = db
self.llm = llm
self.conv_id: str | None = None
self._memory = None # set by app after memory system init
self._tools = None # set by app after tool system init
def set_memory(self, memory):
self._memory = memory
def set_tools(self, tools):
self._tools = tools
def ensure_conversation(self) -> str:
if not self.conv_id:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def new_conversation(self) -> str:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def respond(self, user_input: str, files: list | None = None) -> Generator[str, None, None]:
"""Process user input and yield streaming response text."""
conv_id = self.ensure_conversation()
# Store user message
self.db.add_message(conv_id, "user", user_input)
# Build system prompt
memory_context = ""
if self._memory:
memory_context = self._memory.get_context(user_input)
tools_schema = []
tools_description = ""
if self._tools:
tools_schema = self._tools.get_tools_schema()
tools_description = self._tools.get_tools_description()
system_prompt = build_system_prompt(
identity_dir=self.config.identity_dir,
memory_context=memory_context,
tools_description=tools_description,
)
# Load conversation history
history = self.db.get_messages(conv_id, limit=self.config.memory.max_context_messages)
messages = format_messages_for_llm(
system_prompt, history, self.config.memory.max_context_messages
)
# Agent loop: LLM call → tool execution → repeat
seen_tool_calls: set[str] = set() # track (name, args_json) to prevent duplicates
for _iteration in range(MAX_TOOL_ITERATIONS):
full_response = ""
tool_calls = []
for chunk in self.llm.chat(messages, tools=tools_schema or None, stream=True):
if chunk["type"] == "text":
full_response += chunk["content"]
yield chunk["content"]
elif chunk["type"] == "tool_use":
tool_calls.append(chunk)
# If no tool calls, we're done
if not tool_calls:
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
break
# Filter out duplicate tool calls
unique_tool_calls = []
for tc in tool_calls:
call_key = f"{tc['name']}:{json.dumps(tc.get('input', {}), sort_keys=True)}"
if call_key in seen_tool_calls:
log.warning("Skipping duplicate tool call: %s", tc["name"])
continue
seen_tool_calls.add(call_key)
unique_tool_calls.append(tc)
if not unique_tool_calls:
# All tool calls were duplicates — force the model to respond
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
else:
yield "(I already have the information needed to answer.)"
break
# Store assistant message with tool calls
self.db.add_message(
conv_id,
"assistant",
full_response,
tool_calls=[{"name": tc["name"], "input": tc["input"]} for tc in unique_tool_calls],
model=self.llm.current_model,
)
# Execute tools
if self._tools:
messages.append(
{
"role": "assistant",
"content": full_response or "I'll use some tools to help with that.",
}
)
for tc in unique_tool_calls:
yield f"\n\n**Using tool: {tc['name']}**\n"
try:
result = self._tools.execute(tc["name"], tc.get("input", {}))
except Exception as e:
result = f"Tool error: {e}"
yield f"```\n{result[:2000]}\n```\n\n"
self.db.add_message(conv_id, "tool", result, tool_result=tc["name"])
messages.append(
{"role": "user", "content": f'[Tool "{tc["name"]}" result]\n{result}'}
)
else:
# No tool system configured - just mention tool was requested
if full_response:
self.db.add_message(
conv_id, "assistant", full_response, model=self.llm.current_model
)
for tc in unique_tool_calls:
yield f"\n(Tool requested: {tc['name']} - tool system not yet initialized)\n"
break
else:
yield "\n(Reached maximum tool iterations)"
# Check if memory flush is needed
if self._memory:
msg_count = self.db.count_messages(conv_id)
if msg_count > self.config.memory.flush_threshold:
self._memory.auto_flush(conv_id)
def respond_to_prompt(self, prompt: str) -> str:
"""Non-streaming response for scheduled tasks / internal use."""
result_parts = []
for chunk in self.respond(prompt):
result_parts.append(chunk)
return "".join(result_parts)
def execute_task(
self,
prompt: str,
system_context: str = "",
tools: str = "",
model: str = "",
) -> str:
"""Execute a task using the execution brain (Claude Code CLI).
Used by heartbeat, scheduler, and the delegate tool.
Logs the result to daily memory if available.
Args:
tools: Override Claude Code tool list (e.g. "Bash,Read,WebSearch").
model: Override the CLI model (e.g. "claude-sonnet-4.5").
"""
log.info("Execution brain task: %s", prompt[:100])
kwargs: dict = {"system_prompt": system_context}
if tools:
kwargs["tools"] = tools
if model:
kwargs["model"] = model
result = self.llm.execute(prompt, **kwargs)
# Log to daily memory
if self._memory:
try:
self._memory.log_daily(f"[Execution] {prompt[:200]}\n{result[:500]}")
except Exception as e:
log.warning("Failed to log execution to memory: %s", e)
return result