Add UI-agnostic notification bus
Pub/sub NotificationBus backed by DB persistence. Any interface (Gradio, Discord, Slack) subscribes as a listener with independent read cursors. Supports both push callbacks and poll-based consumption. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>cora-start
parent
0b7950cefa
commit
ba89f61bc4
|
|
@ -0,0 +1,79 @@
|
|||
"""UI-agnostic notification bus.
|
||||
|
||||
Any interface (Gradio, Discord, Slack, etc.) subscribes as a listener.
|
||||
Notifications are persisted to the DB so they survive restarts and can
|
||||
be consumed by interfaces that connect later.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from typing import Callable, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .db import Database
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationBus:
|
||||
"""Pub/sub notification bus backed by the DB notifications table."""
|
||||
|
||||
def __init__(self, db: Database):
|
||||
self._db = db
|
||||
self._lock = threading.Lock()
|
||||
# listener_id → callback
|
||||
self._listeners: dict[str, Callable[[str, str], None]] = {}
|
||||
# listener_id → last notification id consumed
|
||||
self._cursors: dict[str, int] = {}
|
||||
|
||||
def push(self, message: str, category: str = "clickup"):
|
||||
"""Push a notification. Persisted to DB and dispatched to all listeners."""
|
||||
notif_id = self._db.add_notification(message, category)
|
||||
log.info("Notification [%s] #%d: %s", category, notif_id, message[:120])
|
||||
|
||||
# Dispatch to live listeners
|
||||
with self._lock:
|
||||
for listener_id, callback in self._listeners.items():
|
||||
try:
|
||||
callback(message, category)
|
||||
except Exception as e:
|
||||
log.warning("Listener %s callback failed: %s", listener_id, e)
|
||||
|
||||
def subscribe(self, listener_id: str, callback: Callable[[str, str], None]):
|
||||
"""Register a listener. Callback receives (message, category)."""
|
||||
with self._lock:
|
||||
self._listeners[listener_id] = callback
|
||||
# Start cursor at latest notification so listener only gets new ones
|
||||
recent = self._db.get_notifications_after(0, limit=1)
|
||||
if recent:
|
||||
# Get the max id
|
||||
all_notifs = self._db.get_notifications_after(0, limit=10000)
|
||||
self._cursors[listener_id] = all_notifs[-1]["id"] if all_notifs else 0
|
||||
else:
|
||||
self._cursors[listener_id] = 0
|
||||
log.info("Listener '%s' subscribed to notification bus", listener_id)
|
||||
|
||||
def unsubscribe(self, listener_id: str):
|
||||
"""Remove a listener."""
|
||||
with self._lock:
|
||||
self._listeners.pop(listener_id, None)
|
||||
self._cursors.pop(listener_id, None)
|
||||
|
||||
def get_pending(self, listener_id: str) -> list[str]:
|
||||
"""Poll-based alternative: drain pending notifications for a listener."""
|
||||
with self._lock:
|
||||
cursor = self._cursors.get(listener_id, 0)
|
||||
|
||||
notifications = self._db.get_notifications_after(cursor)
|
||||
if not notifications:
|
||||
return []
|
||||
|
||||
messages = [n["message"] for n in notifications]
|
||||
new_cursor = notifications[-1]["id"]
|
||||
|
||||
with self._lock:
|
||||
self._cursors[listener_id] = new_cursor
|
||||
|
||||
return messages
|
||||
Loading…
Reference in New Issue