From b71e4a96c507b2c38b59ae88ff555caec89b06e3 Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Sun, 26 Apr 2026 17:52:02 -0500 Subject: [PATCH] fix: invalidate stale predicate-scoped cache entries on manual-sync writes --- .../fix-cache-poisoning-on-manual-writes.md | 9 + packages/query-db-collection/src/query.ts | 37 ++-- .../query-db-collection/tests/query.test.ts | 169 ++++++++++++++++++ 3 files changed, 201 insertions(+), 14 deletions(-) create mode 100644 .changeset/fix-cache-poisoning-on-manual-writes.md diff --git a/.changeset/fix-cache-poisoning-on-manual-writes.md b/.changeset/fix-cache-poisoning-on-manual-writes.md new file mode 100644 index 000000000..38f1addc2 --- /dev/null +++ b/.changeset/fix-cache-poisoning-on-manual-writes.md @@ -0,0 +1,9 @@ +--- +'@tanstack/query-db-collection': patch +--- + +fix: invalidate stale predicate-scoped cache entries on manual-sync writes + +In `syncMode: 'on-demand'`, manual-sync writes (`writeInsert`/`writeUpdate`/`writeDelete`/`writeUpsert`/`writeBatch`) used to overwrite every cache entry under the collection's `queryKey` with the full post-write `syncedData` snapshot — regardless of the predicate that originally produced each entry. For stale entries (no live observer) within `gcTime`, this stamped them with rows that didn't satisfy their original `where`. A subsequent cache-hit re-subscribe re-applied those wrong rows via `applySuccessfulResult`, the per-subscription `where` filter discarded them, and the subscriber received `[]` for predicates whose matching rows still existed in the source. + +Stale cache entries are now invalidated (`removeQueries`) instead of overwritten, forcing the next subscribe to re-run `queryFn` against the source of truth. Active entries continue to receive the full snapshot (predicate scoping for the consumer is enforced downstream by `subscribeChanges`'s `where`). The original ghost-row protection is preserved because deleted rows no longer reappear from a stale snapshot. diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index b29aac873..d84bdd74e 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -1813,29 +1813,38 @@ export function queryCollectionOptions( } /** - * Updates the query cache with new items for ALL query keys matching this collection, - * including stale/inactive cache entries from destroyed observers. + * Updates the query cache after a manual-sync write. * - * This prevents ghost items: when an observer is destroyed but gcTime > 0, TanStack Query - * keeps the cached data. If syncedData changes (via writeDelete/writeInsert/writeUpdate) - * after the observer is destroyed, the stale cache becomes inconsistent. When a new observer - * later picks up this stale cache, makeQueryResultHandler would create spurious sync - * operations (re-inserting deleted items, reverting updated values, etc). + * Active entries (observer still subscribed): write the full syncedData + * snapshot back. Predicate scoping for the consumer is enforced downstream + * by the per-subscription `where` filter on `subscribeChanges`. * - * By updating all cache entries (active and stale), we ensure the cache always reflects - * the current syncedData state. + * Stale entries (no live observer): invalidate by removing the entry. Each + * entry was originally produced by `queryFn` for a specific predicate, so + * stamping it with the full snapshot would poison it — rows that don't + * satisfy the predicate end up in the cache, then a cache-hit re-subscribe + * within `gcTime` re-applies those wrong rows via `applySuccessfulResult`, + * the subscription's `where` filter discards them, and the subscriber sees + * `[]`. The next subscribe will re-run `queryFn` against the source of + * truth, which also preserves ghost-row protection (deleted rows won't + * reappear from a stale snapshot). */ const updateCacheData = (items: Array): void => { const allCached = queryClient.getQueryCache().findAll({ queryKey: baseKey }) - if (allCached.length > 0) { - for (const query of allCached) { - updateCacheDataForKey(query.queryKey, items) - } - } else { + if (allCached.length === 0) { // Fallback: no queries in cache yet, seed the base query key. // This handles the case where updateCacheData is called before any queries are created. updateCacheDataForKey(baseKey, items) + return + } + + for (const query of allCached) { + if (!state.observers.has(hashKey(query.queryKey))) { + queryClient.removeQueries({ queryKey: query.queryKey, exact: true }) + continue + } + updateCacheDataForKey(query.queryKey, items) } } diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index 113fc0e1e..4e9cb9a23 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { QueryClient, hashKey } from '@tanstack/query-core' import { BTreeIndex, + compileSingleRowExpression, createCollection, createLiveQueryCollection, eq, @@ -6342,4 +6343,172 @@ describe(`QueryCollection`, () => { customQueryClient.clear() }) }) + + describe(`Predicate-scoped cache invalidation on manual writes`, () => { + // Each cache entry under an on-demand collection's queryKey was originally + // produced by queryFn with a specific where predicate. A manual-sync write + // (writeInsert/writeUpdate/writeDelete/writeUpsert) used to overwrite every + // such entry with the full post-write syncedData snapshot, including rows + // that didn't satisfy the entry's predicate. After unsubscribe + re-subscribe + // within gcTime, the cache-hit re-applied those wrong rows, the per- + // subscription where filter discarded them, and the subscriber saw []. + // + // The fix invalidates stale (no-live-observer) cache entries on each write, + // forcing the next subscribe to re-run queryFn against the source of truth. + + interface ScopedRow { + id: string + category: string + } + + function makeQueryFn(getServerRows: () => Array) { + return async (ctx: QueryFunctionContext) => { + const where = (ctx.meta as { loadSubsetOptions?: { where?: any } }) + .loadSubsetOptions?.where + if (!where) return getServerRows() + const evaluator = compileSingleRowExpression(where) + return getServerRows().filter((r) => + evaluator(r as unknown as Record), + ) + } + } + + function makeCollection(serverRows: Array) { + const customQueryClient = new QueryClient({ + defaultOptions: { + queries: { + gcTime: 5 * 60 * 1000, + staleTime: Infinity, + retry: false, + }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `predicate-cache-${Math.random()}`, + queryClient: customQueryClient, + queryKey: [`predicate-cache-test`], + syncMode: `on-demand`, + getKey: (r) => r.id, + queryFn: makeQueryFn(() => serverRows), + onInsert: async () => ({ refetch: false }), + onUpdate: async () => ({ refetch: false }), + onDelete: async () => ({ refetch: false }), + }), + ) + + return { collection, customQueryClient } + } + + it(`should return correct rows on re-subscribe after an unrelated writeUpsert`, async () => { + const serverRows: Array = [ + { id: `1`, category: `A` }, + { id: `2`, category: `B` }, + ] + const { collection, customQueryClient } = makeCollection(serverRows) + + // Subscribe to category=A, then unsubscribe. Leaves a stale cache entry + // whose contents should be [{id:1}]. + const wave1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + await wave1.preload() + expect(wave1.toArray.map((r) => r.id)).toEqual([`1`]) + await wave1.cleanup() + + // Manual write of a row outside the predicate. Before the fix, this + // overwrote every cache entry with the full snapshot, poisoning + // category=A's stale entry with [{id:1},{id:2},{id:99}]. + collection.utils.writeUpsert({ id: `99`, category: `unrelated` }) + + // Re-subscribe to the same predicate. With the fix, the stale entry was + // invalidated and queryFn re-runs, returning only the matching row. + const wave2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + await wave2.preload() + expect(wave2.toArray.map((r) => r.id)).toEqual([`1`]) + + await wave2.cleanup() + customQueryClient.clear() + }) + + it(`should pick up newly inserted rows that match the predicate on re-subscribe`, async () => { + const serverRows: Array = [{ id: `1`, category: `A` }] + const { collection, customQueryClient } = makeCollection(serverRows) + + const wave1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + await wave1.preload() + expect(wave1.toArray.map((r) => r.id)).toEqual([`1`]) + await wave1.cleanup() + + // Insert a new matching row both server-side and via writeInsert. + serverRows.push({ id: `2`, category: `A` }) + collection.utils.writeInsert({ id: `2`, category: `A` }) + + const wave2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + await wave2.preload() + expect(wave2.toArray.map((r) => r.id).sort()).toEqual([`1`, `2`]) + + await wave2.cleanup() + customQueryClient.clear() + }) + + it(`should not poison a stale narrow predicate when a write lands in a broader active predicate`, async () => { + const serverRows: Array = [{ id: `1`, category: `A` }] + const { collection, customQueryClient } = makeCollection(serverRows) + + // Narrow predicate (id=1) subscribes then unsubscribes — leaves a stale + // cache entry containing [{id:1}]. + const narrow = createLiveQueryCollection({ + query: (q) => + q.from({ item: collection }).where(({ item }) => eq(item.id, `1`)), + }) + await narrow.preload() + await narrow.cleanup() + + // Broader predicate (category=A) becomes active. + const broad = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + await broad.preload() + + // Insert a row in the broader scope but outside the narrow predicate. + serverRows.push({ id: `2`, category: `A` }) + collection.utils.writeInsert({ id: `2`, category: `A` }) + + // Re-subscribing to the narrow predicate must still return only id=1, not + // be poisoned by id=2 from the active broader entry. + const narrow2 = createLiveQueryCollection({ + query: (q) => + q.from({ item: collection }).where(({ item }) => eq(item.id, `1`)), + }) + await narrow2.preload() + expect(narrow2.toArray.map((r) => r.id)).toEqual([`1`]) + + await narrow2.cleanup() + await broad.cleanup() + customQueryClient.clear() + }) + }) })