Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fix-cache-poisoning-on-manual-writes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/query-db-collection': patch
---

fix: invalidate stale predicate-scoped cache entries on manual-sync writes

In `syncMode: 'on-demand'`, manual-sync writes (`writeInsert`/`writeUpdate`/`writeDelete`/`writeUpsert`/`writeBatch`) used to overwrite every cache entry under the collection's `queryKey` with the full post-write `syncedData` snapshot — regardless of the predicate that originally produced each entry. For stale entries (no live observer) within `gcTime`, this stamped them with rows that didn't satisfy their original `where`. A subsequent cache-hit re-subscribe re-applied those wrong rows via `applySuccessfulResult`, the per-subscription `where` filter discarded them, and the subscriber received `[]` for predicates whose matching rows still existed in the source.

Stale cache entries are now invalidated (`removeQueries`) instead of overwritten, forcing the next subscribe to re-run `queryFn` against the source of truth. Active entries continue to receive the full snapshot (predicate scoping for the consumer is enforced downstream by `subscribeChanges`'s `where`). The original ghost-row protection is preserved because deleted rows no longer reappear from a stale snapshot.
37 changes: 23 additions & 14 deletions packages/query-db-collection/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1813,29 +1813,38 @@ export function queryCollectionOptions(
}

/**
* Updates the query cache with new items for ALL query keys matching this collection,
* including stale/inactive cache entries from destroyed observers.
* Updates the query cache after a manual-sync write.
*
* This prevents ghost items: when an observer is destroyed but gcTime > 0, TanStack Query
* keeps the cached data. If syncedData changes (via writeDelete/writeInsert/writeUpdate)
* after the observer is destroyed, the stale cache becomes inconsistent. When a new observer
* later picks up this stale cache, makeQueryResultHandler would create spurious sync
* operations (re-inserting deleted items, reverting updated values, etc).
* Active entries (observer still subscribed): write the full syncedData
* snapshot back. Predicate scoping for the consumer is enforced downstream
* by the per-subscription `where` filter on `subscribeChanges`.
*
* By updating all cache entries (active and stale), we ensure the cache always reflects
* the current syncedData state.
* Stale entries (no live observer): invalidate by removing the entry. Each
* entry was originally produced by `queryFn` for a specific predicate, so
* stamping it with the full snapshot would poison it — rows that don't
* satisfy the predicate end up in the cache, then a cache-hit re-subscribe
* within `gcTime` re-applies those wrong rows via `applySuccessfulResult`,
* the subscription's `where` filter discards them, and the subscriber sees
* `[]`. The next subscribe will re-run `queryFn` against the source of
* truth, which also preserves ghost-row protection (deleted rows won't
* reappear from a stale snapshot).
*/
const updateCacheData = (items: Array<any>): void => {
const allCached = queryClient.getQueryCache().findAll({ queryKey: baseKey })

if (allCached.length > 0) {
for (const query of allCached) {
updateCacheDataForKey(query.queryKey, items)
}
} else {
if (allCached.length === 0) {
// Fallback: no queries in cache yet, seed the base query key.
// This handles the case where updateCacheData is called before any queries are created.
updateCacheDataForKey(baseKey, items)
return
}

for (const query of allCached) {
if (!state.observers.has(hashKey(query.queryKey))) {
queryClient.removeQueries({ queryKey: query.queryKey, exact: true })
continue
}
updateCacheDataForKey(query.queryKey, items)
}
}

Expand Down
169 changes: 169 additions & 0 deletions packages/query-db-collection/tests/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { QueryClient, hashKey } from '@tanstack/query-core'
import {
BTreeIndex,
compileSingleRowExpression,
createCollection,
createLiveQueryCollection,
eq,
Expand Down Expand Up @@ -6342,4 +6343,172 @@ describe(`QueryCollection`, () => {
customQueryClient.clear()
})
})

describe(`Predicate-scoped cache invalidation on manual writes`, () => {
// Each cache entry under an on-demand collection's queryKey was originally
// produced by queryFn with a specific where predicate. A manual-sync write
// (writeInsert/writeUpdate/writeDelete/writeUpsert) used to overwrite every
// such entry with the full post-write syncedData snapshot, including rows
// that didn't satisfy the entry's predicate. After unsubscribe + re-subscribe
// within gcTime, the cache-hit re-applied those wrong rows, the per-
// subscription where filter discarded them, and the subscriber saw [].
//
// The fix invalidates stale (no-live-observer) cache entries on each write,
// forcing the next subscribe to re-run queryFn against the source of truth.

interface ScopedRow {
id: string
category: string
}

function makeQueryFn(getServerRows: () => Array<ScopedRow>) {
return async (ctx: QueryFunctionContext) => {
const where = (ctx.meta as { loadSubsetOptions?: { where?: any } })
.loadSubsetOptions?.where
if (!where) return getServerRows()
const evaluator = compileSingleRowExpression(where)
return getServerRows().filter((r) =>
evaluator(r as unknown as Record<string, unknown>),
)
}
}

function makeCollection(serverRows: Array<ScopedRow>) {
const customQueryClient = new QueryClient({
defaultOptions: {
queries: {
gcTime: 5 * 60 * 1000,
staleTime: Infinity,
retry: false,
},
},
})

const collection = createCollection(
queryCollectionOptions<ScopedRow>({
id: `predicate-cache-${Math.random()}`,
queryClient: customQueryClient,
queryKey: [`predicate-cache-test`],
syncMode: `on-demand`,
getKey: (r) => r.id,
queryFn: makeQueryFn(() => serverRows),
onInsert: async () => ({ refetch: false }),
onUpdate: async () => ({ refetch: false }),
onDelete: async () => ({ refetch: false }),
}),
)

return { collection, customQueryClient }
}

it(`should return correct rows on re-subscribe after an unrelated writeUpsert`, async () => {
const serverRows: Array<ScopedRow> = [
{ id: `1`, category: `A` },
{ id: `2`, category: `B` },
]
const { collection, customQueryClient } = makeCollection(serverRows)

// Subscribe to category=A, then unsubscribe. Leaves a stale cache entry
// whose contents should be [{id:1}].
const wave1 = createLiveQueryCollection({
query: (q) =>
q
.from({ item: collection })
.where(({ item }) => eq(item.category, `A`)),
})
await wave1.preload()
expect(wave1.toArray.map((r) => r.id)).toEqual([`1`])
await wave1.cleanup()

// Manual write of a row outside the predicate. Before the fix, this
// overwrote every cache entry with the full snapshot, poisoning
// category=A's stale entry with [{id:1},{id:2},{id:99}].
collection.utils.writeUpsert({ id: `99`, category: `unrelated` })

// Re-subscribe to the same predicate. With the fix, the stale entry was
// invalidated and queryFn re-runs, returning only the matching row.
const wave2 = createLiveQueryCollection({
query: (q) =>
q
.from({ item: collection })
.where(({ item }) => eq(item.category, `A`)),
})
await wave2.preload()
expect(wave2.toArray.map((r) => r.id)).toEqual([`1`])

await wave2.cleanup()
customQueryClient.clear()
})

it(`should pick up newly inserted rows that match the predicate on re-subscribe`, async () => {
const serverRows: Array<ScopedRow> = [{ id: `1`, category: `A` }]
const { collection, customQueryClient } = makeCollection(serverRows)

const wave1 = createLiveQueryCollection({
query: (q) =>
q
.from({ item: collection })
.where(({ item }) => eq(item.category, `A`)),
})
await wave1.preload()
expect(wave1.toArray.map((r) => r.id)).toEqual([`1`])
await wave1.cleanup()

// Insert a new matching row both server-side and via writeInsert.
serverRows.push({ id: `2`, category: `A` })
collection.utils.writeInsert({ id: `2`, category: `A` })

const wave2 = createLiveQueryCollection({
query: (q) =>
q
.from({ item: collection })
.where(({ item }) => eq(item.category, `A`)),
})
await wave2.preload()
expect(wave2.toArray.map((r) => r.id).sort()).toEqual([`1`, `2`])

await wave2.cleanup()
customQueryClient.clear()
})

it(`should not poison a stale narrow predicate when a write lands in a broader active predicate`, async () => {
const serverRows: Array<ScopedRow> = [{ id: `1`, category: `A` }]
const { collection, customQueryClient } = makeCollection(serverRows)

// Narrow predicate (id=1) subscribes then unsubscribes — leaves a stale
// cache entry containing [{id:1}].
const narrow = createLiveQueryCollection({
query: (q) =>
q.from({ item: collection }).where(({ item }) => eq(item.id, `1`)),
})
await narrow.preload()
await narrow.cleanup()

// Broader predicate (category=A) becomes active.
const broad = createLiveQueryCollection({
query: (q) =>
q
.from({ item: collection })
.where(({ item }) => eq(item.category, `A`)),
})
await broad.preload()

// Insert a row in the broader scope but outside the narrow predicate.
serverRows.push({ id: `2`, category: `A` })
collection.utils.writeInsert({ id: `2`, category: `A` })

// Re-subscribing to the narrow predicate must still return only id=1, not
// be poisoned by id=2 from the active broader entry.
const narrow2 = createLiveQueryCollection({
query: (q) =>
q.from({ item: collection }).where(({ item }) => eq(item.id, `1`)),
})
await narrow2.preload()
expect(narrow2.toArray.map((r) => r.id)).toEqual([`1`])

await narrow2.cleanup()
await broad.cleanup()
customQueryClient.clear()
})
})
})
Loading