75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""UI-agnostic notification bus.
|
|
|
|
Any interface (Gradio, Discord, Slack, etc.) subscribes as a listener.
|
|
Notifications are persisted to the DB so they survive restarts and can
|
|
be consumed by interfaces that connect later.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from collections.abc import Callable
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from .db import Database
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class NotificationBus:
|
|
"""Pub/sub notification bus backed by the DB notifications table."""
|
|
|
|
def __init__(self, db: Database):
|
|
self._db = db
|
|
self._lock = threading.Lock()
|
|
# listener_id → callback
|
|
self._listeners: dict[str, Callable[[str, str], None]] = {}
|
|
# listener_id → last notification id consumed
|
|
self._cursors: dict[str, int] = {}
|
|
|
|
def push(self, message: str, category: str = "clickup"):
|
|
"""Push a notification. Persisted to DB and dispatched to all listeners."""
|
|
notif_id = self._db.add_notification(message, category)
|
|
log.info("Notification [%s] #%d: %s", category, notif_id, message[:120])
|
|
|
|
# Dispatch to live listeners
|
|
with self._lock:
|
|
for listener_id, callback in self._listeners.items():
|
|
try:
|
|
callback(message, category)
|
|
except Exception as e:
|
|
log.warning("Listener %s callback failed: %s", listener_id, e)
|
|
|
|
def subscribe(self, listener_id: str, callback: Callable[[str, str], None]):
|
|
"""Register a listener. Callback receives (message, category)."""
|
|
with self._lock:
|
|
self._listeners[listener_id] = callback
|
|
# Start cursor at latest notification so listener only gets new ones
|
|
self._cursors[listener_id] = self._db.get_max_notification_id()
|
|
log.info("Listener '%s' subscribed to notification bus", listener_id)
|
|
|
|
def unsubscribe(self, listener_id: str):
|
|
"""Remove a listener."""
|
|
with self._lock:
|
|
self._listeners.pop(listener_id, None)
|
|
self._cursors.pop(listener_id, None)
|
|
|
|
def get_pending(self, listener_id: str) -> list[str]:
|
|
"""Poll-based alternative: drain pending notifications for a listener."""
|
|
with self._lock:
|
|
cursor = self._cursors.get(listener_id, 0)
|
|
|
|
notifications = self._db.get_notifications_after(cursor)
|
|
if not notifications:
|
|
return []
|
|
|
|
messages = [n["message"] for n in notifications]
|
|
new_cursor = notifications[-1]["id"]
|
|
|
|
with self._lock:
|
|
self._cursors[listener_id] = new_cursor
|
|
|
|
return messages
|