diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index e51eb998d..46b7f2303 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -3,7 +3,10 @@ import { CollectionRequiresConfigError, CollectionRequiresSyncConfigError, } from '../errors' +import { getBuilderFromConfig } from '../query/live/collection-registry.js' import { currentStateAsChanges } from './change-events' +import { TrackedSourceRecordsManager } from './tracked-source-records.js' +import { registerTrackedSourceRecordsManager } from './tracked-source-records-store.js' import { CollectionStateManager } from './state' import { CollectionChangesManager } from './changes' @@ -34,6 +37,9 @@ import type { SingleResult, StringCollationConfig, SubscribeChangesOptions, + SubscribeTrackedSourceRecordsOptions, + TrackedSourceRecord, + TrackedSourceRecordsChange, Transaction as TransactionType, UtilsRecord, WritableDeep, @@ -44,6 +50,14 @@ import type { WithVirtualProps } from '../virtual-props.js' export type { CollectionIndexMetadata } from './events.js' +type LiveQueryTrackedSourceView = { + snapshot: () => Array + subscribe: ( + callback: (change: TrackedSourceRecordsChange) => void, + options?: SubscribeTrackedSourceRecordsOptions, + ) => () => void +} + /** * Enhanced Collection interface that includes both data type T and utilities TUtils * @template T - The type of items in the collection @@ -255,9 +269,7 @@ export function createCollection( schema?: StandardSchemaV1 }, ): Collection { - const collection = new CollectionImpl( - options, - ) + const collection = new CollectionImpl(options) // Attach utils to collection if (options.utils) { @@ -299,6 +311,12 @@ export class CollectionImpl< // The core state of the collection is "public" so that is accessible in tests // and for debugging public _state: CollectionStateManager + // Aggregated view of source-records currently being used by active live + // queries that depend on this collection. + private readonly _trackedSourceRecords: TrackedSourceRecordsManager + // For live-query collections only: a live-query-local view of "source + // records this query is currently using." Undefined on base collections. + private readonly _liveQueryTrackedSourceView?: LiveQueryTrackedSourceView /** * When set, collection consumers should defer processing incoming data @@ -354,6 +372,10 @@ export class CollectionImpl< this._mutations = new CollectionMutationsManager(config, this.id) this._state = new CollectionStateManager(config) this._sync = new CollectionSyncManager(config, this.id) + this._trackedSourceRecords = new TrackedSourceRecordsManager(this.id) + registerTrackedSourceRecordsManager(this, this._trackedSourceRecords) + this._liveQueryTrackedSourceView = + getBuilderFromConfig(config)?.liveQueryTrackedSourceView this.comparisonOpts = buildCompareOptionsFromConfig(config) @@ -941,6 +963,42 @@ export class CollectionImpl< return this._changes.subscribeChanges(callback, options) } + /** + * Snapshot of source records currently being tracked through this + * collection's data flow. + * + * On a base collection: the union of records OF this collection being + * used by any active live query. Each record appears once regardless + * of how many queries reference it. + * + * On a live query collection: the records FROM this query's source + * collections that the query is currently using. + * + * Both views answer "what source records are currently flowing through + * me," from opposite ends of the data-flow graph. + */ + public getTrackedSourceRecords(): Array { + return ( + this._liveQueryTrackedSourceView?.snapshot() ?? + this._trackedSourceRecords.get() + ) + } + + /** + * Subscribe to changes in the set of source records tracked through this + * collection's data flow. See `getTrackedSourceRecords` for the per- + * collection-type semantics. + */ + public subscribeTrackedSourceRecords( + callback: (change: TrackedSourceRecordsChange) => void, + options?: SubscribeTrackedSourceRecordsOptions, + ): () => void { + if (this._liveQueryTrackedSourceView) { + return this._liveQueryTrackedSourceView.subscribe(callback, options) + } + return this._trackedSourceRecords.subscribe(callback, options) + } + /** * Subscribe to a collection event */ diff --git a/packages/db/src/collection/tracked-source-records-store.ts b/packages/db/src/collection/tracked-source-records-store.ts new file mode 100644 index 000000000..66b51e607 --- /dev/null +++ b/packages/db/src/collection/tracked-source-records-store.ts @@ -0,0 +1,22 @@ +import type { TrackedSourceRecordsManager } from './tracked-source-records.js' + +const trackedSourceRecordsManagers = new WeakMap< + object, + TrackedSourceRecordsManager +>() + +export function registerTrackedSourceRecordsManager( + collection: object, + manager: TrackedSourceRecordsManager, +): void { + trackedSourceRecordsManagers.set(collection, manager) +} + +export function applyTrackedSourceRecordDelta( + collection: object | undefined, + added: Iterable, + removed: Iterable, +): void { + if (!collection) return + trackedSourceRecordsManagers.get(collection)?.apply(added, removed) +} diff --git a/packages/db/src/collection/tracked-source-records.ts b/packages/db/src/collection/tracked-source-records.ts new file mode 100644 index 000000000..b3783a315 --- /dev/null +++ b/packages/db/src/collection/tracked-source-records.ts @@ -0,0 +1,98 @@ +import type { + SubscribeTrackedSourceRecordsOptions, + TrackedSourceRecord, + TrackedSourceRecordsChange, +} from '../types.js' + +type Entry = { key: TKey; refCount: number } + +/** + * Per-base-collection tracked source records manager. + * + * Refcounts over active live queries that depend on this collection. Each + * live query's aggregator pushes its net alias-level transitions here; this + * manager dedupes across queries and emits to subscribers only on true 0↔1 + * transitions. + */ +export class TrackedSourceRecordsManager< + TKey extends string | number = string | number, +> { + // Keys are primitives; use them directly as the Map key. No serialization. + private readonly entries = new Map>() + private readonly listeners = new Set< + (change: TrackedSourceRecordsChange) => void + >() + + constructor(private readonly collectionId: string) {} + + apply(added: Iterable, removed: Iterable): void { + const keyDeltas = new Map() + for (const key of added) { + const currentDelta = keyDeltas.get(key) ?? 0 + keyDeltas.set(key, currentDelta + 1) + } + for (const key of removed) { + const currentDelta = keyDeltas.get(key) ?? 0 + keyDeltas.set(key, currentDelta - 1) + } + + const netAdded: Array = [] + const netRemoved: Array = [] + + for (const [key, delta] of keyDeltas) { + if (delta === 0) continue + const existing = this.entries.get(key) + + if (delta > 0) { + if (existing) { + existing.refCount += delta + } else { + this.entries.set(key, { key, refCount: delta }) + netAdded.push(key) + } + continue + } + + if (!existing) { + continue + } + + const nextRefCount = existing.refCount + delta + if (nextRefCount <= 0) { + this.entries.delete(key) + netRemoved.push(existing.key) + } else { + existing.refCount = nextRefCount + } + } + + if (netAdded.length === 0 && netRemoved.length === 0) return + if (this.listeners.size === 0) return + const change: TrackedSourceRecordsChange = { + added: netAdded.map((key) => this.toRecord(key)), + removed: netRemoved.map((key) => this.toRecord(key)), + } + for (const listener of this.listeners) listener(change) + } + + get(): Array { + return Array.from(this.entries.values(), ({ key }) => this.toRecord(key)) + } + + subscribe( + callback: (change: TrackedSourceRecordsChange) => void, + options?: SubscribeTrackedSourceRecordsOptions, + ): () => void { + this.listeners.add(callback) + if (options?.includeInitialState && this.entries.size > 0) { + callback({ added: this.get(), removed: [] }) + } + return () => { + this.listeners.delete(callback) + } + } + + private toRecord(key: TKey): TrackedSourceRecord { + return { collectionId: this.collectionId, key } + } +} diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 889202e5a..5a8e47b7e 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -84,8 +84,13 @@ export { // One-shot query execution export { queryOnce, type QueryOnceConfig } from './query-once.js' -export { type LiveQueryCollectionConfig } from './live/types.js' -export { type LiveQueryCollectionUtils } from './live/collection-config-builder.js' +export type { + SubscribeTrackedSourceRecordsOptions, + TrackedSourceRecord, + TrackedSourceRecordsChange, +} from '../types.js' +export type { LiveQueryCollectionConfig } from './live/types.js' +export { type LiveQueryCollectionUtils } from './live-query-collection.js' // Predicate utilities for predicate push-down export { diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 8649bc0bc..c3ab4779e 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -27,6 +27,8 @@ import type { RootQueryResult, } from './builder/types.js' +export type { LiveQueryCollectionUtils } from './live/collection-config-builder.js' + type CollectionConfigForContext< TContext extends Context, TResult extends object, @@ -180,26 +182,26 @@ export function createLiveQueryCollection< TContext, TResult > & { utils: LiveQueryCollectionUtils & TUtils } - } else { - // Config object case - const config = configOrQuery as LiveQueryCollectionConfig< - TContext, - TResult - > & { utils?: TUtils } - // Same overload implementation limitation as above: the config has already - // been validated by the public signatures, but the branch loses that precision. - const options = liveQueryCollectionOptions(config as any) + } - // Merge custom utils if provided, preserving the getBuilder() method for dependency tracking - if (config.utils) { - options.utils = { ...options.utils, ...config.utils } - } + // Config object case. Same overload implementation limitation as above: + // the config has already been validated by the public signatures, but the + // branch loses that precision. + const config = configOrQuery as LiveQueryCollectionConfig< + TContext, + TResult + > & { utils?: TUtils } + const options = liveQueryCollectionOptions(config as any) - return bridgeToCreateCollection(options) as CollectionForContext< - TContext, - TResult - > & { utils: LiveQueryCollectionUtils & TUtils } + // Merge custom utils if provided, preserving the getBuilder() method for dependency tracking + if (config.utils) { + options.utils = { ...options.utils, ...config.utils } } + + return bridgeToCreateCollection(options) as CollectionForContext< + TContext, + TResult + > & { utils: LiveQueryCollectionUtils & TUtils } } /** @@ -212,13 +214,13 @@ function bridgeToCreateCollection< >( options: CollectionConfig & { utils: TUtils }, ): Collection { + const builder = getBuilderFromConfig(options) const collection = createCollection(options as any) as unknown as Collection< TResult, string | number, LiveQueryCollectionUtils > - const builder = getBuilderFromConfig(options) if (builder) { registerCollectionBuilder(collection, builder) } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 7353b2116..b2d8aa098 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -10,6 +10,7 @@ import { getActiveTransaction } from '../../transactions.js' import { CollectionSubscriber } from './collection-subscriber.js' import { getCollectionBuilder } from './collection-registry.js' import { LIVE_QUERY_INTERNAL } from './internal.js' +import { LiveQueryTrackedSourceRecordsAggregator } from './tracked-source-records-aggregator.js' import { buildQueryFromConfig, extractCollectionAliases, @@ -32,7 +33,10 @@ import type { KeyedStream, ResultStream, StringCollationConfig, + SubscribeTrackedSourceRecordsOptions, SyncConfig, + TrackedSourceRecord, + TrackedSourceRecordsChange, UtilsRecord, } from '../../types.js' import type { Context, GetResult } from '../builder/types.js' @@ -139,6 +143,36 @@ export class CollectionConfigBuilder< PendingGraphRun >() + // Long-lived listeners for the live-query-local tracked-source view. + // Survives sync sessions: each new session's aggregator reads this set + // by reference, so external subscribers don't need to re-attach across + // start/stop cycles. + private readonly trackedSourceRecordListeners = new Set< + (change: TrackedSourceRecordsChange) => void + >() + + // Adapter the live-query Collection routes through for its tracked-source + // record view. Routes through the current sync session's aggregator (which + // can come and go) but the adapter itself is stable across sessions. + public readonly liveQueryTrackedSourceView = { + snapshot: (): Array => + this.currentSyncState?.trackedSourceRecordsAggregator.snapshot() ?? [], + subscribe: ( + callback: (change: TrackedSourceRecordsChange) => void, + options?: SubscribeTrackedSourceRecordsOptions, + ): (() => void) => { + this.trackedSourceRecordListeners.add(callback) + if (options?.includeInitialState) { + const added = + this.currentSyncState?.trackedSourceRecordsAggregator.snapshot() ?? [] + if (added.length > 0) callback({ added, removed: [] }) + } + return () => { + this.trackedSourceRecordListeners.delete(callback) + } + }, + } + // Unsubscribe function for scheduler's onClear listener // Registered when sync starts, unregistered when sync stops // Prevents memory leaks by releasing the scheduler's reference to this builder @@ -583,10 +617,22 @@ export class CollectionConfigBuilder< // Store config and syncState as instance properties for the duration of this sync session this.currentSyncConfig = config + // Session-scoped aggregator that dedupes tracked source records across + // aliases (handles self-joins), propagates net transitions to each + // source collection's tracked-source-records manager, and fans out to + // the builder's long-lived listener Set (so external subscribers reach + // the per-query view via the live-query Collection). Lives only for this + // sync session. + const trackedSourceRecordsAggregator = + new LiveQueryTrackedSourceRecordsAggregator( + this.collections, + this.trackedSourceRecordListeners, + ) const syncState: SyncState = { messagesCount: 0, subscribedToAllCollections: false, unsubscribeCallbacks: new Set<() => void>(), + trackedSourceRecordsAggregator, } // Extend the pipeline such that it applies the incoming changes to the collection @@ -619,6 +665,17 @@ export class CollectionConfigBuilder< ) syncState.unsubscribeCallbacks.add(loadingSubsetUnsubscribe) + // Expose tracked source records only while the live query has active + // subscribers. The aggregator replays snapshot-as-added on 0→1 and + // snapshot-as-removed on 1→0. + const trackedSubscribersUnsubscribe = config.collection.on( + `subscribers:change`, + (event) => { + trackedSourceRecordsAggregator.setExposed(event.subscriberCount > 0) + }, + ) + syncState.unsubscribeCallbacks.add(trackedSubscribersUnsubscribe) + const loadSubsetDataCallbacks = this.subscribeToAllCollections( config, fullSyncState, @@ -1068,6 +1125,18 @@ export class CollectionConfigBuilder< this, ) + // Forward net membership changes from this subscriber into the + // per-live-query aggregator. One subscriber per alias — self-joins + // emit independently and the aggregator dedupes. Direct assignment + // (not a listener list) since this is 1:1. + collectionSubscriber.onTrackedKeysChange = ({ added, removed }) => { + syncState.trackedSourceRecordsAggregator.apply( + collectionId, + added, + removed, + ) + } + // Subscribe to status changes for status flow const statusUnsubscribe = collection.on(`status:change`, (event) => { this.handleSourceStatusChange(config, collectionId, event) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index e83cb8885..c8493bec8 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -48,11 +48,26 @@ export class CollectionSubscriber< // can potentially send the same item to D2 multiple times. private sentToD2Keys = new Set() + // Track keys currently reported as tracked source records for this subscriber. + private trackedSourceKeys = new Set() + // Direct load tracking callback for ordered path (set during subscribeToOrderedChanges, // used by loadNextItems for subsequent requestLimitedSnapshot calls) private orderedLoadSubsetResult?: (result: Promise | true) => void private pendingOrderedLoadPromise: Promise | undefined + /** + * Callback invoked with net membership changes in this subscriber's + * tracked-source-key set. The builder wires + * this to the per-live-query aggregator. A stable-membership ordered + * update where split delete+insert leaves `trackedSourceKeys` unchanged + * emits nothing. + */ + public onTrackedKeysChange?: (change: { + added: Array + removed: Array + }) => void + constructor( private alias: string, private collectionId: string, @@ -131,6 +146,10 @@ export class CollectionSubscriber< this.ensureLoadingPromise(subscription) } + subscription.on(`unsubscribed`, () => { + this.clearTrackedSourceKeys() + }) + const unsubscribe = () => { // If subscription has a pending promise, resolve it before unsubscribing const deferred = this.subscriptionLoadingPromises.get(subscription) @@ -168,6 +187,7 @@ export class CollectionSubscriber< filteredChanges, this.collection.config.getKey, ) + this.emitTrackedSourceKeyDelta(filteredChanges) // Do not provide the callback that loads more data // if there's no more data to load @@ -275,9 +295,10 @@ export class CollectionSubscriber< }) subscriptionHolder.current = subscription - // Listen for truncate events to reset cursor tracking state and sentToD2Keys - // This ensures that after a must-refetch/truncate, we don't use stale cursor data - // and allow re-inserts of previously sent keys + // Reset cursor/D2 duplicate state on truncate. Tracked-source membership + // is tracked separately in trackedSourceKeys and updated from the buffered + // delete/refetch change batch, so unsubscribe cleanup still has the right + // membership snapshot while truncate is in flight. const truncateUnsubscribe = this.collection.on(`truncate`, () => { this.biggest = undefined this.lastLoadRequestKey = undefined @@ -467,4 +488,49 @@ export class CollectionSubscriber< promise, ) } + + private emitTrackedSourceKeyDelta( + changes: ReadonlyArray>, + ): void { + const wasTracked = new Map() + for (const change of changes) { + if (change.type !== `insert` && change.type !== `delete`) { + continue + } + + if (!wasTracked.has(change.key)) { + wasTracked.set(change.key, this.trackedSourceKeys.has(change.key)) + } + + if (change.type === `insert`) { + this.trackedSourceKeys.add(change.key) + } else { + this.trackedSourceKeys.delete(change.key) + } + } + + if (!this.onTrackedKeysChange) return + + const added: Array = [] + const removed: Array = [] + for (const [key, before] of wasTracked) { + const after = this.trackedSourceKeys.has(key) + if (!before && after) { + added.push(key) + } else if (before && !after) { + removed.push(key) + } + } + + if (added.length === 0 && removed.length === 0) return + this.onTrackedKeysChange({ added, removed }) + } + + private clearTrackedSourceKeys() { + const removed = Array.from(this.trackedSourceKeys) + this.trackedSourceKeys.clear() + this.sentToD2Keys.clear() + if (removed.length === 0) return + this.onTrackedKeysChange?.({ added: [], removed }) + } } diff --git a/packages/db/src/query/live/tracked-source-records-aggregator.ts b/packages/db/src/query/live/tracked-source-records-aggregator.ts new file mode 100644 index 000000000..ffcaa5573 --- /dev/null +++ b/packages/db/src/query/live/tracked-source-records-aggregator.ts @@ -0,0 +1,162 @@ +import { applyTrackedSourceRecordDelta } from '../../collection/tracked-source-records-store.js' +import type { Collection } from '../../collection/index.js' +import type { + TrackedSourceRecord, + TrackedSourceRecordsChange, +} from '../../types.js' + +type Entry = { refCount: number } + +/** + * Per-live-query aggregator for tracked source records. + * + * Lives on a single sync session — dies with it. Refcounts over aliases + * within one query (a self-join references the same base collection under + * multiple aliases, so the same (collectionId, key) pair can be added + * multiple times). Net 0↔1 transitions are: + * 1. propagated to each source collection's tracked-source-records + * manager (so consumers reading the base-collection view see them) + * 2. fanned out to `listeners` (so consumers reading the per-query view + * via `liveQuery.subscribeTrackedSourceRecords` see them) + * + * `listeners` is held by reference. The builder owns a long-lived listener + * Set that survives sync sessions; aggregator instances come and go but the + * Set persists, so external subscribers don't need to re-attach across + * session boundaries. + * + * `exposed` gates both propagation and listener fan-out: only emit while + * the live query has active subscribers. Flipping `exposed` replays the + * current snapshot as added/removed so downstream views stay consistent. + */ +export class LiveQueryTrackedSourceRecordsAggregator { + // Nested map avoids serializing (collectionId, key) composites. Outer key + // is collectionId; inner key is the source record's key (primitive). + private readonly entries = new Map>() + private exposed = false + + constructor( + private readonly sourceCollections: Record< + string, + Collection + >, + private readonly listeners: ReadonlySet< + (change: TrackedSourceRecordsChange) => void + >, + ) {} + + /** + * Record a membership change from one `CollectionSubscriber`. All keys in + * a single call share the same collectionId, so propagation to the source + * collection's manager is a direct call — no grouping needed. + */ + apply( + collectionId: string, + added: Iterable, + removed: Iterable, + ): void { + const keyDeltas = new Map() + for (const key of added) { + const currentDelta = keyDeltas.get(key) ?? 0 + keyDeltas.set(key, currentDelta + 1) + } + for (const key of removed) { + const currentDelta = keyDeltas.get(key) ?? 0 + keyDeltas.set(key, currentDelta - 1) + } + + let byKey = this.entries.get(collectionId) + if (!byKey) { + byKey = new Map() + this.entries.set(collectionId, byKey) + } + + const netAdded: Array = [] + const netRemoved: Array = [] + + for (const [key, delta] of keyDeltas) { + if (delta === 0) continue + const existing = byKey.get(key) + + if (delta > 0) { + if (existing) { + existing.refCount += delta + } else { + byKey.set(key, { refCount: delta }) + netAdded.push(key) + } + continue + } + + if (!existing) { + continue + } + + const nextRefCount = existing.refCount + delta + if (nextRefCount <= 0) { + byKey.delete(key) + netRemoved.push(key) + } else { + existing.refCount = nextRefCount + } + } + + // Drop an emptied bucket so `entries.size === 0` correctly reflects + // "nothing tracked" for setExposed. + if (byKey.size === 0) { + this.entries.delete(collectionId) + } + + if (!this.exposed) return + if (netAdded.length === 0 && netRemoved.length === 0) return + + applyTrackedSourceRecordDelta( + this.sourceCollections[collectionId], + netAdded, + netRemoved, + ) + if (this.listeners.size === 0) return + const change: TrackedSourceRecordsChange = { + added: netAdded.map((key) => ({ collectionId, key })), + removed: netRemoved.map((key) => ({ collectionId, key })), + } + for (const listener of this.listeners) listener(change) + } + + setExposed(exposed: boolean): void { + if (this.exposed === exposed) return + this.exposed = exposed + if (this.entries.size === 0) return + + const hasListeners = this.listeners.size > 0 + const added: Array = [] + const removed: Array = [] + for (const [collectionId, byKey] of this.entries) { + const keys = Array.from(byKey.keys()) + const collection = this.sourceCollections[collectionId] + if (exposed) { + applyTrackedSourceRecordDelta(collection, keys, []) + if (hasListeners) { + for (const key of keys) added.push({ collectionId, key }) + } + } else { + applyTrackedSourceRecordDelta(collection, [], keys) + if (hasListeners) { + for (const key of keys) removed.push({ collectionId, key }) + } + } + } + + if (!hasListeners) return + const change: TrackedSourceRecordsChange = { added, removed } + for (const listener of this.listeners) listener(change) + } + + snapshot(): Array { + if (!this.exposed) return [] + const records: Array = [] + for (const [collectionId, byKey] of this.entries) { + for (const key of byKey.keys()) records.push({ collectionId, key }) + } + return records + } +} diff --git a/packages/db/src/query/live/types.ts b/packages/db/src/query/live/types.ts index 118015bd6..d2d9aa140 100644 --- a/packages/db/src/query/live/types.ts +++ b/packages/db/src/query/live/types.ts @@ -10,6 +10,7 @@ import type { RootObjectResultConstraint, RootQueryResult, } from '../builder/types.js' +import type { LiveQueryTrackedSourceRecordsAggregator } from './tracked-source-records-aggregator.js' export type Changes = { deletes: number @@ -22,6 +23,12 @@ export type SyncState = { messagesCount: number subscribedToAllCollections: boolean unsubscribeCallbacks: Set<() => void> + /** + * Session-scoped aggregator that dedupes tracked source records across + * aliases and forwards transitions to source collections. Dies with the + * sync session. + */ + trackedSourceRecordsAggregator: LiveQueryTrackedSourceRecordsAggregator graph?: D2 inputs?: Record> diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 6087e234e..4017d6ee6 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -73,6 +73,20 @@ export type Fn = (...args: Array) => any */ export type UtilsRecord = Record +export type TrackedSourceRecord = { + collectionId: string + key: string | number +} + +export type TrackedSourceRecordsChange = { + added: Array + removed: Array +} + +export type SubscribeTrackedSourceRecordsOptions = { + includeInitialState?: boolean +} + /** * * @remarks `update` and `insert` are both represented as `Partial`, but changes for `insert` could me made more precise by inferring the schema input type. In practice, this has almost 0 real world impact so it's not worth the added type complexity. diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 8b2b60901..34ddca36a 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -19,7 +19,12 @@ import { import { createDeferred } from '../../src/deferred' import { BTreeIndex } from '../../src/indexes/btree-index' import { Func, Value } from '../../src/query/ir.js' -import type { ChangeMessage, LoadSubsetOptions } from '../../src/types.js' +import type { + ChangeMessage, + LoadSubsetOptions, + SyncConfig, + TrackedSourceRecordsChange, +} from '../../src/types.js' // Sample user type for tests type User = { @@ -45,6 +50,18 @@ function createUsersCollection() { ) } +function sortTrackedSourceRecords( + records: ReadonlyArray<{ collectionId: string; key: string | number }>, +) { + return [...records].sort((left, right) => { + if (left.collectionId !== right.collectionId) { + return left.collectionId.localeCompare(right.collectionId) + } + + return String(left.key).localeCompare(String(right.key)) + }) +} + describe(`createLiveQueryCollection`, () => { let usersCollection: ReturnType @@ -106,6 +123,828 @@ describe(`createLiveQueryCollection`, () => { expect(activeUsers2.size).toBe(2) }) + it(`should emit tracked source record deltas for membership changes and clear when the last subscriber leaves`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const trackingEvents: Array<{ + added: Array<{ collectionId: string; key: string | number }> + removed: Array<{ collectionId: string; key: string | number }> + }> = [] + + const unsubscribeTracked = usersCollection.subscribeTrackedSourceRecords( + (changes) => { + trackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + { includeInitialState: true }, + ) + + const subscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + expect(trackingEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + trackingEvents.length = 0 + + usersCollection.insert({ id: 4, name: `David`, active: true }) + await flushPromises() + + expect(trackingEvents).toEqual([ + { + added: [{ collectionId: usersCollection.id, key: 4 }], + removed: [], + }, + ]) + + trackingEvents.length = 0 + + usersCollection.update(2, (draft) => { + draft.active = false + }) + await flushPromises() + + expect(trackingEvents).toEqual([ + { + added: [], + removed: [{ collectionId: usersCollection.id, key: 2 }], + }, + ]) + + trackingEvents.length = 0 + + subscription.unsubscribe() + + expect(trackingEvents).toEqual([ + { + added: [], + removed: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 4 }, + ], + }, + ]) + expect(usersCollection.getTrackedSourceRecords()).toEqual([]) + + unsubscribeTracked() + }) + + it(`should expose tracked source records on base collections while dependent live queries have active subscribers`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const trackingEvents: Array<{ + added: Array<{ collectionId: string; key: string | number }> + removed: Array<{ collectionId: string; key: string | number }> + }> = [] + + expect(usersCollection.getTrackedSourceRecords()).toEqual([]) + + const unsubscribeTracked = usersCollection.subscribeTrackedSourceRecords( + (changes) => { + trackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + { includeInitialState: true }, + ) + + const subscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + expect( + sortTrackedSourceRecords(usersCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ]) + expect(trackingEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + trackingEvents.length = 0 + subscription.unsubscribe() + + expect(usersCollection.getTrackedSourceRecords()).toEqual([]) + expect(trackingEvents).toEqual([ + { + added: [], + removed: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + }, + ]) + + unsubscribeTracked() + }) + + it(`should replay tracked source records as initial state for base collections after tracking starts`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const trackingEvents: Array<{ + added: Array<{ collectionId: string; key: string | number }> + removed: Array<{ collectionId: string; key: string | number }> + }> = [] + + const subscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + const unsubscribeTracked = usersCollection.subscribeTrackedSourceRecords( + (changes) => { + trackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + { includeInitialState: true }, + ) + + expect(trackingEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + unsubscribeTracked() + subscription.unsubscribe() + }) + + it(`should ref-count tracked source records on base collections across overlapping live queries`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const bobOnlyUsers = createLiveQueryCollection((q) => + q.from({ user: usersCollection }).where(({ user }) => eq(user.id, 2)), + ) + const trackingEvents: Array<{ + added: Array<{ collectionId: string; key: string | number }> + removed: Array<{ collectionId: string; key: string | number }> + }> = [] + + const unsubscribeTracked = usersCollection.subscribeTrackedSourceRecords( + (changes) => { + trackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + { includeInitialState: true }, + ) + + const activeUsersSubscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + expect(trackingEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + trackingEvents.length = 0 + + const bobOnlySubscription = bobOnlyUsers.subscribeChanges(() => {}) + await bobOnlyUsers.preload() + + expect( + sortTrackedSourceRecords(usersCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ]) + expect(trackingEvents).toEqual([]) + + activeUsersSubscription.unsubscribe() + + expect( + sortTrackedSourceRecords(usersCollection.getTrackedSourceRecords()), + ).toEqual([{ collectionId: usersCollection.id, key: 2 }]) + expect(trackingEvents).toEqual([ + { + added: [], + removed: [{ collectionId: usersCollection.id, key: 1 }], + }, + ]) + + trackingEvents.length = 0 + + bobOnlySubscription.unsubscribe() + + expect(usersCollection.getTrackedSourceRecords()).toEqual([]) + expect(trackingEvents).toEqual([ + { + added: [], + removed: [{ collectionId: usersCollection.id, key: 2 }], + }, + ]) + + unsubscribeTracked() + }) + + it(`should not emit tracked source churn for ordered updates that keep the same keys in range`, async () => { + type Item = { id: number; value: number } + const sourceCollection = createCollection({ + id: `tracked-ordered-update-source`, + getKey: (item) => item.id, + startSync: true, + autoIndex: `eager`, + defaultIndexType: BTreeIndex, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + ;[ + { id: 1, value: 1 }, + { id: 2, value: 2 }, + { id: 3, value: 3 }, + ].forEach((item) => { + write({ type: `insert`, value: item }) + }) + commit() + markReady() + }, + }, + onUpdate: async () => {}, + }) + + const topItems = createLiveQueryCollection((q) => + q + .from({ item: sourceCollection }) + .orderBy(({ item }) => item.value, `asc`) + .limit(2), + ) + const baseTrackingEvents: Array = [] + + const unsubscribeBaseTracked = + sourceCollection.subscribeTrackedSourceRecords((changes) => { + baseTrackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + + const subscription = topItems.subscribeChanges(() => {}) + await topItems.preload() + + baseTrackingEvents.length = 0 + + sourceCollection.update(1, (draft) => { + draft.value = 1.5 + }) + await flushPromises() + + expect(baseTrackingEvents).toEqual([]) + expect( + sortTrackedSourceRecords(sourceCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: sourceCollection.id, key: 1 }, + { collectionId: sourceCollection.id, key: 2 }, + ]) + + baseTrackingEvents.length = 0 + + subscription.unsubscribe() + + expect(baseTrackingEvents).toEqual([ + { + added: [], + removed: [ + { collectionId: sourceCollection.id, key: 1 }, + { collectionId: sourceCollection.id, key: 2 }, + ], + }, + ]) + expect(sourceCollection.getTrackedSourceRecords()).toEqual([]) + + unsubscribeBaseTracked() + }) + + it(`should not emit tracked source churn across truncate refetch when ordered queries keep tracking the same keys`, async () => { + type Item = { id: number; value: string; rank: number } + + let syncOps: Parameters[`sync`]>[0] | undefined + let loadSubsetCallCount = 0 + let loadSubsetResolver: (() => void) | undefined + + const sourceCollection = createCollection({ + id: `tracked-truncate-source`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + autoIndex: `eager`, + defaultIndexType: BTreeIndex, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + loadSubset: (_options: LoadSubsetOptions) => { + loadSubsetCallCount++ + + return new Promise((resolve) => { + loadSubsetResolver = () => { + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `refetched-1`, rank: 1 }, + }) + cfg.write({ + type: `insert`, + value: { id: 2, value: `refetched-2`, rank: 2 }, + }) + cfg.commit() + resolve() + } + }) + }, + } + }, + }, + }) + + const topItems = createLiveQueryCollection((q) => + q + .from({ item: sourceCollection }) + .orderBy(({ item }) => item.rank, `asc`) + .limit(2), + ) + const baseTrackingEvents: Array = [] + + const unsubscribeBaseTracked = + sourceCollection.subscribeTrackedSourceRecords((changes) => { + baseTrackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + + const subscription = topItems.subscribeChanges(() => {}) + const preloadPromise = topItems.preload() + + await vi.waitFor(() => expect(loadSubsetCallCount).toBe(1)) + loadSubsetResolver?.() + await preloadPromise + + baseTrackingEvents.length = 0 + loadSubsetCallCount = 0 + + syncOps?.begin() + syncOps?.truncate() + syncOps?.commit() + + await vi.waitFor(() => expect(loadSubsetCallCount).toBe(1)) + expect(baseTrackingEvents).toEqual([]) + + loadSubsetResolver?.() + await flushPromises() + + expect(baseTrackingEvents).toEqual([]) + expect( + sortTrackedSourceRecords(sourceCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: sourceCollection.id, key: 1 }, + { collectionId: sourceCollection.id, key: 2 }, + ]) + + baseTrackingEvents.length = 0 + + subscription.unsubscribe() + + expect(baseTrackingEvents).toEqual([ + { + added: [], + removed: [ + { collectionId: sourceCollection.id, key: 1 }, + { collectionId: sourceCollection.id, key: 2 }, + ], + }, + ]) + expect(sourceCollection.getTrackedSourceRecords()).toEqual([]) + + unsubscribeBaseTracked() + }) + + it(`should emit tracked source changes across truncate refetch when ordered queries track different keys`, async () => { + type Item = { id: number; value: string; rank: number } + + let syncOps: Parameters[`sync`]>[0] | undefined + let loadSubsetCallCount = 0 + let loadSubsetResolver: (() => void) | undefined + let refetchVersion = 0 + + const sourceCollection = createCollection({ + id: `tracked-truncate-different-keys-source`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + autoIndex: `eager`, + defaultIndexType: BTreeIndex, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + loadSubset: (_options: LoadSubsetOptions) => { + loadSubsetCallCount++ + + return new Promise((resolve) => { + loadSubsetResolver = () => { + refetchVersion++ + const items = + refetchVersion === 1 + ? [ + { id: 1, value: `initial-1`, rank: 1 }, + { id: 2, value: `initial-2`, rank: 2 }, + ] + : [ + { id: 3, value: `refetched-3`, rank: 1 }, + { id: 4, value: `refetched-4`, rank: 2 }, + ] + + cfg.begin() + items.forEach((item) => { + cfg.write({ type: `insert`, value: item }) + }) + cfg.commit() + resolve() + } + }) + }, + } + }, + }, + }) + + const topItems = createLiveQueryCollection((q) => + q + .from({ item: sourceCollection }) + .orderBy(({ item }) => item.rank, `asc`) + .limit(2), + ) + const baseTrackingEvents: Array = [] + + const unsubscribeBaseTracked = + sourceCollection.subscribeTrackedSourceRecords((changes) => { + baseTrackingEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + + const subscription = topItems.subscribeChanges(() => {}) + const preloadPromise = topItems.preload() + + await vi.waitFor(() => expect(loadSubsetCallCount).toBe(1)) + loadSubsetResolver?.() + await preloadPromise + + baseTrackingEvents.length = 0 + loadSubsetCallCount = 0 + + syncOps?.begin() + syncOps?.truncate() + syncOps?.commit() + + await vi.waitFor(() => expect(loadSubsetCallCount).toBe(1)) + expect(baseTrackingEvents).toEqual([]) + + loadSubsetResolver?.() + await flushPromises() + + expect(baseTrackingEvents).toEqual([ + { + added: [ + { collectionId: sourceCollection.id, key: 3 }, + { collectionId: sourceCollection.id, key: 4 }, + ], + removed: [ + { collectionId: sourceCollection.id, key: 1 }, + { collectionId: sourceCollection.id, key: 2 }, + ], + }, + ]) + expect( + sortTrackedSourceRecords(sourceCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: sourceCollection.id, key: 3 }, + { collectionId: sourceCollection.id, key: 4 }, + ]) + + subscription.unsubscribe() + unsubscribeBaseTracked() + }) + + it(`should expose live-query-local tracked source records via subscribeTrackedSourceRecords on the live query collection`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const liveQueryEvents: Array = [] + + const unsubscribeLiveQueryTracked = + activeUsers.subscribeTrackedSourceRecords((changes) => { + liveQueryEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + + const subscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + expect(liveQueryEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + liveQueryEvents.length = 0 + + usersCollection.insert({ id: 4, name: `David`, active: true }) + await flushPromises() + + expect(liveQueryEvents).toEqual([ + { + added: [{ collectionId: usersCollection.id, key: 4 }], + removed: [], + }, + ]) + + liveQueryEvents.length = 0 + + subscription.unsubscribe() + + // 1→0 subscriber transition replays the snapshot as removed. + expect(liveQueryEvents).toEqual([ + { + added: [], + removed: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + { collectionId: usersCollection.id, key: 4 }, + ], + }, + ]) + expect(activeUsers.getTrackedSourceRecords()).toEqual([]) + + unsubscribeLiveQueryTracked() + }) + + it(`should replay the live-query-local snapshot as initial state when includeInitialState is set`, async () => { + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + + const subscription = activeUsers.subscribeChanges(() => {}) + await activeUsers.preload() + + const liveQueryEvents: Array = [] + const unsubscribeLiveQueryTracked = + activeUsers.subscribeTrackedSourceRecords( + (changes) => { + liveQueryEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + { includeInitialState: true }, + ) + + expect(liveQueryEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + + expect( + sortTrackedSourceRecords(activeUsers.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ]) + + unsubscribeLiveQueryTracked() + subscription.unsubscribe() + }) + + it(`should expose direct source records for nested live queries`, async () => { + const activeUsers = createLiveQueryCollection({ + id: `tracked-active-users`, + query: (q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + }) + const activeUserNames = createLiveQueryCollection({ + id: `tracked-active-user-names`, + query: (q) => + q.from({ activeUser: activeUsers }).select(({ activeUser }) => ({ + id: activeUser.id, + name: activeUser.name, + })), + }) + const activeUsersEvents: Array = [] + const activeUserNamesEvents: Array = [] + + const unsubscribeActiveUsersTracked = + activeUsers.subscribeTrackedSourceRecords((changes) => { + activeUsersEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + const unsubscribeActiveUserNamesTracked = + activeUserNames.subscribeTrackedSourceRecords((changes) => { + activeUserNamesEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }) + + const activeUserNamesSubscription = activeUserNames.subscribeChanges( + () => {}, + ) + await activeUserNames.preload() + + expect(activeUsersEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + expect(activeUserNamesEvents).toEqual([ + { + added: [ + { collectionId: activeUsers.id, key: 1 }, + { collectionId: activeUsers.id, key: 2 }, + ], + removed: [], + }, + ]) + + expect( + sortTrackedSourceRecords(activeUsers.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ]) + expect( + sortTrackedSourceRecords(activeUserNames.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: activeUsers.id, key: 1 }, + { collectionId: activeUsers.id, key: 2 }, + ]) + + activeUserNamesSubscription.unsubscribe() + unsubscribeActiveUserNamesTracked() + unsubscribeActiveUsersTracked() + }) + + it(`should scope subscribeTrackedSourceRecords differently on base collection vs live query collection`, async () => { + // Two live queries with disjoint where-clauses against the same base. + // The base view sees the union of what each query is using; each + // live-query view sees only its own scope. Same method, different scope. + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)), + ) + const inactiveUsers = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, false)), + ) + + const baseEvents: Array = [] + const activeEvents: Array = [] + const inactiveEvents: Array = [] + + const unsubscribeBase = usersCollection.subscribeTrackedSourceRecords( + (changes) => { + baseEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + ) + const unsubscribeActive = activeUsers.subscribeTrackedSourceRecords( + (changes) => { + activeEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + ) + const unsubscribeInactive = inactiveUsers.subscribeTrackedSourceRecords( + (changes) => { + inactiveEvents.push({ + added: sortTrackedSourceRecords(changes.added), + removed: sortTrackedSourceRecords(changes.removed), + }) + }, + ) + + const activeSub = activeUsers.subscribeChanges(() => {}) + const inactiveSub = inactiveUsers.subscribeChanges(() => {}) + await Promise.all([activeUsers.preload(), inactiveUsers.preload()]) + + // Live-query-local views: each query sees only its own keys. + expect(activeEvents).toEqual([ + { + added: [ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ], + removed: [], + }, + ]) + expect(inactiveEvents).toEqual([ + { + added: [{ collectionId: usersCollection.id, key: 3 }], + removed: [], + }, + ]) + + // Base view: union of all live queries' tracked keys. + expect( + sortTrackedSourceRecords(usersCollection.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + { collectionId: usersCollection.id, key: 3 }, + ]) + + // Each live query reports only its own snapshot. + expect( + sortTrackedSourceRecords(activeUsers.getTrackedSourceRecords()), + ).toEqual([ + { collectionId: usersCollection.id, key: 1 }, + { collectionId: usersCollection.id, key: 2 }, + ]) + expect(inactiveUsers.getTrackedSourceRecords()).toEqual([ + { collectionId: usersCollection.id, key: 3 }, + ]) + + // Sanity: every base event should have arrived as one of the + // live-query events (the base view is the union of per-query deltas). + expect(baseEvents.length).toBeGreaterThan(0) + + activeSub.unsubscribe() + inactiveSub.unsubscribe() + unsubscribeBase() + unsubscribeActive() + unsubscribeInactive() + }) + describe(`compareOptions inheritance`, () => { it(`should inherit compareOptions from FROM collection`, () => { // Create a collection with non-default compareOptions diff --git a/packages/db/tests/tracked-source-records.test.ts b/packages/db/tests/tracked-source-records.test.ts new file mode 100644 index 000000000..b2caa58e3 --- /dev/null +++ b/packages/db/tests/tracked-source-records.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { TrackedSourceRecordsManager } from '../src/collection/tracked-source-records.js' +import { LiveQueryTrackedSourceRecordsAggregator } from '../src/query/live/tracked-source-records-aggregator.js' +import { mockSyncCollectionOptions } from './utils.js' +import type { TrackedSourceRecordsChange } from '../src/types.js' + +type User = { + id: number + name: string +} + +function createUsersCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `users`, + getKey: (user) => user.id, + initialData: [], + }), + ) +} + +describe(`TrackedSourceRecordsManager`, () => { + it(`nets overlapping additions and removals before applying them`, () => { + const manager = new TrackedSourceRecordsManager(`users`) + const changes: Array = [] + manager.subscribe((change) => changes.push(change)) + + manager.apply([1], [1]) + + expect(manager.get()).toEqual([]) + expect(changes).toEqual([]) + + manager.apply([1], []) + changes.length = 0 + + manager.apply([1], [1]) + + expect(manager.get()).toEqual([{ collectionId: `users`, key: 1 }]) + expect(changes).toEqual([]) + }) +}) + +describe(`LiveQueryTrackedSourceRecordsAggregator`, () => { + it(`nets overlapping additions and removals before notifying listeners`, () => { + const usersCollection = createUsersCollection() + const sourceChanges: Array = [] + const liveQueryChanges: Array = [] + const listeners = new Set<(change: TrackedSourceRecordsChange) => void>([ + (change) => liveQueryChanges.push(change), + ]) + const aggregator = new LiveQueryTrackedSourceRecordsAggregator( + { [usersCollection.id]: usersCollection }, + listeners, + ) + + usersCollection.subscribeTrackedSourceRecords((change) => + sourceChanges.push(change), + ) + aggregator.setExposed(true) + + aggregator.apply(usersCollection.id, [1], [1]) + + expect(aggregator.snapshot()).toEqual([]) + expect(usersCollection.getTrackedSourceRecords()).toEqual([]) + expect(liveQueryChanges).toEqual([]) + expect(sourceChanges).toEqual([]) + }) +}) diff --git a/packages/db/tests/utility-exposure.test.ts b/packages/db/tests/utility-exposure.test.ts index 9a4ab7ed7..dcae7ab0b 100644 --- a/packages/db/tests/utility-exposure.test.ts +++ b/packages/db/tests/utility-exposure.test.ts @@ -1,5 +1,6 @@ import { describe, expect, test } from 'vitest' import { createCollection } from '../src/collection/index.js' +import { createLiveQueryCollection } from '../src/query/index.js' import type { CollectionConfig, SyncConfig, UtilsRecord } from '../src/types' // Mock utility functions for testing @@ -99,4 +100,24 @@ describe(`Utility exposure pattern`, () => { // This is a compile-time check that we can't verify at runtime directly // But we've verified the utilities work }) + + test(`user-supplied utils override built-in helpers on name collision`, () => { + const source = createCollection({ + getKey: (item: { id: string }) => item.id, + sync: { + sync: () => {}, + }, + }) + + // User deliberately overrides getRunCount; their version should win. + const userGetRunCount = () => 42 + const utils = { getRunCount: userGetRunCount } + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ source }), + utils, + }) + + expect(liveQuery.utils.getRunCount).toBe(userGetRunCount) + expect(liveQuery.utils.getRunCount()).toBe(42) + }) })