105 lines
3.3 KiB
Python
105 lines
3.3 KiB
Python
"""ntfy.sh push notification sender.
|
|
|
|
Subscribes to the NotificationBus and routes notifications to ntfy.sh
|
|
topics based on category and message-pattern matching.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
|
|
import httpx
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class NtfyChannel:
|
|
"""One ntfy topic with routing rules."""
|
|
|
|
name: str
|
|
server: str
|
|
topic: str
|
|
categories: list[str]
|
|
include_patterns: list[str] = field(default_factory=list)
|
|
exclude_patterns: list[str] = field(default_factory=list)
|
|
priority: str = "high"
|
|
tags: str = ""
|
|
|
|
def accepts(self, message: str, category: str) -> bool:
|
|
"""Return True if this channel should receive the notification."""
|
|
if category not in self.categories:
|
|
return False
|
|
if self.exclude_patterns:
|
|
for pat in self.exclude_patterns:
|
|
if re.search(pat, message, re.IGNORECASE):
|
|
return False
|
|
if self.include_patterns:
|
|
return any(
|
|
re.search(pat, message, re.IGNORECASE)
|
|
for pat in self.include_patterns
|
|
)
|
|
return True # no include_patterns = accept all matching categories
|
|
|
|
|
|
class NtfyNotifier:
|
|
"""Posts notifications to ntfy.sh topics."""
|
|
|
|
def __init__(self, channels: list[NtfyChannel]):
|
|
self._channels = [ch for ch in channels if ch.topic]
|
|
if self._channels:
|
|
log.info(
|
|
"ntfy notifier initialized with %d channel(s): %s",
|
|
len(self._channels),
|
|
", ".join(ch.name for ch in self._channels),
|
|
)
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return bool(self._channels)
|
|
|
|
def notify(self, message: str, category: str) -> None:
|
|
"""Route a notification to matching ntfy channels.
|
|
|
|
This is the callback signature expected by NotificationBus.subscribe().
|
|
Each matching channel posts in a daemon thread so the notification
|
|
pipeline is never blocked.
|
|
"""
|
|
for channel in self._channels:
|
|
if channel.accepts(message, category):
|
|
t = threading.Thread(
|
|
target=self._post,
|
|
args=(channel, message, category),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
def _post(self, channel: NtfyChannel, message: str, category: str) -> None:
|
|
"""Send a notification to an ntfy topic. Fire-and-forget."""
|
|
url = f"{channel.server.rstrip('/')}/{channel.topic}"
|
|
headers: dict[str, str] = {
|
|
"Title": f"CheddahBot [{category}]",
|
|
"Priority": channel.priority,
|
|
}
|
|
if channel.tags:
|
|
headers["Tags"] = channel.tags
|
|
try:
|
|
resp = httpx.post(
|
|
url,
|
|
content=message.encode("utf-8"),
|
|
headers=headers,
|
|
timeout=10.0,
|
|
)
|
|
if resp.status_code >= 400:
|
|
log.warning(
|
|
"ntfy '%s' returned %d: %s",
|
|
channel.name, resp.status_code, resp.text[:200],
|
|
)
|
|
else:
|
|
log.debug("ntfy notification sent to '%s'", channel.name)
|
|
except httpx.HTTPError as e:
|
|
log.warning("ntfy '%s' failed: %s", channel.name, e)
|