Add test suite for ClickUp integration APIs

51 tests covering the core API surfaces:
- ClickUpTask parsing (dropdown resolution, status normalization, fallbacks)
- ClickUpClient HTTP layer (mocked with respx, graceful error handling)
- Database kv_scan and notifications methods
- NotificationBus pub/sub (independent cursors, fault isolation)
- Chat tools state machine (approve/decline transitions, guards)

Also adds pyproject.toml with modern Python tooling (uv, pytest, ruff,
respx) and proper test/coverage configuration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-16 07:34:34 -06:00
parent 7864ca2461
commit fa064be7c9
7 changed files with 830 additions and 0 deletions

78
pyproject.toml 100644
View File

@ -0,0 +1,78 @@
[project]
name = "cheddahbot"
version = "0.1.0"
description = "Personal AI assistant with ClickUp integration"
requires-python = ">=3.11"
dependencies = [
"gradio>=5.0",
"openai>=1.30",
"pyyaml>=6.0",
"python-dotenv>=1.0",
"sentence-transformers>=3.0",
"numpy>=1.24",
"httpx>=0.27",
"beautifulsoup4>=4.12",
"croniter>=2.0",
"edge-tts>=6.1",
]
[build-system]
requires = ["uv_build>=0.9,<1"]
build-backend = "uv_build"
[tool.uv.build-backend]
module-root = ""
[dependency-groups]
dev = [{include-group = "lint"}, {include-group = "test"}]
lint = ["ruff"]
test = ["pytest", "pytest-cov", "respx"]
[tool.uv]
default-groups = ["dev", "test"]
[tool.pytest]
testpaths = ["tests"]
addopts = [
"-ra",
"--strict-markers",
"--strict-config",
"--cov=cheddahbot",
"--cov-report=term-missing",
]
markers = [
"integration: requires live ClickUp API token",
]
[tool.coverage.run]
branch = true
source = ["cheddahbot"]
omit = [
"*/__main__.py",
"*/conftest.py",
"*/ui.py",
"*/media.py",
"*/llm.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"if __name__ == .__main__.:",
]
show_missing = true
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM", "RUF"]
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = [
"S101",
"PLR2004",
"ANN",
]

View File

71
tests/conftest.py 100644
View File

@ -0,0 +1,71 @@
"""Shared test fixtures."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from cheddahbot.db import Database
@pytest.fixture()
def tmp_db(tmp_path):
"""Provide a fresh in-memory-like SQLite database."""
db_path = tmp_path / "test.db"
return Database(db_path)
@pytest.fixture()
def sample_clickup_task_data():
"""Raw ClickUp API response for a single task."""
return {
"id": "abc123",
"name": "Write PR for Acme Corp",
"description": "Draft a press release about the new product launch.",
"status": {"status": "to do", "type": "open"},
"url": "https://app.clickup.com/t/abc123",
"list": {"id": "list_1", "name": "Content Tasks"},
"custom_fields": [
{
"id": "cf_1",
"name": "Task Type",
"type": "drop_down",
"value": 0,
"type_config": {
"options": [
{"id": "opt_1", "name": "Press Release", "orderindex": 0},
{"id": "opt_2", "name": "Link Building", "orderindex": 1},
]
},
},
{
"id": "cf_2",
"name": "Company",
"type": "short_text",
"value": "Acme Corp",
},
{
"id": "cf_3",
"name": "Priority Level",
"type": "drop_down",
"value": None,
"type_config": {"options": []},
},
],
}
@pytest.fixture()
def sample_clickup_task_no_type():
"""ClickUp task with no Task Type custom field."""
return {
"id": "def456",
"name": "Random admin task",
"description": "",
"status": {"status": "to do", "type": "open"},
"url": "https://app.clickup.com/t/def456",
"list": {"id": "list_2", "name": "Admin"},
"custom_fields": [],
}

View File

@ -0,0 +1,302 @@
"""Tests for the ClickUp REST API client and task parsing."""
from __future__ import annotations
import httpx
import pytest
import respx
from cheddahbot.clickup import BASE_URL, ClickUpClient, ClickUpTask
# ── ClickUpTask.from_api ──
class TestClickUpTaskParsing:
def test_parses_basic_fields(self, sample_clickup_task_data):
task = ClickUpTask.from_api(sample_clickup_task_data)
assert task.id == "abc123"
assert task.name == "Write PR for Acme Corp"
assert task.status == "to do"
assert task.description == "Draft a press release about the new product launch."
assert task.url == "https://app.clickup.com/t/abc123"
assert task.list_id == "list_1"
assert task.list_name == "Content Tasks"
def test_resolves_dropdown_by_orderindex(self, sample_clickup_task_data):
task = ClickUpTask.from_api(sample_clickup_task_data)
assert task.task_type == "Press Release"
assert task.custom_fields["Task Type"] == "Press Release"
def test_preserves_text_custom_fields(self, sample_clickup_task_data):
task = ClickUpTask.from_api(sample_clickup_task_data)
assert task.custom_fields["Company"] == "Acme Corp"
def test_handles_null_dropdown_value(self, sample_clickup_task_data):
task = ClickUpTask.from_api(sample_clickup_task_data)
assert task.custom_fields["Priority Level"] is None
def test_no_custom_fields(self, sample_clickup_task_no_type):
task = ClickUpTask.from_api(sample_clickup_task_no_type)
assert task.task_type == ""
assert task.custom_fields == {}
def test_custom_task_type_field_name(self, sample_clickup_task_data):
"""When task_type_field_name doesn't match, task_type should be empty."""
task = ClickUpTask.from_api(sample_clickup_task_data, task_type_field_name="Work Type")
assert task.task_type == ""
# But custom fields still parsed
assert task.custom_fields["Task Type"] == "Press Release"
def test_status_normalized_to_lowercase(self):
data = {
"id": "x",
"name": "Test",
"status": {"status": "In Progress", "type": "active"},
"custom_fields": [],
}
task = ClickUpTask.from_api(data)
assert task.status == "in progress"
def test_missing_description_defaults_empty(self):
data = {
"id": "x",
"name": "Test",
"status": {"status": "open", "type": "open"},
"custom_fields": [],
}
task = ClickUpTask.from_api(data)
assert task.description == ""
def test_dropdown_resolved_by_id_fallback(self):
"""When orderindex doesn't match, fall back to id matching."""
data = {
"id": "x",
"name": "Test",
"status": {"status": "open", "type": "open"},
"custom_fields": [
{
"id": "cf_1",
"name": "Task Type",
"type": "drop_down",
"value": "opt_2", # string id, not int orderindex
"type_config": {
"options": [
{"id": "opt_1", "name": "Press Release", "orderindex": 0},
{"id": "opt_2", "name": "Link Building", "orderindex": 1},
]
},
}
],
}
task = ClickUpTask.from_api(data)
assert task.custom_fields["Task Type"] == "Link Building"
assert task.task_type == "Link Building"
# ── ClickUpClient ──
class TestClickUpClient:
@respx.mock
def test_get_tasks(self):
respx.get(f"{BASE_URL}/list/list_1/task").mock(
return_value=httpx.Response(
200,
json={
"tasks": [
{
"id": "t1",
"name": "Task One",
"status": {"status": "to do", "type": "open"},
"custom_fields": [],
},
{
"id": "t2",
"name": "Task Two",
"status": {"status": "to do", "type": "open"},
"custom_fields": [],
},
]
},
)
)
client = ClickUpClient(api_token="pk_test_123")
tasks = client.get_tasks("list_1", statuses=["to do"])
assert len(tasks) == 2
assert tasks[0].id == "t1"
assert tasks[1].name == "Task Two"
client.close()
@respx.mock
def test_get_tasks_from_space(self):
# Mock folders endpoint
respx.get(f"{BASE_URL}/space/space_1/folder").mock(
return_value=httpx.Response(
200,
json={
"folders": [
{
"id": "f1",
"lists": [{"id": "list_a"}, {"id": "list_b"}],
}
]
},
)
)
# Mock folderless lists
respx.get(f"{BASE_URL}/space/space_1/list").mock(
return_value=httpx.Response(200, json={"lists": [{"id": "list_c"}]})
)
# Mock task responses for each list
for list_id in ("list_a", "list_b", "list_c"):
respx.get(f"{BASE_URL}/list/{list_id}/task").mock(
return_value=httpx.Response(
200,
json={
"tasks": [
{
"id": f"t_{list_id}",
"name": f"Task from {list_id}",
"status": {"status": "to do", "type": "open"},
"custom_fields": [],
}
]
},
)
)
client = ClickUpClient(api_token="pk_test_123")
tasks = client.get_tasks_from_space("space_1", statuses=["to do"])
assert len(tasks) == 3
task_ids = {t.id for t in tasks}
assert task_ids == {"t_list_a", "t_list_b", "t_list_c"}
client.close()
@respx.mock
def test_update_task_status(self):
respx.put(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(200, json={})
)
client = ClickUpClient(api_token="pk_test_123")
result = client.update_task_status("t1", "in progress")
assert result is True
request = respx.calls.last.request
assert b'"status"' in request.content
assert b'"in progress"' in request.content
client.close()
@respx.mock
def test_update_task_status_failure(self):
respx.put(f"{BASE_URL}/task/t1").mock(
return_value=httpx.Response(403, json={"err": "forbidden"})
)
client = ClickUpClient(api_token="pk_test_123")
result = client.update_task_status("t1", "done")
assert result is False
client.close()
@respx.mock
def test_add_comment(self):
respx.post(f"{BASE_URL}/task/t1/comment").mock(
return_value=httpx.Response(200, json={})
)
client = ClickUpClient(api_token="pk_test_123")
result = client.add_comment("t1", "CheddahBot completed this task.")
assert result is True
request = respx.calls.last.request
assert b"CheddahBot completed" in request.content
client.close()
@respx.mock
def test_add_comment_failure(self):
respx.post(f"{BASE_URL}/task/t1/comment").mock(
return_value=httpx.Response(500, json={"err": "server error"})
)
client = ClickUpClient(api_token="pk_test_123")
result = client.add_comment("t1", "Hello")
assert result is False
client.close()
@respx.mock
def test_get_custom_fields(self):
respx.get(f"{BASE_URL}/list/list_1/field").mock(
return_value=httpx.Response(
200,
json={
"fields": [
{"id": "cf_1", "name": "Task Type", "type": "drop_down"},
{"id": "cf_2", "name": "Company", "type": "short_text"},
]
},
)
)
client = ClickUpClient(api_token="pk_test_123")
fields = client.get_custom_fields("list_1")
assert len(fields) == 2
assert fields[0]["name"] == "Task Type"
client.close()
@respx.mock
def test_auth_header_sent(self):
respx.get(f"{BASE_URL}/list/list_1/task").mock(
return_value=httpx.Response(200, json={"tasks": []})
)
client = ClickUpClient(api_token="pk_secret_token")
client.get_tasks("list_1")
request = respx.calls.last.request
assert request.headers["authorization"] == "pk_secret_token"
client.close()
@respx.mock
def test_get_tasks_from_space_handles_folder_error(self):
"""If folders endpoint fails, still fetch folderless lists."""
respx.get(f"{BASE_URL}/space/space_1/folder").mock(
return_value=httpx.Response(403, json={"err": "forbidden"})
)
respx.get(f"{BASE_URL}/space/space_1/list").mock(
return_value=httpx.Response(200, json={"lists": [{"id": "list_x"}]})
)
respx.get(f"{BASE_URL}/list/list_x/task").mock(
return_value=httpx.Response(
200,
json={
"tasks": [
{
"id": "t_x",
"name": "Task X",
"status": {"status": "to do", "type": "open"},
"custom_fields": [],
}
]
},
)
)
client = ClickUpClient(api_token="pk_test")
tasks = client.get_tasks_from_space("space_1")
assert len(tasks) == 1
assert tasks[0].id == "t_x"
client.close()

View File

@ -0,0 +1,159 @@
"""Tests for the ClickUp chat tools (state machine transitions)."""
from __future__ import annotations
import json
import pytest
from cheddahbot.tools.clickup_tool import (
clickup_approve_task,
clickup_decline_task,
clickup_list_tasks,
clickup_task_status,
)
def _make_ctx(db):
return {"db": db}
def _seed_task(db, task_id, state, **overrides):
"""Insert a task state into kv_store."""
data = {
"state": state,
"clickup_task_id": task_id,
"clickup_task_name": f"Task {task_id}",
"task_type": "Press Release",
"skill_name": "write_press_releases",
"discovered_at": "2026-01-01T00:00:00",
"started_at": None,
"completed_at": None,
"error": None,
"deliverable_paths": [],
"custom_fields": {},
}
data.update(overrides)
db.kv_set(f"clickup:task:{task_id}:state", json.dumps(data))
class TestClickupListTasks:
def test_empty_when_no_tasks(self, tmp_db):
result = clickup_list_tasks(ctx=_make_ctx(tmp_db))
assert "No ClickUp tasks" in result
def test_lists_all_tracked_tasks(self, tmp_db):
_seed_task(tmp_db, "a1", "discovered")
_seed_task(tmp_db, "a2", "approved")
result = clickup_list_tasks(ctx=_make_ctx(tmp_db))
assert "a1" in result
assert "a2" in result
assert "2" in result # count
def test_filter_by_status(self, tmp_db):
_seed_task(tmp_db, "a1", "discovered")
_seed_task(tmp_db, "a2", "approved")
_seed_task(tmp_db, "a3", "completed")
result = clickup_list_tasks(status="approved", ctx=_make_ctx(tmp_db))
assert "a2" in result
assert "a1" not in result
assert "a3" not in result
def test_filter_returns_empty_message(self, tmp_db):
_seed_task(tmp_db, "a1", "discovered")
result = clickup_list_tasks(status="completed", ctx=_make_ctx(tmp_db))
assert "No ClickUp tasks with state" in result
class TestClickupTaskStatus:
def test_shows_details(self, tmp_db):
_seed_task(tmp_db, "a1", "executing", started_at="2026-01-01T12:00:00")
result = clickup_task_status(task_id="a1", ctx=_make_ctx(tmp_db))
assert "Task a1" in result
assert "executing" in result
assert "Press Release" in result
assert "2026-01-01T12:00:00" in result
def test_unknown_task(self, tmp_db):
result = clickup_task_status(task_id="nonexistent", ctx=_make_ctx(tmp_db))
assert "No tracked state" in result
def test_shows_error_when_failed(self, tmp_db):
_seed_task(tmp_db, "f1", "failed", error="API timeout")
result = clickup_task_status(task_id="f1", ctx=_make_ctx(tmp_db))
assert "API timeout" in result
def test_shows_deliverables(self, tmp_db):
_seed_task(tmp_db, "c1", "completed", deliverable_paths=["/data/pr1.txt", "/data/pr2.txt"])
result = clickup_task_status(task_id="c1", ctx=_make_ctx(tmp_db))
assert "/data/pr1.txt" in result
class TestClickupApproveTask:
"""Approval is the gate between 'discovered' and 'executing'.
If this breaks, tasks requiring approval can never run."""
def test_approves_awaiting_task(self, tmp_db):
_seed_task(tmp_db, "a1", "awaiting_approval")
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db))
assert "approved" in result.lower()
# Verify state changed in DB
raw = tmp_db.kv_get("clickup:task:a1:state")
state = json.loads(raw)
assert state["state"] == "approved"
def test_rejects_non_awaiting_task(self, tmp_db):
_seed_task(tmp_db, "a1", "executing")
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db))
assert "Cannot approve" in result
# State should be unchanged
raw = tmp_db.kv_get("clickup:task:a1:state")
state = json.loads(raw)
assert state["state"] == "executing"
def test_unknown_task(self, tmp_db):
result = clickup_approve_task(task_id="nope", ctx=_make_ctx(tmp_db))
assert "No tracked state" in result
class TestClickupDeclineTask:
def test_declines_awaiting_task(self, tmp_db):
_seed_task(tmp_db, "d1", "awaiting_approval")
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db))
assert "declined" in result.lower()
raw = tmp_db.kv_get("clickup:task:d1:state")
state = json.loads(raw)
assert state["state"] == "declined"
def test_rejects_non_awaiting_task(self, tmp_db):
_seed_task(tmp_db, "d1", "completed")
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db))
assert "Cannot decline" in result
def test_unknown_task(self, tmp_db):
result = clickup_decline_task(task_id="nope", ctx=_make_ctx(tmp_db))
assert "No tracked state" in result

112
tests/test_db.py 100644
View File

@ -0,0 +1,112 @@
"""Tests for the Database kv_scan and notifications methods."""
from __future__ import annotations
import json
class TestKvScan:
"""kv_scan is used to find all tracked ClickUp task states efficiently.
If it breaks, the scheduler can't find tasks to execute or recover."""
def test_returns_matching_pairs(self, tmp_db):
tmp_db.kv_set("clickup:task:abc:state", '{"state": "discovered"}')
tmp_db.kv_set("clickup:task:def:state", '{"state": "approved"}')
tmp_db.kv_set("other:key", "unrelated")
results = tmp_db.kv_scan("clickup:task:")
assert len(results) == 2
keys = {k for k, _ in results}
assert keys == {"clickup:task:abc:state", "clickup:task:def:state"}
def test_returns_empty_on_no_match(self, tmp_db):
tmp_db.kv_set("other:key", "value")
results = tmp_db.kv_scan("clickup:")
assert results == []
def test_prefix_is_exact_not_substring(self, tmp_db):
"""'click' should not match 'clickup:' prefix."""
tmp_db.kv_set("clickup:task:1:state", "data")
tmp_db.kv_set("clicked:something", "other")
results = tmp_db.kv_scan("clickup:")
assert len(results) == 1
assert results[0][0] == "clickup:task:1:state"
def test_values_are_returned_correctly(self, tmp_db):
state = json.dumps({"state": "completed", "task_name": "Test"})
tmp_db.kv_set("clickup:task:x:state", state)
results = tmp_db.kv_scan("clickup:task:x:")
assert len(results) == 1
parsed = json.loads(results[0][1])
assert parsed["state"] == "completed"
assert parsed["task_name"] == "Test"
class TestNotifications:
"""Notifications back the NotificationBus. If these break, no UI
gets informed about ClickUp task discoveries, completions, or failures."""
def test_add_and_retrieve(self, tmp_db):
nid = tmp_db.add_notification("Task discovered", "clickup")
assert nid >= 1
notifs = tmp_db.get_notifications_after(0)
assert len(notifs) == 1
assert notifs[0]["message"] == "Task discovered"
assert notifs[0]["category"] == "clickup"
def test_after_id_filters_correctly(self, tmp_db):
id1 = tmp_db.add_notification("First", "clickup")
id2 = tmp_db.add_notification("Second", "clickup")
id3 = tmp_db.add_notification("Third", "clickup")
# Should only get notifications after id1
notifs = tmp_db.get_notifications_after(id1)
assert len(notifs) == 2
assert notifs[0]["message"] == "Second"
assert notifs[1]["message"] == "Third"
def test_after_latest_returns_empty(self, tmp_db):
id1 = tmp_db.add_notification("Only one", "clickup")
notifs = tmp_db.get_notifications_after(id1)
assert notifs == []
def test_limit_is_respected(self, tmp_db):
for i in range(10):
tmp_db.add_notification(f"Msg {i}", "clickup")
notifs = tmp_db.get_notifications_after(0, limit=3)
assert len(notifs) == 3
def test_default_category(self, tmp_db):
tmp_db.add_notification("No category specified")
notifs = tmp_db.get_notifications_after(0)
assert notifs[0]["category"] == "clickup"
def test_created_at_is_populated(self, tmp_db):
tmp_db.add_notification("Timestamped")
notifs = tmp_db.get_notifications_after(0)
assert notifs[0]["created_at"] is not None
assert len(notifs[0]["created_at"]) > 10 # ISO format
def test_ids_are_monotonically_increasing(self, tmp_db):
id1 = tmp_db.add_notification("A")
id2 = tmp_db.add_notification("B")
id3 = tmp_db.add_notification("C")
assert id1 < id2 < id3

View File

@ -0,0 +1,108 @@
"""Tests for the NotificationBus pub/sub layer."""
from __future__ import annotations
from cheddahbot.notifications import NotificationBus
class TestNotificationBus:
"""The bus is the single point of contact between the scheduler and all UIs.
If push/subscribe/get_pending break, no UI learns about ClickUp events."""
def test_push_dispatches_to_subscriber(self, tmp_db):
bus = NotificationBus(tmp_db)
received = []
bus.subscribe("test", lambda msg, cat: received.append((msg, cat)))
bus.push("Task completed", "clickup")
assert len(received) == 1
assert received[0] == ("Task completed", "clickup")
def test_push_persists_to_db(self, tmp_db):
bus = NotificationBus(tmp_db)
bus.push("Persisted message", "clickup")
notifs = tmp_db.get_notifications_after(0)
assert len(notifs) == 1
assert notifs[0]["message"] == "Persisted message"
def test_get_pending_drains_messages(self, tmp_db):
bus = NotificationBus(tmp_db)
bus.subscribe("ui", lambda msg, cat: None)
bus.push("Msg 1")
bus.push("Msg 2")
pending = bus.get_pending("ui")
assert len(pending) == 2
assert pending[0] == "Msg 1"
assert pending[1] == "Msg 2"
# Second call should return empty (already consumed)
assert bus.get_pending("ui") == []
def test_multiple_listeners_get_independent_delivery(self, tmp_db):
bus = NotificationBus(tmp_db)
bus.subscribe("gradio", lambda msg, cat: None)
bus.subscribe("discord", lambda msg, cat: None)
bus.push("Shared notification")
gradio_pending = bus.get_pending("gradio")
discord_pending = bus.get_pending("discord")
assert len(gradio_pending) == 1
assert len(discord_pending) == 1
# Each drains independently
assert bus.get_pending("gradio") == []
assert bus.get_pending("discord") == []
def test_unsubscribe_removes_listener(self, tmp_db):
bus = NotificationBus(tmp_db)
received = []
bus.subscribe("test", lambda msg, cat: received.append(msg))
bus.push("Before unsub")
bus.unsubscribe("test")
bus.push("After unsub")
assert len(received) == 1
assert received[0] == "Before unsub"
def test_get_pending_for_unknown_listener(self, tmp_db):
bus = NotificationBus(tmp_db)
bus.push("Exists in DB")
# Unknown listener starts at cursor 0, so gets everything
pending = bus.get_pending("unknown")
assert len(pending) == 1
def test_subscriber_only_gets_new_notifications(self, tmp_db):
"""Subscribing after push should not deliver old notifications."""
bus = NotificationBus(tmp_db)
bus.push("Old message")
bus.subscribe("late_joiner", lambda msg, cat: None)
bus.push("New message")
pending = bus.get_pending("late_joiner")
assert len(pending) == 1
assert pending[0] == "New message"
def test_callback_error_doesnt_break_other_listeners(self, tmp_db):
bus = NotificationBus(tmp_db)
received = []
def bad_callback(msg, cat):
raise RuntimeError("broken listener")
bus.subscribe("broken", bad_callback)
bus.subscribe("healthy", lambda msg, cat: received.append(msg))
bus.push("Test message")
assert len(received) == 1
assert received[0] == "Test message"