"""Dashboard API endpoints. Provides JSON data for the standalone HTML dashboard. All ClickUp data is cached for 5 minutes to avoid hammering the API. """ from __future__ import annotations import json import logging import shutil import threading import time from typing import TYPE_CHECKING from fastapi import APIRouter if TYPE_CHECKING: from .agent_registry import AgentRegistry from .config import Config from .db import Database log = logging.getLogger(__name__) router = APIRouter(prefix="/api") # Module-level refs wired at startup by create_api_router() _config: Config | None = None _db: Database | None = None _registry: AgentRegistry | None = None # Simple in-memory cache for ClickUp data _cache: dict[str, dict] = {} _cache_lock = threading.Lock() CACHE_TTL = 300 # 5 minutes def create_api_router( config: Config, db: Database, registry: AgentRegistry ) -> APIRouter: """Wire dependencies and return the router.""" global _config, _db, _registry _config = config _db = db _registry = registry return router def _get_cached(key: str) -> dict | None: with _cache_lock: entry = _cache.get(key) if entry and time.time() - entry["ts"] < CACHE_TTL: return entry["data"] return None def _set_cached(key: str, data): with _cache_lock: _cache[key] = {"data": data, "ts": time.time()} def _get_clickup_client(): from .clickup import ClickUpClient return ClickUpClient( api_token=_config.clickup.api_token, workspace_id=_config.clickup.workspace_id, task_type_field_name=_config.clickup.task_type_field_name, ) # ── Tasks ── @router.get("/tasks") async def get_tasks(): """All ClickUp tasks grouped by work category.""" cached = _get_cached("tasks") if cached is not None: return cached if not _config or not _config.clickup.enabled: return {"tasks": [], "error": "ClickUp not configured"} client = _get_clickup_client() try: raw_tasks = client.get_tasks_from_overall_lists(_config.clickup.space_id) tasks = [] for t in raw_tasks: tasks.append( { "id": t.id, "name": t.name, "status": t.status, "task_type": t.task_type, "url": t.url, "due_date": t.due_date, "date_done": t.date_done, "list_name": t.list_name, "tags": t.tags, "custom_fields": t.custom_fields, } ) result = {"tasks": tasks, "count": len(tasks)} _set_cached("tasks", result) return result except Exception as e: log.error("Failed to fetch tasks for dashboard: %s", e) return {"tasks": [], "error": str(e)} finally: client.close() @router.get("/tasks/by-company") async def get_tasks_by_company(): """Tasks grouped by Client custom field.""" data = await get_tasks() by_company: dict[str, list] = {} for task in data.get("tasks", []): company = task["custom_fields"].get("Customer") or "Unassigned" by_company.setdefault(company, []).append(task) # Sort companies by task count descending sorted_companies = sorted(by_company.items(), key=lambda x: -len(x[1])) return { "companies": [ {"name": name, "tasks": tasks, "count": len(tasks)} for name, tasks in sorted_companies ] } @router.get("/tasks/link-building") async def get_link_building_tasks(): """Link building tasks with KV state merged in.""" cached = _get_cached("lb_tasks") if cached is not None: return cached if not _config or not _config.clickup.enabled: return {"total": 0, "companies": [], "status_counts": {}} client = _get_clickup_client() try: raw_tasks = client.get_tasks_from_overall_lists( _config.clickup.space_id, include_closed=True ) except Exception as e: log.error("Failed to fetch LB tasks: %s", e) return {"total": 0, "companies": [], "status_counts": {}, "error": str(e)} finally: client.close() lb_tasks = [] for t in raw_tasks: if t.task_type != "Link Building": continue task_dict = { "id": t.id, "name": t.name, "status": t.status, "task_type": t.task_type, "url": t.url, "due_date": t.due_date, "date_done": t.date_done, "list_name": t.list_name, "tags": t.tags, "custom_fields": t.custom_fields, } lb_tasks.append(task_dict) # Merge KV state if _db: for task in lb_tasks: kv_key = f"clickup:task:{task['id']}:state" raw = _db.kv_get(kv_key) if raw: try: task["kv_state"] = json.loads(raw) except json.JSONDecodeError: task["kv_state"] = None else: task["kv_state"] = None # -- Build focused groups -- # Split active vs closed closed_statuses = {"complete", "closed", "done"} active_lb = [ t for t in lb_tasks if not any(kw in t["status"] for kw in closed_statuses) ] # need_cora: open LB tasks where LB Method = "Cora Backlinks" # Exclude tasks that automation already touched automation_touched = { "error", "automation underway", "complete", "closed", "done", "internal review", } need_cora = [ t for t in active_lb if t["custom_fields"].get("LB Method") == "Cora Backlinks" and t["status"] not in automation_touched ] need_cora.sort(key=lambda t: int(t.get("due_date") or "9999999999999")) # recently_completed: closed/complete tasks with date_done in last 7 days seven_days_ago_ms = (time.time() - 7 * 86400) * 1000 recently_completed = [] for t in lb_tasks: s = t["status"] if not (s.endswith("complete") or "closed" in s or "done" in s): continue if t["date_done"]: try: if int(t["date_done"]) >= seven_days_ago_ms: recently_completed.append(t) except (ValueError, TypeError): pass recently_completed.sort(key=lambda t: int(t.get("date_done") or "0"), reverse=True) # in_progress_not_started: status "in progress" but no meaningful KV state early_states = {"", "approved", "awaiting_approval"} in_progress_not_started = [] for t in lb_tasks: if t["status"] != "in progress": continue kv = t.get("kv_state") if kv is None or kv.get("state", "") in early_states: in_progress_not_started.append(t) by_company: dict[str, list] = {} for task in active_lb: company = task["custom_fields"].get("Customer") or "Unassigned" by_company.setdefault(company, []).append(task) result = { "total": len(active_lb), "need_cora": need_cora, "recently_completed": recently_completed, "in_progress_not_started": in_progress_not_started, "companies": [ {"name": name, "tasks": tasks, "count": len(tasks)} for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1])) ], "status_counts": _count_statuses(active_lb), } _set_cached("lb_tasks", result) return result @router.get("/tasks/press-releases") async def get_press_release_tasks(): """Press release tasks with KV state merged in.""" data = await get_tasks() pr_tasks = [t for t in data.get("tasks", []) if t["task_type"] == "Press Release"] if _db: for task in pr_tasks: kv_key = f"clickup:task:{task['id']}:state" raw = _db.kv_get(kv_key) if raw: try: task["kv_state"] = json.loads(raw) except json.JSONDecodeError: task["kv_state"] = None else: task["kv_state"] = None by_company: dict[str, list] = {} for task in pr_tasks: company = task["custom_fields"].get("Customer") or "Unassigned" by_company.setdefault(company, []).append(task) return { "total": len(pr_tasks), "companies": [ {"name": name, "tasks": tasks, "count": len(tasks)} for name, tasks in sorted(by_company.items(), key=lambda x: -len(x[1])) ], "status_counts": _count_statuses(pr_tasks), } def _count_statuses(tasks: list[dict]) -> dict[str, int]: counts: dict[str, int] = {} for t in tasks: s = t.get("status", "unknown") counts[s] = counts.get(s, 0) + 1 return counts # ── Agents ── @router.get("/agents") async def get_agents(): """Agent configurations.""" if not _config: return {"agents": []} agents = [] for ac in _config.agents: agents.append( { "name": ac.name, "display_name": ac.display_name, "personality_file": ac.personality_file, "model": ac.model or _config.chat_model, "tools": ac.tools, "skills": ac.skills, "memory_scope": ac.memory_scope or "shared", } ) return {"agents": agents} # ── System ── @router.get("/system/health") async def get_system_health(): """System health: disk space, brains, integrations.""" health = { "execution_brain": shutil.which("claude") is not None, "clickup_enabled": _config.clickup.enabled if _config else False, "chat_model": _config.chat_model if _config else "unknown", "execution_model": _config.default_model if _config else "unknown", "disks": [], } # Disk space (Windows drives) for drive in ["C:", "D:", "E:", "Z:"]: try: usage = shutil.disk_usage(drive + "/") health["disks"].append( { "drive": drive, "total_gb": round(usage.total / (1024**3), 1), "free_gb": round(usage.free / (1024**3), 1), "percent_free": round(usage.free / usage.total * 100, 1), } ) except (FileNotFoundError, OSError): pass return health @router.get("/system/notifications") async def get_notifications(): """Recent notifications from the DB.""" if not _db: return {"notifications": []} rows = _db._conn.execute( "SELECT id, message, category, created_at FROM notifications" " ORDER BY id DESC LIMIT 50" ).fetchall() return {"notifications": [dict(r) for r in rows]} @router.get("/system/kv-states") async def get_kv_states(): """All ClickUp task states from KV store.""" if not _db: return {"states": []} pairs = _db.kv_scan("clickup:task:") states = [] for _key, val in pairs: try: state = json.loads(val) states.append(state) except json.JSONDecodeError: pass return {"states": states} @router.post("/cache/clear") async def clear_cache(): """Clear the ClickUp data cache.""" with _cache_lock: _cache.clear() return {"status": "cleared"}