158 lines
4.8 KiB
Python
158 lines
4.8 KiB
Python
"""Tests for the ClickUp chat tools (state machine transitions)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from cheddahbot.tools.clickup_tool import (
|
|
clickup_approve_task,
|
|
clickup_decline_task,
|
|
clickup_list_tasks,
|
|
clickup_task_status,
|
|
)
|
|
|
|
|
|
def _make_ctx(db):
|
|
return {"db": db}
|
|
|
|
|
|
def _seed_task(db, task_id, state, **overrides):
|
|
"""Insert a task state into kv_store."""
|
|
data = {
|
|
"state": state,
|
|
"clickup_task_id": task_id,
|
|
"clickup_task_name": f"Task {task_id}",
|
|
"task_type": "Press Release",
|
|
"skill_name": "write_press_releases",
|
|
"discovered_at": "2026-01-01T00:00:00",
|
|
"started_at": None,
|
|
"completed_at": None,
|
|
"error": None,
|
|
"deliverable_paths": [],
|
|
"custom_fields": {},
|
|
}
|
|
data.update(overrides)
|
|
db.kv_set(f"clickup:task:{task_id}:state", json.dumps(data))
|
|
|
|
|
|
class TestClickupListTasks:
|
|
def test_empty_when_no_tasks(self, tmp_db):
|
|
result = clickup_list_tasks(ctx=_make_ctx(tmp_db))
|
|
assert "No ClickUp tasks" in result
|
|
|
|
def test_lists_all_tracked_tasks(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "discovered")
|
|
_seed_task(tmp_db, "a2", "approved")
|
|
|
|
result = clickup_list_tasks(ctx=_make_ctx(tmp_db))
|
|
|
|
assert "a1" in result
|
|
assert "a2" in result
|
|
assert "2" in result # count
|
|
|
|
def test_filter_by_status(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "discovered")
|
|
_seed_task(tmp_db, "a2", "approved")
|
|
_seed_task(tmp_db, "a3", "completed")
|
|
|
|
result = clickup_list_tasks(status="approved", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "a2" in result
|
|
assert "a1" not in result
|
|
assert "a3" not in result
|
|
|
|
def test_filter_returns_empty_message(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "discovered")
|
|
|
|
result = clickup_list_tasks(status="completed", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "No ClickUp tasks with state" in result
|
|
|
|
|
|
class TestClickupTaskStatus:
|
|
def test_shows_details(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "executing", started_at="2026-01-01T12:00:00")
|
|
|
|
result = clickup_task_status(task_id="a1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "Task a1" in result
|
|
assert "executing" in result
|
|
assert "Press Release" in result
|
|
assert "2026-01-01T12:00:00" in result
|
|
|
|
def test_unknown_task(self, tmp_db):
|
|
result = clickup_task_status(task_id="nonexistent", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "No tracked state" in result
|
|
|
|
def test_shows_error_when_failed(self, tmp_db):
|
|
_seed_task(tmp_db, "f1", "failed", error="API timeout")
|
|
|
|
result = clickup_task_status(task_id="f1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "API timeout" in result
|
|
|
|
def test_shows_deliverables(self, tmp_db):
|
|
_seed_task(tmp_db, "c1", "completed", deliverable_paths=["/data/pr1.txt", "/data/pr2.txt"])
|
|
|
|
result = clickup_task_status(task_id="c1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "/data/pr1.txt" in result
|
|
|
|
|
|
class TestClickupApproveTask:
|
|
"""Approval is the gate between 'discovered' and 'executing'.
|
|
If this breaks, tasks requiring approval can never run."""
|
|
|
|
def test_approves_awaiting_task(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "awaiting_approval")
|
|
|
|
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "approved" in result.lower()
|
|
|
|
# Verify state changed in DB
|
|
raw = tmp_db.kv_get("clickup:task:a1:state")
|
|
state = json.loads(raw)
|
|
assert state["state"] == "approved"
|
|
|
|
def test_rejects_non_awaiting_task(self, tmp_db):
|
|
_seed_task(tmp_db, "a1", "executing")
|
|
|
|
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "Cannot approve" in result
|
|
|
|
# State should be unchanged
|
|
raw = tmp_db.kv_get("clickup:task:a1:state")
|
|
state = json.loads(raw)
|
|
assert state["state"] == "executing"
|
|
|
|
def test_unknown_task(self, tmp_db):
|
|
result = clickup_approve_task(task_id="nope", ctx=_make_ctx(tmp_db))
|
|
assert "No tracked state" in result
|
|
|
|
|
|
class TestClickupDeclineTask:
|
|
def test_declines_awaiting_task(self, tmp_db):
|
|
_seed_task(tmp_db, "d1", "awaiting_approval")
|
|
|
|
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "declined" in result.lower()
|
|
|
|
raw = tmp_db.kv_get("clickup:task:d1:state")
|
|
state = json.loads(raw)
|
|
assert state["state"] == "declined"
|
|
|
|
def test_rejects_non_awaiting_task(self, tmp_db):
|
|
_seed_task(tmp_db, "d1", "completed")
|
|
|
|
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db))
|
|
|
|
assert "Cannot decline" in result
|
|
|
|
def test_unknown_task(self, tmp_db):
|
|
result = clickup_decline_task(task_id="nope", ctx=_make_ctx(tmp_db))
|
|
assert "No tracked state" in result
|