-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_storage.py
More file actions
417 lines (340 loc) · 16 KB
/
test_storage.py
File metadata and controls
417 lines (340 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
"""Tests for paperscout.storage (PostgreSQL-backed via FakePool)."""
from __future__ import annotations
import logging
import time
from unittest.mock import patch
import pytest
from paperscout.models import Paper
from paperscout.storage import (
PaperCache,
ProbeState,
UserWatchlist,
iso_paper_number_from_discovered_url,
)
# ── PaperCache ────────────────────────────────────────────────────────────────
class TestPaperCache:
def test_is_fresh_when_empty(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=1.0)
assert not cache.is_fresh()
def test_is_fresh_after_write(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=1.0)
cache.write({"x": 1})
assert cache.is_fresh()
def test_is_stale_with_zero_ttl(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=0.0)
cache.write({"x": 1})
assert not cache.is_fresh()
def test_is_stale_when_old(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=1.0)
cache.write({"x": 1})
with patch("paperscout.storage.time") as mock_time:
mock_time.time.return_value = 1e12
assert not cache.is_fresh()
def test_read_when_empty(self, fake_pool):
cache = PaperCache(fake_pool)
assert cache.read() is None
def test_read_after_write(self, fake_pool):
cache = PaperCache(fake_pool)
data = {"key": "value", "num": 42}
cache.write(data)
assert cache.read() == data
def test_read_if_fresh_returns_data_when_fresh(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=1.0)
cache.write({"a": 1})
assert cache.read_if_fresh() == {"a": 1}
def test_read_if_fresh_returns_none_when_stale(self, fake_pool):
cache = PaperCache(fake_pool, ttl_hours=1.0)
cache.write({"a": 1})
with patch("paperscout.storage.time") as mock_time:
mock_time.time.return_value = 1e12
assert cache.read_if_fresh() is None
def test_write_upserts_on_second_write(self, fake_pool):
cache = PaperCache(fake_pool)
cache.write({"version": 1})
cache.write({"version": 2})
assert cache.read() == {"version": 2}
def test_write_non_ascii(self, fake_pool):
cache = PaperCache(fake_pool)
data = {"author": "Bjørn Stroustrup"}
cache.write(data)
assert cache.read() == data
# ── ProbeState ────────────────────────────────────────────────────────────────
class TestProbeState:
def test_initial_state(self, fake_pool):
state = ProbeState(fake_pool)
assert state.get_all_discovered() == {}
assert state.miss_counts == {}
assert state.last_poll == 0.0
def test_mark_discovered_stores_entry(self, fake_pool):
state = ProbeState(fake_pool)
url = "https://isocpp.org/files/papers/D2300R11.pdf"
assert not state.is_discovered(url)
state.mark_discovered(url)
assert state.is_discovered(url)
entry = state.get_all_discovered()[url]
assert isinstance(entry, dict)
assert "discovered_at" in entry
assert entry["last_modified"] is None
def test_mark_discovered_stores_last_modified(self, fake_pool):
state = ProbeState(fake_pool)
url = "https://isocpp.org/files/papers/D2300R11.pdf"
lm_ts = 1_700_000_000.0
state.mark_discovered(url, last_modified_ts=lm_ts)
entry = state.get_all_discovered()[url]
assert entry["last_modified"] == lm_ts
assert entry["discovered_at"] > 0
def test_iso_paper_number_from_discovered_url(self):
assert (
iso_paper_number_from_discovered_url("https://isocpp.org/files/papers/D4165R0.pdf")
== 4165
)
assert (
iso_paper_number_from_discovered_url("https://isocpp.org/files/papers/P1234R0.html")
== 1234
)
assert iso_paper_number_from_discovered_url("https://example.com/") is None
def test_paper_nums_from_discovered_iso_urls(self, fake_pool):
state = ProbeState(fake_pool)
state.mark_discovered("https://isocpp.org/files/papers/D4165R0.pdf")
state.mark_discovered("https://isocpp.org/files/papers/D2300R11.pdf")
assert state.paper_nums_from_discovered_iso_urls() == {4165, 2300}
def test_mark_discovered_is_idempotent(self, fake_pool):
state = ProbeState(fake_pool)
url = "https://isocpp.org/files/papers/D2300R11.pdf"
state.mark_discovered(url, last_modified_ts=111.0)
first_entry = dict(state.get_all_discovered()[url])
time.sleep(0.01)
state.mark_discovered(url, last_modified_ts=999.0)
assert state.get_all_discovered()[url] == first_entry
def test_discovered_info_returns_entry(self, fake_pool):
state = ProbeState(fake_pool)
url = "https://isocpp.org/files/papers/D2300R11.pdf"
state.mark_discovered(url, last_modified_ts=42.0)
info = state.discovered_info(url)
assert info is not None
assert info["last_modified"] == 42.0
def test_discovered_info_returns_none_for_unknown(self, fake_pool):
state = ProbeState(fake_pool)
assert state.discovered_info("https://example.com/nope.pdf") is None
def test_get_all_discovered_returns_all(self, fake_pool):
state = ProbeState(fake_pool)
state.mark_discovered("https://example.com/A.pdf")
state.mark_discovered("https://example.com/B.pdf")
disc = state.get_all_discovered()
assert len(disc) == 2
assert "https://example.com/A.pdf" in disc
assert "https://example.com/B.pdf" in disc
def test_miss_counter_increments(self, fake_pool):
state = ProbeState(fake_pool)
assert state.get_miss_count("1234") == 0
state.record_miss("1234")
assert state.get_miss_count("1234") == 1
state.record_miss("1234")
assert state.get_miss_count("1234") == 2
def test_reset_misses(self, fake_pool):
state = ProbeState(fake_pool)
state.record_miss("1234")
state.record_miss("1234")
state.reset_misses("1234")
assert state.get_miss_count("1234") == 0
def test_reset_misses_nonexistent_is_safe(self, fake_pool):
state = ProbeState(fake_pool)
state.reset_misses("9999") # must not raise
def test_should_skip_below_threshold(self, fake_pool):
state = ProbeState(fake_pool)
state.record_miss("1")
state.record_miss("1")
assert not state.should_skip("1", threshold=3, multiplier=2, max_skip=48, cycle=1)
def test_should_skip_at_threshold(self, fake_pool):
state = ProbeState(fake_pool)
for _ in range(3):
state.record_miss("1")
assert not state.should_skip("1", threshold=3, multiplier=2, max_skip=48, cycle=1)
def test_should_skip_above_threshold(self, fake_pool):
state = ProbeState(fake_pool)
for _ in range(4):
state.record_miss("1")
assert state.should_skip("1", threshold=3, multiplier=2, max_skip=48, cycle=1)
assert not state.should_skip("1", threshold=3, multiplier=2, max_skip=48, cycle=2)
def test_should_skip_respects_max_skip(self, fake_pool):
state = ProbeState(fake_pool)
for _ in range(20):
state.record_miss("1")
assert not state.should_skip("1", threshold=3, multiplier=2, max_skip=4, cycle=4)
assert state.should_skip("1", threshold=3, multiplier=2, max_skip=4, cycle=1)
def test_touch_poll(self, fake_pool):
state = ProbeState(fake_pool)
before = time.time()
state.touch_poll()
assert state.last_poll >= before
def test_save_is_noop(self, fake_pool):
"""save() is a no-op; data persists immediately via the pool."""
state = ProbeState(fake_pool)
state.mark_discovered("https://example.com/D1234R0.pdf")
state.save() # must not raise
state2 = ProbeState(fake_pool) # same pool → same store
assert state2.is_discovered("https://example.com/D1234R0.pdf")
def test_miss_counts_property_returns_all(self, fake_pool):
state = ProbeState(fake_pool)
state.record_miss("100")
state.record_miss("200")
state.record_miss("200")
mc = state.miss_counts
assert mc["100"] == 1
assert mc["200"] == 2
# ── UserWatchlist ─────────────────────────────────────────────────────────────
class TestUserWatchlist:
def test_add_author_returns_true(self, fake_pool):
wl = UserWatchlist(fake_pool)
assert wl.add("U1", "Niebler") is True
def test_add_author_stored_lowercase(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "NIEBLER")
entries = wl.list_entries("U1")
assert ("niebler", "author") in entries
def test_add_paper_number_detected_as_paper(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "2300")
entries = wl.list_entries("U1")
assert ("2300", "paper") in entries
def test_add_duplicate_returns_false(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "Niebler")
assert wl.add("U1", "Niebler") is False
def test_add_case_insensitive_dedup(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "NIEBLER")
assert wl.add("U1", "niebler") is False
def test_add_empty_string_returns_false(self, fake_pool):
wl = UserWatchlist(fake_pool)
assert wl.add("U1", "") is False
assert wl.add("U1", " ") is False
def test_remove_existing_returns_true(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "Niebler")
assert wl.remove("U1", "Niebler") is True
assert wl.list_entries("U1") == []
def test_remove_nonexistent_returns_false(self, fake_pool):
wl = UserWatchlist(fake_pool)
assert wl.remove("U1", "Nobody") is False
def test_list_entries_empty(self, fake_pool):
wl = UserWatchlist(fake_pool)
assert wl.list_entries("U1") == []
def test_list_entries_only_for_requested_user(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "niebler")
wl.add("U2", "baker")
assert wl.list_entries("U1") == [("niebler", "author")]
assert wl.list_entries("U2") == [("baker", "author")]
def test_list_entries_sorted_by_type_then_entry(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "niebler")
wl.add("U1", "2300")
wl.add("U1", "baker")
entries = wl.list_entries("U1")
types = [t for _, t in entries]
# authors come after paper (alphabetically "author" < "paper")
assert types == sorted(types)
def test_get_all_watched_paper_nums_empty(self, fake_pool):
wl = UserWatchlist(fake_pool)
assert wl.get_all_watched_paper_nums() == set()
def test_get_all_watched_paper_nums_union(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "2300")
wl.add("U2", "2301")
wl.add("U2", "niebler") # author — should not appear
nums = wl.get_all_watched_paper_nums()
assert nums == {2300, 2301}
def test_matches_for_users_author_match(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "niebler")
paper = Paper(id="P2300R11", title="X", author="Eric Niebler")
result = wl.matches_for_users([paper], [])
assert "U1" in result
matched_papers = [p for p, _ in result["U1"].papers]
assert paper in matched_papers
def test_matches_for_users_paper_match(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "2300")
paper = Paper(id="P2300R11", title="X", author="Unknown")
result = wl.matches_for_users([paper], [])
assert "U1" in result
def test_matches_for_users_no_match(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "baker")
paper = Paper(id="P2300R11", title="X", author="Unknown Author")
result = wl.matches_for_users([paper], [])
assert "U1" not in result
def test_matches_for_users_empty_watchlist(self, fake_pool):
wl = UserWatchlist(fake_pool)
paper = Paper(id="P2300R11", title="X", author="Niebler")
assert wl.matches_for_users([paper], []) == {}
def test_matches_for_users_probe_hit_author(self, fake_pool):
from paperscout.sources import ProbeHit
wl = UserWatchlist(fake_pool)
wl.add("U1", "niebler")
hit = ProbeHit(
url="https://isocpp.org/files/papers/D9999R0.pdf",
prefix="D",
number=9999,
revision=0,
extension=".pdf",
tier="frontier",
front_text="written by niebler",
is_recent=True,
)
result = wl.matches_for_users([], [hit])
assert "U1" in result
assert len(result["U1"].probe_hits) == 1
def test_matches_for_users_probe_hit_paper_number(self, fake_pool):
from paperscout.sources import ProbeHit
wl = UserWatchlist(fake_pool)
wl.add("U1", "9999")
hit = ProbeHit(
url="https://isocpp.org/files/papers/D9999R0.pdf",
prefix="D",
number=9999,
revision=0,
extension=".pdf",
tier="watchlist",
is_recent=True,
)
result = wl.matches_for_users([], [hit])
assert "U1" in result
# ── PaperCache edge cases ──────────────────────────────────────────────────────
class TestPaperCacheInvalidJson:
def test_read_returns_none_when_json_decode_fails(self, fake_pool, caplog):
fake_pool.seed_paper_cache_invalid_json()
cache = PaperCache(fake_pool)
with caplog.at_level(logging.WARNING):
assert cache.read() is None
assert "Failed to parse cached index JSON" in caplog.text
# ── Transaction rollback ───────────────────────────────────────────────────────
class TestConnRollback:
def test_user_watchlist_add_rolls_back_on_commit_failure(self, fake_pool):
fake_pool.fail_on_commit = True
wl = UserWatchlist(fake_pool)
with pytest.raises(RuntimeError, match="simulated"):
wl.add("U1", "alice")
assert fake_pool.rollback_count == 1
# ── Direct watchlist seeding (invalid DB rows) ─────────────────────────────────
class TestUserWatchlistRawSeed:
def test_get_all_watched_skips_non_numeric_paper_entries(self, fake_pool):
fake_pool.seed_watchlist_raw([("U1", "oops", "paper")])
wl = UserWatchlist(fake_pool)
assert wl.get_all_watched_paper_nums() == set()
def test_matches_skips_bad_paper_row_author_match_still_works(self, fake_pool):
fake_pool.seed_watchlist_raw([("U1", "oops", "paper"), ("U1", "alice", "author")])
wl = UserWatchlist(fake_pool)
paper = Paper(id="P2300R11", title="X", author="Alice Wonder")
result = wl.matches_for_users([paper], [])
assert "U1" in result
reasons = [r for _, r in result["U1"].papers]
assert "author" in reasons
def test_matches_paper_with_none_number_never_paper_matched(self, fake_pool):
wl = UserWatchlist(fake_pool)
wl.add("U1", "2300")
paper = Paper(id="UNKNOWN", title="X", author="Someone")
assert paper.number is None
result = wl.matches_for_users([paper], [])
assert "U1" not in result