"""Tests for the simplified ClickUp scheduler flow.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import UTC, datetime from unittest.mock import MagicMock from cheddahbot.clickup import ClickUpTask from cheddahbot.scheduler import Scheduler # ── Fixtures ── _PR_MAPPING = { "tool": "write_press_releases", "auto_execute": True, "field_mapping": { "topic": "task_name", "company_name": "Customer", }, } @dataclass class _FakeClickUpConfig: api_token: str = "pk_test" workspace_id: str = "ws_1" space_id: str = "space_1" poll_interval_minutes: int = 20 poll_statuses: list[str] = field(default_factory=lambda: ["to do"]) review_status: str = "internal review" in_progress_status: str = "in progress" automation_status: str = "automation underway" error_status: str = "error" task_type_field_name: str = "Work Category" default_auto_execute: bool = True skill_map: dict = field(default_factory=lambda: {"Press Release": _PR_MAPPING}) enabled: bool = True @dataclass class _FakeSchedulerConfig: poll_interval_seconds: int = 60 heartbeat_interval_minutes: int = 30 @dataclass class _FakeConfig: clickup: _FakeClickUpConfig = field(default_factory=_FakeClickUpConfig) scheduler: _FakeSchedulerConfig = field(default_factory=_FakeSchedulerConfig) identity_dir: str = "identity" def _make_task(task_id, name, task_type, due_date="", custom_fields=None): """Build a ClickUpTask for testing.""" return ClickUpTask( id=task_id, name=name, status="to do", task_type=task_type, due_date=due_date, custom_fields=custom_fields or {}, ) def _now_ms(): return int(datetime.now(UTC).timestamp() * 1000) _FIELDS = {"Customer": "Acme"} # ── Tests ── class TestPollClickup: """Test the simplified _poll_clickup method.""" def test_skips_task_with_no_space_id(self, tmp_db): config = _FakeConfig() config.clickup.space_id = "" agent = MagicMock() scheduler = Scheduler(config, tmp_db, agent) # Should not raise — just log warning and return scheduler._poll_clickup() def test_skips_task_with_empty_skill_map(self, tmp_db): config = _FakeConfig() config.clickup.skill_map = {} agent = MagicMock() scheduler = Scheduler(config, tmp_db, agent) # Should skip without calling any API scheduler._poll_clickup() def _make_mock_client(self, tasks=None, field_filter=None): """Create a mock ClickUp client with proper return types.""" mock_client = MagicMock() mock_client.get_tasks_from_space.return_value = tasks or [] mock_client.get_list_ids_from_space.return_value = {"list_1"} mock_client.discover_field_filter.return_value = field_filter return mock_client def test_skips_task_with_no_due_date(self, tmp_db): """Tasks with no due date should be skipped.""" config = _FakeConfig() agent = MagicMock() scheduler = Scheduler(config, tmp_db, agent) task = _make_task( "t1", "PR for Acme", "Press Release", due_date="", ) scheduler._clickup_client = self._make_mock_client( tasks=[task], ) scheduler._poll_clickup() scheduler._clickup_client.update_task_status.assert_not_called() def test_skips_unmapped_task_type(self, tmp_db): """Tasks with unmapped task_type should be skipped.""" config = _FakeConfig() agent = MagicMock() scheduler = Scheduler(config, tmp_db, agent) due = str(_now_ms() + 86400000) task = _make_task( "t1", "Content task", "Content Creation", due_date=due, ) scheduler._clickup_client = self._make_mock_client( tasks=[task], ) scheduler._poll_clickup() scheduler._clickup_client.update_task_status.assert_not_called() class TestExecuteTask: """Test the simplified _execute_task method.""" def test_success_flow(self, tmp_db): """Successful execution: tool called, automation underway set.""" config = _FakeConfig() agent = MagicMock() agent._tools = MagicMock() agent._tools.execute.return_value = "Pipeline completed successfully" scheduler = Scheduler(config, tmp_db, agent) mock_client = MagicMock() mock_client.update_task_status.return_value = True scheduler._clickup_client = mock_client due = str(_now_ms() + 86400000) task = _make_task( "t1", "PR for Acme", "Press Release", due_date=due, custom_fields=_FIELDS, ) scheduler._execute_task(task) mock_client.update_task_status.assert_any_call( "t1", "automation underway", ) agent._tools.execute.assert_called_once() def test_failure_flow(self, tmp_db): """Failed: error comment posted, status set to 'error'.""" config = _FakeConfig() agent = MagicMock() agent._tools = MagicMock() agent._tools.execute.side_effect = RuntimeError("API timeout") scheduler = Scheduler(config, tmp_db, agent) mock_client = MagicMock() mock_client.update_task_status.return_value = True mock_client.add_comment.return_value = True scheduler._clickup_client = mock_client due = str(_now_ms() + 86400000) task = _make_task( "t1", "PR for Acme", "Press Release", due_date=due, custom_fields=_FIELDS, ) scheduler._execute_task(task) mock_client.update_task_status.assert_any_call("t1", "error") mock_client.add_comment.assert_called_once() comment_text = mock_client.add_comment.call_args[0][1] assert "failed" in comment_text.lower() class TestFieldFilterDiscovery: """Test _discover_field_filter caching.""" def test_caches_after_first_call(self, tmp_db): config = _FakeConfig() agent = MagicMock() scheduler = Scheduler(config, tmp_db, agent) mock_client = MagicMock() mock_client.get_list_ids_from_space.return_value = {"list_1"} mock_client.discover_field_filter.return_value = { "field_id": "cf_wc", "options": {"Press Release": "opt_pr"}, } mock_client.get_tasks_from_space.return_value = [] scheduler._clickup_client = mock_client # First poll should discover scheduler._poll_clickup() assert scheduler._field_filter_cache is not None assert scheduler._field_filter_cache["field_id"] == "cf_wc" # Second poll reuses cache mock_client.discover_field_filter.reset_mock() scheduler._poll_clickup() mock_client.discover_field_filter.assert_not_called()