Add API cost tracking and switch planner to Grok 4.1 Fast

Track per-call token usage and estimated costs across all OpenRouter models.
Switch planner agent from Claude Sonnet 4.6 ($3/$15 per M) to Grok 4.1 Fast
($0.20/$0.50 per M) for ~25x cost reduction. Add budget alerts, a dashboard
card, and a check_api_usage tool for visibility into spending.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-23 18:00:11 -06:00
parent ab2c313baa
commit 0b3ab904de
8 changed files with 264 additions and 1 deletions

View File

@ -186,6 +186,18 @@ class Agent:
yield chunk["content"]
elif chunk["type"] == "tool_use":
tool_calls.append(chunk)
elif chunk["type"] == "usage":
if self.db:
self.db.log_api_usage(
model=chunk["model"],
provider="openrouter",
prompt_tokens=chunk["prompt_tokens"],
completion_tokens=chunk["completion_tokens"],
total_tokens=chunk["total_tokens"],
estimated_cost=chunk["estimated_cost"],
conv_id=conv_id,
agent_name=self.agent_config.name if self.agent_config else "default",
)
# If no tool calls, we're done
if not tool_calls:

View File

@ -76,6 +76,12 @@ class LinkBuildingConfig:
default_branded_plus_ratio: float = 0.7
@dataclass
class ApiBudgetConfig:
monthly_limit: float = 20.00 # USD - alert when exceeded
alert_threshold: float = 0.8 # alert at 80% of limit
@dataclass
class AgentConfig:
"""Per-agent configuration for multi-agent support."""
@ -105,6 +111,7 @@ class Config:
press_advantage: PressAdvantageConfig = field(default_factory=PressAdvantageConfig)
email: EmailConfig = field(default_factory=EmailConfig)
link_building: LinkBuildingConfig = field(default_factory=LinkBuildingConfig)
api_budget: ApiBudgetConfig = field(default_factory=ApiBudgetConfig)
agents: list[AgentConfig] = field(default_factory=lambda: [AgentConfig()])
# Derived paths
@ -156,6 +163,10 @@ def load_config() -> Config:
for k, v in data["link_building"].items():
if hasattr(cfg.link_building, k):
setattr(cfg.link_building, k, v)
if "api_budget" in data and isinstance(data["api_budget"], dict):
for k, v in data["api_budget"].items():
if hasattr(cfg.api_budget, k):
setattr(cfg.api_budget, k, v)
# Multi-agent configs
if "agents" in data and isinstance(data["agents"], list):

View File

@ -72,6 +72,18 @@ class Database:
category TEXT NOT NULL DEFAULT 'clickup',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS api_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
model TEXT NOT NULL,
provider TEXT NOT NULL,
prompt_tokens INTEGER NOT NULL DEFAULT 0,
completion_tokens INTEGER NOT NULL DEFAULT 0,
total_tokens INTEGER NOT NULL DEFAULT 0,
estimated_cost REAL NOT NULL DEFAULT 0.0,
conv_id TEXT,
agent_name TEXT
);
""")
# Migration: add agent_name column to conversations (idempotent)
with contextlib.suppress(sqlite3.OperationalError):
@ -275,6 +287,84 @@ class Database:
).fetchall()
return [dict(r) for r in rows]
# -- API Usage --
def log_api_usage(
self,
model: str,
provider: str,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
estimated_cost: float,
conv_id: str | None = None,
agent_name: str | None = None,
):
now = _now()
self._conn.execute(
"""INSERT INTO api_usage
(timestamp, model, provider, prompt_tokens, completion_tokens,
total_tokens, estimated_cost, conv_id, agent_name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(now, model, provider, prompt_tokens, completion_tokens,
total_tokens, estimated_cost, conv_id, agent_name),
)
self._conn.commit()
def get_api_usage_summary(self, days: int = 30) -> dict:
"""Return total tokens, total cost, and per-model breakdown for the period."""
cutoff = datetime.now(UTC).isoformat()[:10] # today
# Compute cutoff date
from datetime import timedelta
cutoff_dt = datetime.now(UTC) - timedelta(days=days)
cutoff = cutoff_dt.isoformat()
row = self._conn.execute(
"SELECT COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,"
" COALESCE(SUM(completion_tokens), 0) as completion_tokens,"
" COALESCE(SUM(total_tokens), 0) as total_tokens,"
" COALESCE(SUM(estimated_cost), 0.0) as total_cost"
" FROM api_usage WHERE timestamp >= ?",
(cutoff,),
).fetchone()
model_rows = self._conn.execute(
"SELECT model,"
" COALESCE(SUM(prompt_tokens), 0) as prompt_tokens,"
" COALESCE(SUM(completion_tokens), 0) as completion_tokens,"
" COALESCE(SUM(total_tokens), 0) as total_tokens,"
" COALESCE(SUM(estimated_cost), 0.0) as total_cost,"
" COUNT(*) as call_count"
" FROM api_usage WHERE timestamp >= ?"
" GROUP BY model ORDER BY total_cost DESC",
(cutoff,),
).fetchall()
return {
"prompt_tokens": row["prompt_tokens"],
"completion_tokens": row["completion_tokens"],
"total_tokens": row["total_tokens"],
"total_cost": row["total_cost"],
"by_model": [dict(r) for r in model_rows],
}
def get_api_usage_daily(self, days: int = 7) -> list[dict]:
"""Return daily totals for trending."""
from datetime import timedelta
cutoff_dt = datetime.now(UTC) - timedelta(days=days)
cutoff = cutoff_dt.isoformat()
rows = self._conn.execute(
"SELECT DATE(timestamp) as day,"
" COALESCE(SUM(total_tokens), 0) as total_tokens,"
" COALESCE(SUM(estimated_cost), 0.0) as total_cost,"
" COUNT(*) as call_count"
" FROM api_usage WHERE timestamp >= ?"
" GROUP BY DATE(timestamp) ORDER BY day ASC",
(cutoff,),
).fetchall()
return [dict(r) for r in rows]
def _now() -> str:
return datetime.now(UTC).isoformat()

View File

@ -27,6 +27,29 @@ import httpx
log = logging.getLogger(__name__)
# Pricing per million tokens: (input_cost, output_cost) in USD
MODEL_PRICING: dict[str, tuple[float, float]] = {
"anthropic/claude-sonnet": (3.00, 15.00),
"anthropic/claude-opus": (5.00, 25.00),
"anthropic/claude-haiku": (0.80, 4.00),
"x-ai/grok-4.1-fast": (0.20, 0.50),
"google/gemini-3-flash": (0.50, 3.00),
"google/gemini-2.5-flash": (0.15, 0.60),
"openai/gpt-4o-mini": (0.15, 0.60),
"openai/gpt-5-nano": (0.10, 0.40),
"deepseek/deepseek-v3": (0.24, 0.38),
"minimax/minimax-m2.5": (0.30, 1.20),
"moonshotai/kimi-k2.5": (0.45, 2.20),
}
def _estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""Estimate cost in USD using prefix matching against MODEL_PRICING."""
for prefix, (input_rate, output_rate) in MODEL_PRICING.items():
if model.startswith(prefix):
return (prompt_tokens * input_rate + completion_tokens * output_rate) / 1_000_000
return 0.0
@dataclass
class ModelInfo:
@ -232,6 +255,8 @@ class LLMAdapter:
"messages": messages,
"stream": stream,
}
if stream:
kwargs["stream_options"] = {"include_usage": True}
if tools:
kwargs["tools"] = tools
@ -243,7 +268,11 @@ class LLMAdapter:
if stream:
response = client.chat.completions.create(**kwargs)
tool_calls_accum: dict[int, dict] = {}
stream_usage = None
for chunk in response:
# Capture usage from the final stream chunk
if hasattr(chunk, "usage") and chunk.usage:
stream_usage = chunk.usage
delta = chunk.choices[0].delta if chunk.choices else None
if not delta:
continue
@ -277,6 +306,19 @@ class LLMAdapter:
"name": tc["name"],
"input": args,
}
# Yield usage chunk if available
if stream_usage:
pt = getattr(stream_usage, "prompt_tokens", 0) or 0
ct = getattr(stream_usage, "completion_tokens", 0) or 0
yield {
"type": "usage",
"model": model_id,
"prompt_tokens": pt,
"completion_tokens": ct,
"total_tokens": pt + ct,
"estimated_cost": _estimate_cost(model_id, pt, ct),
}
else:
response = client.chat.completions.create(**kwargs)
msg = response.choices[0].message
@ -295,6 +337,19 @@ class LLMAdapter:
"name": tc.function.name,
"input": args,
}
# Yield usage chunk for non-streaming
if hasattr(response, "usage") and response.usage:
pt = response.usage.prompt_tokens or 0
ct = response.usage.completion_tokens or 0
yield {
"type": "usage",
"model": model_id,
"prompt_tokens": pt,
"completion_tokens": ct,
"total_tokens": pt + ct,
"estimated_cost": _estimate_cost(model_id, pt, ct),
}
# Success — break out of retry loop
return

View File

@ -62,3 +62,54 @@ def report_issue(
log.info("Logged improvement request: %s", title)
return f"Logged improvement request: **{title}**. Bryan will see it on the next heartbeat."
@tool(
"check_api_usage",
"Check API token usage and estimated costs for the last N days",
category="system",
)
def check_api_usage(days: int = 30, ctx: dict | None = None) -> str:
"""Return a formatted report of API usage and costs."""
db = ctx.get("db") if ctx else None
if not db:
return "Error: database not available."
summary = db.get_api_usage_summary(days)
daily = db.get_api_usage_daily(min(days, 7))
total_tokens = summary["total_tokens"]
total_cost = summary["total_cost"]
lines = [f"## API Usage Report ({days}-day window)\n"]
lines.append(f"**Total tokens:** {total_tokens:,}")
lines.append(f"**Estimated cost:** ${total_cost:.4f}")
# Budget info
config = ctx.get("config") if ctx else None
if config and hasattr(config, "api_budget"):
limit = config.api_budget.monthly_limit
pct = (total_cost / limit * 100) if limit > 0 else 0
lines.append(f"**Budget:** ${total_cost:.2f} / ${limit:.2f} ({pct:.1f}%)")
if pct >= config.api_budget.alert_threshold * 100:
lines.append(f"\n**WARNING:** Spending is at {pct:.1f}% of monthly budget!")
# Per-model breakdown
if summary["by_model"]:
lines.append("\n### By Model")
for m in summary["by_model"]:
lines.append(
f"- **{m['model']}**: {m['total_tokens']:,} tokens, "
f"${m['total_cost']:.4f}, {m['call_count']} calls"
)
# Daily trend
if daily:
lines.append("\n### Daily Trend (last 7 days)")
for d in daily:
lines.append(
f"- {d['day']}: {d['total_tokens']:,} tokens, "
f"${d['total_cost']:.4f}, {d['call_count']} calls"
)
return "\n".join(lines)

View File

@ -214,6 +214,12 @@ def create_ui(
elem_classes=["contain"],
)
# -- API Usage card --
api_usage_display = gr.Markdown(
value="*API Usage (30d):* loading...",
elem_classes=["contain"],
)
# -- Notification banner --
notification_display = gr.Markdown(
value="",
@ -505,6 +511,34 @@ def create_ui(
value="*Recent System events* | System Loop: waiting for first run..."
)
def poll_api_usage():
"""Poll API usage stats for the dashboard card."""
try:
db = registry.default.db if registry.default else None
if not db:
return gr.update()
summary = db.get_api_usage_summary(30)
total_tokens = summary["total_tokens"]
total_cost = summary["total_cost"]
# Format tokens as human-readable
if total_tokens >= 1_000_000:
tok_str = f"{total_tokens / 1_000_000:.1f}M"
elif total_tokens >= 1_000:
tok_str = f"{total_tokens / 1_000:.1f}K"
else:
tok_str = str(total_tokens)
budget_str = ""
if hasattr(config, "api_budget"):
limit = config.api_budget.monthly_limit
budget_str = f" | Budget: ${total_cost:.2f} / ${limit:.2f}"
label = (
f"*API Usage (30d):* {tok_str} tokens"
f" | ${total_cost:.2f} est.{budget_str}"
)
return gr.update(value=label)
except Exception:
return gr.update()
def on_force_pulse():
if not scheduler:
return gr.update(
@ -563,4 +597,8 @@ def create_ui(
loop_timer = gr.Timer(30)
loop_timer.tick(poll_loop_status, None, [loop_status])
# API usage polling timer (every 60 seconds)
api_timer = gr.Timer(60)
api_timer.tick(poll_api_usage, None, [api_usage_display])
return app

View File

@ -115,6 +115,11 @@ agents:
- name: planner
display_name: Planner
model: "anthropic/claude-sonnet-4.6"
model: "x-ai/grok-4.1-fast"
tools: [delegate_task, remember, search_memory, report_issue, web_search]
memory_scope: ""
# API budget alerts
api_budget:
monthly_limit: 20.00 # USD - alert when exceeded
alert_threshold: 0.8 # alert at 80% of limit

View File

@ -6,3 +6,4 @@ Things to proactively check on each heartbeat cycle:
- Review memory for any pending reminders that are due
- Check disk space (warn if < 10% free)
- Check memory/improvement_requests.md for pending items and notify Bryan with a summary
- Check API usage costs (check_api_usage tool) and alert Bryan if monthly spend exceeds budget threshold