From fc92c2b6d4d01e24313c7c24319f117b454e608c Mon Sep 17 00:00:00 2001 From: mac Date: Fri, 15 May 2026 00:11:10 +0800 Subject: [PATCH 1/2] resolved production monitor and stabilization problem --- src/paperscout/__main__.py | 34 ++++++++++++++++++++++ src/paperscout/config.py | 4 +++ src/paperscout/errors.py | 14 +++++++++ src/paperscout/health.py | 41 +++++++++++++++++---------- src/paperscout/monitor.py | 58 +++++++++++++++++++++++++++++++++++++- src/paperscout/scout.py | 13 +++++++++ src/paperscout/sources.py | 36 +++++++++++++++++++++-- tests/test_health.py | 30 ++++++++++++++++++++ tests/test_monitor.py | 41 +++++++++++++++++++++++++++ tests/test_sources.py | 30 ++++++++++++++++++-- 10 files changed, 280 insertions(+), 21 deletions(-) create mode 100644 src/paperscout/errors.py diff --git a/src/paperscout/__main__.py b/src/paperscout/__main__.py index 722a5d0..2303661 100644 --- a/src/paperscout/__main__.py +++ b/src/paperscout/__main__.py @@ -113,14 +113,47 @@ def _on_poll_result(result): notify_channel(app, result, mq) notify_users(app, result, mq) + def _ops_alert(msg: str) -> None: + if settings.ops_alert_channel: + mq.enqueue( + settings.ops_alert_channel, + f":rotating_light: PaperScout alert: {msg}", + ) + + def _pool_status(p) -> dict: + """Best-effort pool stats (psycopg2 ThreadedConnectionPool uses private attrs).""" + status: dict = {"max": getattr(p, "maxconn", None)} + try: + status["in_use"] = len(p._used) + status["available"] = len(p._pool) + except AttributeError: + status["in_use"] = None + status["available"] = None + return status + scheduler = Scheduler( index=index, prober=prober, user_watchlist=user_watchlist, state=state, notify_callback=_on_poll_result, + ops_alert_fn=_ops_alert, ) + def _extra_health_fields() -> dict: + lsp = scheduler._last_successful_poll + s = scheduler._last_probe_stats + total = sum(s.get(k, 0) for k in ("hit_recent", "hit_old", "hit_no_lm", "miss", "error")) + hit_rate = (s.get("hit_recent", 0) + s.get("hit_old", 0)) / total if total > 0 else None + return { + "last_successful_poll": ( + datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None + ), + "probe_hit_rate": hit_rate, + "mq_depth": mq.depth(), + "db_pool": _pool_status(pool), + } + register_handlers(app, user_watchlist, state, paper_count_fn, launch_time) start_health_server( @@ -129,6 +162,7 @@ def _on_poll_result(result): state, paper_count_fn, bind_host=settings.health_bind_host, + extra_fields_fn=_extra_health_fields, ) log.info("Starting Slack Bolt app on port %d", settings.port) bolt_thread = threading.Thread( diff --git a/src/paperscout/config.py b/src/paperscout/config.py index b96b194..b470d11 100644 --- a/src/paperscout/config.py +++ b/src/paperscout/config.py @@ -80,6 +80,10 @@ class Settings(BaseSettings): # -- Notifications -- notification_channel: str = "" + # Slack channel ID for ops alerts (stale poll). Empty = disabled. + ops_alert_channel: str = "" + # Log a warning when MessageQueue depth reaches or exceeds this (unbounded queue). + mq_backpressure_threshold: int = Field(default=100, ge=1) notify_on_frontier_hit: bool = True notify_on_any_draft: bool = True # Alert when a D-paper we previously probed appears in the wg21.link index diff --git a/src/paperscout/errors.py b/src/paperscout/errors.py new file mode 100644 index 0000000..f16e9e3 --- /dev/null +++ b/src/paperscout/errors.py @@ -0,0 +1,14 @@ +"""Shared failure taxonomy for logging and monitoring.""" + +from __future__ import annotations + +import enum + + +class FailureCategory(str, enum.Enum): + """Structured failure categories for poll / HTTP / probe errors.""" + + RATE_LIMIT = "RATE_LIMIT" + NETWORK = "NETWORK" + TIMEOUT = "TIMEOUT" + UNKNOWN = "UNKNOWN" diff --git a/src/paperscout/health.py b/src/paperscout/health.py index 375e39f..bc6d077 100644 --- a/src/paperscout/health.py +++ b/src/paperscout/health.py @@ -20,6 +20,7 @@ class _HealthHandler(BaseHTTPRequestHandler): launch_time: datetime paper_count_fn: Callable[[], int] state: object # ProbeState — kept generic to avoid circular import + extra_fields_fn: Callable[[], dict] def do_GET(self) -> None: if self.path.rstrip("/") != "/health": @@ -35,21 +36,27 @@ def do_GET(self) -> None: get_disc = getattr(self.state, "get_all_discovered", lambda: {}) discovered = get_disc() - body = json.dumps( - { - "version": __version__, - "uptime_seconds": int(uptime), - "launched_at": self.launch_time.isoformat(), - "papers_loaded": self.paper_count_fn(), - "last_poll": ( - datetime.fromtimestamp(last_poll, tz=timezone.utc).isoformat() - if last_poll - else None - ), - "discovered_via_probe": len(discovered), - "iso_probe_enabled": settings.enable_iso_probe, - } - ).encode() + base = { + "version": __version__, + "uptime_seconds": int(uptime), + "launched_at": self.launch_time.isoformat(), + "papers_loaded": self.paper_count_fn(), + "last_poll": ( + datetime.fromtimestamp(last_poll, tz=timezone.utc).isoformat() + if last_poll + else None + ), + "discovered_via_probe": len(discovered), + "iso_probe_enabled": settings.enable_iso_probe, + } + try: + extra = self.extra_fields_fn() + if not isinstance(extra, dict): + extra = {} + except Exception: + log.exception("health: extra_fields_fn failed") + extra = {} + body = json.dumps({**base, **extra}).encode() self.send_response(200) self.send_header("Content-Type", "application/json") @@ -67,9 +74,12 @@ def start_health_server( state, paper_count_fn: Callable[[], int], bind_host: str = "127.0.0.1", + extra_fields_fn: Callable[[], dict] | None = None, ) -> HTTPServer: """Start the ``/health`` HTTP server on *bind_host*:*port* in a daemon thread.""" + _extra = extra_fields_fn or (lambda: {}) + handler = type( "_BoundHealthHandler", (_HealthHandler,), @@ -77,6 +87,7 @@ def start_health_server( "launch_time": launch_time, "paper_count_fn": staticmethod(paper_count_fn), "state": state, + "extra_fields_fn": staticmethod(_extra), }, ) diff --git a/src/paperscout/monitor.py b/src/paperscout/monitor.py index af6d6b3..0ec1d5e 100644 --- a/src/paperscout/monitor.py +++ b/src/paperscout/monitor.py @@ -5,10 +5,14 @@ import asyncio import logging import time +from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timezone +import httpx + from .config import Settings, settings +from .errors import FailureCategory from .models import Paper, PerUserMatches, ProbeHit from .sources import ISOProber, WG21Index from .storage import ProbeState, UserWatchlist @@ -96,6 +100,7 @@ def __init__( state: ProbeState, cfg: Settings | None = None, notify_callback=None, + ops_alert_fn: Callable[[str], None] | None = None, ): self.index = index self.prober = prober @@ -103,9 +108,13 @@ def __init__( self.state = state self.cfg = cfg or settings self.notify_callback = notify_callback + self.ops_alert_fn = ops_alert_fn self._previous_papers: dict[str, Paper] = {} self._seeded = False self._poll_count = 0 + self._last_successful_poll: float | None = None + self._last_probe_stats: dict[str, int] = {} + self._last_ops_alert: float | None = None async def seed(self) -> None: """First-run: gather all current papers from all sources without notifying.""" @@ -140,6 +149,8 @@ async def poll_once(self) -> PollResult: if not self._seeded: await self.seed() + self._last_successful_poll = time.time() + self._last_probe_stats = dict(self.prober._stats) return PollResult( diff=DiffResult(new_papers=[], updated_papers=[]), probe_hits=[], @@ -256,6 +267,8 @@ async def poll_once(self) -> PollResult: len(dp_transitions), len(per_user_matches), ) + self._last_successful_poll = time.time() + self._last_probe_stats = dict(self.prober._stats) return result async def run_forever(self) -> None: @@ -273,10 +286,53 @@ async def run_forever(self) -> None: t0 = time.monotonic() try: await self.poll_once() + except httpx.TimeoutException as exc: + log.error( + "POLL-ERROR failure_category=%s poll=%d %s", + FailureCategory.TIMEOUT.value, + self._poll_count, + exc, + ) + except httpx.HTTPStatusError as exc: + cat = ( + FailureCategory.RATE_LIMIT + if exc.response.status_code == 429 + else FailureCategory.NETWORK + ) + log.error( + "POLL-ERROR failure_category=%s poll=%d status=%d", + cat.value, + self._poll_count, + exc.response.status_code, + ) + except httpx.HTTPError as exc: + log.error( + "POLL-ERROR failure_category=%s poll=%d %s", + FailureCategory.NETWORK.value, + self._poll_count, + exc, + ) except Exception: - log.exception("POLL-ERROR poll=%d", self._poll_count) + log.exception( + "POLL-ERROR failure_category=%s poll=%d", + FailureCategory.UNKNOWN.value, + self._poll_count, + ) elapsed = time.monotonic() - t0 + if self.ops_alert_fn and self._last_successful_poll is not None: + stale = time.time() - self._last_successful_poll + alert_threshold = 2 * interval + now_m = time.monotonic() + if stale > alert_threshold and ( + self._last_ops_alert is None or (now_m - self._last_ops_alert) > interval + ): + self.ops_alert_fn( + f"No successful poll in {stale / 60:.0f}min " + f"(threshold={2 * self.cfg.poll_interval_minutes}min)" + ) + self._last_ops_alert = now_m + sleep_for = max(interval - elapsed, cooldown) log.info( "SCHEDULER-SLEEP sleep=%.0fs (poll=%.0fs interval=%ds)", diff --git a/src/paperscout/scout.py b/src/paperscout/scout.py index aaf239f..31e8e6f 100644 --- a/src/paperscout/scout.py +++ b/src/paperscout/scout.py @@ -50,9 +50,22 @@ def start(self) -> None: self._thread.start() log.info("MessageQueue started") + def depth(self) -> int: + """Approximate number of messages waiting to be sent (see ``queue.Queue.qsize``).""" + return self._q.qsize() + def enqueue(self, channel: str, text: str, **kwargs) -> None: """Queue a ``chat.postMessage`` for *channel* (or user id for DMs).""" + from .config import settings + self._q.put((channel, text, kwargs)) + depth = self._q.qsize() + if depth >= settings.mq_backpressure_threshold: + log.warning( + "MQ-BACKPRESSURE depth=%d threshold=%d", + depth, + settings.mq_backpressure_threshold, + ) def _run(self) -> None: while True: diff --git a/src/paperscout/sources.py b/src/paperscout/sources.py index e9b09a0..34fda23 100644 --- a/src/paperscout/sources.py +++ b/src/paperscout/sources.py @@ -15,6 +15,7 @@ import httpx from .config import Settings, settings +from .errors import FailureCategory from .models import Paper, ProbeHit, Tier from .storage import PaperCache, ProbeState, UserWatchlist @@ -91,13 +92,32 @@ async def _download(self) -> dict | None: return None except httpx.TimeoutException as exc: log.warning( - "INDEX-FETCH failure_category=TIMEOUT url=%s %s", + "INDEX-FETCH failure_category=%s url=%s %s", + FailureCategory.TIMEOUT.value, WG21_INDEX_URL, exc, ) return None + except httpx.HTTPStatusError as exc: + cat = ( + FailureCategory.RATE_LIMIT + if exc.response.status_code == 429 + else FailureCategory.NETWORK + ) + log.error( + "INDEX-FETCH failure_category=%s url=%s status=%d", + cat.value, + WG21_INDEX_URL, + exc.response.status_code, + ) + return None except (httpx.HTTPError, ValueError) as exc: - log.error("Failed to download index: %s", exc) + log.error( + "INDEX-FETCH failure_category=%s url=%s %s", + FailureCategory.NETWORK.value, + WG21_INDEX_URL, + exc, + ) return None def _parse_and_index(self, raw: dict) -> dict[str, Paper]: @@ -490,7 +510,17 @@ async def _probe_one( if _attempt < _max_retries - 1: await asyncio.sleep(0.5 * (2**_attempt)) continue - log.debug("ERR %s %s (after %d attempts)", url, exc, _max_retries) + cat = ( + FailureCategory.TIMEOUT + if isinstance(exc, httpx.TimeoutException) + else FailureCategory.NETWORK + ) + log.debug( + "PROBE-ERR failure_category=%s url=%s %s", + cat.value, + url, + exc, + ) self._stats["error"] += 1 return None else: diff --git a/tests/test_health.py b/tests/test_health.py index 97f1f5c..d24c276 100644 --- a/tests/test_health.py +++ b/tests/test_health.py @@ -28,6 +28,27 @@ def get_all_discovered(self): return self._discovered +@pytest.fixture() +def health_url_with_extras(): + port = _find_free_port() + launch = datetime(2026, 3, 16, 10, 0, 0, tzinfo=timezone.utc) + state = _FakeState(last_poll=1742119200.0, discovered={"u1": 1}) + server = start_health_server( + port, + launch, + state, + lambda: 42, + extra_fields_fn=lambda: { + "last_successful_poll": "2026-03-16T12:00:00+00:00", + "probe_hit_rate": 0.5, + "mq_depth": 3, + "db_pool": {"max": 10, "in_use": 1, "available": 9}, + }, + ) + yield f"http://127.0.0.1:{port}" + server.shutdown() + + @pytest.fixture() def health_url(): port = _find_free_port() @@ -81,3 +102,12 @@ def test_iso_probe_flag_follows_config_settings(self, health_url): assert data["iso_probe_enabled"] is True finally: cfg.settings.enable_iso_probe = original + + def test_health_extra_fields_merged(self, health_url_with_extras): + data = json.loads(urllib.request.urlopen(f"{health_url_with_extras}/health").read()) + assert "version" in data + assert "last_successful_poll" in data + assert data["last_successful_poll"] == "2026-03-16T12:00:00+00:00" + assert data["probe_hit_rate"] == 0.5 + assert data["mq_depth"] == 3 + assert data["db_pool"] == {"max": 10, "in_use": 1, "available": 9} diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 8c44720..f162da7 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -3,9 +3,11 @@ from __future__ import annotations import asyncio +import logging from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock, patch +import httpx import pytest from paperscout.models import Paper, PerUserMatches, ProbeHit @@ -165,6 +167,7 @@ def _make_scheduler(fake_pool, **cfg_overrides): index.papers = {} prober = MagicMock(spec=ISOProber) prober.run_cycle = AsyncMock(return_value=[]) + prober._stats = {} user_watchlist = MagicMock(spec=UserWatchlist) user_watchlist.matches_for_users.return_value = {} state = ProbeState(fake_pool) @@ -400,6 +403,44 @@ async def mock_poll_once(): await scheduler.run_forever() assert call_count == 2 + async def test_run_forever_emits_timeout_failure_category(self, fake_pool, caplog): + scheduler, _, _, _, _ = _make_scheduler(fake_pool, poll_interval_minutes=0) + call_count = 0 + + async def mock_poll_once(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.TimeoutException("boom", request=MagicMock()) + raise asyncio.CancelledError() + + scheduler.poll_once = mock_poll_once + with caplog.at_level(logging.ERROR, logger="paperscout.monitor"): + with patch("asyncio.sleep", AsyncMock()): + with pytest.raises(asyncio.CancelledError): + await scheduler.run_forever() + assert "failure_category=TIMEOUT" in caplog.text + assert call_count == 2 + + async def test_run_forever_emits_network_failure_category(self, fake_pool, caplog): + scheduler, _, _, _, _ = _make_scheduler(fake_pool, poll_interval_minutes=0) + call_count = 0 + + async def mock_poll_once(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ConnectError("no route", request=MagicMock()) + raise asyncio.CancelledError() + + scheduler.poll_once = mock_poll_once + with caplog.at_level(logging.ERROR, logger="paperscout.monitor"): + with patch("asyncio.sleep", AsyncMock()): + with pytest.raises(asyncio.CancelledError): + await scheduler.run_forever() + assert "failure_category=NETWORK" in caplog.text + assert call_count == 2 + async def test_run_forever_adaptive_sleep_normal_cycle(self, fake_pool): scheduler, _, _, _, _ = _make_scheduler( fake_pool, poll_interval_minutes=30, poll_overrun_cooldown_seconds=300 diff --git a/tests/test_sources.py b/tests/test_sources.py index 30bdae9..97bbfa8 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -139,15 +139,41 @@ async def test_download_non_dict_response(self, fake_pool): result = await index._download() assert result is None - async def test_download_http_error(self, fake_pool): + async def test_download_http_error(self, fake_pool, caplog): index = WG21Index(fake_pool) mock_client = AsyncMock() mock_client.get = AsyncMock(side_effect=httpx.HTTPError("connect failed")) with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) - result = await index._download() + with caplog.at_level(logging.ERROR, logger="paperscout.sources"): + result = await index._download() + assert result is None + assert "failure_category=NETWORK" in caplog.text + + async def test_download_http_status_429_emits_rate_limit(self, fake_pool, caplog): + index = WG21Index(fake_pool) + mock_resp = _make_response(429, json_data={}) + mock_client = _make_async_client(get_resp=mock_resp) + with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: + mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) + with caplog.at_level(logging.ERROR, logger="paperscout.sources"): + result = await index._download() + assert result is None + assert "failure_category=RATE_LIMIT" in caplog.text + + async def test_download_http_status_500_emits_network(self, fake_pool, caplog): + index = WG21Index(fake_pool) + mock_resp = _make_response(500, json_data={}) + mock_client = _make_async_client(get_resp=mock_resp) + with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: + mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) + with caplog.at_level(logging.ERROR, logger="paperscout.sources"): + result = await index._download() assert result is None + assert "failure_category=NETWORK" in caplog.text async def test_download_uses_wg21_index_timeout_from_settings(self, fake_pool): cfg = make_test_settings(wg21_index_timeout_s=42.0) From 2163722835ed67e10e664d5d51f35110f2e8a879 Mon Sep 17 00:00:00 2001 From: mac Date: Fri, 15 May 2026 22:22:07 +0800 Subject: [PATCH 2/2] addressed ai review --- src/paperscout/monitor.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/paperscout/monitor.py b/src/paperscout/monitor.py index 0ec1d5e..1a3d35c 100644 --- a/src/paperscout/monitor.py +++ b/src/paperscout/monitor.py @@ -282,6 +282,7 @@ async def run_forever(self) -> None: self.cfg.enable_iso_probe, self.cfg.enable_bulk_wg21, ) + run_started_wall = time.time() while True: t0 = time.monotonic() try: @@ -320,17 +321,25 @@ async def run_forever(self) -> None: ) elapsed = time.monotonic() - t0 - if self.ops_alert_fn and self._last_successful_poll is not None: - stale = time.time() - self._last_successful_poll + if self.ops_alert_fn: alert_threshold = 2 * interval + now_wall = time.time() now_m = time.monotonic() + if self._last_successful_poll is not None: + stale = now_wall - self._last_successful_poll + else: + # Never completed a poll: treat as stale from loop start. + stale = now_wall - run_started_wall if stale > alert_threshold and ( self._last_ops_alert is None or (now_m - self._last_ops_alert) > interval ): - self.ops_alert_fn( - f"No successful poll in {stale / 60:.0f}min " - f"(threshold={2 * self.cfg.poll_interval_minutes}min)" - ) + try: + self.ops_alert_fn( + f"No successful poll in {stale / 60:.0f}min " + f"(threshold={2 * self.cfg.poll_interval_minutes}min)" + ) + except Exception: + log.exception("OPS-ALERT stale-poll notification failed") self._last_ops_alert = now_m sleep_for = max(interval - elapsed, cooldown)