Initial implementation of CheddahBot personal AI assistant

Multi-model AI assistant with Gradio UI, persistent memory, 15+ tools,
and meta-tools for runtime tool creation. Routes Claude models through
Claude Code SDK (Max subscription), cloud models through OpenRouter,
and local models through Ollama/LM Studio.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-13 20:20:39 -06:00
commit 1b73cf2e5d
29 changed files with 2429 additions and 0 deletions

10
.gitignore vendored 100644
View File

@ -0,0 +1,10 @@
.env
__pycache__/
*.pyc
data/
memory/embeddings.db
memory/*.md
*.egg-info/
dist/
build/
.venv/

View File

@ -0,0 +1,3 @@
"""CheddahBot - Personal AI assistant."""
__version__ = "0.1.0"

View File

@ -0,0 +1,77 @@
"""Entry point: python -m cheddahbot"""
import logging
import sys
from .config import load_config
from .db import Database
from .llm import LLMAdapter
from .agent import Agent
from .ui import create_ui
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("cheddahbot")
def main():
log.info("Loading configuration...")
config = load_config()
log.info("Initializing database...")
db = Database(config.db_path)
log.info("Initializing LLM adapter (default model: %s)...", config.default_model)
llm = LLMAdapter(
default_model=config.default_model,
openrouter_key=config.openrouter_api_key,
ollama_url=config.ollama_url,
lmstudio_url=config.lmstudio_url,
)
log.info("Creating agent...")
agent = Agent(config, db, llm)
# Phase 2+: Memory system
try:
from .memory import MemorySystem
log.info("Initializing memory system...")
memory = MemorySystem(config, db)
agent.set_memory(memory)
except Exception as e:
log.warning("Memory system not available: %s", e)
# Phase 3+: Tool system
try:
from .tools import ToolRegistry
log.info("Initializing tool system...")
tools = ToolRegistry(config, db, agent)
agent.set_tools(tools)
except Exception as e:
log.warning("Tool system not available: %s", e)
# Phase 3+: Scheduler
try:
from .scheduler import Scheduler
log.info("Starting scheduler...")
scheduler = Scheduler(config, db, agent)
scheduler.start()
except Exception as e:
log.warning("Scheduler not available: %s", e)
log.info("Launching Gradio UI on %s:%s...", config.host, config.port)
app, css = create_ui(agent, config, llm)
app.launch(
server_name=config.host,
server_port=config.port,
pwa=True,
show_error=True,
css=css,
)
if __name__ == "__main__":
main()

134
cheddahbot/agent.py 100644
View File

@ -0,0 +1,134 @@
"""Core agent loop - the brain of CheddahBot."""
from __future__ import annotations
import json
import logging
import uuid
from typing import Generator
from .config import Config
from .db import Database
from .llm import LLMAdapter
from .router import build_system_prompt, format_messages_for_llm
log = logging.getLogger(__name__)
MAX_TOOL_ITERATIONS = 10
class Agent:
def __init__(self, config: Config, db: Database, llm: LLMAdapter):
self.config = config
self.db = db
self.llm = llm
self.conv_id: str | None = None
self._memory = None # set by app after memory system init
self._tools = None # set by app after tool system init
def set_memory(self, memory):
self._memory = memory
def set_tools(self, tools):
self._tools = tools
def ensure_conversation(self) -> str:
if not self.conv_id:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def new_conversation(self) -> str:
self.conv_id = uuid.uuid4().hex[:12]
self.db.create_conversation(self.conv_id)
return self.conv_id
def respond(self, user_input: str, files: list | None = None) -> Generator[str, None, None]:
"""Process user input and yield streaming response text."""
conv_id = self.ensure_conversation()
# Store user message
self.db.add_message(conv_id, "user", user_input)
# Build system prompt
memory_context = ""
if self._memory:
memory_context = self._memory.get_context(user_input)
tools_schema = []
tools_description = ""
if self._tools:
tools_schema = self._tools.get_tools_schema()
tools_description = self._tools.get_tools_description()
system_prompt = build_system_prompt(
identity_dir=self.config.identity_dir,
memory_context=memory_context,
tools_description=tools_description,
)
# Load conversation history
history = self.db.get_messages(conv_id, limit=self.config.memory.max_context_messages)
messages = format_messages_for_llm(system_prompt, history, self.config.memory.max_context_messages)
# Agent loop: LLM call → tool execution → repeat
for iteration in range(MAX_TOOL_ITERATIONS):
full_response = ""
tool_calls = []
for chunk in self.llm.chat(messages, tools=tools_schema or None, stream=True):
if chunk["type"] == "text":
full_response += chunk["content"]
yield chunk["content"]
elif chunk["type"] == "tool_use":
tool_calls.append(chunk)
# If no tool calls, we're done
if not tool_calls:
if full_response:
self.db.add_message(conv_id, "assistant", full_response, model=self.llm.current_model)
break
# Store assistant message with tool calls
self.db.add_message(
conv_id, "assistant", full_response,
tool_calls=[{"name": tc["name"], "input": tc["input"]} for tc in tool_calls],
model=self.llm.current_model,
)
# Execute tools
if self._tools:
messages.append({"role": "assistant", "content": full_response or "I'll use some tools to help with that."})
for tc in tool_calls:
yield f"\n\n🔧 **Using tool: {tc['name']}**\n"
try:
result = self._tools.execute(tc["name"], tc.get("input", {}))
except Exception as e:
result = f"Tool error: {e}"
yield f"```\n{result[:2000]}\n```\n\n"
self.db.add_message(conv_id, "tool", result, tool_result=tc["name"])
messages.append({"role": "user", "content": f'[Tool "{tc["name"]}" result]\n{result}'})
else:
# No tool system configured - just mention tool was requested
if full_response:
self.db.add_message(conv_id, "assistant", full_response, model=self.llm.current_model)
for tc in tool_calls:
yield f"\n(Tool requested: {tc['name']} - tool system not yet initialized)\n"
break
else:
yield "\n(Reached maximum tool iterations)"
# Check if memory flush is needed
if self._memory:
msg_count = self.db.count_messages(conv_id)
if msg_count > self.config.memory.flush_threshold:
self._memory.auto_flush(conv_id)
def respond_to_prompt(self, prompt: str) -> str:
"""Non-streaming response for scheduled tasks / internal use."""
result_parts = []
for chunk in self.respond(prompt):
result_parts.append(chunk)
return "".join(result_parts)

View File

@ -0,0 +1,98 @@
"""Configuration loader: env vars → config.yaml → defaults."""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
import yaml
from dotenv import load_dotenv
ROOT_DIR = Path(__file__).resolve().parent.parent
load_dotenv(ROOT_DIR / ".env")
@dataclass
class MemoryConfig:
max_context_messages: int = 50
flush_threshold: int = 40
embedding_model: str = "all-MiniLM-L6-v2"
search_top_k: int = 5
@dataclass
class SchedulerConfig:
heartbeat_interval_minutes: int = 30
poll_interval_seconds: int = 60
@dataclass
class ShellConfig:
blocked_commands: list[str] = field(default_factory=lambda: ["rm -rf /", "format", ":(){:|:&};:"])
require_approval: bool = False
@dataclass
class Config:
default_model: str = "claude-sonnet-4-20250514"
host: str = "0.0.0.0"
port: int = 7860
ollama_url: str = "http://localhost:11434"
lmstudio_url: str = "http://localhost:1234"
openrouter_api_key: str = ""
memory: MemoryConfig = field(default_factory=MemoryConfig)
scheduler: SchedulerConfig = field(default_factory=SchedulerConfig)
shell: ShellConfig = field(default_factory=ShellConfig)
# Derived paths
root_dir: Path = field(default_factory=lambda: ROOT_DIR)
data_dir: Path = field(default_factory=lambda: ROOT_DIR / "data")
identity_dir: Path = field(default_factory=lambda: ROOT_DIR / "identity")
memory_dir: Path = field(default_factory=lambda: ROOT_DIR / "memory")
skills_dir: Path = field(default_factory=lambda: ROOT_DIR / "skills")
db_path: Path = field(default_factory=lambda: ROOT_DIR / "data" / "cheddahbot.db")
def load_config() -> Config:
"""Load config from env vars → config.yaml → defaults."""
cfg = Config()
# Load YAML if exists
yaml_path = ROOT_DIR / "config.yaml"
if yaml_path.exists():
with open(yaml_path) as f:
data = yaml.safe_load(f) or {}
for key in ("default_model", "host", "port", "ollama_url", "lmstudio_url"):
if key in data:
setattr(cfg, key, data[key])
if "memory" in data and isinstance(data["memory"], dict):
for k, v in data["memory"].items():
if hasattr(cfg.memory, k):
setattr(cfg.memory, k, v)
if "scheduler" in data and isinstance(data["scheduler"], dict):
for k, v in data["scheduler"].items():
if hasattr(cfg.scheduler, k):
setattr(cfg.scheduler, k, v)
if "shell" in data and isinstance(data["shell"], dict):
for k, v in data["shell"].items():
if hasattr(cfg.shell, k):
setattr(cfg.shell, k, v)
# Env var overrides (CHEDDAH_ prefix)
cfg.openrouter_api_key = os.getenv("OPENROUTER_API_KEY", "")
if m := os.getenv("CHEDDAH_DEFAULT_MODEL"):
cfg.default_model = m
if h := os.getenv("CHEDDAH_HOST"):
cfg.host = h
if p := os.getenv("CHEDDAH_PORT"):
cfg.port = int(p)
# Ensure data directories exist
cfg.data_dir.mkdir(parents=True, exist_ok=True)
(cfg.data_dir / "uploads").mkdir(exist_ok=True)
(cfg.data_dir / "generated").mkdir(exist_ok=True)
cfg.memory_dir.mkdir(parents=True, exist_ok=True)
cfg.skills_dir.mkdir(parents=True, exist_ok=True)
return cfg

187
cheddahbot/db.py 100644
View File

@ -0,0 +1,187 @@
"""SQLite persistence layer."""
from __future__ import annotations
import json
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
class Database:
def __init__(self, db_path: Path):
self._path = db_path
self._local = threading.local()
self._init_schema()
@property
def _conn(self) -> sqlite3.Connection:
if not hasattr(self._local, "conn"):
self._local.conn = sqlite3.connect(str(self._path))
self._local.conn.row_factory = sqlite3.Row
self._local.conn.execute("PRAGMA journal_mode=WAL")
self._local.conn.execute("PRAGMA foreign_keys=ON")
return self._local.conn
def _init_schema(self):
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
title TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conv_id TEXT NOT NULL REFERENCES conversations(id),
role TEXT NOT NULL,
content TEXT NOT NULL,
tool_calls TEXT,
tool_result TEXT,
model TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_messages_conv ON messages(conv_id, created_at);
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
prompt TEXT NOT NULL,
schedule TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
next_run TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS task_run_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER NOT NULL REFERENCES scheduled_tasks(id),
started_at TEXT NOT NULL,
finished_at TEXT,
result TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
""")
self._conn.commit()
# -- Conversations --
def create_conversation(self, conv_id: str, title: str = "New Chat") -> str:
now = _now()
self._conn.execute(
"INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)",
(conv_id, title, now, now),
)
self._conn.commit()
return conv_id
def list_conversations(self, limit: int = 50) -> list[dict]:
rows = self._conn.execute(
"SELECT id, title, updated_at FROM conversations ORDER BY updated_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# -- Messages --
def add_message(
self,
conv_id: str,
role: str,
content: str,
tool_calls: list | None = None,
tool_result: str | None = None,
model: str | None = None,
) -> int:
now = _now()
cur = self._conn.execute(
"""INSERT INTO messages (conv_id, role, content, tool_calls, tool_result, model, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
conv_id,
role,
content,
json.dumps(tool_calls) if tool_calls else None,
tool_result,
model,
now,
),
)
self._conn.execute(
"UPDATE conversations SET updated_at = ? WHERE id = ?", (now, conv_id)
)
self._conn.commit()
return cur.lastrowid
def get_messages(self, conv_id: str, limit: int = 100) -> list[dict]:
rows = self._conn.execute(
"""SELECT role, content, tool_calls, tool_result, model, created_at
FROM messages WHERE conv_id = ? ORDER BY created_at ASC LIMIT ?""",
(conv_id, limit),
).fetchall()
result = []
for r in rows:
msg = dict(r)
if msg["tool_calls"]:
msg["tool_calls"] = json.loads(msg["tool_calls"])
result.append(msg)
return result
def count_messages(self, conv_id: str) -> int:
row = self._conn.execute(
"SELECT COUNT(*) as cnt FROM messages WHERE conv_id = ?", (conv_id,)
).fetchone()
return row["cnt"]
# -- Scheduled Tasks --
def add_scheduled_task(self, name: str, prompt: str, schedule: str) -> int:
now = _now()
cur = self._conn.execute(
"INSERT INTO scheduled_tasks (name, prompt, schedule, created_at) VALUES (?, ?, ?, ?)",
(name, prompt, schedule, now),
)
self._conn.commit()
return cur.lastrowid
def get_due_tasks(self) -> list[dict]:
now = _now()
rows = self._conn.execute(
"SELECT * FROM scheduled_tasks WHERE enabled = 1 AND (next_run IS NULL OR next_run <= ?)",
(now,),
).fetchall()
return [dict(r) for r in rows]
def update_task_next_run(self, task_id: int, next_run: str):
self._conn.execute(
"UPDATE scheduled_tasks SET next_run = ? WHERE id = ?", (next_run, task_id)
)
self._conn.commit()
def log_task_run(self, task_id: int, result: str | None = None, error: str | None = None):
now = _now()
self._conn.execute(
"INSERT INTO task_run_logs (task_id, started_at, finished_at, result, error) VALUES (?, ?, ?, ?, ?)",
(task_id, now, now, result, error),
)
self._conn.commit()
# -- Key-Value Store --
def kv_set(self, key: str, value: str):
self._conn.execute(
"INSERT OR REPLACE INTO kv_store (key, value) VALUES (?, ?)", (key, value)
)
self._conn.commit()
def kv_get(self, key: str) -> str | None:
row = self._conn.execute("SELECT value FROM kv_store WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
def _now() -> str:
return datetime.now(timezone.utc).isoformat()

334
cheddahbot/llm.py 100644
View File

@ -0,0 +1,334 @@
"""Model-agnostic LLM adapter.
Routing:
- Claude models Claude Code SDK (subprocess, uses Max subscription)
- Cloud models OpenRouter (single API key, OpenAI-compatible)
- Local models direct HTTP (Ollama / LM Studio, OpenAI-compatible)
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import sys
from dataclasses import dataclass
from typing import Generator
import httpx
log = logging.getLogger(__name__)
@dataclass
class ModelInfo:
id: str
name: str
provider: str # "claude" | "openrouter" | "ollama" | "lmstudio"
context_length: int | None = None
# Well-known Claude models that route through the SDK
CLAUDE_MODELS = {
"claude-sonnet-4-20250514",
"claude-opus-4-20250514",
"claude-haiku-4-20250514",
}
def _is_claude_model(model_id: str) -> bool:
return model_id in CLAUDE_MODELS or model_id.startswith("claude-")
def _provider_for(model_id: str, openrouter_key: str, ollama_url: str, lmstudio_url: str) -> str:
if _is_claude_model(model_id):
return "claude"
if model_id.startswith("local/ollama/"):
return "ollama"
if model_id.startswith("local/lmstudio/"):
return "lmstudio"
if openrouter_key:
return "openrouter"
return "openrouter"
class LLMAdapter:
def __init__(
self,
default_model: str = "claude-sonnet-4-20250514",
openrouter_key: str = "",
ollama_url: str = "http://localhost:11434",
lmstudio_url: str = "http://localhost:1234",
):
self.current_model = default_model
self.openrouter_key = openrouter_key
self.ollama_url = ollama_url.rstrip("/")
self.lmstudio_url = lmstudio_url.rstrip("/")
self._openai_mod = None # lazy import
@property
def provider(self) -> str:
return _provider_for(self.current_model, self.openrouter_key, self.ollama_url, self.lmstudio_url)
def switch_model(self, model_id: str):
self.current_model = model_id
log.info("Switched to model: %s (provider: %s)", model_id, self.provider)
# ── Main entry point ──
def chat(
self,
messages: list[dict],
tools: list[dict] | None = None,
stream: bool = True,
) -> Generator[dict, None, None]:
"""Yield chunks: {"type": "text", "content": "..."} or {"type": "tool_use", ...}."""
provider = self.provider
if provider == "claude":
yield from self._chat_claude_sdk(messages, tools, stream)
else:
base_url, api_key = self._resolve_endpoint(provider)
model_id = self._resolve_model_id(provider)
yield from self._chat_openai_sdk(messages, tools, stream, base_url, api_key, model_id)
# ── Claude Code SDK (subprocess) ──
def _chat_claude_sdk(
self, messages: list[dict], tools: list[dict] | None, stream: bool
) -> Generator[dict, None, None]:
# Separate system prompt from user messages
system_prompt = ""
user_prompt_parts = []
for m in messages:
role = m.get("role", "user")
content = m.get("content", "")
if isinstance(content, list):
content = " ".join(c.get("text", "") for c in content if c.get("type") == "text")
if role == "system":
system_prompt += content + "\n"
elif role == "assistant":
user_prompt_parts.append(f"[Assistant]\n{content}")
else:
user_prompt_parts.append(content)
user_prompt = "\n\n".join(user_prompt_parts)
# Find claude CLI - on Windows needs .cmd extension for npm-installed binaries
claude_bin = shutil.which("claude")
if not claude_bin:
yield {"type": "text", "content": "Error: `claude` CLI not found in PATH. Install Claude Code: npm install -g @anthropic-ai/claude-code"}
return
cmd = [claude_bin, "-p", user_prompt, "--model", self.current_model,
"--output-format", "json", "--tools", ""]
if system_prompt.strip():
cmd.extend(["--system-prompt", system_prompt.strip()])
log.debug("Claude SDK using: %s", claude_bin)
# Strip CLAUDECODE env var so the subprocess doesn't think it's nested
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
shell=(sys.platform == "win32"),
env=env,
)
except FileNotFoundError:
yield {"type": "text", "content": "Error: `claude` CLI not found. Install Claude Code: npm install -g @anthropic-ai/claude-code"}
return
stdout, stderr = proc.communicate(timeout=120)
if proc.returncode != 0:
yield {"type": "text", "content": f"Claude SDK error: {stderr or 'unknown error'}"}
return
# --output-format json returns a single JSON object
try:
result = json.loads(stdout)
text = result.get("result", "")
if text:
yield {"type": "text", "content": text}
elif result.get("is_error"):
yield {"type": "text", "content": f"Claude error: {result.get('result', 'unknown')}"}
return
except json.JSONDecodeError:
# Fallback: treat as plain text
if stdout.strip():
yield {"type": "text", "content": stdout.strip()}
# ── OpenAI-compatible SDK (OpenRouter / Ollama / LM Studio) ──
def _chat_openai_sdk(
self,
messages: list[dict],
tools: list[dict] | None,
stream: bool,
base_url: str,
api_key: str,
model_id: str,
) -> Generator[dict, None, None]:
openai = self._get_openai()
client = openai.OpenAI(base_url=base_url, api_key=api_key)
kwargs: dict = {
"model": model_id,
"messages": messages,
"stream": stream,
}
if tools:
kwargs["tools"] = tools
try:
if stream:
response = client.chat.completions.create(**kwargs)
tool_calls_accum: dict[int, dict] = {}
for chunk in response:
delta = chunk.choices[0].delta if chunk.choices else None
if not delta:
continue
if delta.content:
yield {"type": "text", "content": delta.content}
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_calls_accum:
tool_calls_accum[idx] = {
"id": tc.id or "",
"name": tc.function.name if tc.function and tc.function.name else "",
"arguments": "",
}
if tc.function and tc.function.arguments:
tool_calls_accum[idx]["arguments"] += tc.function.arguments
if tc.id:
tool_calls_accum[idx]["id"] = tc.id
for _, tc in sorted(tool_calls_accum.items()):
try:
args = json.loads(tc["arguments"])
except json.JSONDecodeError:
args = {}
yield {
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": args,
}
else:
response = client.chat.completions.create(**kwargs)
msg = response.choices[0].message
if msg.content:
yield {"type": "text", "content": msg.content}
if msg.tool_calls:
for tc in msg.tool_calls:
try:
args = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args = {}
yield {
"type": "tool_use",
"id": tc.id,
"name": tc.function.name,
"input": args,
}
except Exception as e:
yield {"type": "text", "content": f"LLM error ({self.provider}): {e}"}
# ── Helpers ──
def _resolve_endpoint(self, provider: str) -> tuple[str, str]:
if provider == "openrouter":
return "https://openrouter.ai/api/v1", self.openrouter_key or "sk-placeholder"
elif provider == "ollama":
return f"{self.ollama_url}/v1", "ollama"
elif provider == "lmstudio":
return f"{self.lmstudio_url}/v1", "lm-studio"
return "https://openrouter.ai/api/v1", self.openrouter_key or "sk-placeholder"
def _resolve_model_id(self, provider: str) -> str:
model = self.current_model
if provider == "ollama" and model.startswith("local/ollama/"):
return model.removeprefix("local/ollama/")
if provider == "lmstudio" and model.startswith("local/lmstudio/"):
return model.removeprefix("local/lmstudio/")
return model
def _messages_to_prompt(self, messages: list[dict]) -> str:
"""Flatten messages into a single prompt string for Claude SDK -p flag."""
parts = []
for m in messages:
role = m.get("role", "user")
content = m.get("content", "")
if isinstance(content, list):
# multimodal - extract text parts
content = " ".join(
c.get("text", "") for c in content if c.get("type") == "text"
)
if role == "system":
parts.append(f"[System]\n{content}")
elif role == "assistant":
parts.append(f"[Assistant]\n{content}")
else:
parts.append(content)
return "\n\n".join(parts)
def _get_openai(self):
if self._openai_mod is None:
import openai
self._openai_mod = openai
return self._openai_mod
# ── Model Discovery ──
def discover_local_models(self) -> list[ModelInfo]:
models = []
# Ollama
try:
r = httpx.get(f"{self.ollama_url}/api/tags", timeout=3)
if r.status_code == 200:
for m in r.json().get("models", []):
models.append(ModelInfo(
id=f"local/ollama/{m['name']}",
name=f"[Ollama] {m['name']}",
provider="ollama",
))
except Exception:
pass
# LM Studio
try:
r = httpx.get(f"{self.lmstudio_url}/v1/models", timeout=3)
if r.status_code == 200:
for m in r.json().get("data", []):
models.append(ModelInfo(
id=f"local/lmstudio/{m['id']}",
name=f"[LM Studio] {m['id']}",
provider="lmstudio",
))
except Exception:
pass
return models
def list_available_models(self) -> list[ModelInfo]:
"""Return all available models across all providers."""
models = [
ModelInfo("claude-sonnet-4-20250514", "Claude Sonnet 4", "claude"),
ModelInfo("claude-opus-4-20250514", "Claude Opus 4", "claude"),
ModelInfo("claude-haiku-4-20250514", "Claude Haiku 4", "claude"),
]
if self.openrouter_key:
models.extend([
ModelInfo("openai/gpt-4o", "GPT-4o", "openrouter"),
ModelInfo("openai/gpt-4o-mini", "GPT-4o Mini", "openrouter"),
ModelInfo("google/gemini-2.0-flash-001", "Gemini 2.0 Flash", "openrouter"),
ModelInfo("google/gemini-2.5-pro-preview", "Gemini 2.5 Pro", "openrouter"),
ModelInfo("mistralai/mistral-large", "Mistral Large", "openrouter"),
ModelInfo("meta-llama/llama-3.3-70b-instruct", "Llama 3.3 70B", "openrouter"),
])
models.extend(self.discover_local_models())
return models

115
cheddahbot/media.py 100644
View File

@ -0,0 +1,115 @@
"""Audio/video processing: STT, TTS, video frame extraction."""
from __future__ import annotations
import asyncio
import logging
import subprocess
import tempfile
from pathlib import Path
log = logging.getLogger(__name__)
# ── Speech-to-Text ──
def transcribe_audio(audio_path: str | Path) -> str:
"""Transcribe audio to text. Tries OpenAI Whisper API, falls back to local whisper."""
audio_path = Path(audio_path)
if not audio_path.exists():
return ""
# Try local whisper first (no API key needed)
try:
return _transcribe_local(audio_path)
except ImportError:
pass
except Exception as e:
log.warning("Local whisper failed: %s", e)
# Fallback: try OpenAI API
try:
return _transcribe_openai_api(audio_path)
except Exception as e:
log.warning("OpenAI whisper API failed: %s", e)
return f"(Could not transcribe audio from {audio_path.name})"
def _transcribe_local(audio_path: Path) -> str:
import whisper
model = whisper.load_model("base")
result = model.transcribe(str(audio_path))
return result.get("text", "").strip()
def _transcribe_openai_api(audio_path: Path) -> str:
import openai
import os
key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
if not key:
raise ValueError("No API key for Whisper")
client = openai.OpenAI(api_key=key)
with open(audio_path, "rb") as f:
transcript = client.audio.transcriptions.create(model="whisper-1", file=f)
return transcript.text.strip()
# ── Text-to-Speech ──
def text_to_speech(text: str, output_path: str | Path | None = None, voice: str = "en-US-AriaNeural") -> Path:
"""Convert text to speech using edge-tts (free, no API key)."""
if output_path is None:
output_path = Path(tempfile.mktemp(suffix=".mp3"))
else:
output_path = Path(output_path)
try:
import edge_tts
async def _generate():
communicate = edge_tts.Communicate(text, voice)
await communicate.save(str(output_path))
asyncio.run(_generate())
return output_path
except ImportError:
log.warning("edge-tts not installed. Run: pip install edge-tts")
# Write a placeholder
output_path.write_text("TTS not available", encoding="utf-8")
return output_path
# ── Video Frame Extraction ──
def extract_video_frames(video_path: str | Path, max_frames: int = 5) -> list[Path]:
"""Extract key frames from a video using ffmpeg."""
video_path = Path(video_path)
if not video_path.exists():
return []
output_dir = Path(tempfile.mkdtemp(prefix="cheddah_frames_"))
try:
# Get video duration
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(video_path)],
capture_output=True, text=True, timeout=10,
)
duration = float(result.stdout.strip()) if result.stdout.strip() else 10.0
interval = max(duration / (max_frames + 1), 1.0)
# Extract frames
subprocess.run(
["ffmpeg", "-i", str(video_path), "-vf", f"fps=1/{interval}",
"-frames:v", str(max_frames), str(output_dir / "frame_%03d.jpg")],
capture_output=True, timeout=30,
)
frames = sorted(output_dir.glob("frame_*.jpg"))
return frames
except FileNotFoundError:
log.warning("ffmpeg/ffprobe not found. Video analysis requires ffmpeg.")
return []
except Exception as e:
log.warning("Video frame extraction failed: %s", e)
return []

View File

@ -0,0 +1,231 @@
"""4-layer memory system.
Layers:
1. Identity - SOUL.md + USER.md (handled by router.py)
2. Long-term - memory/MEMORY.md (learned facts, decisions)
3. Daily logs - memory/YYYY-MM-DD.md (timestamped entries)
4. Semantic - memory/embeddings.db (vector search over all memory)
"""
from __future__ import annotations
import logging
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
from .config import Config
from .db import Database
log = logging.getLogger(__name__)
class MemorySystem:
def __init__(self, config: Config, db: Database):
self.config = config
self.db = db
self.memory_dir = config.memory_dir
self._embedder = None
self._embed_lock = threading.Lock()
self._embed_db_path = self.memory_dir / "embeddings.db"
self._init_embed_db()
# ── Public API ──
def get_context(self, query: str) -> str:
"""Build memory context string for the system prompt."""
parts = []
# Long-term memory
lt = self._read_long_term()
if lt:
parts.append(f"## Long-Term Memory\n{lt}")
# Today's log
today_log = self._read_daily_log()
if today_log:
parts.append(f"## Today's Log\n{today_log}")
# Semantic search results
if query:
results = self.search(query, top_k=self.config.memory.search_top_k)
if results:
formatted = "\n".join(f"- {r['text']}" for r in results)
parts.append(f"## Related Memories\n{formatted}")
return "\n\n".join(parts) if parts else ""
def remember(self, text: str):
"""Save a fact/instruction to long-term memory."""
memory_path = self.memory_dir / "MEMORY.md"
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
entry = f"\n- [{timestamp}] {text}\n"
if memory_path.exists():
content = memory_path.read_text(encoding="utf-8")
else:
content = "# Long-Term Memory\n"
content += entry
memory_path.write_text(content, encoding="utf-8")
self._index_text(text, f"memory:long_term:{timestamp}")
log.info("Saved to long-term memory: %s", text[:80])
def log_daily(self, text: str):
"""Append an entry to today's daily log."""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
log_path = self.memory_dir / f"{today}.md"
timestamp = datetime.now(timezone.utc).strftime("%H:%M")
if log_path.exists():
content = log_path.read_text(encoding="utf-8")
else:
content = f"# Daily Log - {today}\n"
content += f"\n- [{timestamp}] {text}\n"
log_path.write_text(content, encoding="utf-8")
self._index_text(text, f"daily:{today}:{timestamp}")
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""Semantic search over all indexed memory."""
embedder = self._get_embedder()
if embedder is None:
return self._fallback_search(query, top_k)
query_vec = embedder.encode([query])[0]
return self._vector_search(query_vec, top_k)
def auto_flush(self, conv_id: str):
"""Summarize old messages and move to daily log."""
messages = self.db.get_messages(conv_id, limit=200)
if len(messages) < self.config.memory.flush_threshold:
return
# Take older messages for summarization
to_summarize = messages[:-10] # keep last 10 in context
text_block = "\n".join(
f"{m['role']}: {m['content'][:200]}" for m in to_summarize
if m.get("content")
)
summary = f"Conversation summary ({len(to_summarize)} messages): {text_block[:1000]}"
self.log_daily(summary)
log.info("Auto-flushed %d messages to daily log", len(to_summarize))
def reindex_all(self):
"""Rebuild the embedding index from all memory files."""
self._clear_embeddings()
for path in self.memory_dir.glob("*.md"):
content = path.read_text(encoding="utf-8")
for i, line in enumerate(content.split("\n")):
line = line.strip().lstrip("- ")
if len(line) > 10:
self._index_text(line, f"file:{path.name}:L{i}")
log.info("Reindexed all memory files")
# ── Private: Long-term memory ──
def _read_long_term(self) -> str:
path = self.memory_dir / "MEMORY.md"
if path.exists():
content = path.read_text(encoding="utf-8")
# Return last 2000 chars to keep prompt manageable
return content[-2000:] if len(content) > 2000 else content
return ""
def _read_daily_log(self) -> str:
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
path = self.memory_dir / f"{today}.md"
if path.exists():
content = path.read_text(encoding="utf-8")
return content[-1500:] if len(content) > 1500 else content
return ""
# ── Private: Embedding system ──
def _init_embed_db(self):
conn = sqlite3.connect(str(self._embed_db_path))
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id TEXT PRIMARY KEY,
text TEXT NOT NULL,
vector BLOB NOT NULL
)
""")
conn.commit()
conn.close()
def _get_embedder(self):
if self._embedder is not None:
return self._embedder
with self._embed_lock:
if self._embedder is not None:
return self._embedder
try:
from sentence_transformers import SentenceTransformer
model_name = self.config.memory.embedding_model
log.info("Loading embedding model: %s", model_name)
self._embedder = SentenceTransformer(model_name)
return self._embedder
except ImportError:
log.warning("sentence-transformers not installed; semantic search disabled")
return None
except Exception as e:
log.warning("Failed to load embedding model: %s", e)
return None
def _index_text(self, text: str, doc_id: str):
embedder = self._get_embedder()
if embedder is None:
return
vec = embedder.encode([text])[0]
conn = sqlite3.connect(str(self._embed_db_path))
conn.execute(
"INSERT OR REPLACE INTO embeddings (id, text, vector) VALUES (?, ?, ?)",
(doc_id, text, vec.tobytes()),
)
conn.commit()
conn.close()
def _vector_search(self, query_vec: np.ndarray, top_k: int) -> list[dict]:
conn = sqlite3.connect(str(self._embed_db_path))
rows = conn.execute("SELECT id, text, vector FROM embeddings").fetchall()
conn.close()
if not rows:
return []
scored = []
for doc_id, text, vec_bytes in rows:
vec = np.frombuffer(vec_bytes, dtype=np.float32)
sim = float(np.dot(query_vec, vec) / (np.linalg.norm(query_vec) * np.linalg.norm(vec) + 1e-8))
scored.append({"id": doc_id, "text": text, "score": sim})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[:top_k]
def _clear_embeddings(self):
conn = sqlite3.connect(str(self._embed_db_path))
conn.execute("DELETE FROM embeddings")
conn.commit()
conn.close()
def _fallback_search(self, query: str, top_k: int) -> list[dict]:
"""Simple keyword search when embeddings are unavailable."""
results = []
query_lower = query.lower()
for path in self.memory_dir.glob("*.md"):
try:
content = path.read_text(encoding="utf-8")
except Exception:
continue
for line in content.split("\n"):
stripped = line.strip().lstrip("- ")
if len(stripped) > 10 and query_lower in stripped.lower():
results.append({"id": path.name, "text": stripped, "score": 1.0})
if len(results) >= top_k:
return results
return results

View File

@ -0,0 +1 @@
# Reserved for future custom providers

View File

@ -0,0 +1,67 @@
"""Message formatting and system prompt construction."""
from __future__ import annotations
from pathlib import Path
def build_system_prompt(
identity_dir: Path,
memory_context: str = "",
tools_description: str = "",
) -> str:
"""Build the system prompt from identity files + memory + tools."""
parts = []
# 1. Identity: SOUL.md
soul_path = identity_dir / "SOUL.md"
if soul_path.exists():
parts.append(soul_path.read_text(encoding="utf-8").strip())
# 2. Identity: USER.md
user_path = identity_dir / "USER.md"
if user_path.exists():
parts.append(user_path.read_text(encoding="utf-8").strip())
# 3. Memory context (injected by memory system)
if memory_context:
parts.append(f"# Relevant Memory\n{memory_context}")
# 4. Available tools
if tools_description:
parts.append(f"# Available Tools\n{tools_description}")
# 5. Core instructions
parts.append(
"# Instructions\n"
"- Use tools when they would help answer the user's request.\n"
"- If you learn something important about the user, save it to memory.\n"
"- Be concise but thorough. Don't pad responses unnecessarily.\n"
"- When uncertain, ask for clarification.\n"
"- Reference memories naturally when relevant."
)
return "\n\n---\n\n".join(parts)
def format_messages_for_llm(
system_prompt: str,
history: list[dict],
max_messages: int = 50,
) -> list[dict]:
"""Format conversation history into LLM message format."""
messages = [{"role": "system", "content": system_prompt}]
# Take the most recent messages up to the limit
recent = history[-max_messages:] if len(history) > max_messages else history
for msg in recent:
role = msg.get("role", "user")
content = msg.get("content", "")
if role in ("user", "assistant", "system"):
messages.append({"role": role, "content": content})
elif role == "tool":
# Tool results go as a user message with context
messages.append({"role": "user", "content": f"[Tool Result]\n{content}"})
return messages

View File

@ -0,0 +1,118 @@
"""Task scheduler with heartbeat support."""
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from croniter import croniter
if TYPE_CHECKING:
from .agent import Agent
from .config import Config
from .db import Database
log = logging.getLogger(__name__)
HEARTBEAT_OK = "HEARTBEAT_OK"
class Scheduler:
def __init__(self, config: Config, db: Database, agent: Agent):
self.config = config
self.db = db
self.agent = agent
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._heartbeat_thread: threading.Thread | None = None
def start(self):
"""Start the scheduler and heartbeat threads."""
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name="scheduler")
self._thread.start()
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True, name="heartbeat")
self._heartbeat_thread.start()
log.info("Scheduler started (poll=%ds, heartbeat=%dm)",
self.config.scheduler.poll_interval_seconds,
self.config.scheduler.heartbeat_interval_minutes)
def stop(self):
self._stop_event.set()
# ── Scheduled Tasks ──
def _poll_loop(self):
while not self._stop_event.is_set():
try:
self._run_due_tasks()
except Exception as e:
log.error("Scheduler poll error: %s", e)
self._stop_event.wait(self.config.scheduler.poll_interval_seconds)
def _run_due_tasks(self):
tasks = self.db.get_due_tasks()
for task in tasks:
try:
log.info("Running scheduled task: %s", task["name"])
result = self.agent.respond_to_prompt(task["prompt"])
self.db.log_task_run(task["id"], result=result[:2000])
# Calculate next run
schedule = task["schedule"]
if schedule.startswith("once:"):
# One-time task, disable it
self.db._conn.execute(
"UPDATE scheduled_tasks SET enabled = 0 WHERE id = ?", (task["id"],)
)
self.db._conn.commit()
else:
# Cron schedule - calculate next run
now = datetime.now(timezone.utc)
cron = croniter(schedule, now)
next_run = cron.get_next(datetime)
self.db.update_task_next_run(task["id"], next_run.isoformat())
except Exception as e:
log.error("Task '%s' failed: %s", task["name"], e)
self.db.log_task_run(task["id"], error=str(e))
# ── Heartbeat ──
def _heartbeat_loop(self):
interval = self.config.scheduler.heartbeat_interval_minutes * 60
# Wait a bit before first heartbeat
self._stop_event.wait(60)
while not self._stop_event.is_set():
try:
self._run_heartbeat()
except Exception as e:
log.error("Heartbeat error: %s", e)
self._stop_event.wait(interval)
def _run_heartbeat(self):
heartbeat_path = self.config.identity_dir / "HEARTBEAT.md"
if not heartbeat_path.exists():
return
checklist = heartbeat_path.read_text(encoding="utf-8")
prompt = (
f"HEARTBEAT CHECK. Review this checklist and take action if needed.\n"
f"If nothing needs attention, respond with exactly: {HEARTBEAT_OK}\n\n"
f"{checklist}"
)
result = self.agent.respond_to_prompt(prompt)
if HEARTBEAT_OK in result:
log.debug("Heartbeat: all clear")
else:
log.info("Heartbeat action taken: %s", result[:200])
# Log to daily log
if self.agent._memory:
self.agent._memory.log_daily(f"[Heartbeat] {result[:500]}")

View File

@ -0,0 +1,63 @@
"""Skill registry with @skill decorator and loader."""
from __future__ import annotations
import importlib.util
import logging
from pathlib import Path
from typing import Callable
log = logging.getLogger(__name__)
_SKILLS: dict[str, "SkillDef"] = {}
class SkillDef:
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self.func = func
def skill(name: str, description: str):
"""Decorator to register a skill."""
def decorator(func: Callable) -> Callable:
_SKILLS[name] = SkillDef(name, description, func)
return func
return decorator
def load_skill(path: Path):
"""Dynamically load a skill from a .py file."""
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec and spec.loader:
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
log.info("Loaded skill from %s", path)
def discover_skills(skills_dir: Path):
"""Load all .py files from the skills directory."""
if not skills_dir.exists():
return
for path in skills_dir.glob("*.py"):
if path.name.startswith("_"):
continue
try:
load_skill(path)
except Exception as e:
log.warning("Failed to load skill %s: %s", path.name, e)
def list_skills() -> list[SkillDef]:
return list(_SKILLS.values())
def run_skill(name: str, **kwargs) -> str:
if name not in _SKILLS:
return f"Unknown skill: {name}"
try:
result = _SKILLS[name].func(**kwargs)
return str(result) if result is not None else "Done."
except Exception as e:
return f"Skill error: {e}"

View File

@ -0,0 +1,164 @@
"""Tool registry with @tool decorator and auto-discovery."""
from __future__ import annotations
import importlib
import inspect
import json
import logging
import pkgutil
from pathlib import Path
from typing import Any, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from ..agent import Agent
from ..config import Config
from ..db import Database
log = logging.getLogger(__name__)
# Global tool registry
_TOOLS: dict[str, ToolDef] = {}
class ToolDef:
"""Metadata for a registered tool."""
def __init__(self, name: str, description: str, func: Callable, category: str = "general"):
self.name = name
self.description = description
self.func = func
self.category = category
self.parameters = _extract_params(func)
def to_openai_schema(self) -> dict:
"""Convert to OpenAI function-calling format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": self.parameters["properties"],
"required": self.parameters["required"],
},
},
}
def tool(name: str, description: str, category: str = "general"):
"""Decorator to register a tool function."""
def decorator(func: Callable) -> Callable:
tool_def = ToolDef(name, description, func, category)
_TOOLS[name] = tool_def
func._tool_def = tool_def
return func
return decorator
def _extract_params(func: Callable) -> dict:
"""Extract parameter schema from function signature and type hints."""
sig = inspect.signature(func)
properties = {}
required = []
for param_name, param in sig.parameters.items():
if param_name in ("self", "ctx"):
continue
prop: dict[str, Any] = {}
annotation = param.annotation
if annotation == str or annotation == inspect.Parameter.empty:
prop["type"] = "string"
elif annotation == int:
prop["type"] = "integer"
elif annotation == float:
prop["type"] = "number"
elif annotation == bool:
prop["type"] = "boolean"
elif annotation == list:
prop["type"] = "array"
prop["items"] = {"type": "string"}
else:
prop["type"] = "string"
# Check for description in docstring (simple parsing)
prop["description"] = f"Parameter: {param_name}"
properties[param_name] = prop
if param.default is inspect.Parameter.empty:
required.append(param_name)
return {"properties": properties, "required": required}
class ToolRegistry:
"""Runtime tool registry with execution and schema generation."""
def __init__(self, config: "Config", db: "Database", agent: "Agent"):
self.config = config
self.db = db
self.agent = agent
self._discover_tools()
def _discover_tools(self):
"""Auto-import all modules in the tools/ package."""
tools_dir = Path(__file__).parent
for _, module_name, _ in pkgutil.iter_modules([str(tools_dir)]):
if module_name.startswith("_"):
continue
try:
importlib.import_module(f".{module_name}", package=__package__)
log.info("Loaded tool module: %s", module_name)
except Exception as e:
log.warning("Failed to load tool module %s: %s", module_name, e)
def get_tools_schema(self) -> list[dict]:
"""Get all tools in OpenAI function-calling format."""
return [t.to_openai_schema() for t in _TOOLS.values()]
def get_tools_description(self) -> str:
"""Human-readable tool list for system prompt."""
lines = []
by_cat: dict[str, list[ToolDef]] = {}
for t in _TOOLS.values():
by_cat.setdefault(t.category, []).append(t)
for cat, tools in sorted(by_cat.items()):
lines.append(f"\n### {cat.title()}")
for t in tools:
params = ", ".join(t.parameters["required"])
lines.append(f"- **{t.name}**({params}): {t.description}")
return "\n".join(lines)
def execute(self, name: str, args: dict) -> str:
"""Execute a tool by name and return the result as a string."""
if name not in _TOOLS:
return f"Unknown tool: {name}"
tool_def = _TOOLS[name]
try:
# Inject context if the function expects it
sig = inspect.signature(tool_def.func)
if "ctx" in sig.parameters:
args["ctx"] = {
"config": self.config,
"db": self.db,
"agent": self.agent,
"memory": self.agent._memory,
}
result = tool_def.func(**args)
return str(result) if result is not None else "Done."
except Exception as e:
log.error("Tool %s failed: %s", name, e, exc_info=True)
return f"Tool error: {e}"
def register_external(self, tool_def: ToolDef):
"""Register a dynamically created tool."""
_TOOLS[tool_def.name] = tool_def
log.info("Registered external tool: %s", tool_def.name)

View File

@ -0,0 +1,49 @@
"""Meta-skill: create multi-step skills at runtime."""
from __future__ import annotations
import textwrap
from pathlib import Path
from . import tool
@tool("build_skill", "Create a new multi-step skill from a description", category="meta")
def build_skill(name: str, description: str, steps: str, ctx: dict = None) -> str:
"""Generate a new skill and save it to the skills directory.
Args:
name: Skill name (snake_case)
description: What the skill does
steps: Python code implementing the skill steps (must use @skill decorator)
"""
if not name.isidentifier():
return f"Invalid skill name: {name}. Must be a valid Python identifier."
if not ctx or not ctx.get("config"):
return "Config context not available."
skills_dir = ctx["config"].skills_dir
skills_dir.mkdir(parents=True, exist_ok=True)
module_code = textwrap.dedent(f'''\
"""Auto-generated skill: {description}"""
from __future__ import annotations
from cheddahbot.skills import skill
{steps}
''')
file_path = skills_dir / f"{name}.py"
if file_path.exists():
return f"Skill '{name}' already exists. Choose a different name."
file_path.write_text(module_code, encoding="utf-8")
# Try to load it
try:
from cheddahbot.skills import load_skill
load_skill(file_path)
return f"Skill '{name}' created at {file_path}"
except Exception as e:
return f"Skill created at {file_path} but failed to load: {e}"

View File

@ -0,0 +1,48 @@
"""Meta-tool: dynamically create new tools at runtime."""
from __future__ import annotations
import importlib
import textwrap
from pathlib import Path
from . import tool
@tool("build_tool", "Create a new tool from a description. The agent writes Python code with @tool decorator.", category="meta")
def build_tool(name: str, description: str, code: str, ctx: dict = None) -> str:
"""Generate a new tool module and hot-load it.
Args:
name: Tool name (snake_case)
description: What the tool does
code: Full Python code for the tool function (must use @tool decorator)
"""
if not name.isidentifier():
return f"Invalid tool name: {name}. Must be a valid Python identifier."
# Wrap code in a module with the import
module_code = textwrap.dedent(f'''\
"""Auto-generated tool: {description}"""
from __future__ import annotations
from . import tool
{code}
''')
# Write to tools directory
tools_dir = Path(__file__).parent
file_path = tools_dir / f"{name}.py"
if file_path.exists():
return f"Tool module '{name}' already exists. Choose a different name."
file_path.write_text(module_code, encoding="utf-8")
# Hot-import the new module
try:
importlib.import_module(f".{name}", package=__package__)
return f"Tool '{name}' created and loaded successfully at {file_path}"
except Exception as e:
# Clean up on failure
file_path.unlink(missing_ok=True)
return f"Failed to load tool '{name}': {e}"

View File

@ -0,0 +1,58 @@
"""Calendar/reminder tools: schedule tasks, set reminders."""
from __future__ import annotations
from datetime import datetime, timezone
from . import tool
@tool("remember_this", "Save an important fact or instruction to long-term memory", category="memory")
def remember_this(text: str, ctx: dict = None) -> str:
if ctx and ctx.get("memory"):
ctx["memory"].remember(text)
return f"Saved to memory: {text}"
return "Memory system not available"
@tool("search_memory", "Search through saved memories", category="memory")
def search_memory(query: str, ctx: dict = None) -> str:
if ctx and ctx.get("memory"):
results = ctx["memory"].search(query)
if results:
return "\n".join(f"- [{r.get('score', 0):.2f}] {r['text']}" for r in results)
return "No matching memories found."
return "Memory system not available"
@tool("log_note", "Add a timestamped note to today's daily log", category="memory")
def log_note(text: str, ctx: dict = None) -> str:
if ctx and ctx.get("memory"):
ctx["memory"].log_daily(text)
return f"Logged: {text}"
return "Memory system not available"
@tool("schedule_task", "Schedule a recurring or one-time task", category="scheduling")
def schedule_task(name: str, prompt: str, schedule: str, ctx: dict = None) -> str:
"""Schedule a task. Schedule format: cron expression or 'once:YYYY-MM-DDTHH:MM'."""
if ctx and ctx.get("db"):
task_id = ctx["db"].add_scheduled_task(name, prompt, schedule)
return f"Scheduled task '{name}' (id={task_id}) with schedule: {schedule}"
return "Database not available"
@tool("list_tasks", "List all scheduled tasks", category="scheduling")
def list_tasks(ctx: dict = None) -> str:
if ctx and ctx.get("db"):
tasks = ctx["db"]._conn.execute(
"SELECT id, name, schedule, enabled, next_run FROM scheduled_tasks ORDER BY id"
).fetchall()
if not tasks:
return "No scheduled tasks."
lines = []
for t in tasks:
status = "enabled" if t["enabled"] else "disabled"
lines.append(f"[{t['id']}] {t['name']} - {t['schedule']} ({status})")
return "\n".join(lines)
return "Database not available"

View File

@ -0,0 +1,44 @@
"""Python code execution tool (sandboxed via subprocess)."""
from __future__ import annotations
import subprocess
import sys
import tempfile
from pathlib import Path
from . import tool
@tool("run_python", "Execute Python code and return the output", category="code")
def run_python(code: str, timeout: int = 30) -> str:
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
f.write(code)
f.flush()
tmp_path = f.name
try:
result = subprocess.run(
[sys.executable, tmp_path],
capture_output=True,
text=True,
timeout=min(timeout, 60),
encoding="utf-8",
errors="replace",
)
output = ""
if result.stdout:
output += result.stdout
if result.stderr:
output += f"\n[stderr]\n{result.stderr}"
if result.returncode != 0:
output += f"\n[exit code: {result.returncode}]"
if len(output) > 10000:
output = output[:10000] + "\n... (truncated)"
return output.strip() or "(no output)"
except subprocess.TimeoutExpired:
return f"Execution timed out after {timeout}s"
except Exception as e:
return f"Execution error: {e}"
finally:
Path(tmp_path).unlink(missing_ok=True)

View File

@ -0,0 +1,87 @@
"""Data processing tools: CSV/JSON operations."""
from __future__ import annotations
import csv
import io
import json
from pathlib import Path
from . import tool
@tool("read_csv", "Read a CSV file and return summary or specific rows", category="data")
def read_csv(path: str, max_rows: int = 20) -> str:
p = Path(path).resolve()
if not p.exists():
return f"File not found: {path}"
try:
with open(p, newline="", encoding="utf-8-sig") as f:
reader = csv.reader(f)
rows = []
for i, row in enumerate(reader):
rows.append(row)
if i >= max_rows:
break
if not rows:
return "Empty CSV file."
# Format as table
header = rows[0]
lines = [" | ".join(header), " | ".join("---" for _ in header)]
for row in rows[1:]:
lines.append(" | ".join(str(c)[:50] for c in row))
result = "\n".join(lines)
total_line_count = sum(1 for _ in open(p, encoding="utf-8-sig"))
if total_line_count > max_rows + 1:
result += f"\n\n... ({total_line_count - 1} total rows, showing first {max_rows})"
return result
except Exception as e:
return f"Error reading CSV: {e}"
@tool("read_json", "Read and pretty-print a JSON file", category="data")
def read_json(path: str) -> str:
p = Path(path).resolve()
if not p.exists():
return f"File not found: {path}"
try:
data = json.loads(p.read_text(encoding="utf-8"))
formatted = json.dumps(data, indent=2, ensure_ascii=False)
if len(formatted) > 15000:
formatted = formatted[:15000] + "\n... (truncated)"
return formatted
except Exception as e:
return f"Error reading JSON: {e}"
@tool("query_json", "Extract data from a JSON file using a dot-notation path", category="data")
def query_json(path: str, json_path: str) -> str:
"""json_path example: 'data.users.0.name' or 'results.*.id'"""
p = Path(path).resolve()
if not p.exists():
return f"File not found: {path}"
try:
data = json.loads(p.read_text(encoding="utf-8"))
result = _navigate(data, json_path.split("."))
return json.dumps(result, indent=2, ensure_ascii=False) if not isinstance(result, str) else result
except Exception as e:
return f"Error: {e}"
def _navigate(data, parts: list[str]):
for part in parts:
if part == "*" and isinstance(data, list):
return data
elif isinstance(data, dict):
data = data.get(part, f"Key '{part}' not found")
elif isinstance(data, list):
try:
data = data[int(part)]
except (ValueError, IndexError):
return f"Invalid index '{part}'"
else:
return f"Cannot navigate into {type(data).__name__}"
return data

View File

@ -0,0 +1,96 @@
"""File operation tools: read, write, edit, search."""
from __future__ import annotations
import os
from pathlib import Path
from . import tool
@tool("read_file", "Read the contents of a file", category="files")
def read_file(path: str) -> str:
p = Path(path).resolve()
if not p.exists():
return f"File not found: {path}"
if not p.is_file():
return f"Not a file: {path}"
try:
content = p.read_text(encoding="utf-8", errors="replace")
if len(content) > 50000:
return content[:50000] + f"\n\n... (truncated, {len(content)} total chars)"
return content
except Exception as e:
return f"Error reading file: {e}"
@tool("write_file", "Write content to a file (creates or overwrites)", category="files")
def write_file(path: str, content: str) -> str:
p = Path(path).resolve()
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
return f"Written {len(content)} chars to {p}"
@tool("edit_file", "Replace text in a file (first occurrence)", category="files")
def edit_file(path: str, old_text: str, new_text: str) -> str:
p = Path(path).resolve()
if not p.exists():
return f"File not found: {path}"
content = p.read_text(encoding="utf-8")
if old_text not in content:
return f"Text not found in {path}"
content = content.replace(old_text, new_text, 1)
p.write_text(content, encoding="utf-8")
return f"Replaced text in {p}"
@tool("list_directory", "List files and folders in a directory", category="files")
def list_directory(path: str = ".") -> str:
p = Path(path).resolve()
if not p.is_dir():
return f"Not a directory: {path}"
entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
lines = []
for e in entries[:200]:
prefix = "📁 " if e.is_dir() else "📄 "
size = ""
if e.is_file():
s = e.stat().st_size
if s > 1_000_000:
size = f" ({s / 1_000_000:.1f} MB)"
elif s > 1000:
size = f" ({s / 1000:.1f} KB)"
else:
size = f" ({s} B)"
lines.append(f"{prefix}{e.name}{size}")
return "\n".join(lines) if lines else "(empty directory)"
@tool("search_files", "Search for files matching a glob pattern", category="files")
def search_files(pattern: str, directory: str = ".") -> str:
p = Path(directory).resolve()
matches = list(p.glob(pattern))[:100]
if not matches:
return f"No files matching '{pattern}' in {directory}"
return "\n".join(str(m) for m in matches)
@tool("search_in_files", "Search for text content across files", category="files")
def search_in_files(query: str, directory: str = ".", extension: str = "") -> str:
p = Path(directory).resolve()
pattern = f"**/*{extension}" if extension else "**/*"
results = []
for f in p.glob(pattern):
if not f.is_file() or f.stat().st_size > 1_000_000:
continue
try:
content = f.read_text(encoding="utf-8", errors="ignore")
for i, line in enumerate(content.split("\n"), 1):
if query.lower() in line.lower():
results.append(f"{f}:{i}: {line.strip()[:200]}")
if len(results) >= 50:
return "\n".join(results) + "\n... (truncated)"
except Exception:
continue
return "\n".join(results) if results else f"No matches for '{query}'"

View File

@ -0,0 +1,41 @@
"""Image analysis tool - delegates to vision-capable LLM."""
from __future__ import annotations
import base64
from pathlib import Path
from . import tool
@tool("analyze_image", "Describe or analyze an image file", category="media")
def analyze_image(path: str, question: str = "Describe this image in detail.", ctx: dict = None) -> str:
p = Path(path).resolve()
if not p.exists():
return f"Image not found: {path}"
suffix = p.suffix.lower()
mime_map = {".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp"}
mime = mime_map.get(suffix, "image/png")
try:
data = base64.b64encode(p.read_bytes()).decode("utf-8")
except Exception as e:
return f"Error reading image: {e}"
if ctx and ctx.get("agent"):
agent = ctx["agent"]
messages = [
{"role": "user", "content": [
{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{data}"}},
]},
]
result_parts = []
for chunk in agent.llm.chat(messages, stream=False):
if chunk["type"] == "text":
result_parts.append(chunk["content"])
return "".join(result_parts) or "Could not analyze image."
return "Agent context not available for image analysis."

View File

@ -0,0 +1,53 @@
"""Shell command execution tool with safety checks."""
from __future__ import annotations
import subprocess
import sys
from . import tool
# Commands that are always blocked
BLOCKED_PATTERNS = [
"rm -rf /",
"format c:",
":(){:|:&};:",
"dd if=/dev/zero",
"mkfs.",
"> /dev/sda",
]
@tool("run_command", "Execute a shell command and return output", category="shell")
def run_command(command: str, timeout: int = 30) -> str:
# Safety check
cmd_lower = command.lower().strip()
for pattern in BLOCKED_PATTERNS:
if pattern in cmd_lower:
return f"Blocked: command matches dangerous pattern '{pattern}'"
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=min(timeout, 120),
encoding="utf-8",
errors="replace",
)
output = ""
if result.stdout:
output += result.stdout
if result.stderr:
output += f"\n[stderr]\n{result.stderr}"
if result.returncode != 0:
output += f"\n[exit code: {result.returncode}]"
if len(output) > 10000:
output = output[:10000] + "\n... (truncated)"
return output.strip() or "(no output)"
except subprocess.TimeoutExpired:
return f"Command timed out after {timeout}s"
except Exception as e:
return f"Error running command: {e}"

View File

@ -0,0 +1,64 @@
"""Web tools: search, fetch URL, scrape."""
from __future__ import annotations
import httpx
from bs4 import BeautifulSoup
from . import tool
@tool("web_search", "Search the web using DuckDuckGo", category="web")
def web_search(query: str, max_results: int = 5) -> str:
try:
# Use DuckDuckGo HTML search (no API key needed)
r = httpx.get(
"https://html.duckduckgo.com/html/",
params={"q": query},
headers={"User-Agent": "Mozilla/5.0 (compatible; CheddahBot/1.0)"},
timeout=15,
follow_redirects=True,
)
soup = BeautifulSoup(r.text, "html.parser")
results = []
for item in soup.select(".result")[:max_results]:
title_el = item.select_one(".result__title a")
snippet_el = item.select_one(".result__snippet")
if title_el:
title = title_el.get_text(strip=True)
url = title_el.get("href", "")
snippet = snippet_el.get_text(strip=True) if snippet_el else ""
results.append(f"**{title}**\n{url}\n{snippet}")
return "\n\n".join(results) if results else "No results found."
except Exception as e:
return f"Search error: {e}"
@tool("fetch_url", "Fetch and extract text content from a URL", category="web")
def fetch_url(url: str) -> str:
try:
r = httpx.get(
url,
headers={"User-Agent": "Mozilla/5.0 (compatible; CheddahBot/1.0)"},
timeout=20,
follow_redirects=True,
)
content_type = r.headers.get("content-type", "")
if "html" in content_type:
soup = BeautifulSoup(r.text, "html.parser")
# Remove script/style elements
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
# Collapse whitespace
lines = [l.strip() for l in text.split("\n") if l.strip()]
text = "\n".join(lines)
if len(text) > 15000:
text = text[:15000] + "\n... (truncated)"
return text
elif "json" in content_type:
return r.text[:15000]
else:
return r.text[:5000]
except Exception as e:
return f"Fetch error: {e}"

195
cheddahbot/ui.py 100644
View File

@ -0,0 +1,195 @@
"""Gradio interface for CheddahBot."""
from __future__ import annotations
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import gradio as gr
if TYPE_CHECKING:
from .agent import Agent
from .config import Config
from .llm import LLMAdapter
log = logging.getLogger(__name__)
_CSS = """
.contain { max-width: 900px; margin: auto; }
footer { display: none !important; }
"""
def create_ui(agent: Agent, config: Config, llm: LLMAdapter) -> gr.Blocks:
"""Build and return the Gradio app."""
available_models = llm.list_available_models()
model_choices = [(m.name, m.id) for m in available_models]
current_model = llm.current_model
with gr.Blocks(title="CheddahBot") as app:
gr.Markdown("# CheddahBot", elem_classes=["contain"])
with gr.Row(elem_classes=["contain"]):
model_dropdown = gr.Dropdown(
choices=model_choices,
value=current_model,
label="Model",
interactive=True,
scale=3,
)
refresh_btn = gr.Button("Refresh", scale=0, min_width=70)
new_chat_btn = gr.Button("New Chat", scale=1, variant="secondary")
chatbot = gr.Chatbot(
label="Chat",
height=500,
buttons=["copy"],
elem_classes=["contain"],
)
with gr.Row(elem_classes=["contain"]):
msg_input = gr.MultimodalTextbox(
placeholder="Type a message... (attach files, use mic, or camera)",
show_label=False,
scale=4,
sources=["upload", "microphone"],
)
# -- Voice Chat Mode --
with gr.Accordion("Voice Chat", open=False, elem_classes=["contain"]):
gr.Markdown("Record audio and get a spoken response.")
voice_input = gr.Audio(sources=["microphone"], type="filepath", label="Speak")
voice_output = gr.Audio(type="filepath", label="Response", autoplay=True)
voice_status = gr.Textbox(label="Transcript", interactive=False)
# -- Accordion sections --
with gr.Accordion("Conversation History", open=False, elem_classes=["contain"]):
conv_list = gr.Dataframe(
headers=["ID", "Title", "Last Updated"],
label="Past Conversations",
interactive=False,
)
load_conv_btn = gr.Button("Load Selected")
with gr.Accordion("Settings", open=False, elem_classes=["contain"]):
gr.Markdown(
"Edit `identity/SOUL.md` to change the agent's personality.\n\n"
"Edit `identity/USER.md` to update your profile.\n\n"
"Edit `config.yaml` for advanced settings."
)
# -- Event handlers --
def on_model_change(model_id):
llm.switch_model(model_id)
return f"Switched to {model_id}"
def on_refresh_models():
models = llm.list_available_models()
choices = [(m.name, m.id) for m in models]
return gr.update(choices=choices, value=llm.current_model)
def on_new_chat():
agent.new_conversation()
return [], _load_conversations()
def _load_conversations():
convs = agent.db.list_conversations()
return [[c["id"], c["title"], c["updated_at"][:19]] for c in convs]
def on_user_message(message, chat_history):
# Extract text and files from MultimodalTextbox
if isinstance(message, dict):
text = message.get("text", "")
files = message.get("files", [])
else:
text = str(message)
files = []
if not text and not files:
yield chat_history, gr.update(value=None)
return
# Handle audio files - transcribe them
processed_files = []
for f in files:
fpath = f if isinstance(f, str) else f.get("path", f.get("name", ""))
if fpath and Path(fpath).suffix.lower() in (".wav", ".mp3", ".ogg", ".webm", ".m4a"):
try:
from .media import transcribe_audio
transcript = transcribe_audio(fpath)
if transcript:
text = f"{text}\n[Voice message]: {transcript}" if text else f"[Voice message]: {transcript}"
continue
except Exception as e:
log.warning("Audio transcription failed: %s", e)
processed_files.append(fpath)
# Add user message
user_display = text
if processed_files:
file_names = [Path(f).name for f in processed_files]
user_display += f"\n[Attached: {', '.join(file_names)}]"
chat_history = chat_history + [{"role": "user", "content": user_display}]
yield chat_history, gr.update(value=None)
# Stream assistant response
response_text = ""
chat_history = chat_history + [{"role": "assistant", "content": ""}]
for chunk in agent.respond(text, files=processed_files):
response_text += chunk
chat_history[-1] = {"role": "assistant", "content": response_text}
yield chat_history, gr.update(value=None)
def on_voice_chat(audio_path):
"""Handle voice chat: transcribe -> respond -> TTS."""
if not audio_path:
return None, "No audio received."
try:
from .media import transcribe_audio, text_to_speech
# Transcribe
transcript = transcribe_audio(audio_path)
if not transcript:
return None, "Could not transcribe audio."
# Get response
response = agent.respond_to_prompt(transcript)
# TTS
output_path = config.data_dir / "generated" / "voice_response.mp3"
text_to_speech(response, output_path)
return str(output_path), f"You said: {transcript}\n\nResponse: {response}"
except Exception as e:
return None, f"Voice chat error: {e}"
# -- Wire events --
model_dropdown.change(on_model_change, [model_dropdown], None)
refresh_btn.click(on_refresh_models, None, [model_dropdown])
new_chat_btn.click(on_new_chat, None, [chatbot, conv_list])
msg_input.submit(
on_user_message,
[msg_input, chatbot],
[chatbot, msg_input],
)
voice_input.stop_recording(
on_voice_chat,
[voice_input],
[voice_output, voice_status],
)
# Load conversation list on app start
app.load(_load_conversations, None, [conv_list])
return app, _CSS

32
config.yaml 100644
View File

@ -0,0 +1,32 @@
# CheddahBot Configuration
# Default model to use on startup
default_model: "claude-sonnet-4-20250514"
# Gradio server settings
host: "0.0.0.0"
port: 7860
# Memory settings
memory:
max_context_messages: 50 # Messages kept in context window
flush_threshold: 40 # Trigger summary when messages exceed this
embedding_model: "all-MiniLM-L6-v2"
search_top_k: 5 # Number of semantic search results
# Scheduler settings
scheduler:
heartbeat_interval_minutes: 30
poll_interval_seconds: 60
# Local model endpoints (auto-detected)
ollama_url: "http://localhost:11434"
lmstudio_url: "http://localhost:1234"
# Safety settings
shell:
blocked_commands:
- "rm -rf /"
- "format"
- ":(){:|:&};:"
require_approval: false # If true, shell commands need user confirmation

View File

@ -0,0 +1,7 @@
# Heartbeat Checklist
Things to proactively check on each heartbeat cycle:
- Check if any scheduled tasks failed and need retry
- Review memory for any pending reminders that are due
- Check disk space (warn if < 10% free)

20
identity/SOUL.md 100644
View File

@ -0,0 +1,20 @@
# Soul
You are Cheddah, a sharp and resourceful AI assistant.
## Personality
- Direct, no-nonsense, but warm
- You use humor when appropriate
- You're proactive - suggest things before being asked
- You remember what the user tells you and reference it naturally
- You adapt your communication style to match the user's preferences
## Boundaries
- Never pretend to be human
- Be honest about uncertainty - say "I don't know" when you don't
- Don't make up facts or hallucinate information
- Ask for clarification rather than guessing on important decisions
## Quirks
- You occasionally use the word "cheddah" as slang for money/value
- You appreciate efficiency and elegant solutions

14
identity/USER.md 100644
View File

@ -0,0 +1,14 @@
# User Profile
## Identity
- Name: (your name here)
- How to address: (first name, nickname, etc.)
## Context
- Technical level: (beginner/intermediate/advanced)
- Primary language: Python
- Working on: (current projects)
## Preferences
- Communication style: (concise/detailed)
- (anything else you want the agent to know)

19
requirements.txt 100644
View File

@ -0,0 +1,19 @@
# Core
gradio>=5.0
openai>=1.30
pyyaml>=6.0
python-dotenv>=1.0
# Memory
sentence-transformers>=3.0
numpy>=1.24
# Web tools
httpx>=0.27
beautifulsoup4>=4.12
# Scheduling
croniter>=2.0
# Audio
edge-tts>=6.1