502 lines
19 KiB
Python
502 lines
19 KiB
Python
"""Two-brain LLM adapter.
|
|
|
|
Chat Brain:
|
|
- OpenRouter / Ollama / LM Studio (OpenAI-compatible APIs)
|
|
- Full control over system prompt — Cheddah personality works here
|
|
- Claude models available via OpenRouter mapping
|
|
|
|
Execution Brain:
|
|
- Claude Code CLI (subprocess)
|
|
- Used for heartbeat, scheduled tasks, delegated system-level work
|
|
- Claude's built-in tools (Bash, Read, Edit, etc.) are a feature here
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from collections.abc import Generator
|
|
from dataclasses import dataclass
|
|
|
|
import httpx
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Pricing per million tokens: (input_cost, output_cost) in USD
|
|
MODEL_PRICING: dict[str, tuple[float, float]] = {
|
|
"anthropic/claude-sonnet": (3.00, 15.00),
|
|
"anthropic/claude-opus": (5.00, 25.00),
|
|
"anthropic/claude-haiku": (0.80, 4.00),
|
|
"x-ai/grok-4.1-fast": (0.20, 0.50),
|
|
"google/gemini-3-flash": (0.50, 3.00),
|
|
"google/gemini-2.5-flash": (0.15, 0.60),
|
|
"openai/gpt-4o-mini": (0.15, 0.60),
|
|
"openai/gpt-5-nano": (0.10, 0.40),
|
|
"deepseek/deepseek-v3": (0.24, 0.38),
|
|
"minimax/minimax-m2.5": (0.30, 1.20),
|
|
"moonshotai/kimi-k2.5": (0.45, 2.20),
|
|
}
|
|
|
|
|
|
def _estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
|
|
"""Estimate cost in USD using prefix matching against MODEL_PRICING."""
|
|
for prefix, (input_rate, output_rate) in MODEL_PRICING.items():
|
|
if model.startswith(prefix):
|
|
return (prompt_tokens * input_rate + completion_tokens * output_rate) / 1_000_000
|
|
return 0.0
|
|
|
|
|
|
@dataclass
|
|
class ModelInfo:
|
|
id: str
|
|
name: str
|
|
provider: str # "openrouter" | "ollama" | "lmstudio"
|
|
context_length: int | None = None
|
|
|
|
|
|
# Claude model IDs → OpenRouter equivalents (for chat dropdown)
|
|
CLAUDE_OPENROUTER_MAP = {
|
|
"claude-sonnet-4.5": "anthropic/claude-sonnet-4.5",
|
|
"claude-opus-4.6": "anthropic/claude-opus-4.6",
|
|
"claude-haiku-4.5": "anthropic/claude-haiku-4.5",
|
|
}
|
|
|
|
|
|
def _provider_for(model_id: str, openrouter_key: str) -> str:
|
|
"""Determine which OpenAI-compatible provider to route a chat model to."""
|
|
if model_id.startswith("local/ollama/"):
|
|
return "ollama"
|
|
if model_id.startswith("local/lmstudio/"):
|
|
return "lmstudio"
|
|
# Everything else goes through OpenRouter (including mapped Claude models)
|
|
return "openrouter"
|
|
|
|
|
|
class LLMAdapter:
|
|
def __init__(
|
|
self,
|
|
default_model: str = "claude-sonnet-4.5",
|
|
openrouter_key: str = "",
|
|
ollama_url: str = "http://localhost:11434",
|
|
lmstudio_url: str = "http://localhost:1234",
|
|
):
|
|
self.current_model = default_model
|
|
self.openrouter_key = openrouter_key
|
|
self.ollama_url = ollama_url.rstrip("/")
|
|
self.lmstudio_url = lmstudio_url.rstrip("/")
|
|
self._openai_mod = None # lazy import
|
|
|
|
@property
|
|
def provider(self) -> str:
|
|
return _provider_for(self.current_model, self.openrouter_key)
|
|
|
|
def switch_model(self, model_id: str):
|
|
self.current_model = model_id
|
|
log.info("Switched chat model to: %s (provider: %s)", model_id, self.provider)
|
|
|
|
# ── Chat Brain (OpenAI-compatible only) ──
|
|
|
|
def chat(
|
|
self,
|
|
messages: list[dict],
|
|
tools: list[dict] | None = None,
|
|
stream: bool = True,
|
|
) -> Generator[dict, None, None]:
|
|
"""Chat brain: routes through OpenAI-compatible APIs only.
|
|
|
|
Yields chunks: {"type": "text", "content": "..."} or {"type": "tool_use", ...}.
|
|
"""
|
|
provider = self.provider
|
|
model_id = self._resolve_model_id(provider)
|
|
|
|
# If a Claude model ID was selected, map it to OpenRouter equivalent
|
|
if model_id in CLAUDE_OPENROUTER_MAP:
|
|
if self.openrouter_key:
|
|
model_id = CLAUDE_OPENROUTER_MAP[model_id]
|
|
provider = "openrouter"
|
|
else:
|
|
yield {
|
|
"type": "text",
|
|
"content": (
|
|
"To chat with Claude models, you need an OpenRouter API key "
|
|
"(set OPENROUTER_API_KEY in .env). Alternatively, select a local "
|
|
"model from Ollama or LM Studio."
|
|
),
|
|
}
|
|
return
|
|
|
|
# Check if provider is available
|
|
if provider == "openrouter" and not self.openrouter_key:
|
|
yield {
|
|
"type": "text",
|
|
"content": (
|
|
"No API key configured. To use cloud models:\n"
|
|
"1. Get an OpenRouter API key at https://openrouter.ai/keys\n"
|
|
"2. Set OPENROUTER_API_KEY in your .env file\n\n"
|
|
"Or install Ollama (free, local) and pull a model:\n"
|
|
" ollama pull llama3.2"
|
|
),
|
|
}
|
|
return
|
|
|
|
base_url, api_key = self._resolve_endpoint(provider)
|
|
yield from self._chat_openai_sdk(messages, tools, stream, base_url, api_key, model_id)
|
|
|
|
# ── Execution Brain (Claude Code CLI) ──
|
|
|
|
def execute(
|
|
self,
|
|
prompt: str,
|
|
system_prompt: str = "",
|
|
working_dir: str | None = None,
|
|
tools: str = "Bash,Read,Edit,Write,Glob,Grep",
|
|
model: str | None = None,
|
|
skip_permissions: bool = False,
|
|
) -> str:
|
|
"""Execution brain: calls Claude Code CLI with full tool access.
|
|
|
|
Used for heartbeat checks, scheduled tasks, and delegated complex tasks.
|
|
Returns the full result string (non-streaming).
|
|
|
|
Args:
|
|
tools: Comma-separated Claude Code tool names (default: standard set).
|
|
model: Override the CLI model (e.g. "claude-sonnet-4.5").
|
|
skip_permissions: If True, append --dangerously-skip-permissions to
|
|
the CLI invocation (used for automated pipelines).
|
|
"""
|
|
claude_bin = shutil.which("claude")
|
|
if not claude_bin:
|
|
return (
|
|
"Error: `claude` CLI not found in PATH. "
|
|
"Install Claude Code: npm install -g @anthropic-ai/claude-code"
|
|
)
|
|
|
|
# Pipe prompt through stdin to avoid Windows 8191-char command-line limit.
|
|
cmd = [
|
|
claude_bin,
|
|
"-p",
|
|
"--output-format",
|
|
"json",
|
|
"--tools",
|
|
tools,
|
|
"--allowedTools",
|
|
tools,
|
|
]
|
|
if model:
|
|
cmd.extend(["--model", model])
|
|
if system_prompt:
|
|
cmd.extend(["--system-prompt", system_prompt])
|
|
if skip_permissions:
|
|
cmd.append("--dangerously-skip-permissions")
|
|
|
|
log.debug("Execution brain cmd: %s", " ".join(cmd[:6]) + "...")
|
|
|
|
# Strip CLAUDECODE env var so the subprocess doesn't think it's nested
|
|
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
|
|
|
|
try:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
shell=(sys.platform == "win32"),
|
|
cwd=working_dir,
|
|
env=env,
|
|
)
|
|
except FileNotFoundError:
|
|
return (
|
|
"Error: `claude` CLI not found. "
|
|
"Install Claude Code: npm install -g @anthropic-ai/claude-code"
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = proc.communicate(input=prompt, timeout=900)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
return "Error: Claude Code execution timed out after 15 minutes."
|
|
|
|
if proc.returncode != 0:
|
|
return f"Execution error: {stderr or 'unknown error'}"
|
|
|
|
try:
|
|
result = json.loads(stdout)
|
|
text = result.get("result", "")
|
|
if text:
|
|
return text
|
|
if result.get("is_error"):
|
|
return f"Execution error: {result.get('result', 'unknown')}"
|
|
return "(No output from execution brain)"
|
|
except json.JSONDecodeError:
|
|
return stdout.strip() if stdout.strip() else "(No output from execution brain)"
|
|
|
|
def is_execution_brain_available(self) -> bool:
|
|
"""Check if the Claude Code CLI is available."""
|
|
return shutil.which("claude") is not None
|
|
|
|
# ── OpenAI-compatible SDK (OpenRouter / Ollama / LM Studio) ──
|
|
|
|
def _chat_openai_sdk(
|
|
self,
|
|
messages: list[dict],
|
|
tools: list[dict] | None,
|
|
stream: bool,
|
|
base_url: str,
|
|
api_key: str,
|
|
model_id: str,
|
|
) -> Generator[dict, None, None]:
|
|
openai = self._get_openai()
|
|
client = openai.OpenAI(base_url=base_url, api_key=api_key)
|
|
|
|
kwargs: dict = {
|
|
"model": model_id,
|
|
"messages": messages,
|
|
"stream": stream,
|
|
}
|
|
if stream:
|
|
kwargs["stream_options"] = {"include_usage": True}
|
|
if tools:
|
|
kwargs["tools"] = tools
|
|
|
|
max_retries = 2
|
|
has_yielded = False
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
if stream:
|
|
response = client.chat.completions.create(**kwargs)
|
|
tool_calls_accum: dict[int, dict] = {}
|
|
stream_usage = None
|
|
for chunk in response:
|
|
# Capture usage from the final stream chunk
|
|
if hasattr(chunk, "usage") and chunk.usage:
|
|
stream_usage = chunk.usage
|
|
delta = chunk.choices[0].delta if chunk.choices else None
|
|
if not delta:
|
|
continue
|
|
if delta.content:
|
|
has_yielded = True
|
|
yield {"type": "text", "content": delta.content}
|
|
if delta.tool_calls:
|
|
for tc in delta.tool_calls:
|
|
idx = tc.index
|
|
if idx not in tool_calls_accum:
|
|
tool_calls_accum[idx] = {
|
|
"id": tc.id or "",
|
|
"name": tc.function.name
|
|
if tc.function and tc.function.name
|
|
else "",
|
|
"arguments": "",
|
|
}
|
|
if tc.function and tc.function.arguments:
|
|
tool_calls_accum[idx]["arguments"] += tc.function.arguments
|
|
if tc.id:
|
|
tool_calls_accum[idx]["id"] = tc.id
|
|
|
|
for _, tc in sorted(tool_calls_accum.items()):
|
|
try:
|
|
args = json.loads(tc["arguments"])
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
yield {
|
|
"type": "tool_use",
|
|
"id": tc["id"],
|
|
"name": tc["name"],
|
|
"input": args,
|
|
}
|
|
|
|
# Yield usage chunk if available
|
|
if stream_usage:
|
|
pt = getattr(stream_usage, "prompt_tokens", 0) or 0
|
|
ct = getattr(stream_usage, "completion_tokens", 0) or 0
|
|
yield {
|
|
"type": "usage",
|
|
"model": model_id,
|
|
"prompt_tokens": pt,
|
|
"completion_tokens": ct,
|
|
"total_tokens": pt + ct,
|
|
"estimated_cost": _estimate_cost(model_id, pt, ct),
|
|
}
|
|
else:
|
|
response = client.chat.completions.create(**kwargs)
|
|
msg = response.choices[0].message
|
|
if msg.content:
|
|
has_yielded = True
|
|
yield {"type": "text", "content": msg.content}
|
|
if msg.tool_calls:
|
|
for tc in msg.tool_calls:
|
|
try:
|
|
args = json.loads(tc.function.arguments)
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
yield {
|
|
"type": "tool_use",
|
|
"id": tc.id,
|
|
"name": tc.function.name,
|
|
"input": args,
|
|
}
|
|
|
|
# Yield usage chunk for non-streaming
|
|
if hasattr(response, "usage") and response.usage:
|
|
pt = response.usage.prompt_tokens or 0
|
|
ct = response.usage.completion_tokens or 0
|
|
yield {
|
|
"type": "usage",
|
|
"model": model_id,
|
|
"prompt_tokens": pt,
|
|
"completion_tokens": ct,
|
|
"total_tokens": pt + ct,
|
|
"estimated_cost": _estimate_cost(model_id, pt, ct),
|
|
}
|
|
# Success — break out of retry loop
|
|
return
|
|
|
|
except Exception as e:
|
|
if not has_yielded and attempt < max_retries and _is_retryable_error(e):
|
|
wait = 2**attempt
|
|
log.warning(
|
|
"Retryable LLM error (attempt %d/%d), retrying in %ds: %s",
|
|
attempt + 1,
|
|
max_retries + 1,
|
|
wait,
|
|
e,
|
|
)
|
|
time.sleep(wait)
|
|
continue
|
|
yield {"type": "text", "content": _friendly_error(e, self.provider)}
|
|
|
|
# ── Helpers ──
|
|
|
|
def _resolve_endpoint(self, provider: str) -> tuple[str, str]:
|
|
if provider == "openrouter": # noqa: SIM116
|
|
return "https://openrouter.ai/api/v1", self.openrouter_key or "sk-placeholder"
|
|
elif provider == "ollama":
|
|
return f"{self.ollama_url}/v1", "ollama"
|
|
elif provider == "lmstudio":
|
|
return f"{self.lmstudio_url}/v1", "lm-studio"
|
|
return "https://openrouter.ai/api/v1", self.openrouter_key or "sk-placeholder"
|
|
|
|
def _resolve_model_id(self, provider: str) -> str:
|
|
model = self.current_model
|
|
if provider == "ollama" and model.startswith("local/ollama/"):
|
|
return model.removeprefix("local/ollama/")
|
|
if provider == "lmstudio" and model.startswith("local/lmstudio/"):
|
|
return model.removeprefix("local/lmstudio/")
|
|
return model
|
|
|
|
def _get_openai(self):
|
|
if self._openai_mod is None:
|
|
import openai
|
|
|
|
self._openai_mod = openai
|
|
return self._openai_mod
|
|
|
|
# ── Model Discovery ──
|
|
|
|
def discover_local_models(self) -> list[ModelInfo]:
|
|
models = []
|
|
# Ollama
|
|
try:
|
|
r = httpx.get(f"{self.ollama_url}/api/tags", timeout=3)
|
|
if r.status_code == 200:
|
|
for m in r.json().get("models", []):
|
|
models.append(
|
|
ModelInfo(
|
|
id=f"local/ollama/{m['name']}",
|
|
name=f"[Ollama] {m['name']}",
|
|
provider="ollama",
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
# LM Studio
|
|
try:
|
|
r = httpx.get(f"{self.lmstudio_url}/v1/models", timeout=3)
|
|
if r.status_code == 200:
|
|
for m in r.json().get("data", []):
|
|
models.append(
|
|
ModelInfo(
|
|
id=f"local/lmstudio/{m['id']}",
|
|
name=f"[LM Studio] {m['id']}",
|
|
provider="lmstudio",
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
return models
|
|
|
|
def list_chat_models(self) -> list[ModelInfo]:
|
|
"""Return models available for the chat brain (no direct Claude SDK entries)."""
|
|
models = []
|
|
|
|
if self.openrouter_key:
|
|
models.extend(
|
|
[
|
|
# Anthropic (via OpenRouter — system prompts work correctly)
|
|
ModelInfo("anthropic/claude-sonnet-4.5", "Claude Sonnet 4.5", "openrouter"),
|
|
ModelInfo("anthropic/claude-opus-4.6", "Claude Opus 4.6", "openrouter"),
|
|
# Google
|
|
ModelInfo(
|
|
"google/gemini-3-flash-preview", "Gemini 3 Flash Preview", "openrouter"
|
|
),
|
|
ModelInfo("google/gemini-2.5-flash", "Gemini 2.5 Flash", "openrouter"),
|
|
ModelInfo(
|
|
"google/gemini-2.5-flash-lite", "Gemini 2.5 Flash Lite", "openrouter"
|
|
),
|
|
# OpenAI
|
|
ModelInfo("openai/gpt-5-nano", "GPT-5 Nano", "openrouter"),
|
|
ModelInfo("openai/gpt-4o-mini", "GPT-4o Mini", "openrouter"),
|
|
# DeepSeek / xAI / Others
|
|
ModelInfo("deepseek/deepseek-v3.2", "DeepSeek V3.2", "openrouter"),
|
|
ModelInfo("x-ai/grok-4.1-fast", "Grok 4.1 Fast", "openrouter"),
|
|
ModelInfo("moonshotai/kimi-k2.5", "Kimi K2.5", "openrouter"),
|
|
ModelInfo("minimax/minimax-m2.5", "MiniMax M2.5", "openrouter"),
|
|
]
|
|
)
|
|
|
|
models.extend(self.discover_local_models())
|
|
return models
|
|
|
|
def list_available_models(self) -> list[ModelInfo]:
|
|
"""Backwards-compatible alias for list_chat_models()."""
|
|
return self.list_chat_models()
|
|
|
|
|
|
def _is_retryable_error(e: Exception) -> bool:
|
|
"""Return True for transient errors worth retrying (5xx, timeout, rate limit)."""
|
|
name = type(e).__name__
|
|
# openai library exceptions
|
|
if name in ("APITimeoutError", "InternalServerError", "RateLimitError", "APIConnectionError"):
|
|
return True
|
|
# Status-code based (works with openai.APIStatusError subclasses)
|
|
status = getattr(e, "status_code", None)
|
|
if status and status >= 500:
|
|
return True
|
|
return status == 429
|
|
|
|
|
|
def _friendly_error(e: Exception, provider: str) -> str:
|
|
"""Map common LLM exceptions to plain-English messages."""
|
|
name = type(e).__name__
|
|
if name == "AuthenticationError" or "401" in str(e):
|
|
return f"Authentication failed for {provider}. Please check your API key."
|
|
if name == "RateLimitError" or "429" in str(e):
|
|
return f"Rate limited by {provider}. Please wait a moment and try again."
|
|
if name in ("APITimeoutError", "APIConnectionError") or "timeout" in str(e).lower():
|
|
return (
|
|
f"Could not reach {provider} — the service may be down "
|
|
"or your connection is interrupted."
|
|
)
|
|
if name == "InternalServerError" or (getattr(e, "status_code", None) or 0) >= 500:
|
|
return f"{provider} returned a server error. Please try again shortly."
|
|
# Generic fallback — still friendlier than a raw traceback
|
|
log.error("LLM error (%s): %s", provider, e, exc_info=True)
|
|
return f"Something went wrong talking to {provider}. Check the logs for details."
|