diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py new file mode 100644 index 0000000..5005806 --- /dev/null +++ b/tests/test_thread_safety.py @@ -0,0 +1,221 @@ +"""Thread-safety contention tests for lock-protected paths. + +Intentional OS-thread stress of lock-protected paths; production uses +event-loop cooperative scheduling + snapshot reads. + +NOTE: FakePool is not thread-safe. These tests do not call pool methods +concurrently; they only stress ISOProber._stats_lock and Scheduler._health_lock. +""" + +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock + +from paperscout.models import CycleStatus +from paperscout.monitor import Scheduler, _compute_probe_success_rate +from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX +from paperscout.sources import ISOProber, WG21Index +from paperscout.storage import ProbeState, UserWatchlist +from tests.conftest import make_test_settings + +_STATS_KEYS = frozenset(ISOProber._STATS_TEMPLATE.keys()) +_HEALTH_SNAPSHOT_KEYS = frozenset( + { + "last_updated", + "poll_count", + "last_successful_poll", + "last_cycle_status", + "last_cycle_error", + "probe_stats", + "probe_success_rate", + } +) + +THREAD_JOIN_TIMEOUT = 5.0 +RESET_ITERATIONS = 1000 +SNAPSHOT_ITERATIONS = 1000 +BUMPER_ITERATIONS = 1000 +PUBLISH_ITERATIONS = 500 +READ_ITERATIONS = 500 + + +def _make_prober(fake_pool) -> ISOProber: + index = WG21Index(fake_pool) + state = ProbeState(fake_pool) + wl = MagicMock(spec=UserWatchlist) + wl.get_all_watched_paper_nums.return_value = set() + return ISOProber(index, state, user_watchlist=wl, cfg=make_test_settings()) + + +def _make_scheduler(fake_pool) -> Scheduler: + wg21 = MagicMock() + wg21.source_id = SOURCE_WG21_INDEX + iso = MagicMock() + iso.source_id = SOURCE_ISO_PROBE + user_watchlist = MagicMock(spec=UserWatchlist) + user_watchlist.matches_for_users.return_value = {} + state = ProbeState(fake_pool) + return Scheduler( + sources=[wg21, iso], + user_watchlist=user_watchlist, + state=state, + cfg=make_test_settings(), + ) + + +def _join_threads(threads: list[threading.Thread]) -> None: + for thread in threads: + thread.join(timeout=THREAD_JOIN_TIMEOUT) + assert not thread.is_alive(), ( + f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s" + ) + + +def _assert_valid_probe_stats(stats: dict[str, int]) -> None: + assert set(stats.keys()) == _STATS_KEYS + assert all(isinstance(v, int) and v >= 0 for v in stats.values()) + + +def _assert_health_snapshot_consistent(snap: dict) -> None: + assert set(snap.keys()) == _HEALTH_SNAPSHOT_KEYS + assert isinstance(snap["poll_count"], int) and snap["poll_count"] >= 0 + assert isinstance(snap["probe_stats"], dict) + assert all(isinstance(v, int) for v in snap["probe_stats"].values()) + assert snap["probe_success_rate"] == _compute_probe_success_rate(snap["probe_stats"]) + assert snap["probe_success_rate"] is None or isinstance(snap["probe_success_rate"], float) + assert snap["last_cycle_status"] is None or isinstance(snap["last_cycle_status"], str) + assert snap["last_cycle_error"] is None or isinstance(snap["last_cycle_error"], str) + assert snap["last_successful_poll"] is None or isinstance(snap["last_successful_poll"], str) + last_updated = snap["last_updated"] + assert last_updated is None or (isinstance(last_updated, str) and len(last_updated) > 0) + + +class TestISOProberStatsContention: + """Intentional OS-thread stress of ISOProber._stats_lock.""" + + def test_concurrent_bump_stat_totals(self, fake_pool): + """Concurrent _bump_stat() calls produce correct totals.""" + prober = _make_prober(fake_pool) + n_threads = 32 + bumps_per_thread = 100 + barrier = threading.Barrier(n_threads) + + def worker() -> None: + barrier.wait() + for _ in range(bumps_per_thread): + prober._bump_stat("miss") + + threads = [threading.Thread(target=worker, name=f"bump-{i}") for i in range(n_threads)] + for thread in threads: + thread.start() + _join_threads(threads) + assert prober.snapshot_stats()["miss"] == n_threads * bumps_per_thread + + def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool): + """snapshot_stats() concurrent with _reset_stats() returns consistent state.""" + prober = _make_prober(fake_pool) + errors: list[Exception] = [] + errors_lock = threading.Lock() + n_threads = 5 + barrier = threading.Barrier(n_threads) + + def record_error(exc: Exception) -> None: + with errors_lock: + errors.append(exc) + + def resetter() -> None: + try: + barrier.wait() + for _ in range(RESET_ITERATIONS): + prober._reset_stats() + except Exception as exc: + record_error(exc) + + def snapshotter() -> None: + try: + barrier.wait() + for _ in range(SNAPSHOT_ITERATIONS): + _assert_valid_probe_stats(prober.snapshot_stats()) + except Exception as exc: + record_error(exc) + + def bumper() -> None: + try: + barrier.wait() + for _ in range(BUMPER_ITERATIONS): + prober._bump_stat("miss") + except Exception as exc: + record_error(exc) + + threads = [ + threading.Thread(target=resetter, name="resetter"), + threading.Thread(target=snapshotter, name="snapshot-0"), + threading.Thread(target=snapshotter, name="snapshot-1"), + threading.Thread(target=bumper, name="bumper-0"), + threading.Thread(target=bumper, name="bumper-1"), + ] + for thread in threads: + thread.start() + _join_threads(threads) + assert not errors, f"thread errors: {errors!r}" + + +class TestSchedulerHealthContention: + """Intentional OS-thread stress of Scheduler._health_lock.""" + + def test_health_snapshot_consistent_under_concurrent_publish(self, fake_pool): + """health_snapshot() from non-event-loop threads returns consistent data.""" + scheduler = _make_scheduler(fake_pool) + scheduler._poll_count = 0 + scheduler._last_probe_stats = {} + scheduler._last_cycle_status = None + scheduler._last_successful_poll = None + scheduler._publish_health_snapshot() + + errors: list[Exception] = [] + errors_lock = threading.Lock() + + def record_error(exc: Exception) -> None: + with errors_lock: + errors.append(exc) + + def writer() -> None: + try: + # Writer thread is the sole mutator; attribute writes happen-before + # _publish_health_snapshot() reads on the same thread. _health_lock + # only guards _health_snapshot assignment vs reader threads. + for i in range(PUBLISH_ITERATIONS): + scheduler._poll_count = i + 1 + scheduler._last_probe_stats = { + "miss": i % 10, + "error": i % 3, + "hit_recent": i % 5, + "hit_old": 0, + "hit_no_lm": 0, + "skipped_discovered": 0, + "skipped_in_index": 0, + } + scheduler._last_cycle_status = ( + CycleStatus.SUCCESS if i % 2 == 0 else CycleStatus.EMPTY + ) + scheduler._last_successful_poll = time.time() + scheduler._publish_health_snapshot() + except Exception as exc: + record_error(exc) + + def reader() -> None: + try: + for _ in range(READ_ITERATIONS): + _assert_health_snapshot_consistent(scheduler.health_snapshot()) + except Exception as exc: + record_error(exc) + + writer_thread = threading.Thread(target=writer, name="writer") + reader_threads = [threading.Thread(target=reader, name=f"reader-{i}") for i in range(6)] + writer_thread.start() + for thread in reader_threads: + thread.start() + _join_threads([writer_thread, *reader_threads]) + assert not errors, f"thread errors: {errors!r}"