Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
CollectionRequiresSyncConfigError,
} from '../errors'
import { currentStateAsChanges } from './change-events'
import { TrackedSourceRecordsManager } from './tracked-source-records.js'

import { CollectionStateManager } from './state'
import { CollectionChangesManager } from './changes'
Expand Down Expand Up @@ -34,6 +35,9 @@ import type {
SingleResult,
StringCollationConfig,
SubscribeChangesOptions,
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
Transaction as TransactionType,
UtilsRecord,
WritableDeep,
Expand Down Expand Up @@ -266,7 +270,13 @@ export function createCollection(
collection.utils = {}
}

return collection
return collection as unknown as Collection<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we pass these type args to CollectionImpl (in particular UtilsRecord) such that we don't have to do an unsafe typecast here?

any,
string | number,
UtilsRecord,
any,
any
>
}

export class CollectionImpl<
Expand Down Expand Up @@ -299,6 +309,22 @@ export class CollectionImpl<
// The core state of the collection is "public" so that is accessible in tests
// and for debugging
public _state: CollectionStateManager<TOutput, TKey, TSchema, TInput>
// Aggregated view of source-records currently being used by active live
// queries that depend on this collection. Public so live-query aggregators
// can push deltas in.
public _trackedSourceRecords: TrackedSourceRecordsManager<TKey>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird to have underscored variables that are exposed publicly. We could replace the direct cross-class access with a method on CollectionImpl (e.g. applyTrackedSourceDelta(added, removed) that delegates internally). Then the field can be private.

// For live-query collections only: a live-query-local view of "source
// records this query is currently using." Set by the live-query path
// during construction; undefined on base collections. When present, the
// public `getTrackedSourceRecords` / `subscribeTrackedSourceRecords`
// methods route to this view instead of `_trackedSourceRecords`.
public _liveQueryTrackedSourceView?: {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, it's a bit weird to have this public underscored variable. We could avoid this by passing the view through CollectionConfig (or a CollectionImpl constructor argument) instead of assigning it post-construction. Then the field can be readonly and the assignment site moves into the constructor. The builder already exists at the moment bridgeToCreateCollection calls createCollection, so this is mechanically straightforward — just thread it through CollectionConfig.

snapshot: () => Array<TrackedSourceRecord>
subscribe: (
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
) => () => void
}

/**
* When set, collection consumers should defer processing incoming data
Expand Down Expand Up @@ -354,6 +380,7 @@ export class CollectionImpl<
this._mutations = new CollectionMutationsManager(config, this.id)
this._state = new CollectionStateManager(config)
this._sync = new CollectionSyncManager(config, this.id)
this._trackedSourceRecords = new TrackedSourceRecordsManager<TKey>(this.id)

this.comparisonOpts = buildCompareOptionsFromConfig(config)

Expand Down Expand Up @@ -941,6 +968,42 @@ export class CollectionImpl<
return this._changes.subscribeChanges(callback, options)
}

/**
* Snapshot of source records currently being tracked through this
* collection's data flow.
*
* On a base collection: the union of records OF this collection being
* used by any active live query. Each record appears once regardless
* of how many queries reference it.
*
* On a live query collection: the records FROM this query's source
* collections that the query is currently using.
*
* Both views answer "what source records are currently flowing through
* me," from opposite ends of the data-flow graph.
*/
public getTrackedSourceRecords(): Array<TrackedSourceRecord> {
return (
this._liveQueryTrackedSourceView?.snapshot() ??
this._trackedSourceRecords.get()
)
}

/**
* Subscribe to changes in the set of source records tracked through this
* collection's data flow. See `getTrackedSourceRecords` for the per-
* collection-type semantics.
*/
public subscribeTrackedSourceRecords(
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
): () => void {
if (this._liveQueryTrackedSourceView) {
return this._liveQueryTrackedSourceView.subscribe(callback, options)
}
return this._trackedSourceRecords.subscribe(callback, options)
}

/**
* Subscribe to a collection event
*/
Expand Down
82 changes: 82 additions & 0 deletions packages/db/src/collection/tracked-source-records.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type {
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
} from '../types.js'

type Entry<TKey> = { key: TKey; refCount: number }

/**
* Per-base-collection tracked source records manager.
*
* Refcounts over active live queries that depend on this collection. Each
* live query's aggregator pushes its net alias-level transitions here; this
* manager dedupes across queries and emits to subscribers only on true 0↔1
* transitions.
*/
export class TrackedSourceRecordsManager<
TKey extends string | number = string | number,
> {
// Keys are primitives; use them directly as the Map key. No serialization.
private readonly entries = new Map<TKey, Entry<TKey>>()
private readonly listeners = new Set<
(change: TrackedSourceRecordsChange) => void
>()

constructor(private readonly collectionId: string) {}

apply(added: Iterable<TKey>, removed: Iterable<TKey>): void {
const netAdded: Array<TKey> = []
const netRemoved: Array<TKey> = []

for (const key of added) {
const existing = this.entries.get(key)
if (existing) {
existing.refCount++
} else {
this.entries.set(key, { key, refCount: 1 })
netAdded.push(key)
}
}

for (const key of removed) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it happen that a key wasn't known until now and occurs in both added and removed?
In that case it would be part of both netAdded and netRemoved, is that problematic?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked Claude about this, i agree with its answer and would go for adding built-in defense mechanism.


Not from the current callers, but the manager doesn't enforce it. Tracing upward:

  • CollectionSubscriber.emitTrackedKeyDelta nets per-key first (Map<key, number>), then partitions by sign — so added and removed it emits are disjoint by construction.
  • LiveQueryTrackedSourceRecordsAggregator.apply is called with that disjoint pair (single subscriber → single call), and its own netAdded / netRemoved likewise can't share a key within one call.
  • TrackedSourceRecordsManager.apply is only called by the aggregator, with the disjoint netAdded / netRemoved.

So end-to-end the precondition holds today. But if it were violated — added and removed both containing a key K that's not in entries — the manager would:

  1. Process added: entries.set(K, { refCount: 1 }), push K to netAdded.
  2. Process removed: existing.refCount === 1entries.delete(K), push K to netRemoved.

Net result: listeners get { added: [K], removed: [K] } for a key that was never actually tracked. A sync layer wired to this would subscribe(K) then immediately unsubscribe(K) for nothing. That is problematic.

Two other overlap shapes are fine: if K already exists with refCount ≥ 1, the add/remove pair leaves refCount unchanged and emits nothing (the if (existing.refCount === 1) guard correctly skips netRemoved when refCount is now > 1). Only the brand-new-key case is buggy.

The aggregator's apply has the identical shape and the same latent fragility.

Two options:

  • Document the precondition in a JSDoc on both apply methods: "added and removed must be disjoint; duplicates within either must be deduplicated by the caller." Cheap, makes the assumption explicit, leaves it to future contributors to honor.
  • Net defensively — same trick as emitTrackedKeyDelta:
    const net = new Map<TKey, number>()
    for (const k of added) net.set(k, (net.get(k) ?? 0) + 1)
    for (const k of removed) net.set(k, (net.get(k) ?? 0) - 1)
    // then iterate net entries by sign
    Costs one Map allocation per call but eliminates the entire class of caller mistakes (overlap and duplicates within either array).

I'd lean toward netting in the manager (it's the public-API boundary — callers reach through _trackedSourceRecords from outside the class, so the contract is harder to enforce by code review), and just documenting the precondition on the aggregator (private-ish, single internal caller).

const existing = this.entries.get(key)
if (!existing) continue
if (existing.refCount === 1) {
this.entries.delete(key)
netRemoved.push(existing.key)
} else {
existing.refCount--
}
}

if (netAdded.length === 0 && netRemoved.length === 0) return
if (this.listeners.size === 0) return
const change: TrackedSourceRecordsChange = {
added: netAdded.map((key) => this.toRecord(key)),
removed: netRemoved.map((key) => this.toRecord(key)),
}
for (const listener of this.listeners) listener(change)
}

get(): Array<TrackedSourceRecord> {
return Array.from(this.entries.values(), ({ key }) => this.toRecord(key))
}

subscribe(
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
): () => void {
this.listeners.add(callback)
if (options?.includeInitialState && this.entries.size > 0) {
callback({ added: this.get(), removed: [] })
}
return () => {
this.listeners.delete(callback)
}
}

private toRecord(key: TKey): TrackedSourceRecord {
return { collectionId: this.collectionId, key }
}
}
9 changes: 7 additions & 2 deletions packages/db/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ export {
// One-shot query execution
export { queryOnce, type QueryOnceConfig } from './query-once.js'

export { type LiveQueryCollectionConfig } from './live/types.js'
export { type LiveQueryCollectionUtils } from './live/collection-config-builder.js'
export type {
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
} from '../types.js'
export type { LiveQueryCollectionConfig } from './live/types.js'
export { type LiveQueryCollectionUtils } from './live-query-collection.js'

// Predicate utilities for predicate push-down
export {
Expand Down
70 changes: 39 additions & 31 deletions packages/db/src/query/live-query-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import {
getBuilderFromConfig,
registerCollectionBuilder,
} from './live/collection-registry.js'
import type { LiveQueryCollectionUtils } from './live/collection-config-builder.js'
import type {
LiveQueryBuiltInUtils,
LiveQueryCollectionUtils,
} from './live/collection-config-builder.js'
import type { LiveQueryCollectionConfig } from './live/types.js'
import type {
ExtractContext,
Expand All @@ -27,6 +30,8 @@ import type {
RootQueryResult,
} from './builder/types.js'

export type { LiveQueryCollectionUtils } from './live/collection-config-builder.js'

type CollectionConfigForContext<
TContext extends Context,
TResult extends object,
Expand Down Expand Up @@ -78,7 +83,7 @@ export function liveQueryCollectionOptions<
query: RootQueryFn<TQuery> | RootQueryBuilder<TQuery>
},
): CollectionConfigForContext<TContext, TResult> & {
utils: LiveQueryCollectionUtils
utils: LiveQueryBuiltInUtils
} {
const collectionConfigBuilder = new CollectionConfigBuilder<
TContext,
Expand All @@ -87,7 +92,7 @@ export function liveQueryCollectionOptions<
return collectionConfigBuilder.getConfig() as CollectionConfigForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils }
> & { utils: LiveQueryBuiltInUtils }
}

/**
Expand Down Expand Up @@ -148,7 +153,7 @@ export function createLiveQueryCollection<
utils?: TUtils
},
): CollectionForContext<TContext, RootQueryResult<TContext>> & {
utils: LiveQueryCollectionUtils & TUtils
utils: LiveQueryCollectionUtils<TUtils>
}

// Implementation
Expand All @@ -163,7 +168,7 @@ export function createLiveQueryCollection<
q: InitialQueryBuilder,
) => QueryBuilder<TContext> & RootObjectResultConstraint<TContext>),
): CollectionForContext<TContext, TResult> & {
utils: LiveQueryCollectionUtils & TUtils
utils: LiveQueryCollectionUtils<TUtils>
} {
// Determine if the argument is a function (query) or a config object
if (typeof configOrQuery === `function`) {
Expand All @@ -179,49 +184,52 @@ export function createLiveQueryCollection<
return bridgeToCreateCollection(options) as CollectionForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils & TUtils }
} else {
// Config object case
const config = configOrQuery as LiveQueryCollectionConfig<
TContext,
TResult
> & { utils?: TUtils }
// Same overload implementation limitation as above: the config has already
// been validated by the public signatures, but the branch loses that precision.
const options = liveQueryCollectionOptions(config as any)
> & { utils: LiveQueryCollectionUtils<TUtils> }
}

// Merge custom utils if provided, preserving the getBuilder() method for dependency tracking
if (config.utils) {
options.utils = { ...options.utils, ...config.utils }
}
// Config object case. Same overload implementation limitation as above:
// the config has already been validated by the public signatures, but the
// branch loses that precision.
const config = configOrQuery as LiveQueryCollectionConfig<
TContext,
TResult
> & { utils?: TUtils }
const options = liveQueryCollectionOptions(config as any)

return bridgeToCreateCollection(options) as CollectionForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils & TUtils }
// Merge custom utils if provided, preserving the getBuilder() method for dependency tracking
if (config.utils) {
options.utils = { ...options.utils, ...config.utils }
}

return bridgeToCreateCollection(options) as CollectionForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils<TUtils> }
}

/**
* Bridge function that handles the type compatibility between query2's TResult
* and core collection's output type without exposing ugly type assertions to users
*/
function bridgeToCreateCollection<
TResult extends object,
TUtils extends UtilsRecord = {},
>(
options: CollectionConfig<TResult> & { utils: TUtils },
): Collection<TResult, string | number, TUtils> {
function bridgeToCreateCollection<TResult extends object>(
options: CollectionConfig<TResult> & { utils: LiveQueryBuiltInUtils },
): Collection<TResult, string | number, LiveQueryBuiltInUtils> {
const collection = createCollection(options as any) as unknown as Collection<
TResult,
string | number,
LiveQueryCollectionUtils
LiveQueryBuiltInUtils
>

const builder = getBuilderFromConfig(options)
if (builder) {
registerCollectionBuilder(collection, builder)
// Route the Collection's tracked-source-records public methods through
// the live-query-local view (the source-records this query is using),
// not the base-collection refcount manager (which is "consumers of mine").
// The adapter is stable across sync sessions; the underlying aggregator
// is replaced each session.
collection._liveQueryTrackedSourceView = builder.liveQueryTrackedSourceView
}

return collection as unknown as Collection<TResult, string | number, TUtils>
return collection
}
Loading
Loading