"""ntfy.sh push notification sender. Subscribes to the NotificationBus and routes notifications to ntfy.sh topics based on category and message-pattern matching. """ from __future__ import annotations import logging import re import threading from dataclasses import dataclass, field import httpx log = logging.getLogger(__name__) @dataclass class NtfyChannel: """One ntfy topic with routing rules.""" name: str server: str topic: str categories: list[str] include_patterns: list[str] = field(default_factory=list) exclude_patterns: list[str] = field(default_factory=list) priority: str = "high" tags: str = "" def accepts(self, message: str, category: str) -> bool: """Return True if this channel should receive the notification.""" if category not in self.categories: return False if self.exclude_patterns: for pat in self.exclude_patterns: if re.search(pat, message, re.IGNORECASE): return False if self.include_patterns: return any( re.search(pat, message, re.IGNORECASE) for pat in self.include_patterns ) return True # no include_patterns = accept all matching categories class NtfyNotifier: """Posts notifications to ntfy.sh topics.""" def __init__(self, channels: list[NtfyChannel]): self._channels = [ch for ch in channels if ch.topic] if self._channels: log.info( "ntfy notifier initialized with %d channel(s): %s", len(self._channels), ", ".join(ch.name for ch in self._channels), ) @property def enabled(self) -> bool: return bool(self._channels) def notify(self, message: str, category: str) -> None: """Route a notification to matching ntfy channels. This is the callback signature expected by NotificationBus.subscribe(). Each matching channel posts in a daemon thread so the notification pipeline is never blocked. """ for channel in self._channels: if channel.accepts(message, category): t = threading.Thread( target=self._post, args=(channel, message, category), daemon=True, ) t.start() def _post(self, channel: NtfyChannel, message: str, category: str) -> None: """Send a notification to an ntfy topic. Fire-and-forget.""" url = f"{channel.server.rstrip('/')}/{channel.topic}" headers: dict[str, str] = { "Title": f"CheddahBot [{category}]", "Priority": channel.priority, } if channel.tags: headers["Tags"] = channel.tags try: resp = httpx.post( url, content=message.encode("utf-8"), headers=headers, timeout=10.0, ) if resp.status_code >= 400: log.warning( "ntfy '%s' returned %d: %s", channel.name, resp.status_code, resp.text[:200], ) else: log.debug("ntfy notification sent to '%s'", channel.name) except httpx.HTTPError as e: log.warning("ntfy '%s' failed: %s", channel.name, e)