From e8df7a975020ca0540f939265ee9fb8b5ff5f0ba Mon Sep 17 00:00:00 2001 From: PeninsulaInd Date: Wed, 18 Mar 2026 17:04:19 -0500 Subject: [PATCH] Add get_active_tasks tool to show what's running before restart Thread-safe active execution registry in Scheduler tracks which tool functions are currently blocking. New get_active_tasks chat tool reads this registry plus loop timestamps to report running tasks, durations, loop health, and a safe-to-restart verdict. Co-Authored-By: Claude Opus 4.6 (1M context) --- cheddahbot/__main__.py | 3 ++ cheddahbot/scheduler.py | 31 +++++++++++++ cheddahbot/tools/__init__.py | 2 + cheddahbot/tools/clickup_tool.py | 77 ++++++++++++++++++++++++++++++++ tests/test_clickup_tools.py | 36 +++++++++++++++ tests/test_scheduler.py | 59 ++++++++++++++++++++++++ 6 files changed, 208 insertions(+) diff --git a/cheddahbot/__main__.py b/cheddahbot/__main__.py index ad4b3d3..61ff6ba 100644 --- a/cheddahbot/__main__.py +++ b/cheddahbot/__main__.py @@ -181,6 +181,9 @@ def main(): log.info("Starting scheduler...") scheduler = Scheduler(config, db, default_agent, notification_bus=notification_bus) scheduler.start() + # Inject scheduler into tool context so get_active_tasks can read it + if tools: + tools.scheduler = scheduler except Exception as e: log.warning("Scheduler not available: %s", e) diff --git a/cheddahbot/scheduler.py b/cheddahbot/scheduler.py index 09be27d..8e0418a 100644 --- a/cheddahbot/scheduler.py +++ b/cheddahbot/scheduler.py @@ -72,6 +72,8 @@ class Scheduler: "cora_distribute": None, "briefing": None, } + self._active_executions: dict[str, dict] = {} + self._active_lock = threading.Lock() def start(self): """Start the scheduler, heartbeat, and ClickUp threads.""" @@ -214,6 +216,26 @@ class Scheduler: """Return last_run timestamps for all loops (in-memory).""" return dict(self._loop_timestamps) + def _register_execution(self, task_id: str, name: str, tool_name: str) -> None: + """Register a task as actively executing.""" + with self._active_lock: + self._active_executions[task_id] = { + "name": name, + "tool": tool_name, + "started_at": datetime.now(UTC), + "thread": threading.current_thread().name, + } + + def _unregister_execution(self, task_id: str) -> None: + """Remove a task from the active executions registry.""" + with self._active_lock: + self._active_executions.pop(task_id, None) + + def get_active_executions(self) -> dict[str, dict]: + """Return a snapshot of currently executing tasks.""" + with self._active_lock: + return dict(self._active_executions) + # ── Scheduled Tasks ── def _poll_loop(self): @@ -454,6 +476,7 @@ class Scheduler: log.info("Executing ClickUp task: %s → %s", task.name, tool_name) self._notify(f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`") + self._register_execution(task_id, task.name, tool_name) try: # args already built during validation above args["clickup_task_id"] = task_id @@ -502,6 +525,8 @@ class Scheduler: f"Skill: `{tool_name}` | Error: {str(e)[:200]}" ) log.error("ClickUp task failed: %s — %s", task.name, e) + finally: + self._unregister_execution(task_id) def _recover_stale_tasks(self): """Reset tasks stuck in 'automation underway' for too long. @@ -788,6 +813,7 @@ class Scheduler: with contextlib.suppress(ValueError, TypeError): args["branded_plus_ratio"] = float(bp_raw) + self._register_execution(task_id, matched_task.name, "run_cora_backlinks") try: # Execute via tool registry if hasattr(self.agent, "_tools") and self.agent._tools: @@ -822,6 +848,8 @@ class Scheduler: except Exception as e: log.error("Folder watcher pipeline error for %s: %s", filename, e) client.update_task_status(task_id, self.config.clickup.error_status) + finally: + self._unregister_execution(task_id) def _match_xlsx_to_clickup(self, normalized_stem: str): """Find a ClickUp Link Building task whose Keyword matches the file stem. @@ -954,6 +982,7 @@ class Scheduler: "clickup_task_id": task_id, } + self._register_execution(task_id, matched_task.name, "create_content") try: if hasattr(self.agent, "_tools") and self.agent._tools: result = self.agent._tools.execute("create_content", args) @@ -985,6 +1014,8 @@ class Scheduler: except Exception as e: log.error("Content watcher pipeline error for %s: %s", filename, e) + finally: + self._unregister_execution(task_id) def _match_xlsx_to_content_task(self, normalized_stem: str): """Find a ClickUp content task whose Keyword matches the file stem. diff --git a/cheddahbot/tools/__init__.py b/cheddahbot/tools/__init__.py index 2b8be51..4417d02 100644 --- a/cheddahbot/tools/__init__.py +++ b/cheddahbot/tools/__init__.py @@ -105,6 +105,7 @@ class ToolRegistry: self.db = db self.agent = agent self.agent_registry = None # set after multi-agent setup + self.scheduler = None # set after scheduler creation self._discover_tools() def _discover_tools(self): @@ -158,6 +159,7 @@ class ToolRegistry: "agent": self.agent, "memory": self.agent._memory, "agent_registry": self.agent_registry, + "scheduler": self.scheduler, } # Pass scheduler-injected metadata through ctx (not LLM-visible) if "clickup_task_id" in args: diff --git a/cheddahbot/tools/clickup_tool.py b/cheddahbot/tools/clickup_tool.py index 25007b1..137d891 100644 --- a/cheddahbot/tools/clickup_tool.py +++ b/cheddahbot/tools/clickup_tool.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +from datetime import UTC, datetime from . import tool @@ -284,3 +285,79 @@ def clickup_reset_task(task_id: str, ctx: dict | None = None) -> str: f"Task '{task_id}' reset to '{reset_status}'. " f"It will be picked up on the next scheduler poll." ) + + +def _format_duration(delta) -> str: + """Format a timedelta as a human-readable duration string.""" + total_seconds = int(delta.total_seconds()) + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + if hours: + return f"{hours}h {minutes}m {seconds}s" + if minutes: + return f"{minutes}m {seconds}s" + return f"{seconds}s" + + +def _format_ago(iso_str: str | None) -> str: + """Format an ISO timestamp as 'Xm ago' relative to now.""" + if not iso_str: + return "never" + try: + ts = datetime.fromisoformat(iso_str) + delta = datetime.now(UTC) - ts + total_seconds = int(delta.total_seconds()) + if total_seconds < 60: + return f"{total_seconds}s ago" + minutes = total_seconds // 60 + if minutes < 60: + return f"{minutes}m ago" + hours = minutes // 60 + return f"{hours}h {minutes % 60}m ago" + except (ValueError, TypeError): + return "unknown" + + +@tool( + "get_active_tasks", + "Show what CheddahBot is actively executing right now. " + "Reports running tasks, loop health, and whether it's safe to restart.", + category="clickup", +) +def get_active_tasks(ctx: dict | None = None) -> str: + """Show actively running scheduler tasks and loop health.""" + scheduler = ctx.get("scheduler") if ctx else None + if not scheduler: + return "Scheduler not available — cannot check active executions." + + now = datetime.now(UTC) + lines = [] + + # Active executions + active = scheduler.get_active_executions() + if active: + lines.append(f"**Active Executions ({len(active)}):**") + for task_id, info in active.items(): + duration = _format_duration(now - info["started_at"]) + lines.append( + f"- **{info['name']}** — `{info['tool']}` — " + f"running {duration} ({info['thread']} thread)" + ) + else: + lines.append("**No tasks actively executing.**") + + # Loop health + timestamps = scheduler.get_loop_timestamps() + lines.append("") + lines.append("**Loop Health:**") + for loop_name, ts in timestamps.items(): + lines.append(f"- {loop_name}: last ran {_format_ago(ts)}") + + # Safe to restart? + lines.append("") + if active: + lines.append(f"**Safe to restart: No** ({len(active)} task(s) actively running)") + else: + lines.append("**Safe to restart: Yes**") + + return "\n".join(lines) diff --git a/tests/test_clickup_tools.py b/tests/test_clickup_tools.py index 6afd1eb..b257bca 100644 --- a/tests/test_clickup_tools.py +++ b/tests/test_clickup_tools.py @@ -10,6 +10,7 @@ from cheddahbot.tools.clickup_tool import ( clickup_query_tasks, clickup_reset_task, clickup_task_status, + get_active_tasks, ) @@ -142,3 +143,38 @@ class TestClickupResetTask: result = clickup_reset_task(task_id="t1", ctx=_make_ctx()) assert "Error" in result + + +class TestGetActiveTasks: + def test_no_scheduler(self): + result = get_active_tasks(ctx={"config": MagicMock()}) + assert "not available" in result.lower() + + def test_nothing_running(self): + scheduler = MagicMock() + scheduler.get_active_executions.return_value = {} + scheduler.get_loop_timestamps.return_value = {"clickup": None, "folder_watch": None} + + result = get_active_tasks(ctx={"scheduler": scheduler}) + assert "No tasks actively executing" in result + assert "Safe to restart: Yes" in result + + def test_tasks_running(self): + from datetime import UTC, datetime, timedelta + + scheduler = MagicMock() + scheduler.get_active_executions.return_value = { + "t1": { + "name": "Press Release for Acme", + "tool": "write_press_releases", + "started_at": datetime.now(UTC) - timedelta(minutes=5), + "thread": "clickup_thread", + } + } + scheduler.get_loop_timestamps.return_value = {"clickup": datetime.now(UTC).isoformat()} + + result = get_active_tasks(ctx={"scheduler": scheduler}) + assert "Active Executions (1)" in result + assert "Press Release for Acme" in result + assert "write_press_releases" in result + assert "Safe to restart: No" in result diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 31e09a8..427dc6a 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -232,3 +232,62 @@ class TestFieldFilterDiscovery: mock_client.discover_field_filter.reset_mock() scheduler._poll_clickup() mock_client.discover_field_filter.assert_not_called() + + +class TestActiveExecutions: + """Test the active execution registry.""" + + def test_register_and_get(self, tmp_db): + config = _FakeConfig() + scheduler = Scheduler(config, tmp_db, MagicMock()) + + scheduler._register_execution("t1", "Task One", "write_press_releases") + active = scheduler.get_active_executions() + + assert "t1" in active + assert active["t1"]["name"] == "Task One" + assert active["t1"]["tool"] == "write_press_releases" + assert "started_at" in active["t1"] + assert "thread" in active["t1"] + + def test_unregister(self, tmp_db): + config = _FakeConfig() + scheduler = Scheduler(config, tmp_db, MagicMock()) + + scheduler._register_execution("t1", "Task One", "write_press_releases") + scheduler._unregister_execution("t1") + assert scheduler.get_active_executions() == {} + + def test_unregister_nonexistent_is_noop(self, tmp_db): + config = _FakeConfig() + scheduler = Scheduler(config, tmp_db, MagicMock()) + + # Should not raise + scheduler._unregister_execution("nonexistent") + assert scheduler.get_active_executions() == {} + + def test_multiple_executions(self, tmp_db): + config = _FakeConfig() + scheduler = Scheduler(config, tmp_db, MagicMock()) + + scheduler._register_execution("t1", "Task One", "write_press_releases") + scheduler._register_execution("t2", "Task Two", "run_cora_backlinks") + active = scheduler.get_active_executions() + + assert len(active) == 2 + assert "t1" in active + assert "t2" in active + + def test_get_returns_snapshot(self, tmp_db): + """get_active_executions returns a copy, not a reference.""" + config = _FakeConfig() + scheduler = Scheduler(config, tmp_db, MagicMock()) + + scheduler._register_execution("t1", "Task One", "tool_a") + snapshot = scheduler.get_active_executions() + scheduler._unregister_execution("t1") + + # Snapshot should still have t1 + assert "t1" in snapshot + # But live state should be empty + assert scheduler.get_active_executions() == {}