411 lines
14 KiB
Python
411 lines
14 KiB
Python
"""Tests for the ntfy.sh push notification sender."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import httpx
|
|
|
|
from cheddahbot.ntfy import NtfyChannel, NtfyNotifier
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NtfyChannel routing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNtfyChannel:
|
|
def test_accepts_matching_category_and_pattern(self):
|
|
ch = NtfyChannel(
|
|
name="human_action",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["clickup", "autocora"],
|
|
include_patterns=["completed", "SUCCESS"],
|
|
)
|
|
assert ch.accepts("ClickUp task completed: **Acme PR**", "clickup") is True
|
|
assert ch.accepts("AutoCora SUCCESS: **keyword**", "autocora") is True
|
|
|
|
def test_rejects_wrong_category(self):
|
|
ch = NtfyChannel(
|
|
name="human_action",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["clickup"],
|
|
include_patterns=["completed"],
|
|
)
|
|
assert ch.accepts("Some autocora message completed", "autocora") is False
|
|
|
|
def test_rejects_non_matching_pattern(self):
|
|
ch = NtfyChannel(
|
|
name="human_action",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["clickup"],
|
|
include_patterns=["completed"],
|
|
)
|
|
assert ch.accepts("Executing ClickUp task: **Acme PR**", "clickup") is False
|
|
|
|
def test_no_include_patterns_accepts_all_in_category(self):
|
|
ch = NtfyChannel(
|
|
name="all_clickup",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["clickup"],
|
|
)
|
|
assert ch.accepts("Any message at all", "clickup") is True
|
|
|
|
def test_exclude_patterns_take_priority(self):
|
|
ch = NtfyChannel(
|
|
name="test",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["clickup"],
|
|
include_patterns=["task"],
|
|
exclude_patterns=["Executing"],
|
|
)
|
|
assert ch.accepts("Executing ClickUp task", "clickup") is False
|
|
assert ch.accepts("ClickUp task completed", "clickup") is True
|
|
|
|
def test_case_insensitive_patterns(self):
|
|
ch = NtfyChannel(
|
|
name="test",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["autocora"],
|
|
include_patterns=["success"],
|
|
)
|
|
assert ch.accepts("AutoCora SUCCESS: **kw**", "autocora") is True
|
|
|
|
def test_empty_topic_filtered_by_notifier(self):
|
|
ch = NtfyChannel(
|
|
name="empty", server="https://ntfy.sh", topic="",
|
|
categories=["clickup"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
assert notifier.enabled is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NtfyNotifier
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNtfyNotifier:
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_notify_posts_to_matching_channel(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="human_action",
|
|
server="https://ntfy.sh",
|
|
topic="my-topic",
|
|
categories=["clickup"],
|
|
include_patterns=["completed"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("ClickUp task completed: **Acme PR**", "clickup")
|
|
|
|
# Wait for daemon thread
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
mock_post.assert_called_once()
|
|
call_args = mock_post.call_args
|
|
assert call_args[0][0] == "https://ntfy.sh/my-topic"
|
|
assert call_args[1]["headers"]["Title"] == "CheddahBot [clickup]"
|
|
assert call_args[1]["headers"]["Priority"] == "high"
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_notify_skips_non_matching_channel(self, mock_post):
|
|
ch = NtfyChannel(
|
|
name="errors",
|
|
server="https://ntfy.sh",
|
|
topic="err-topic",
|
|
categories=["clickup"],
|
|
include_patterns=["failed"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("ClickUp task completed: **Acme PR**", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
mock_post.assert_not_called()
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_notify_routes_to_multiple_channels(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch1 = NtfyChannel(
|
|
name="all", server="https://ntfy.sh", topic="all-topic",
|
|
categories=["clickup"],
|
|
)
|
|
ch2 = NtfyChannel(
|
|
name="errors", server="https://ntfy.sh", topic="err-topic",
|
|
categories=["clickup"], include_patterns=["failed"],
|
|
)
|
|
notifier = NtfyNotifier([ch1, ch2])
|
|
notifier.notify("ClickUp task failed: **Acme**", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
assert mock_post.call_count == 2
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_webhook_error_is_swallowed(self, mock_post):
|
|
mock_post.side_effect = httpx.ConnectError("connection refused")
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
# Should not raise
|
|
notifier.notify("ClickUp task completed: **test**", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_4xx_is_logged_not_raised(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=400, text="Bad Request")
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("ClickUp task completed: **test**", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
def test_enabled_property(self):
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"],
|
|
)
|
|
assert NtfyNotifier([ch]).enabled is True
|
|
assert NtfyNotifier([]).enabled is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Post format
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPostFormat:
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_includes_tags_header(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"], tags="white_check_mark",
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("task completed", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
headers = mock_post.call_args[1]["headers"]
|
|
assert headers["Tags"] == "white_check_mark"
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_omits_tags_header_when_empty(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"], tags="",
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("task completed", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
headers = mock_post.call_args[1]["headers"]
|
|
assert "Tags" not in headers
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_custom_server_url(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="test", server="https://my-ntfy.example.com",
|
|
topic="topic", categories=["clickup"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("task completed", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
assert mock_post.call_args[0][0] == "https://my-ntfy.example.com/topic"
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_message_sent_as_body(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"],
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("Hello **world**", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
assert mock_post.call_args[1]["content"] == b"Hello **world**"
|
|
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_priority_header(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
ch = NtfyChannel(
|
|
name="test", server="https://ntfy.sh", topic="topic",
|
|
categories=["clickup"], priority="urgent",
|
|
)
|
|
notifier = NtfyNotifier([ch])
|
|
notifier.notify("task completed", "clickup")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
assert mock_post.call_args[1]["headers"]["Priority"] == "urgent"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dedup window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_channel(**overrides) -> NtfyChannel:
|
|
defaults = dict(
|
|
name="errors",
|
|
server="https://ntfy.sh",
|
|
topic="test-topic",
|
|
categories=["alert"],
|
|
)
|
|
defaults.update(overrides)
|
|
return NtfyChannel(**defaults)
|
|
|
|
|
|
class TestDedup:
|
|
def test_first_message_goes_through(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
assert notifier._check_and_track("errors", "task X skipped") is True
|
|
|
|
def test_duplicate_permanently_suppressed(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
assert notifier._check_and_track("errors", "task X skipped") is True
|
|
assert notifier._check_and_track("errors", "task X skipped") is False
|
|
|
|
def test_duplicate_still_suppressed_after_day_rollover(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
assert notifier._check_and_track("errors", "task X skipped") is True
|
|
# Dedup memory persists even across date rollover
|
|
with patch.object(notifier, "_today", return_value="2099-01-01"):
|
|
assert notifier._check_and_track("errors", "task X skipped") is False
|
|
|
|
def test_different_messages_not_deduped(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
assert notifier._check_and_track("errors", "task A skipped") is True
|
|
assert notifier._check_and_track("errors", "task B skipped") is True
|
|
|
|
def test_same_message_different_channel_not_deduped(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
assert notifier._check_and_track("errors", "task X skipped") is True
|
|
assert notifier._check_and_track("alerts", "task X skipped") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Daily cap
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDailyCap:
|
|
def test_sends_up_to_cap(self):
|
|
notifier = NtfyNotifier([_make_channel()], daily_cap=3)
|
|
for i in range(3):
|
|
assert notifier._check_and_track("errors", f"msg {i}") is True
|
|
assert notifier._check_and_track("errors", "msg 3") is False
|
|
|
|
def test_cap_resets_on_new_day(self):
|
|
notifier = NtfyNotifier([_make_channel()], daily_cap=2)
|
|
assert notifier._check_and_track("errors", "msg 0") is True
|
|
assert notifier._check_and_track("errors", "msg 1") is True
|
|
assert notifier._check_and_track("errors", "msg 2") is False
|
|
|
|
with patch.object(notifier, "_today", return_value="2099-01-01"):
|
|
assert notifier._check_and_track("errors", "msg 2") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 429 backoff
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRateLimitBackoff:
|
|
def test_429_suppresses_rest_of_day(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
notifier._mark_rate_limited()
|
|
assert notifier._check_and_track("errors", "new message") is False
|
|
|
|
def test_429_resets_next_day(self):
|
|
notifier = NtfyNotifier([_make_channel()])
|
|
notifier._mark_rate_limited()
|
|
assert notifier._check_and_track("errors", "blocked") is False
|
|
|
|
with patch.object(notifier, "_today", return_value="2099-01-01"):
|
|
assert notifier._check_and_track("errors", "unblocked") is True
|
|
|
|
def test_post_sets_rate_limit_on_429(self):
|
|
channel = _make_channel()
|
|
notifier = NtfyNotifier([channel])
|
|
|
|
mock_resp = MagicMock(status_code=429, text="Rate limited")
|
|
with patch("cheddahbot.ntfy.httpx.post", return_value=mock_resp):
|
|
notifier._post(channel, "test msg", "alert")
|
|
|
|
assert notifier._rate_limited_until == notifier._today()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notify integration with dedup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNotifyDedup:
|
|
@patch("cheddahbot.ntfy.httpx.post")
|
|
def test_notify_skips_deduped_messages(self, mock_post):
|
|
mock_post.return_value = MagicMock(status_code=200)
|
|
channel = _make_channel()
|
|
notifier = NtfyNotifier([channel])
|
|
|
|
notifier.notify("same msg", "alert")
|
|
notifier.notify("same msg", "alert")
|
|
|
|
import threading
|
|
for t in threading.enumerate():
|
|
if t.daemon and t.is_alive():
|
|
t.join(timeout=2)
|
|
|
|
# Only one post — second was deduped
|
|
mock_post.assert_called_once()
|