Skip to content

feat(db): expose tracked source record subscriptions#1500

Open
lalitkapoor wants to merge 3 commits intoTanStack:mainfrom
lalitkapoor:lalitkapoor--subscribe-tracked
Open

feat(db): expose tracked source record subscriptions#1500
lalitkapoor wants to merge 3 commits intoTanStack:mainfrom
lalitkapoor:lalitkapoor--subscribe-tracked

Conversation

@lalitkapoor
Copy link
Copy Markdown

@lalitkapoor lalitkapoor commented Apr 28, 2026

Description

Two new APIs let application code observe which source records a live query is consuming, so callers can subscribe/unsubscribe at an underlying sync layer in lockstep with query usage.

What's added

type TrackedSourceRecord = {
  collectionId: string
  key: string | number
}

type TrackedSourceRecordsChange = {
  added: Array<TrackedSourceRecord>
  removed: Array<TrackedSourceRecord>
}

type SubscribeTrackedSourceRecordsOptions = {
  includeInitialState?: boolean
}

Methods on every Collection:

collection.getTrackedSourceRecords(): Array<TrackedSourceRecord>
collection.subscribeTrackedSourceRecords(
  callback: (change: TrackedSourceRecordsChange) => void,
  options?: SubscribeTrackedSourceRecordsOptions,
): () => void

Same method, polymorphic dispatch:

  • On a live-query collection → "records this query reads from its sources." Refcounted across the query's aliases (self-joins are deduped). Visible only while the query has active subscribers.
  • On any other collection (base, electric, query-db, localStorage, sqlite-persisted, etc.) → "records of mine consumed by some live query." Refcounted across all live queries that depend on this collection. If two queries use key K, the base view sees K once; only when the last query stops using K does it emit removed.

Why

Useful for custom sync layers that subscribe/unsubscribe at the record level. They need to subscribe when a record becomes actively used by queries and unsubscribe when the last query stops using it. Base-collection residency is too broad — it covers everything sync'd, whether queries want it or not. These APIs provide the narrower "actively used by live queries" signal.

How an app developer uses this

On a single live query (per-query scope):

function ActiveUsersList() {
  const activeUsers = useMemo(
    () =>
      createLiveQueryCollection((q) =>
        q.from({ user: usersCollection }).where(({ user }) => eq(user.active, true)),
      ),
    [],
  )

  // The hook drives sync (it calls subscribeChanges internally), so the
  // tracked-source listener will start receiving deltas as the live query
  // mounts and reads from its sources.
  useEffect(() => {
    return activeUsers.subscribeTrackedSourceRecords(({ added, removed }) => {
      for (const r of added) mySyncLayer.subscribe(r.collectionId, r.key)
      for (const r of removed) mySyncLayer.unsubscribe(r.collectionId, r.key)
    })
  }, [activeUsers])

  const { data } = useLiveQuery(activeUsers)
  return <ul>{data?.map((u) => <li key={u.id}>{u.name}</li>)}</ul>
}

On a base collection (union scope across all live queries):

const unsubscribe = usersCollection.subscribeTrackedSourceRecords(
  ({ added, removed }) => {
    // Fires only when the first query starts tracking a key (0→1),
    // or the last one stops (1→0).
    for (const r of added) mySyncLayer.subscribe(r.collectionId, r.key)
    for (const r of removed) mySyncLayer.unsubscribe(r.collectionId, r.key)
  },
  { includeInitialState: true },
)

The same pattern works for any collection that uses CollectionImpl under the hood — electric, query-db, localStorage, sqlite-persisted, etc. The feature is sync-backend-agnostic because tracking happens in the live-query pipeline (the IVM), not in the sync config.

Behaviour notes:

  • includeInitialState: true replays the current snapshot as added on subscribe (if any records are currently tracked and exposed).
  • 0 → 1+ subscribers (a live query gains its first consumer): the per-query view emits added for every key the query is reading; the base view emits added for any key reaching refcount 1.
  • 1+ → 0 subscribers (last consumer leaves): the per-query view emits removed for every key it was reading; the base view emits removed for any key whose refcount drops to 0.
  • Stable-membership updates (an ordered query updates a row that stays in range) emit nothing.
  • must-refetch / truncate-refetch cycles that end with the same keys still tracked emit nothing.

How this works internally

Three concentric scopes, each with a real owner.

Scope 1 — alias-level: CollectionSubscriber.sentToD2Keys

For every alias in a live query (e.g. { user: usersCollection }), one CollectionSubscriber is created. It already maintained sentToD2Keys: Set<string | number> to deduplicate inserts into the D2 dataflow graph — the authoritative per-alias record of "keys this alias is feeding the query."

This PR turns that set's mutations into events. On each sendChangesToPipeline, after filterDuplicateInserts has mutated the set, the subscriber derives a net delta from the filtered change stream itself — every surviving insert is a 0→1 transition for its key, every surviving delete is a 1→0 transition. Insert/delete pairs for the same key within one batch (emitted by splitUpdates for stable-membership ordered updates) net to zero. Cost is O(|batch|) — no dependency on the size of sentToD2Keys. The result is emitted via a single public callback property assigned 1:1 by the builder.

Scope 2 — per-live-query: LiveQueryTrackedSourceRecordsAggregator

One aggregator per sync session. Owns a nested refcount map:

private readonly entries = new Map<string, Map<string | number, { refCount: number }>>()
private exposed = false

Outer key is collectionId, inner key is the source record's primitive key — no composite serialization. The refcount is over aliases: a self-join like { employee: emp, manager: emp } has two subscribers emitting independently, and the aggregator dedupes. apply(collectionId, added, removed):

  1. Refcount the adds/removes against the per-collection inner map. 0→1 transitions go into netAdded, 1→0 into netRemoved. Empty inner buckets are pruned.
  2. If exposed: propagate net transitions to sourceCollections[collectionId]._trackedSourceRecords.apply(...) (scope 3), and fan out directly to the live-query-local listener set (held by reference from the builder).

setExposed(boolean) is the lifecycle gate. false → true replays the snapshot as added. true → false replays as removed. The builder wires this to subscribers:change:

config.collection.on('subscribers:change', (event) => {
  trackedSourceRecordsAggregator.setExposed(event.subscriberCount > 0)
})

Scope 3 — per-base-collection: TrackedSourceRecordsManager

Each CollectionImpl owns one _trackedSourceRecords: TrackedSourceRecordsManager<TKey> field. Refcount map is Map<TKey, { key, refCount }> over live queries. On 0↔1 transitions, listeners registered via subscribe see added / removed. Record-object allocation is skipped when the listener set is empty.

Polymorphic dispatch on Collection.subscribeTrackedSourceRecords

CollectionImpl has an optional _liveQueryTrackedSourceView field. It's undefined on every collection except live queries; bridgeToCreateCollection sets it at construction time to a stable adapter on CollectionConfigBuilder (which routes through whatever aggregator the current sync session has). The public methods on CollectionImpl:

public getTrackedSourceRecords(): Array<TrackedSourceRecord> {
  return this._liveQueryTrackedSourceView?.snapshot()
    ?? this._trackedSourceRecords.get()
}

public subscribeTrackedSourceRecords(callback, options?): () => void {
  if (this._liveQueryTrackedSourceView) {
    return this._liveQueryTrackedSourceView.subscribe(callback, options)
  }
  return this._trackedSourceRecords.subscribe(callback, options)
}

So callers don't need to branch on collection type. Same method name, scope determined by what kind of collection you're holding.

The live-query listener set lives on CollectionConfigBuilder (long-lived). The aggregator holds it by reference. External subscribers attached before the first sync session, or across session start/stop cycles, do not need to re-attach.

End-to-end flow:

CollectionSubscriber
  sentToD2Keys mutates (filterDuplicateInserts)
    count inserts/deletes in filtered batch → net delta
    → onTrackedKeysChange({ added, removed })            [direct callback, 1:1]
      ↓
LiveQueryTrackedSourceRecordsAggregator
  apply(collectionId, added, removed)
    refcount per (collectionId → key) in nested map
    if exposed:
      → sourceCollections[id]._trackedSourceRecords.apply(netAdded, netRemoved)
      if liveQuery listeners.size > 0:
        for listener of listeners: listener({ added: records, removed: records })
                              ↑
                              liveQuery.subscribeTrackedSourceRecords
                              (held by reference; survives sync sessions)
        ↓
TrackedSourceRecordsManager (on each source collection)
  apply(netAdded, netRemoved)
    refcount per key
    if 0↔1 transitions AND listeners.size > 0:
      for listener of listeners: listener({ added: records, removed: records })
                              ↑
                              baseCollection.subscribeTrackedSourceRecords

Performance characteristics

All hot paths are bounded by batch size, with no dependency on the total tracked-set size:

  • Scope 1 delta: O(|batch|) via insert/delete count netting on the filtered stream. No copy of sentToD2Keys.
  • Scope 2 refcount: O(|batch|) using a nested Map<collectionId, Map<key, Entry>> — primitive keys, no serialization.
  • Scope 3 refcount: O(|batch|) using Map<TKey, Entry> — primitive keys, no serialization.
  • Record-object allocation at scopes 2 and 3 is gated on a non-empty listener set. Pure-internal batches (refcount updates with no app-code subscribers) skip the object materialization entirely.

Why not just expose CollectionSubscription.subscribedKeys?

Two reasons:

  1. sentToD2Keys is per-alias. A live query with multiple aliases on the same collection (self-joins) would require consumers to dedupe. Scope 2 exists precisely to handle that.
  2. subscribeChanges buffers truncate/refetch to hide transient empty states. subscribeTrackedSourceRecords needs to emit net membership. Deriving one from the other would mix concerns.

Three scopes with clear ownership match the three semantic views application code may want.

What's not changed

The feature is purely additive on CollectionImpl. Collection.utils and createCollection are byte-identical to upstream — no in-place mutation of user-supplied utils, no merge-precedence changes. The live-query path's utils merge stays at upstream's { ...options.utils, ...config.utils } (config wins). Tracked-source helpers are not on utils at all; they're direct methods on CollectionImpl.

How was this change tested?

  • Automated test (unit, integration, etc.)
  • Manual test (provide reproducible testing steps below)

Regression coverage in packages/db/tests/query/live-query-collection.test.ts:

  • Base-view delta flow with includeInitialState, plus snapshot-as-removed on last-subscriber unsubscribe.
  • Base-view ref-counting across overlapping live queries (key not removed until the last query stops using it).
  • Stable-membership updates emit nothing (ordered query, in-range update).
  • Truncate-refetch cycles that keep the same keys tracked emit nothing.
  • Live-query-local view: delta flow + snapshot-as-removed on unsubscribe.
  • Live-query-local view: includeInitialState replay.
  • Polymorphic-scope test: two live queries with disjoint where-clauses on the same base, asserting the base view sees the union and each per-query view sees only its own scope.

Plus a regression test in packages/db/tests/utility-exposure.test.ts pinning that user-supplied utils keys override built-ins on collision (the upstream merge-precedence contract).

The feature lives entirely on CollectionImpl and the live-query pipeline (CollectionSubscriber → aggregator → manager) — none of the sync backends are touched, so the in-memory tests exercise the same code paths that electric, query-db, localStorage, etc. would hit.

Screenshots

lalitkapoor and others added 2 commits April 28, 2026 13:33
Adds a `getTrackedSourceRecords` / `subscribeTrackedSourceRecords` API on
both base collections and live-query collections so consumers can observe
which source records the IVM pipeline is currently using.

Architecture (three scopes):
- Per-alias dedup (`sentToD2Keys` in CollectionSubscriber) — net add/remove
  transitions only, no churn from updates that keep the same key in range.
- Per-live-query aggregator — refcounts across aliases (handles self-joins),
  exposed only while the live query has subscribers.
- Per-base-collection manager — refcounts across all live queries that
  touch the collection.

The aggregator propagates net 0↔1 transitions to each source collection's
`_trackedSourceRecords` manager, gated on the live query being exposed
(via `subscribers:change`).

Performance:
- O(|batch|) on the hot path — net counting, no O(N) Set copies.
- Skips record-object allocation when no listeners are attached.
- Truncate path no longer eagerly clears `sentToD2Keys`; the truncate's
  delete events drain it through `filterDuplicateInserts`, and the merged
  delete + re-insert batch nets to zero (no spurious churn).

Includes regression tests for stable-membership ordered queries,
truncate-refetch parity, and lazy-join key dedup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comments 1, 2, and 3 from kevin-dp:

1. Stale comment in `collection-subscriber.ts` truncate handler — updated
   to explain why `sentToD2Keys.clear()` is no longer needed (truncate's
   delete events drain it through `filterDuplicateInserts`, and the
   merged delete+re-insert batch nets to zero).

2. Shared `utils` mutation footgun — eliminated. `createCollection` is
   now byte-identical to upstream (`collection.utils = options.utils ?? {}`,
   no in-place mutation). Tracked-source helpers moved off `utils` to
   direct Collection methods (`subscribeTrackedSourceRecords`,
   `getTrackedSourceRecords`), so the entire shared-utils surface area
   for this feature is gone.

3. Reversed merge precedence — restored upstream
   `{ ...options.utils, ...config.utils }` (config wins).

Polymorphic view restoration:

The earlier cleanup removed the live-query-local view (the per-query
"records this query reads from its sources"). Restored under the same
method name on `Collection` via polymorphic dispatch:

  - Base collection -> `_trackedSourceRecords` manager
    ("records of mine consumed by live queries")
  - Live-query collection -> `_liveQueryTrackedSourceView`
    ("records this query reads from its sources")

`bridgeToCreateCollection` wires the live-query view at construction
time. `LiveQueryTrackedSourceRecordsAggregator` regains its listener
fan-out; `CollectionConfigBuilder` owns a long-lived listener Set that
survives sync sessions, so external subscribers don't need to re-attach
across start/stop cycles.

Three new tests cover the live-query-local view: snapshot deltas,
includeInitialState replay, and parity with the base-collection view.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@kevin-dp kevin-dp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lalitkapoor

Thanks for your contribution!
I reviewed your PR and left some comments. I also asked Claude to review and it reported some test gaps, may be worth adding those tests:

Test gap: TrackedSourceRecordsManager.subscribe includeInitialState replay

In every base-collection test, subscribeTrackedSourceRecords is called before subscribeChanges, so the manager is empty when the listener attaches and the initial-state replay path:

// tracked-source-records.ts
if (options?.includeInitialState && this.entries.size > 0) {
  callback({ added: this.get(), removed: [] })
}

is never executed. The mirror path in liveQueryTrackedSourceView.subscribe is covered (test "should replay the live-query-local snapshot as initial state…"). Worth adding a base-collection counterpart: start a live query, preload, then subscribe to the base collection with includeInitialState: true and assert one immediate added event.

Test gap: nested live queries

The PR description says a live query whose source is another live query "works because _trackedSourceRecords lives on CollectionImpl." That's true mechanically — nothing crashes — but there's no end-to-end test, and the design has an interesting implication: liveQueryB will populate liveQueryA._trackedSourceRecords (the base manager), but liveQueryA.subscribeTrackedSourceRecords will always route through _liveQueryTrackedSourceView and never expose that data. So the base-view of any live-query collection is silently unreachable. Either document that, or skip allocating _trackedSourceRecords when _liveQueryTrackedSourceView is set.

Test gap: truncate-refetch with different keys

The truncate test (should not emit tracked source churn across truncate refetch…) only covers the case where the refetched keys equal the previous keys. The interesting opposite case — refetch returns a different set, which should net to non-zero added/removed — isn't covered. Worth adding to pin the contract that real membership changes do still emit across a truncate cycle.

// Aggregated view of source-records currently being used by active live
// queries that depend on this collection. Public so live-query aggregators
// can push deltas in.
public _trackedSourceRecords: TrackedSourceRecordsManager<TKey>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird to have underscored variables that are exposed publicly. We could replace the direct cross-class access with a method on CollectionImpl (e.g. applyTrackedSourceDelta(added, removed) that delegates internally). Then the field can be private.

// during construction; undefined on base collections. When present, the
// public `getTrackedSourceRecords` / `subscribeTrackedSourceRecords`
// methods route to this view instead of `_trackedSourceRecords`.
public _liveQueryTrackedSourceView?: {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, it's a bit weird to have this public underscored variable. We could avoid this by passing the view through CollectionConfig (or a CollectionImpl constructor argument) instead of assigning it post-construction. Then the field can be readonly and the assignment site moves into the constructor. The builder already exists at the moment bridgeToCreateCollection calls createCollection, so this is mechanically straightforward — just thread it through CollectionConfig.

[LIVE_QUERY_INTERNAL]: LiveQueryInternalUtils
}

export type LiveQueryCollectionUtils<TUtils extends UtilsRecord = {}> = TUtils &
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old: LiveQueryCollectionUtils = UtilsRecord & { built-ins } (string-indexable).
New: LiveQueryCollectionUtils<TUtils = {}> = TUtils & LiveQueryBuiltInUtils.

LiveQueryCollectionUtils (no type arg) is now {} & LiveQueryBuiltInUtils, which lacks the Record<string, any> index signature the old type had. Anyone doing const u: LiveQueryCollectionUtils = ...; u.someCustomKey without specifying TUtils will see a new type error.

}

return collection
return collection as unknown as Collection<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we pass these type args to CollectionImpl (in particular UtilsRecord) such that we don't have to do an unsafe typecast here?

}
}

for (const key of removed) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it happen that a key wasn't known until now and occurs in both added and removed?
In that case it would be part of both netAdded and netRemoved, is that problematic?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked Claude about this, i agree with its answer and would go for adding built-in defense mechanism.


Not from the current callers, but the manager doesn't enforce it. Tracing upward:

  • CollectionSubscriber.emitTrackedKeyDelta nets per-key first (Map<key, number>), then partitions by sign — so added and removed it emits are disjoint by construction.
  • LiveQueryTrackedSourceRecordsAggregator.apply is called with that disjoint pair (single subscriber → single call), and its own netAdded / netRemoved likewise can't share a key within one call.
  • TrackedSourceRecordsManager.apply is only called by the aggregator, with the disjoint netAdded / netRemoved.

So end-to-end the precondition holds today. But if it were violated — added and removed both containing a key K that's not in entries — the manager would:

  1. Process added: entries.set(K, { refCount: 1 }), push K to netAdded.
  2. Process removed: existing.refCount === 1entries.delete(K), push K to netRemoved.

Net result: listeners get { added: [K], removed: [K] } for a key that was never actually tracked. A sync layer wired to this would subscribe(K) then immediately unsubscribe(K) for nothing. That is problematic.

Two other overlap shapes are fine: if K already exists with refCount ≥ 1, the add/remove pair leaves refCount unchanged and emits nothing (the if (existing.refCount === 1) guard correctly skips netRemoved when refCount is now > 1). Only the brand-new-key case is buggy.

The aggregator's apply has the identical shape and the same latent fragility.

Two options:

  • Document the precondition in a JSDoc on both apply methods: "added and removed must be disjoint; duplicates within either must be deduplicated by the caller." Cheap, makes the assumption explicit, leaves it to future contributors to honor.
  • Net defensively — same trick as emitTrackedKeyDelta:
    const net = new Map<TKey, number>()
    for (const k of added) net.set(k, (net.get(k) ?? 0) + 1)
    for (const k of removed) net.set(k, (net.get(k) ?? 0) - 1)
    // then iterate net entries by sign
    Costs one Map allocation per call but eliminates the entire class of caller mistakes (overlap and duplicates within either array).

I'd lean toward netting in the manager (it's the public-API boundary — callers reach through _trackedSourceRecords from outside the class, so the contract is harder to enforce by code review), and just documenting the precondition on the aggregator (private-ish, single internal caller).

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented Apr 30, 2026

More templates

@tanstack/angular-db

npm i https://pkg.pr.new/@tanstack/angular-db@1500

@tanstack/browser-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/browser-db-sqlite-persistence@1500

@tanstack/capacitor-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/capacitor-db-sqlite-persistence@1500

@tanstack/cloudflare-durable-objects-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/cloudflare-durable-objects-db-sqlite-persistence@1500

@tanstack/db

npm i https://pkg.pr.new/@tanstack/db@1500

@tanstack/db-ivm

npm i https://pkg.pr.new/@tanstack/db-ivm@1500

@tanstack/db-sqlite-persistence-core

npm i https://pkg.pr.new/@tanstack/db-sqlite-persistence-core@1500

@tanstack/electric-db-collection

npm i https://pkg.pr.new/@tanstack/electric-db-collection@1500

@tanstack/electron-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/electron-db-sqlite-persistence@1500

@tanstack/expo-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/expo-db-sqlite-persistence@1500

@tanstack/node-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/node-db-sqlite-persistence@1500

@tanstack/offline-transactions

npm i https://pkg.pr.new/@tanstack/offline-transactions@1500

@tanstack/powersync-db-collection

npm i https://pkg.pr.new/@tanstack/powersync-db-collection@1500

@tanstack/query-db-collection

npm i https://pkg.pr.new/@tanstack/query-db-collection@1500

@tanstack/react-db

npm i https://pkg.pr.new/@tanstack/react-db@1500

@tanstack/react-native-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/react-native-db-sqlite-persistence@1500

@tanstack/rxdb-db-collection

npm i https://pkg.pr.new/@tanstack/rxdb-db-collection@1500

@tanstack/solid-db

npm i https://pkg.pr.new/@tanstack/solid-db@1500

@tanstack/svelte-db

npm i https://pkg.pr.new/@tanstack/svelte-db@1500

@tanstack/tauri-db-sqlite-persistence

npm i https://pkg.pr.new/@tanstack/tauri-db-sqlite-persistence@1500

@tanstack/trailbase-db-collection

npm i https://pkg.pr.new/@tanstack/trailbase-db-collection@1500

@tanstack/vue-db

npm i https://pkg.pr.new/@tanstack/vue-db@1500

commit: 50332f1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants