From ba89f61bc4f835ea86aa94ddd351fb12905d7672 Mon Sep 17 00:00:00 2001 From: PeninsulaInd Date: Sun, 15 Feb 2026 22:27:02 -0600 Subject: [PATCH] Add UI-agnostic notification bus Pub/sub NotificationBus backed by DB persistence. Any interface (Gradio, Discord, Slack) subscribes as a listener with independent read cursors. Supports both push callbacks and poll-based consumption. Co-Authored-By: Claude Opus 4.6 --- cheddahbot/notifications.py | 79 +++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 cheddahbot/notifications.py diff --git a/cheddahbot/notifications.py b/cheddahbot/notifications.py new file mode 100644 index 0000000..4e2df5b --- /dev/null +++ b/cheddahbot/notifications.py @@ -0,0 +1,79 @@ +"""UI-agnostic notification bus. + +Any interface (Gradio, Discord, Slack, etc.) subscribes as a listener. +Notifications are persisted to the DB so they survive restarts and can +be consumed by interfaces that connect later. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Callable, TYPE_CHECKING + +if TYPE_CHECKING: + from .db import Database + +log = logging.getLogger(__name__) + + +class NotificationBus: + """Pub/sub notification bus backed by the DB notifications table.""" + + def __init__(self, db: Database): + self._db = db + self._lock = threading.Lock() + # listener_id → callback + self._listeners: dict[str, Callable[[str, str], None]] = {} + # listener_id → last notification id consumed + self._cursors: dict[str, int] = {} + + def push(self, message: str, category: str = "clickup"): + """Push a notification. Persisted to DB and dispatched to all listeners.""" + notif_id = self._db.add_notification(message, category) + log.info("Notification [%s] #%d: %s", category, notif_id, message[:120]) + + # Dispatch to live listeners + with self._lock: + for listener_id, callback in self._listeners.items(): + try: + callback(message, category) + except Exception as e: + log.warning("Listener %s callback failed: %s", listener_id, e) + + def subscribe(self, listener_id: str, callback: Callable[[str, str], None]): + """Register a listener. Callback receives (message, category).""" + with self._lock: + self._listeners[listener_id] = callback + # Start cursor at latest notification so listener only gets new ones + recent = self._db.get_notifications_after(0, limit=1) + if recent: + # Get the max id + all_notifs = self._db.get_notifications_after(0, limit=10000) + self._cursors[listener_id] = all_notifs[-1]["id"] if all_notifs else 0 + else: + self._cursors[listener_id] = 0 + log.info("Listener '%s' subscribed to notification bus", listener_id) + + def unsubscribe(self, listener_id: str): + """Remove a listener.""" + with self._lock: + self._listeners.pop(listener_id, None) + self._cursors.pop(listener_id, None) + + def get_pending(self, listener_id: str) -> list[str]: + """Poll-based alternative: drain pending notifications for a listener.""" + with self._lock: + cursor = self._cursors.get(listener_id, 0) + + notifications = self._db.get_notifications_after(cursor) + if not notifications: + return [] + + messages = [n["message"] for n in notifications] + new_cursor = notifications[-1]["id"] + + with self._lock: + self._cursors[listener_id] = new_cursor + + return messages