diff --git a/cheddahbot/__main__.py b/cheddahbot/__main__.py index fe538bc..73cfbf9 100644 --- a/cheddahbot/__main__.py +++ b/cheddahbot/__main__.py @@ -121,6 +121,41 @@ def main(): except Exception as e: log.warning("Notification bus not available: %s", e) + # ntfy.sh push notifications + if notification_bus and config.ntfy.enabled: + try: + import os + + from .ntfy import NtfyChannel, NtfyNotifier + + ntfy_channels = [] + for ch_cfg in config.ntfy.channels: + topic = os.getenv(ch_cfg.topic_env_var, "") + if topic: + ntfy_channels.append( + NtfyChannel( + name=ch_cfg.name, + server=ch_cfg.server, + topic=topic, + categories=ch_cfg.categories, + include_patterns=ch_cfg.include_patterns, + exclude_patterns=ch_cfg.exclude_patterns, + priority=ch_cfg.priority, + tags=ch_cfg.tags, + ) + ) + else: + log.warning( + "ntfy channel '%s' skipped — env var %s not set", + ch_cfg.name, ch_cfg.topic_env_var, + ) + notifier = NtfyNotifier(ntfy_channels) + if notifier.enabled: + notification_bus.subscribe("ntfy", notifier.notify) + log.info("ntfy notifier subscribed to notification bus") + except Exception as e: + log.warning("ntfy notifier not available: %s", e) + # Scheduler (uses default agent) scheduler = None try: diff --git a/cheddahbot/config.py b/cheddahbot/config.py index 59dc432..bfcdf14 100644 --- a/cheddahbot/config.py +++ b/cheddahbot/config.py @@ -106,6 +106,24 @@ class ContentConfig: ) +@dataclass +class NtfyChannelConfig: + name: str = "" + topic_env_var: str = "" # env var name holding the topic string + server: str = "https://ntfy.sh" + categories: list[str] = field(default_factory=list) + include_patterns: list[str] = field(default_factory=list) + exclude_patterns: list[str] = field(default_factory=list) + priority: str = "high" # min / low / default / high / urgent + tags: str = "" # comma-separated emoji shortcodes + + +@dataclass +class NtfyConfig: + enabled: bool = False + channels: list[NtfyChannelConfig] = field(default_factory=list) + + @dataclass class AgentConfig: """Per-agent configuration for multi-agent support.""" @@ -138,6 +156,7 @@ class Config: autocora: AutoCoraConfig = field(default_factory=AutoCoraConfig) api_budget: ApiBudgetConfig = field(default_factory=ApiBudgetConfig) content: ContentConfig = field(default_factory=ContentConfig) + ntfy: NtfyConfig = field(default_factory=NtfyConfig) agents: list[AgentConfig] = field(default_factory=lambda: [AgentConfig()]) # Derived paths @@ -202,6 +221,20 @@ def load_config() -> Config: if hasattr(cfg.content, k): setattr(cfg.content, k, v) + # ntfy push notifications + if "ntfy" in data and isinstance(data["ntfy"], dict): + ntfy_data = data["ntfy"] + cfg.ntfy.enabled = ntfy_data.get("enabled", False) + if "channels" in ntfy_data and isinstance(ntfy_data["channels"], list): + cfg.ntfy.channels = [] + for ch_data in ntfy_data["channels"]: + if isinstance(ch_data, dict): + ch = NtfyChannelConfig() + for k, v in ch_data.items(): + if hasattr(ch, k): + setattr(ch, k, v) + cfg.ntfy.channels.append(ch) + # Multi-agent configs if "agents" in data and isinstance(data["agents"], list): cfg.agents = [] diff --git a/cheddahbot/ntfy.py b/cheddahbot/ntfy.py new file mode 100644 index 0000000..702954f --- /dev/null +++ b/cheddahbot/ntfy.py @@ -0,0 +1,104 @@ +"""ntfy.sh push notification sender. + +Subscribes to the NotificationBus and routes notifications to ntfy.sh +topics based on category and message-pattern matching. +""" + +from __future__ import annotations + +import logging +import re +import threading +from dataclasses import dataclass, field + +import httpx + +log = logging.getLogger(__name__) + + +@dataclass +class NtfyChannel: + """One ntfy topic with routing rules.""" + + name: str + server: str + topic: str + categories: list[str] + include_patterns: list[str] = field(default_factory=list) + exclude_patterns: list[str] = field(default_factory=list) + priority: str = "high" + tags: str = "" + + def accepts(self, message: str, category: str) -> bool: + """Return True if this channel should receive the notification.""" + if category not in self.categories: + return False + if self.exclude_patterns: + for pat in self.exclude_patterns: + if re.search(pat, message, re.IGNORECASE): + return False + if self.include_patterns: + return any( + re.search(pat, message, re.IGNORECASE) + for pat in self.include_patterns + ) + return True # no include_patterns = accept all matching categories + + +class NtfyNotifier: + """Posts notifications to ntfy.sh topics.""" + + def __init__(self, channels: list[NtfyChannel]): + self._channels = [ch for ch in channels if ch.topic] + if self._channels: + log.info( + "ntfy notifier initialized with %d channel(s): %s", + len(self._channels), + ", ".join(ch.name for ch in self._channels), + ) + + @property + def enabled(self) -> bool: + return bool(self._channels) + + def notify(self, message: str, category: str) -> None: + """Route a notification to matching ntfy channels. + + This is the callback signature expected by NotificationBus.subscribe(). + Each matching channel posts in a daemon thread so the notification + pipeline is never blocked. + """ + for channel in self._channels: + if channel.accepts(message, category): + t = threading.Thread( + target=self._post, + args=(channel, message, category), + daemon=True, + ) + t.start() + + def _post(self, channel: NtfyChannel, message: str, category: str) -> None: + """Send a notification to an ntfy topic. Fire-and-forget.""" + url = f"{channel.server.rstrip('/')}/{channel.topic}" + headers: dict[str, str] = { + "Title": f"CheddahBot [{category}]", + "Priority": channel.priority, + } + if channel.tags: + headers["Tags"] = channel.tags + try: + resp = httpx.post( + url, + content=message.encode("utf-8"), + headers=headers, + timeout=10.0, + ) + if resp.status_code >= 400: + log.warning( + "ntfy '%s' returned %d: %s", + channel.name, resp.status_code, resp.text[:200], + ) + else: + log.debug("ntfy notification sent to '%s'", channel.name) + except httpx.HTTPError as e: + log.warning("ntfy '%s' failed: %s", channel.name, e) diff --git a/config.yaml b/config.yaml index a00b48f..e31d36d 100644 --- a/config.yaml +++ b/config.yaml @@ -100,7 +100,7 @@ link_building: autocora: jobs_dir: "//PennQnap1/SHARE1/AutoCora/jobs" results_dir: "//PennQnap1/SHARE1/AutoCora/results" - poll_interval_minutes: 5 + poll_interval_minutes: 20 success_status: "running cora" error_status: "error" enabled: true @@ -111,6 +111,23 @@ content: cora_inbox: "Z:/content-cora-inbox" outline_dir: "Z:/content-outlines" +# ntfy.sh push notifications +ntfy: + enabled: true + channels: + - name: human_action + topic_env_var: NTFY_TOPIC_HUMAN_ACTION + categories: [clickup, autocora, linkbuilding, content] + include_patterns: ["completed", "SUCCESS", "copied to"] + priority: high + tags: white_check_mark + - name: errors + topic_env_var: NTFY_TOPIC_ERRORS + categories: [clickup, autocora, linkbuilding, content] + include_patterns: ["failed", "FAILURE", "skipped", "no ClickUp match", "copy failed", "IMSURL is empty"] + priority: urgent + tags: rotating_light + # Multi-agent configuration # Each agent gets its own personality, tool whitelist, and memory scope. # The first agent is the default. Omit this section for single-agent mode. diff --git a/tests/test_ntfy.py b/tests/test_ntfy.py new file mode 100644 index 0000000..cb9e764 --- /dev/null +++ b/tests/test_ntfy.py @@ -0,0 +1,290 @@ +"""Tests for the ntfy.sh push notification sender.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import httpx + +from cheddahbot.ntfy import NtfyChannel, NtfyNotifier + +# --------------------------------------------------------------------------- +# NtfyChannel routing +# --------------------------------------------------------------------------- + + +class TestNtfyChannel: + def test_accepts_matching_category_and_pattern(self): + ch = NtfyChannel( + name="human_action", + server="https://ntfy.sh", + topic="test-topic", + categories=["clickup", "autocora"], + include_patterns=["completed", "SUCCESS"], + ) + assert ch.accepts("ClickUp task completed: **Acme PR**", "clickup") is True + assert ch.accepts("AutoCora SUCCESS: **keyword**", "autocora") is True + + def test_rejects_wrong_category(self): + ch = NtfyChannel( + name="human_action", + server="https://ntfy.sh", + topic="test-topic", + categories=["clickup"], + include_patterns=["completed"], + ) + assert ch.accepts("Some autocora message completed", "autocora") is False + + def test_rejects_non_matching_pattern(self): + ch = NtfyChannel( + name="human_action", + server="https://ntfy.sh", + topic="test-topic", + categories=["clickup"], + include_patterns=["completed"], + ) + assert ch.accepts("Executing ClickUp task: **Acme PR**", "clickup") is False + + def test_no_include_patterns_accepts_all_in_category(self): + ch = NtfyChannel( + name="all_clickup", + server="https://ntfy.sh", + topic="test-topic", + categories=["clickup"], + ) + assert ch.accepts("Any message at all", "clickup") is True + + def test_exclude_patterns_take_priority(self): + ch = NtfyChannel( + name="test", + server="https://ntfy.sh", + topic="test-topic", + categories=["clickup"], + include_patterns=["task"], + exclude_patterns=["Executing"], + ) + assert ch.accepts("Executing ClickUp task", "clickup") is False + assert ch.accepts("ClickUp task completed", "clickup") is True + + def test_case_insensitive_patterns(self): + ch = NtfyChannel( + name="test", + server="https://ntfy.sh", + topic="test-topic", + categories=["autocora"], + include_patterns=["success"], + ) + assert ch.accepts("AutoCora SUCCESS: **kw**", "autocora") is True + + def test_empty_topic_filtered_by_notifier(self): + ch = NtfyChannel( + name="empty", server="https://ntfy.sh", topic="", + categories=["clickup"], + ) + notifier = NtfyNotifier([ch]) + assert notifier.enabled is False + + +# --------------------------------------------------------------------------- +# NtfyNotifier +# --------------------------------------------------------------------------- + + +class TestNtfyNotifier: + @patch("cheddahbot.ntfy.httpx.post") + def test_notify_posts_to_matching_channel(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="human_action", + server="https://ntfy.sh", + topic="my-topic", + categories=["clickup"], + include_patterns=["completed"], + ) + notifier = NtfyNotifier([ch]) + notifier.notify("ClickUp task completed: **Acme PR**", "clickup") + + # Wait for daemon thread + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[0][0] == "https://ntfy.sh/my-topic" + assert call_args[1]["headers"]["Title"] == "CheddahBot [clickup]" + assert call_args[1]["headers"]["Priority"] == "high" + + @patch("cheddahbot.ntfy.httpx.post") + def test_notify_skips_non_matching_channel(self, mock_post): + ch = NtfyChannel( + name="errors", + server="https://ntfy.sh", + topic="err-topic", + categories=["clickup"], + include_patterns=["failed"], + ) + notifier = NtfyNotifier([ch]) + notifier.notify("ClickUp task completed: **Acme PR**", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + mock_post.assert_not_called() + + @patch("cheddahbot.ntfy.httpx.post") + def test_notify_routes_to_multiple_channels(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch1 = NtfyChannel( + name="all", server="https://ntfy.sh", topic="all-topic", + categories=["clickup"], + ) + ch2 = NtfyChannel( + name="errors", server="https://ntfy.sh", topic="err-topic", + categories=["clickup"], include_patterns=["failed"], + ) + notifier = NtfyNotifier([ch1, ch2]) + notifier.notify("ClickUp task failed: **Acme**", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + assert mock_post.call_count == 2 + + @patch("cheddahbot.ntfy.httpx.post") + def test_webhook_error_is_swallowed(self, mock_post): + mock_post.side_effect = httpx.ConnectError("connection refused") + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], + ) + notifier = NtfyNotifier([ch]) + # Should not raise + notifier.notify("ClickUp task completed: **test**", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + @patch("cheddahbot.ntfy.httpx.post") + def test_4xx_is_logged_not_raised(self, mock_post): + mock_post.return_value = MagicMock(status_code=400, text="Bad Request") + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], + ) + notifier = NtfyNotifier([ch]) + notifier.notify("ClickUp task completed: **test**", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + def test_enabled_property(self): + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], + ) + assert NtfyNotifier([ch]).enabled is True + assert NtfyNotifier([]).enabled is False + + +# --------------------------------------------------------------------------- +# Post format +# --------------------------------------------------------------------------- + + +class TestPostFormat: + @patch("cheddahbot.ntfy.httpx.post") + def test_includes_tags_header(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], tags="white_check_mark", + ) + notifier = NtfyNotifier([ch]) + notifier.notify("task completed", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + headers = mock_post.call_args[1]["headers"] + assert headers["Tags"] == "white_check_mark" + + @patch("cheddahbot.ntfy.httpx.post") + def test_omits_tags_header_when_empty(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], tags="", + ) + notifier = NtfyNotifier([ch]) + notifier.notify("task completed", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + headers = mock_post.call_args[1]["headers"] + assert "Tags" not in headers + + @patch("cheddahbot.ntfy.httpx.post") + def test_custom_server_url(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="test", server="https://my-ntfy.example.com", + topic="topic", categories=["clickup"], + ) + notifier = NtfyNotifier([ch]) + notifier.notify("task completed", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + assert mock_post.call_args[0][0] == "https://my-ntfy.example.com/topic" + + @patch("cheddahbot.ntfy.httpx.post") + def test_message_sent_as_body(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], + ) + notifier = NtfyNotifier([ch]) + notifier.notify("Hello **world**", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + assert mock_post.call_args[1]["content"] == b"Hello **world**" + + @patch("cheddahbot.ntfy.httpx.post") + def test_priority_header(self, mock_post): + mock_post.return_value = MagicMock(status_code=200) + ch = NtfyChannel( + name="test", server="https://ntfy.sh", topic="topic", + categories=["clickup"], priority="urgent", + ) + notifier = NtfyNotifier([ch]) + notifier.notify("task completed", "clickup") + + import threading + for t in threading.enumerate(): + if t.daemon and t.is_alive(): + t.join(timeout=2) + + assert mock_post.call_args[1]["headers"]["Priority"] == "urgent"