109 lines
3.4 KiB
Python
109 lines
3.4 KiB
Python
"""Tests for the NotificationBus pub/sub layer."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from cheddahbot.notifications import NotificationBus
|
|
|
|
|
|
class TestNotificationBus:
|
|
"""The bus is the single point of contact between the scheduler and all UIs.
|
|
If push/subscribe/get_pending break, no UI learns about ClickUp events."""
|
|
|
|
def test_push_dispatches_to_subscriber(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
received = []
|
|
bus.subscribe("test", lambda msg, cat: received.append((msg, cat)))
|
|
|
|
bus.push("Task completed", "clickup")
|
|
|
|
assert len(received) == 1
|
|
assert received[0] == ("Task completed", "clickup")
|
|
|
|
def test_push_persists_to_db(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
|
|
bus.push("Persisted message", "clickup")
|
|
|
|
notifs = tmp_db.get_notifications_after(0)
|
|
assert len(notifs) == 1
|
|
assert notifs[0]["message"] == "Persisted message"
|
|
|
|
def test_get_pending_drains_messages(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
bus.subscribe("ui", lambda msg, cat: None)
|
|
|
|
bus.push("Msg 1")
|
|
bus.push("Msg 2")
|
|
|
|
pending = bus.get_pending("ui")
|
|
assert len(pending) == 2
|
|
assert pending[0] == "Msg 1"
|
|
assert pending[1] == "Msg 2"
|
|
|
|
# Second call should return empty (already consumed)
|
|
assert bus.get_pending("ui") == []
|
|
|
|
def test_multiple_listeners_get_independent_delivery(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
bus.subscribe("gradio", lambda msg, cat: None)
|
|
bus.subscribe("discord", lambda msg, cat: None)
|
|
|
|
bus.push("Shared notification")
|
|
|
|
gradio_pending = bus.get_pending("gradio")
|
|
discord_pending = bus.get_pending("discord")
|
|
|
|
assert len(gradio_pending) == 1
|
|
assert len(discord_pending) == 1
|
|
|
|
# Each drains independently
|
|
assert bus.get_pending("gradio") == []
|
|
assert bus.get_pending("discord") == []
|
|
|
|
def test_unsubscribe_removes_listener(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
received = []
|
|
bus.subscribe("test", lambda msg, cat: received.append(msg))
|
|
|
|
bus.push("Before unsub")
|
|
bus.unsubscribe("test")
|
|
bus.push("After unsub")
|
|
|
|
assert len(received) == 1
|
|
assert received[0] == "Before unsub"
|
|
|
|
def test_get_pending_for_unknown_listener(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
bus.push("Exists in DB")
|
|
|
|
# Unknown listener starts at cursor 0, so gets everything
|
|
pending = bus.get_pending("unknown")
|
|
assert len(pending) == 1
|
|
|
|
def test_subscriber_only_gets_new_notifications(self, tmp_db):
|
|
"""Subscribing after push should not deliver old notifications."""
|
|
bus = NotificationBus(tmp_db)
|
|
bus.push("Old message")
|
|
|
|
bus.subscribe("late_joiner", lambda msg, cat: None)
|
|
bus.push("New message")
|
|
|
|
pending = bus.get_pending("late_joiner")
|
|
assert len(pending) == 1
|
|
assert pending[0] == "New message"
|
|
|
|
def test_callback_error_doesnt_break_other_listeners(self, tmp_db):
|
|
bus = NotificationBus(tmp_db)
|
|
received = []
|
|
|
|
def bad_callback(msg, cat):
|
|
raise RuntimeError("broken listener")
|
|
|
|
bus.subscribe("broken", bad_callback)
|
|
bus.subscribe("healthy", lambda msg, cat: received.append(msg))
|
|
|
|
bus.push("Test message")
|
|
|
|
assert len(received) == 1
|
|
assert received[0] == "Test message"
|