Fix ntfy dedup: make duplicate suppression permanent for process lifetime
The time-based 1-hour dedup window wasn't preventing repeated skip notifications for tasks with missing fields every poll cycle. Replaced with a permanent sent-set so each unique message only fires once per process run. Dedup log bumped to INFO for visibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>fix/customer-field-migration
parent
9102657c15
commit
af67ae166d
|
|
@ -10,7 +10,6 @@ import hashlib
|
|||
import logging
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import date
|
||||
|
||||
|
|
@ -56,14 +55,12 @@ class NtfyNotifier:
|
|||
channels: list[NtfyChannel],
|
||||
*,
|
||||
daily_cap: int = 200,
|
||||
dedup_window_secs: int = 3600,
|
||||
):
|
||||
self._channels = [ch for ch in channels if ch.topic]
|
||||
self._daily_cap = daily_cap
|
||||
self._dedup_window_secs = dedup_window_secs
|
||||
self._lock = threading.Lock()
|
||||
# dedup: hash(channel.name + message) -> last-sent epoch
|
||||
self._recent: dict[str, float] = {}
|
||||
# dedup: set of hash(channel.name + message) — persists for process lifetime
|
||||
self._sent: set[str] = set()
|
||||
# daily cap tracking
|
||||
self._daily_count = 0
|
||||
self._daily_date = ""
|
||||
|
|
@ -85,7 +82,6 @@ class NtfyNotifier:
|
|||
|
||||
def _check_and_track(self, channel_name: str, message: str) -> bool:
|
||||
"""Return True if this message should be sent. Updates internal state."""
|
||||
now = time.monotonic()
|
||||
today = self._today()
|
||||
|
||||
with self._lock:
|
||||
|
|
@ -93,30 +89,29 @@ class NtfyNotifier:
|
|||
if self._rate_limited_until == today:
|
||||
return False
|
||||
|
||||
# Reset daily counter on date rollover
|
||||
# Reset daily counter on date rollover (but keep dedup memory)
|
||||
if self._daily_date != today:
|
||||
self._daily_date = today
|
||||
self._daily_count = 0
|
||||
self._rate_limited_until = ""
|
||||
self._recent.clear()
|
||||
|
||||
# Daily cap check
|
||||
if self._daily_count >= self._daily_cap:
|
||||
return False
|
||||
|
||||
# Dedup check
|
||||
# Dedup check — once sent, never send the same message again
|
||||
# (until process restart)
|
||||
key = hashlib.md5(
|
||||
(channel_name + "\0" + message).encode()
|
||||
).hexdigest()
|
||||
last_sent = self._recent.get(key)
|
||||
if last_sent is not None and (now - last_sent) < self._dedup_window_secs:
|
||||
log.debug(
|
||||
if key in self._sent:
|
||||
log.info(
|
||||
"ntfy dedup: suppressed duplicate to '%s'", channel_name,
|
||||
)
|
||||
return False
|
||||
|
||||
# All checks passed — record send
|
||||
self._recent[key] = now
|
||||
self._sent.add(key)
|
||||
self._daily_count += 1
|
||||
|
||||
if self._daily_count == self._daily_cap:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import httpx
|
||||
|
|
@ -309,29 +308,28 @@ def _make_channel(**overrides) -> NtfyChannel:
|
|||
|
||||
class TestDedup:
|
||||
def test_first_message_goes_through(self):
|
||||
notifier = NtfyNotifier([_make_channel()], dedup_window_secs=3600)
|
||||
notifier = NtfyNotifier([_make_channel()])
|
||||
assert notifier._check_and_track("errors", "task X skipped") is True
|
||||
|
||||
def test_duplicate_within_window_suppressed(self):
|
||||
notifier = NtfyNotifier([_make_channel()], dedup_window_secs=3600)
|
||||
def test_duplicate_permanently_suppressed(self):
|
||||
notifier = NtfyNotifier([_make_channel()])
|
||||
assert notifier._check_and_track("errors", "task X skipped") is True
|
||||
assert notifier._check_and_track("errors", "task X skipped") is False
|
||||
|
||||
def test_duplicate_after_window_passes(self):
|
||||
notifier = NtfyNotifier([_make_channel()], dedup_window_secs=60)
|
||||
assert notifier._check_and_track("errors", "task X skipped") is True
|
||||
# Simulate time passing beyond the window
|
||||
key = list(notifier._recent.keys())[0]
|
||||
notifier._recent[key] = time.monotonic() - 120
|
||||
def test_duplicate_still_suppressed_after_day_rollover(self):
|
||||
notifier = NtfyNotifier([_make_channel()])
|
||||
assert notifier._check_and_track("errors", "task X skipped") is True
|
||||
# Dedup memory persists even across date rollover
|
||||
with patch.object(notifier, "_today", return_value="2099-01-01"):
|
||||
assert notifier._check_and_track("errors", "task X skipped") is False
|
||||
|
||||
def test_different_messages_not_deduped(self):
|
||||
notifier = NtfyNotifier([_make_channel()], dedup_window_secs=3600)
|
||||
notifier = NtfyNotifier([_make_channel()])
|
||||
assert notifier._check_and_track("errors", "task A skipped") is True
|
||||
assert notifier._check_and_track("errors", "task B skipped") is True
|
||||
|
||||
def test_same_message_different_channel_not_deduped(self):
|
||||
notifier = NtfyNotifier([_make_channel()], dedup_window_secs=3600)
|
||||
notifier = NtfyNotifier([_make_channel()])
|
||||
assert notifier._check_and_track("errors", "task X skipped") is True
|
||||
assert notifier._check_and_track("alerts", "task X skipped") is True
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue