Add get_active_tasks tool to show what's running before restart
Thread-safe active execution registry in Scheduler tracks which tool functions are currently blocking. New get_active_tasks chat tool reads this registry plus loop timestamps to report running tasks, durations, loop health, and a safe-to-restart verdict. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>master
parent
eae55fd714
commit
e8df7a9750
|
|
@ -181,6 +181,9 @@ def main():
|
|||
log.info("Starting scheduler...")
|
||||
scheduler = Scheduler(config, db, default_agent, notification_bus=notification_bus)
|
||||
scheduler.start()
|
||||
# Inject scheduler into tool context so get_active_tasks can read it
|
||||
if tools:
|
||||
tools.scheduler = scheduler
|
||||
except Exception as e:
|
||||
log.warning("Scheduler not available: %s", e)
|
||||
|
||||
|
|
|
|||
|
|
@ -72,6 +72,8 @@ class Scheduler:
|
|||
"cora_distribute": None,
|
||||
"briefing": None,
|
||||
}
|
||||
self._active_executions: dict[str, dict] = {}
|
||||
self._active_lock = threading.Lock()
|
||||
|
||||
def start(self):
|
||||
"""Start the scheduler, heartbeat, and ClickUp threads."""
|
||||
|
|
@ -214,6 +216,26 @@ class Scheduler:
|
|||
"""Return last_run timestamps for all loops (in-memory)."""
|
||||
return dict(self._loop_timestamps)
|
||||
|
||||
def _register_execution(self, task_id: str, name: str, tool_name: str) -> None:
|
||||
"""Register a task as actively executing."""
|
||||
with self._active_lock:
|
||||
self._active_executions[task_id] = {
|
||||
"name": name,
|
||||
"tool": tool_name,
|
||||
"started_at": datetime.now(UTC),
|
||||
"thread": threading.current_thread().name,
|
||||
}
|
||||
|
||||
def _unregister_execution(self, task_id: str) -> None:
|
||||
"""Remove a task from the active executions registry."""
|
||||
with self._active_lock:
|
||||
self._active_executions.pop(task_id, None)
|
||||
|
||||
def get_active_executions(self) -> dict[str, dict]:
|
||||
"""Return a snapshot of currently executing tasks."""
|
||||
with self._active_lock:
|
||||
return dict(self._active_executions)
|
||||
|
||||
# ── Scheduled Tasks ──
|
||||
|
||||
def _poll_loop(self):
|
||||
|
|
@ -454,6 +476,7 @@ class Scheduler:
|
|||
log.info("Executing ClickUp task: %s → %s", task.name, tool_name)
|
||||
self._notify(f"Executing ClickUp task: **{task.name}** → Skill: `{tool_name}`")
|
||||
|
||||
self._register_execution(task_id, task.name, tool_name)
|
||||
try:
|
||||
# args already built during validation above
|
||||
args["clickup_task_id"] = task_id
|
||||
|
|
@ -502,6 +525,8 @@ class Scheduler:
|
|||
f"Skill: `{tool_name}` | Error: {str(e)[:200]}"
|
||||
)
|
||||
log.error("ClickUp task failed: %s — %s", task.name, e)
|
||||
finally:
|
||||
self._unregister_execution(task_id)
|
||||
|
||||
def _recover_stale_tasks(self):
|
||||
"""Reset tasks stuck in 'automation underway' for too long.
|
||||
|
|
@ -788,6 +813,7 @@ class Scheduler:
|
|||
with contextlib.suppress(ValueError, TypeError):
|
||||
args["branded_plus_ratio"] = float(bp_raw)
|
||||
|
||||
self._register_execution(task_id, matched_task.name, "run_cora_backlinks")
|
||||
try:
|
||||
# Execute via tool registry
|
||||
if hasattr(self.agent, "_tools") and self.agent._tools:
|
||||
|
|
@ -822,6 +848,8 @@ class Scheduler:
|
|||
except Exception as e:
|
||||
log.error("Folder watcher pipeline error for %s: %s", filename, e)
|
||||
client.update_task_status(task_id, self.config.clickup.error_status)
|
||||
finally:
|
||||
self._unregister_execution(task_id)
|
||||
|
||||
def _match_xlsx_to_clickup(self, normalized_stem: str):
|
||||
"""Find a ClickUp Link Building task whose Keyword matches the file stem.
|
||||
|
|
@ -954,6 +982,7 @@ class Scheduler:
|
|||
"clickup_task_id": task_id,
|
||||
}
|
||||
|
||||
self._register_execution(task_id, matched_task.name, "create_content")
|
||||
try:
|
||||
if hasattr(self.agent, "_tools") and self.agent._tools:
|
||||
result = self.agent._tools.execute("create_content", args)
|
||||
|
|
@ -985,6 +1014,8 @@ class Scheduler:
|
|||
|
||||
except Exception as e:
|
||||
log.error("Content watcher pipeline error for %s: %s", filename, e)
|
||||
finally:
|
||||
self._unregister_execution(task_id)
|
||||
|
||||
def _match_xlsx_to_content_task(self, normalized_stem: str):
|
||||
"""Find a ClickUp content task whose Keyword matches the file stem.
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@ class ToolRegistry:
|
|||
self.db = db
|
||||
self.agent = agent
|
||||
self.agent_registry = None # set after multi-agent setup
|
||||
self.scheduler = None # set after scheduler creation
|
||||
self._discover_tools()
|
||||
|
||||
def _discover_tools(self):
|
||||
|
|
@ -158,6 +159,7 @@ class ToolRegistry:
|
|||
"agent": self.agent,
|
||||
"memory": self.agent._memory,
|
||||
"agent_registry": self.agent_registry,
|
||||
"scheduler": self.scheduler,
|
||||
}
|
||||
# Pass scheduler-injected metadata through ctx (not LLM-visible)
|
||||
if "clickup_task_id" in args:
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from . import tool
|
||||
|
||||
|
|
@ -284,3 +285,79 @@ def clickup_reset_task(task_id: str, ctx: dict | None = None) -> str:
|
|||
f"Task '{task_id}' reset to '{reset_status}'. "
|
||||
f"It will be picked up on the next scheduler poll."
|
||||
)
|
||||
|
||||
|
||||
def _format_duration(delta) -> str:
|
||||
"""Format a timedelta as a human-readable duration string."""
|
||||
total_seconds = int(delta.total_seconds())
|
||||
hours, remainder = divmod(total_seconds, 3600)
|
||||
minutes, seconds = divmod(remainder, 60)
|
||||
if hours:
|
||||
return f"{hours}h {minutes}m {seconds}s"
|
||||
if minutes:
|
||||
return f"{minutes}m {seconds}s"
|
||||
return f"{seconds}s"
|
||||
|
||||
|
||||
def _format_ago(iso_str: str | None) -> str:
|
||||
"""Format an ISO timestamp as 'Xm ago' relative to now."""
|
||||
if not iso_str:
|
||||
return "never"
|
||||
try:
|
||||
ts = datetime.fromisoformat(iso_str)
|
||||
delta = datetime.now(UTC) - ts
|
||||
total_seconds = int(delta.total_seconds())
|
||||
if total_seconds < 60:
|
||||
return f"{total_seconds}s ago"
|
||||
minutes = total_seconds // 60
|
||||
if minutes < 60:
|
||||
return f"{minutes}m ago"
|
||||
hours = minutes // 60
|
||||
return f"{hours}h {minutes % 60}m ago"
|
||||
except (ValueError, TypeError):
|
||||
return "unknown"
|
||||
|
||||
|
||||
@tool(
|
||||
"get_active_tasks",
|
||||
"Show what CheddahBot is actively executing right now. "
|
||||
"Reports running tasks, loop health, and whether it's safe to restart.",
|
||||
category="clickup",
|
||||
)
|
||||
def get_active_tasks(ctx: dict | None = None) -> str:
|
||||
"""Show actively running scheduler tasks and loop health."""
|
||||
scheduler = ctx.get("scheduler") if ctx else None
|
||||
if not scheduler:
|
||||
return "Scheduler not available — cannot check active executions."
|
||||
|
||||
now = datetime.now(UTC)
|
||||
lines = []
|
||||
|
||||
# Active executions
|
||||
active = scheduler.get_active_executions()
|
||||
if active:
|
||||
lines.append(f"**Active Executions ({len(active)}):**")
|
||||
for task_id, info in active.items():
|
||||
duration = _format_duration(now - info["started_at"])
|
||||
lines.append(
|
||||
f"- **{info['name']}** — `{info['tool']}` — "
|
||||
f"running {duration} ({info['thread']} thread)"
|
||||
)
|
||||
else:
|
||||
lines.append("**No tasks actively executing.**")
|
||||
|
||||
# Loop health
|
||||
timestamps = scheduler.get_loop_timestamps()
|
||||
lines.append("")
|
||||
lines.append("**Loop Health:**")
|
||||
for loop_name, ts in timestamps.items():
|
||||
lines.append(f"- {loop_name}: last ran {_format_ago(ts)}")
|
||||
|
||||
# Safe to restart?
|
||||
lines.append("")
|
||||
if active:
|
||||
lines.append(f"**Safe to restart: No** ({len(active)} task(s) actively running)")
|
||||
else:
|
||||
lines.append("**Safe to restart: Yes**")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from cheddahbot.tools.clickup_tool import (
|
|||
clickup_query_tasks,
|
||||
clickup_reset_task,
|
||||
clickup_task_status,
|
||||
get_active_tasks,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -142,3 +143,38 @@ class TestClickupResetTask:
|
|||
|
||||
result = clickup_reset_task(task_id="t1", ctx=_make_ctx())
|
||||
assert "Error" in result
|
||||
|
||||
|
||||
class TestGetActiveTasks:
|
||||
def test_no_scheduler(self):
|
||||
result = get_active_tasks(ctx={"config": MagicMock()})
|
||||
assert "not available" in result.lower()
|
||||
|
||||
def test_nothing_running(self):
|
||||
scheduler = MagicMock()
|
||||
scheduler.get_active_executions.return_value = {}
|
||||
scheduler.get_loop_timestamps.return_value = {"clickup": None, "folder_watch": None}
|
||||
|
||||
result = get_active_tasks(ctx={"scheduler": scheduler})
|
||||
assert "No tasks actively executing" in result
|
||||
assert "Safe to restart: Yes" in result
|
||||
|
||||
def test_tasks_running(self):
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
scheduler = MagicMock()
|
||||
scheduler.get_active_executions.return_value = {
|
||||
"t1": {
|
||||
"name": "Press Release for Acme",
|
||||
"tool": "write_press_releases",
|
||||
"started_at": datetime.now(UTC) - timedelta(minutes=5),
|
||||
"thread": "clickup_thread",
|
||||
}
|
||||
}
|
||||
scheduler.get_loop_timestamps.return_value = {"clickup": datetime.now(UTC).isoformat()}
|
||||
|
||||
result = get_active_tasks(ctx={"scheduler": scheduler})
|
||||
assert "Active Executions (1)" in result
|
||||
assert "Press Release for Acme" in result
|
||||
assert "write_press_releases" in result
|
||||
assert "Safe to restart: No" in result
|
||||
|
|
|
|||
|
|
@ -232,3 +232,62 @@ class TestFieldFilterDiscovery:
|
|||
mock_client.discover_field_filter.reset_mock()
|
||||
scheduler._poll_clickup()
|
||||
mock_client.discover_field_filter.assert_not_called()
|
||||
|
||||
|
||||
class TestActiveExecutions:
|
||||
"""Test the active execution registry."""
|
||||
|
||||
def test_register_and_get(self, tmp_db):
|
||||
config = _FakeConfig()
|
||||
scheduler = Scheduler(config, tmp_db, MagicMock())
|
||||
|
||||
scheduler._register_execution("t1", "Task One", "write_press_releases")
|
||||
active = scheduler.get_active_executions()
|
||||
|
||||
assert "t1" in active
|
||||
assert active["t1"]["name"] == "Task One"
|
||||
assert active["t1"]["tool"] == "write_press_releases"
|
||||
assert "started_at" in active["t1"]
|
||||
assert "thread" in active["t1"]
|
||||
|
||||
def test_unregister(self, tmp_db):
|
||||
config = _FakeConfig()
|
||||
scheduler = Scheduler(config, tmp_db, MagicMock())
|
||||
|
||||
scheduler._register_execution("t1", "Task One", "write_press_releases")
|
||||
scheduler._unregister_execution("t1")
|
||||
assert scheduler.get_active_executions() == {}
|
||||
|
||||
def test_unregister_nonexistent_is_noop(self, tmp_db):
|
||||
config = _FakeConfig()
|
||||
scheduler = Scheduler(config, tmp_db, MagicMock())
|
||||
|
||||
# Should not raise
|
||||
scheduler._unregister_execution("nonexistent")
|
||||
assert scheduler.get_active_executions() == {}
|
||||
|
||||
def test_multiple_executions(self, tmp_db):
|
||||
config = _FakeConfig()
|
||||
scheduler = Scheduler(config, tmp_db, MagicMock())
|
||||
|
||||
scheduler._register_execution("t1", "Task One", "write_press_releases")
|
||||
scheduler._register_execution("t2", "Task Two", "run_cora_backlinks")
|
||||
active = scheduler.get_active_executions()
|
||||
|
||||
assert len(active) == 2
|
||||
assert "t1" in active
|
||||
assert "t2" in active
|
||||
|
||||
def test_get_returns_snapshot(self, tmp_db):
|
||||
"""get_active_executions returns a copy, not a reference."""
|
||||
config = _FakeConfig()
|
||||
scheduler = Scheduler(config, tmp_db, MagicMock())
|
||||
|
||||
scheduler._register_execution("t1", "Task One", "tool_a")
|
||||
snapshot = scheduler.get_active_executions()
|
||||
scheduler._unregister_execution("t1")
|
||||
|
||||
# Snapshot should still have t1
|
||||
assert "t1" in snapshot
|
||||
# But live state should be empty
|
||||
assert scheduler.get_active_executions() == {}
|
||||
|
|
|
|||
Loading…
Reference in New Issue