CheddahBot/cheddahbot/api.py

376 lines
11 KiB
Python

"""Dashboard API endpoints.
Provides JSON data for the standalone HTML dashboard.
All ClickUp data is cached for 5 minutes to avoid hammering the API.
"""
from __future__ import annotations
import json
import logging
import shutil
import threading
import time
from typing import TYPE_CHECKING
from fastapi import APIRouter
if TYPE_CHECKING:
from .agent_registry import AgentRegistry
from .config import Config
from .db import Database
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api")
# Module-level refs wired at startup by create_api_router()
_config: Config | None = None
_db: Database | None = None
_registry: AgentRegistry | None = None
# Simple in-memory cache for ClickUp data
_cache: dict[str, dict] = {}
_cache_lock = threading.Lock()
CACHE_TTL = 300 # 5 minutes
def create_api_router(
config: Config, db: Database, registry: AgentRegistry
) -> APIRouter:
"""Wire dependencies and return the router."""
global _config, _db, _registry
_config = config
_db = db
_registry = registry
return router
def _get_cached(key: str) -> dict | None:
with _cache_lock:
entry = _cache.get(key)
if entry and time.time() - entry["ts"] < CACHE_TTL:
return entry["data"]
return None
def _set_cached(key: str, data):
with _cache_lock:
_cache[key] = {"data": data, "ts": time.time()}
def _get_clickup_client():
from .clickup import ClickUpClient
return ClickUpClient(
api_token=_config.clickup.api_token,
workspace_id=_config.clickup.workspace_id,
task_type_field_name=_config.clickup.task_type_field_name,
)
# ── Tasks ──
@router.get("/tasks")
async def get_tasks():
"""All ClickUp tasks grouped by work category."""
cached = _get_cached("tasks")
if cached is not None:
return cached
if not _config or not _config.clickup.enabled:
return {"tasks": [], "error": "ClickUp not configured"}
client = _get_clickup_client()
try:
raw_tasks = client.get_tasks_from_overall_lists(_config.clickup.space_id)
tasks = []
for t in raw_tasks:
tasks.append(
{
"id": t.id,
"name": t.name,
"status": t.status,
"task_type": t.task_type,
"url": t.url,
"due_date": t.due_date,
"date_done": t.date_done,
"list_name": t.list_name,
"tags": t.tags,
"custom_fields": t.custom_fields,
}
)
result = {"tasks": tasks, "count": len(tasks)}
_set_cached("tasks", result)
return result
except Exception as e:
log.error("Failed to fetch tasks for dashboard: %s", e)
return {"tasks": [], "error": str(e)}
finally:
client.close()
@router.get("/tasks/by-company")
async def get_tasks_by_company():
"""Tasks grouped by Client custom field."""
data = await get_tasks()
by_company: dict[str, list] = {}
for task in data.get("tasks", []):
company = task["custom_fields"].get("Customer") or "Unassigned"
by_company.setdefault(company, []).append(task)
# Sort companies by task count descending
sorted_companies = sorted(by_company.items(), key=lambda x: -len(x[1]))
return {
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted_companies
]
}
@router.get("/tasks/link-building")
async def get_link_building_tasks():
"""Link building tasks with KV state merged in."""
cached = _get_cached("lb_tasks")
if cached is not None:
return cached
if not _config or not _config.clickup.enabled:
return {"total": 0, "companies": [], "status_counts": {}}
client = _get_clickup_client()
try:
raw_tasks = client.get_tasks_from_overall_lists(
_config.clickup.space_id, include_closed=True
)
except Exception as e:
log.error("Failed to fetch LB tasks: %s", e)
return {"total": 0, "companies": [], "status_counts": {}, "error": str(e)}
finally:
client.close()
lb_tasks = []
for t in raw_tasks:
if t.task_type != "Link Building":
continue
task_dict = {
"id": t.id,
"name": t.name,
"status": t.status,
"task_type": t.task_type,
"url": t.url,
"due_date": t.due_date,
"date_done": t.date_done,
"list_name": t.list_name,
"tags": t.tags,
"custom_fields": t.custom_fields,
}
lb_tasks.append(task_dict)
# Merge KV state
if _db:
for task in lb_tasks:
kv_key = f"clickup:task:{task['id']}:state"
raw = _db.kv_get(kv_key)
if raw:
try:
task["kv_state"] = json.loads(raw)
except json.JSONDecodeError:
task["kv_state"] = None
else:
task["kv_state"] = None
# -- Build focused groups --
# Split active vs closed
closed_statuses = {"complete", "closed", "done"}
active_lb = [
t for t in lb_tasks
if not any(kw in t["status"] for kw in closed_statuses)
]
# need_cora: open LB tasks where LB Method = "Cora Backlinks"
need_cora = [
t for t in active_lb
if t["custom_fields"].get("LB Method") == "Cora Backlinks"
]
# recently_completed: closed/complete tasks with date_done in last 7 days
seven_days_ago_ms = (time.time() - 7 * 86400) * 1000
recently_completed = []
for t in lb_tasks:
s = t["status"]
if not (s.endswith("complete") or "closed" in s or "done" in s):
continue
if t["date_done"]:
try:
if int(t["date_done"]) >= seven_days_ago_ms:
recently_completed.append(t)
except (ValueError, TypeError):
pass
recently_completed.sort(key=lambda t: int(t.get("date_done") or "0"), reverse=True)
# in_progress_not_started: status "in progress" but no meaningful KV state
early_states = {"", "approved", "awaiting_approval"}
in_progress_not_started = []
for t in lb_tasks:
if t["status"] != "in progress":
continue
kv = t.get("kv_state")
if kv is None or kv.get("state", "") in early_states:
in_progress_not_started.append(t)
by_company: dict[str, list] = {}
for task in active_lb:
company = task["custom_fields"].get("Customer") or "Unassigned"
by_company.setdefault(company, []).append(task)
result = {
"total": len(active_lb),
"need_cora": need_cora,
"recently_completed": recently_completed,
"in_progress_not_started": in_progress_not_started,
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1]))
],
"status_counts": _count_statuses(active_lb),
}
_set_cached("lb_tasks", result)
return result
@router.get("/tasks/press-releases")
async def get_press_release_tasks():
"""Press release tasks with KV state merged in."""
data = await get_tasks()
pr_tasks = [t for t in data.get("tasks", []) if t["task_type"] == "Press Release"]
if _db:
for task in pr_tasks:
kv_key = f"clickup:task:{task['id']}:state"
raw = _db.kv_get(kv_key)
if raw:
try:
task["kv_state"] = json.loads(raw)
except json.JSONDecodeError:
task["kv_state"] = None
else:
task["kv_state"] = None
by_company: dict[str, list] = {}
for task in pr_tasks:
company = task["custom_fields"].get("Customer") or "Unassigned"
by_company.setdefault(company, []).append(task)
return {
"total": len(pr_tasks),
"companies": [
{"name": name, "tasks": tasks, "count": len(tasks)}
for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1]))
],
"status_counts": _count_statuses(pr_tasks),
}
def _count_statuses(tasks: list[dict]) -> dict[str, int]:
counts: dict[str, int] = {}
for t in tasks:
s = t.get("status", "unknown")
counts[s] = counts.get(s, 0) + 1
return counts
# ── Agents ──
@router.get("/agents")
async def get_agents():
"""Agent configurations."""
if not _config:
return {"agents": []}
agents = []
for ac in _config.agents:
agents.append(
{
"name": ac.name,
"display_name": ac.display_name,
"personality_file": ac.personality_file,
"model": ac.model or _config.chat_model,
"tools": ac.tools,
"skills": ac.skills,
"memory_scope": ac.memory_scope or "shared",
}
)
return {"agents": agents}
# ── System ──
@router.get("/system/health")
async def get_system_health():
"""System health: disk space, brains, integrations."""
health = {
"execution_brain": shutil.which("claude") is not None,
"clickup_enabled": _config.clickup.enabled if _config else False,
"chat_model": _config.chat_model if _config else "unknown",
"execution_model": _config.default_model if _config else "unknown",
"disks": [],
}
# Disk space (Windows drives)
for drive in ["C:", "D:", "E:", "Z:"]:
try:
usage = shutil.disk_usage(drive + "/")
health["disks"].append(
{
"drive": drive,
"total_gb": round(usage.total / (1024**3), 1),
"free_gb": round(usage.free / (1024**3), 1),
"percent_free": round(usage.free / usage.total * 100, 1),
}
)
except (FileNotFoundError, OSError):
pass
return health
@router.get("/system/notifications")
async def get_notifications():
"""Recent notifications from the DB."""
if not _db:
return {"notifications": []}
notes = _db.get_notifications_after(0, limit=100)
return {"notifications": notes}
@router.get("/system/kv-states")
async def get_kv_states():
"""All ClickUp task states from KV store."""
if not _db:
return {"states": []}
pairs = _db.kv_scan("clickup:task:")
states = []
for _key, val in pairs:
try:
state = json.loads(val)
states.append(state)
except json.JSONDecodeError:
pass
return {"states": states}
@router.post("/cache/clear")
async def clear_cache():
"""Clear the ClickUp data cache."""
with _cache_lock:
_cache.clear()
return {"status": "cleared"}