Reformat code and update ClickUp tools to reset pattern

- Ruff format: consistent dict/call wrapping in agent.py, db.py,
  skills.py, delegate.py
- Replace clickup_approve_task/clickup_decline_task with
  clickup_reset_task/clickup_reset_all (simpler state machine)
- Add kv_delete() method to Database
- Add due_date and field filter tests to test_clickup.py
- Update test_clickup_tools.py for new reset tools

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-19 20:44:40 -06:00
parent a1fc5a7c0f
commit 916bec8c0e
7 changed files with 190 additions and 106 deletions

View File

@ -45,10 +45,12 @@ def _build_file_content_parts(files: list[str]) -> list[dict]:
try: try:
data = base64.b64encode(p.read_bytes()).decode("utf-8") data = base64.b64encode(p.read_bytes()).decode("utf-8")
mime = _IMAGE_MIME[suffix] mime = _IMAGE_MIME[suffix]
parts.append({ parts.append(
"type": "image_url", {
"image_url": {"url": f"data:{mime};base64,{data}"}, "type": "image_url",
}) "image_url": {"url": f"data:{mime};base64,{data}"},
}
)
except Exception as e: except Exception as e:
parts.append({"type": "text", "text": f"[Error reading image {p.name}: {e}]"}) parts.append({"type": "text", "text": f"[Error reading image {p.name}: {e}]"})
else: else:
@ -233,11 +235,13 @@ class Agent:
} }
for i, tc in enumerate(unique_tool_calls) for i, tc in enumerate(unique_tool_calls)
] ]
messages.append({ messages.append(
"role": "assistant", {
"content": full_response or None, "role": "assistant",
"tool_calls": openai_tool_calls, "content": full_response or None,
}) "tool_calls": openai_tool_calls,
}
)
for tc in unique_tool_calls: for tc in unique_tool_calls:
yield f"\n\n**Using tool: {tc['name']}**\n" yield f"\n\n**Using tool: {tc['name']}**\n"
@ -248,11 +252,13 @@ class Agent:
yield f"```\n{result[:2000]}\n```\n\n" yield f"```\n{result[:2000]}\n```\n\n"
self.db.add_message(conv_id, "tool", result, tool_result=tc["name"]) self.db.add_message(conv_id, "tool", result, tool_result=tc["name"])
messages.append({ messages.append(
"role": "tool", {
"tool_call_id": tc.get("id", f"call_{tc['name']}"), "role": "tool",
"content": result, "tool_call_id": tc.get("id", f"call_{tc['name']}"),
}) "content": result,
}
)
else: else:
# No tool system configured - just mention tool was requested # No tool system configured - just mention tool was requested
if full_response: if full_response:

View File

@ -94,9 +94,7 @@ class Database:
self._conn.commit() self._conn.commit()
return conv_id return conv_id
def list_conversations( def list_conversations(self, limit: int = 50, agent_name: str | None = None) -> list[dict]:
self, limit: int = 50, agent_name: str | None = None
) -> list[dict]:
if agent_name: if agent_name:
rows = self._conn.execute( rows = self._conn.execute(
"SELECT id, title, updated_at, agent_name FROM conversations" "SELECT id, title, updated_at, agent_name FROM conversations"
@ -229,6 +227,11 @@ class Database:
).fetchall() ).fetchall()
return [(r["key"], r["value"]) for r in rows] return [(r["key"], r["value"]) for r in rows]
def kv_delete(self, key: str):
"""Delete a key from the kv_store."""
self._conn.execute("DELETE FROM kv_store WHERE key = ?", (key,))
self._conn.commit()
# -- Notifications -- # -- Notifications --
def add_notification(self, message: str, category: str = "clickup") -> int: def add_notification(self, message: str, category: str = "clickup") -> int:

View File

@ -116,10 +116,7 @@ class SkillRegistry:
for skill in self._skills.values(): for skill in self._skills.values():
if skill.agents and agent_name not in skill.agents: if skill.agents and agent_name not in skill.agents:
continue continue
parts.append( parts.append(f"### Skill: {skill.name}\n{skill.description}\n")
f"### Skill: {skill.name}\n"
f"{skill.description}\n"
)
if not parts: if not parts:
return "" return ""

View File

@ -95,7 +95,7 @@ def clickup_query_tasks(status: str = "", task_type: str = "", ctx: dict | None
@tool( @tool(
"clickup_list_tasks", "clickup_list_tasks",
"List ClickUp tasks that Cheddah is tracking. Optionally filter by internal state " "List ClickUp tasks that Cheddah is tracking. Optionally filter by internal state "
"(discovered, awaiting_approval, executing, completed, failed, declined, unmapped).", "(executing, completed, failed).",
category="clickup", category="clickup",
) )
def clickup_list_tasks(status: str = "", ctx: dict | None = None) -> str: def clickup_list_tasks(status: str = "", ctx: dict | None = None) -> str:
@ -164,55 +164,42 @@ def clickup_task_status(task_id: str, ctx: dict | None = None) -> str:
@tool( @tool(
"clickup_approve_task", "clickup_reset_task",
"Approve a ClickUp task that is waiting for permission to execute.", "Reset a ClickUp task's internal tracking state so it can be retried on the next poll. "
"Use this when a task has failed or completed and you want to re-run it.",
category="clickup", category="clickup",
) )
def clickup_approve_task(task_id: str, ctx: dict | None = None) -> str: def clickup_reset_task(task_id: str, ctx: dict | None = None) -> str:
"""Approve a task in awaiting_approval state.""" """Delete the kv_store state for a single task so it can be retried."""
db = ctx["db"] db = ctx["db"]
key = f"clickup:task:{task_id}:state" key = f"clickup:task:{task_id}:state"
raw = db.kv_get(key) raw = db.kv_get(key)
if not raw: if not raw:
return f"No tracked state found for task ID '{task_id}'." return f"No tracked state found for task ID '{task_id}'. Nothing to reset."
try: db.kv_delete(key)
state = json.loads(raw) return f"Task '{task_id}' state cleared. It will be picked up on the next scheduler poll."
except json.JSONDecodeError:
return f"Corrupted state data for task '{task_id}'."
if state.get("state") != "awaiting_approval":
current = state.get("state")
return f"Task '{task_id}' is in state '{current}', not 'awaiting_approval'. Cannot approve."
state["state"] = "approved"
db.kv_set(key, json.dumps(state))
name = state.get("clickup_task_name", task_id)
return f"Task '{name}' approved for execution. It will run on the next scheduler cycle."
@tool( @tool(
"clickup_decline_task", "clickup_reset_all",
"Decline a ClickUp task that is waiting for permission to execute.", "Clear ALL internal ClickUp task tracking state. Use this to wipe the slate clean "
"so all eligible tasks can be retried on the next poll cycle.",
category="clickup", category="clickup",
) )
def clickup_decline_task(task_id: str, ctx: dict | None = None) -> str: def clickup_reset_all(ctx: dict | None = None) -> str:
"""Decline a task in awaiting_approval state.""" """Delete all clickup task states and legacy active_ids from kv_store."""
db = ctx["db"] db = ctx["db"]
key = f"clickup:task:{task_id}:state" states = _get_clickup_states(db)
raw = db.kv_get(key) count = 0
if not raw: for task_id in states:
return f"No tracked state found for task ID '{task_id}'." db.kv_delete(f"clickup:task:{task_id}:state")
count += 1
try: # Also clean up legacy active_ids key
state = json.loads(raw) if db.kv_get("clickup:active_task_ids"):
except json.JSONDecodeError: db.kv_delete("clickup:active_task_ids")
return f"Corrupted state data for task '{task_id}'."
if state.get("state") != "awaiting_approval": return (
current = state.get("state") f"Cleared {count} task state(s) from tracking. Next poll will re-discover eligible tasks."
return f"Task '{task_id}' is in state '{current}', not 'awaiting_approval'. Cannot decline." )
state["state"] = "declined"
db.kv_set(key, json.dumps(state))
return f"Task '{state.get('clickup_task_name', task_id)}' has been declined."

View File

@ -48,9 +48,7 @@ def delegate_task(task_description: str, ctx: dict | None = None) -> str:
), ),
category="system", category="system",
) )
def delegate_to_agent( def delegate_to_agent(agent_name: str, task_description: str, ctx: dict | None = None) -> str:
agent_name: str, task_description: str, ctx: dict | None = None
) -> str:
"""Delegate a task to another agent by name.""" """Delegate a task to another agent by name."""
if not ctx or "agent_registry" not in ctx: if not ctx or "agent_registry" not in ctx:
return "Error: delegate_to_agent requires agent_registry in context." return "Error: delegate_to_agent requires agent_registry in context."
@ -71,7 +69,9 @@ def delegate_to_agent(
log.info( log.info(
"Delegating to agent '%s' (depth %d): %s", "Delegating to agent '%s' (depth %d): %s",
agent_name, depth + 1, task_description[:100], agent_name,
depth + 1,
task_description[:100],
) )
_delegation_depth.value = depth + 1 _delegation_depth.value = depth + 1

View File

@ -72,6 +72,38 @@ class TestClickUpTaskParsing:
task = ClickUpTask.from_api(data) task = ClickUpTask.from_api(data)
assert task.description == "" assert task.description == ""
def test_parses_due_date(self):
data = {
"id": "x",
"name": "Test",
"status": {"status": "open", "type": "open"},
"due_date": "1740000000000",
"custom_fields": [],
}
task = ClickUpTask.from_api(data)
assert task.due_date == "1740000000000"
def test_due_date_defaults_empty_when_null(self):
data = {
"id": "x",
"name": "Test",
"status": {"status": "open", "type": "open"},
"due_date": None,
"custom_fields": [],
}
task = ClickUpTask.from_api(data)
assert task.due_date == ""
def test_due_date_defaults_empty_when_missing(self):
data = {
"id": "x",
"name": "Test",
"status": {"status": "open", "type": "open"},
"custom_fields": [],
}
task = ClickUpTask.from_api(data)
assert task.due_date == ""
def test_dropdown_resolved_by_id_fallback(self): def test_dropdown_resolved_by_id_fallback(self):
"""When orderindex doesn't match, fall back to id matching.""" """When orderindex doesn't match, fall back to id matching."""
data = { data = {
@ -333,3 +365,72 @@ class TestClickUpClient:
assert len(tasks) == 1 assert len(tasks) == 1
assert tasks[0].id == "t_x" assert tasks[0].id == "t_x"
client.close() client.close()
@respx.mock
def test_get_tasks_with_due_date_and_custom_fields(self):
"""Verify due_date_lt and custom_fields are passed as query params."""
route = respx.get(f"{BASE_URL}/list/list_1/task").mock(
return_value=httpx.Response(200, json={"tasks": []})
)
client = ClickUpClient(api_token="pk_test")
client.get_tasks(
"list_1",
statuses=["to do"],
due_date_lt=1740000000000,
custom_fields='[{"field_id":"cf_1","operator":"ANY","value":["opt_1"]}]',
)
request = route.calls.last.request
url_str = str(request.url)
assert "due_date_lt=1740000000000" in url_str
assert "custom_fields=" in url_str
client.close()
@respx.mock
def test_discover_field_filter_found(self):
respx.get(f"{BASE_URL}/list/list_1/field").mock(
return_value=httpx.Response(
200,
json={
"fields": [
{
"id": "cf_wc",
"name": "Work Category",
"type": "drop_down",
"type_config": {
"options": [
{"id": "opt_pr", "name": "Press Release", "orderindex": 0},
{"id": "opt_lb", "name": "Link Building", "orderindex": 1},
]
},
},
{"id": "cf_other", "name": "Company", "type": "short_text"},
]
},
)
)
client = ClickUpClient(api_token="pk_test")
result = client.discover_field_filter("list_1", "Work Category")
assert result is not None
assert result["field_id"] == "cf_wc"
assert result["options"]["Press Release"] == "opt_pr"
assert result["options"]["Link Building"] == "opt_lb"
client.close()
@respx.mock
def test_discover_field_filter_not_found(self):
respx.get(f"{BASE_URL}/list/list_1/field").mock(
return_value=httpx.Response(
200,
json={"fields": [{"id": "cf_other", "name": "Company", "type": "short_text"}]},
)
)
client = ClickUpClient(api_token="pk_test")
result = client.discover_field_filter("list_1", "Work Category")
assert result is None
client.close()

View File

@ -1,13 +1,13 @@
"""Tests for the ClickUp chat tools (state machine transitions).""" """Tests for the ClickUp chat tools."""
from __future__ import annotations from __future__ import annotations
import json import json
from cheddahbot.tools.clickup_tool import ( from cheddahbot.tools.clickup_tool import (
clickup_approve_task,
clickup_decline_task,
clickup_list_tasks, clickup_list_tasks,
clickup_reset_all,
clickup_reset_task,
clickup_task_status, clickup_task_status,
) )
@ -100,58 +100,48 @@ class TestClickupTaskStatus:
assert "/data/pr1.txt" in result assert "/data/pr1.txt" in result
class TestClickupApproveTask: class TestClickupResetTask:
"""Approval is the gate between 'discovered' and 'executing'. def test_resets_failed_task(self, tmp_db):
If this breaks, tasks requiring approval can never run.""" _seed_task(tmp_db, "f1", "failed")
def test_approves_awaiting_task(self, tmp_db): result = clickup_reset_task(task_id="f1", ctx=_make_ctx(tmp_db))
_seed_task(tmp_db, "a1", "awaiting_approval")
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db)) assert "cleared" in result.lower()
assert tmp_db.kv_get("clickup:task:f1:state") is None
assert "approved" in result.lower() def test_resets_completed_task(self, tmp_db):
_seed_task(tmp_db, "c1", "completed")
# Verify state changed in DB result = clickup_reset_task(task_id="c1", ctx=_make_ctx(tmp_db))
raw = tmp_db.kv_get("clickup:task:a1:state")
state = json.loads(raw)
assert state["state"] == "approved"
def test_rejects_non_awaiting_task(self, tmp_db): assert "cleared" in result.lower()
_seed_task(tmp_db, "a1", "executing") assert tmp_db.kv_get("clickup:task:c1:state") is None
result = clickup_approve_task(task_id="a1", ctx=_make_ctx(tmp_db))
assert "Cannot approve" in result
# State should be unchanged
raw = tmp_db.kv_get("clickup:task:a1:state")
state = json.loads(raw)
assert state["state"] == "executing"
def test_unknown_task(self, tmp_db): def test_unknown_task(self, tmp_db):
result = clickup_approve_task(task_id="nope", ctx=_make_ctx(tmp_db)) result = clickup_reset_task(task_id="nope", ctx=_make_ctx(tmp_db))
assert "No tracked state" in result assert "Nothing to reset" in result
class TestClickupDeclineTask: class TestClickupResetAll:
def test_declines_awaiting_task(self, tmp_db): def test_clears_all_states(self, tmp_db):
_seed_task(tmp_db, "d1", "awaiting_approval") _seed_task(tmp_db, "a1", "completed")
_seed_task(tmp_db, "a2", "failed")
_seed_task(tmp_db, "a3", "executing")
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db)) result = clickup_reset_all(ctx=_make_ctx(tmp_db))
assert "declined" in result.lower() assert "3" in result
assert tmp_db.kv_get("clickup:task:a1:state") is None
assert tmp_db.kv_get("clickup:task:a2:state") is None
assert tmp_db.kv_get("clickup:task:a3:state") is None
raw = tmp_db.kv_get("clickup:task:d1:state") def test_clears_legacy_active_ids(self, tmp_db):
state = json.loads(raw) tmp_db.kv_set("clickup:active_task_ids", json.dumps(["a1", "a2"]))
assert state["state"] == "declined"
def test_rejects_non_awaiting_task(self, tmp_db): clickup_reset_all(ctx=_make_ctx(tmp_db))
_seed_task(tmp_db, "d1", "completed")
result = clickup_decline_task(task_id="d1", ctx=_make_ctx(tmp_db)) assert tmp_db.kv_get("clickup:active_task_ids") is None
assert "Cannot decline" in result def test_empty_returns_zero(self, tmp_db):
result = clickup_reset_all(ctx=_make_ctx(tmp_db))
def test_unknown_task(self, tmp_db): assert "0" in result
result = clickup_decline_task(task_id="nope", ctx=_make_ctx(tmp_db))
assert "No tracked state" in result