CheddahBot/cheddahbot/api.py

298 lines
8.2 KiB
Python

"""Dashboard API endpoints.
Provides JSON data for the standalone HTML dashboard.
All ClickUp data is cached for 5 minutes to avoid hammering the API.
"""
from __future__ import annotations
import json
import logging
import shutil
import threading
import time
from typing import TYPE_CHECKING
from fastapi import APIRouter
if TYPE_CHECKING:
from .agent_registry import AgentRegistry
from .config import Config
from .db import Database
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api")
# Module-level refs wired at startup by create_api_router()
_config: Config | None = None
_db: Database | None = None
_registry: AgentRegistry | None = None
# Simple in-memory cache for ClickUp data
_cache: dict[str, dict] = {}
_cache_lock = threading.Lock()
CACHE_TTL = 300 # 5 minutes
def create_api_router(
config: Config, db: Database, registry: AgentRegistry
) -> APIRouter:
"""Wire dependencies and return the router."""
global _config, _db, _registry
_config = config
_db = db
_registry = registry
return router
def _get_cached(key: str) -> dict | None:
with _cache_lock:
entry = _cache.get(key)
if entry and time.time() - entry["ts"] < CACHE_TTL:
return entry["data"]
return None
def _set_cached(key: str, data):
with _cache_lock:
_cache[key] = {"data": data, "ts": time.time()}
def _get_clickup_client():
from .clickup import ClickUpClient
return ClickUpClient(
api_token=_config.clickup.api_token,
workspace_id=_config.clickup.workspace_id,
task_type_field_name=_config.clickup.task_type_field_name,
)
# ── Tasks ──
@router.get("/tasks")
async def get_tasks():
"""All ClickUp tasks grouped by work category."""
cached = _get_cached("tasks")
if cached is not None:
return cached
if not _config or not _config.clickup.enabled:
return {"tasks": [], "error": "ClickUp not configured"}
client = _get_clickup_client()
try:
raw_tasks = client.get_tasks_from_space(_config.clickup.space_id)
tasks = []
for t in raw_tasks:
tasks.append(
{
"id": t.id,
"name": t.name,
"status": t.status,
"task_type": t.task_type,
"url": t.url,
"due_date": t.due_date,
"list_name": t.list_name,
"custom_fields": t.custom_fields,
}
)
result = {"tasks": tasks, "count": len(tasks)}
_set_cached("tasks", result)
return result
except Exception as e:
log.error("Failed to fetch tasks for dashboard: %s", e)
return {"tasks": [], "error": str(e)}
finally:
client.close()
@router.get("/tasks/by-company")
async def get_tasks_by_company():
"""Tasks grouped by Client custom field."""
data = await get_tasks()
by_company: dict[str, list] = {}
for task in data.get("tasks", []):
company = task["custom_fields"].get("Client") or "Unassigned"
by_company.setdefault(company, []).append(task)
# Sort companies by task count descending
sorted_companies = sorted(by_company.items(), key=lambda x: -len(x[1]))
return {
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted_companies
]
}
@router.get("/tasks/link-building")
async def get_link_building_tasks():
"""Link building tasks with KV state merged in."""
data = await get_tasks()
lb_tasks = [t for t in data.get("tasks", []) if t["task_type"] == "Link Building"]
# Merge KV state
if _db:
for task in lb_tasks:
kv_key = f"clickup:task:{task['id']}:state"
raw = _db.kv_get(kv_key)
if raw:
try:
task["kv_state"] = json.loads(raw)
except json.JSONDecodeError:
task["kv_state"] = None
else:
task["kv_state"] = None
# Group by company
by_company: dict[str, list] = {}
for task in lb_tasks:
company = task["custom_fields"].get("Client") or "Unassigned"
by_company.setdefault(company, []).append(task)
return {
"total": len(lb_tasks),
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1]))
],
"status_counts": _count_statuses(lb_tasks),
}
@router.get("/tasks/press-releases")
async def get_press_release_tasks():
"""Press release tasks with KV state merged in."""
data = await get_tasks()
pr_tasks = [t for t in data.get("tasks", []) if t["task_type"] == "Press Release"]
if _db:
for task in pr_tasks:
kv_key = f"clickup:task:{task['id']}:state"
raw = _db.kv_get(kv_key)
if raw:
try:
task["kv_state"] = json.loads(raw)
except json.JSONDecodeError:
task["kv_state"] = None
else:
task["kv_state"] = None
by_company: dict[str, list] = {}
for task in pr_tasks:
company = task["custom_fields"].get("Client") or "Unassigned"
by_company.setdefault(company, []).append(task)
return {
"total": len(pr_tasks),
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1]))
],
"status_counts": _count_statuses(pr_tasks),
}
def _count_statuses(tasks: list[dict]) -> dict[str, int]:
counts: dict[str, int] = {}
for t in tasks:
s = t.get("status", "unknown")
counts[s] = counts.get(s, 0) + 1
return counts
# ── Agents ──
@router.get("/agents")
async def get_agents():
"""Agent configurations."""
if not _config:
return {"agents": []}
agents = []
for ac in _config.agents:
agents.append(
{
"name": ac.name,
"display_name": ac.display_name,
"personality_file": ac.personality_file,
"model": ac.model or _config.chat_model,
"tools": ac.tools,
"skills": ac.skills,
"memory_scope": ac.memory_scope or "shared",
}
)
return {"agents": agents}
# ── System ──
@router.get("/system/health")
async def get_system_health():
"""System health: disk space, brains, integrations."""
health = {
"execution_brain": shutil.which("claude") is not None,
"clickup_enabled": _config.clickup.enabled if _config else False,
"chat_model": _config.chat_model if _config else "unknown",
"execution_model": _config.default_model if _config else "unknown",
"disks": [],
}
# Disk space (Windows drives)
for drive in ["C:", "D:", "E:", "Z:"]:
try:
usage = shutil.disk_usage(drive + "/")
health["disks"].append(
{
"drive": drive,
"total_gb": round(usage.total / (1024**3), 1),
"free_gb": round(usage.free / (1024**3), 1),
"percent_free": round(usage.free / usage.total * 100, 1),
}
)
except (FileNotFoundError, OSError):
pass
return health
@router.get("/system/notifications")
async def get_notifications():
"""Recent notifications from the DB."""
if not _db:
return {"notifications": []}
notes = _db.get_notifications_after(0, limit=100)
return {"notifications": notes}
@router.get("/system/kv-states")
async def get_kv_states():
"""All ClickUp task states from KV store."""
if not _db:
return {"states": []}
pairs = _db.kv_scan("clickup:task:")
states = []
for _key, val in pairs:
try:
state = json.loads(val)
states.append(state)
except json.JSONDecodeError:
pass
return {"states": states}
@router.post("/cache/clear")
async def clear_cache():
"""Clear the ClickUp data cache."""
with _cache_lock:
_cache.clear()
return {"status": "cleared"}