-
Notifications
You must be signed in to change notification settings - Fork 210
feat(db): expose tracked source record subscriptions #1500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import { | |
| CollectionRequiresSyncConfigError, | ||
| } from '../errors' | ||
| import { currentStateAsChanges } from './change-events' | ||
| import { TrackedSourceRecordsManager } from './tracked-source-records.js' | ||
|
|
||
| import { CollectionStateManager } from './state' | ||
| import { CollectionChangesManager } from './changes' | ||
|
|
@@ -34,6 +35,9 @@ import type { | |
| SingleResult, | ||
| StringCollationConfig, | ||
| SubscribeChangesOptions, | ||
| SubscribeTrackedSourceRecordsOptions, | ||
| TrackedSourceRecord, | ||
| TrackedSourceRecordsChange, | ||
| Transaction as TransactionType, | ||
| UtilsRecord, | ||
| WritableDeep, | ||
|
|
@@ -266,7 +270,13 @@ export function createCollection( | |
| collection.utils = {} | ||
| } | ||
|
|
||
| return collection | ||
| return collection as unknown as Collection< | ||
| any, | ||
| string | number, | ||
| UtilsRecord, | ||
| any, | ||
| any | ||
| > | ||
| } | ||
|
|
||
| export class CollectionImpl< | ||
|
|
@@ -299,6 +309,22 @@ export class CollectionImpl< | |
| // The core state of the collection is "public" so that is accessible in tests | ||
| // and for debugging | ||
| public _state: CollectionStateManager<TOutput, TKey, TSchema, TInput> | ||
| // Aggregated view of source-records currently being used by active live | ||
| // queries that depend on this collection. Public so live-query aggregators | ||
| // can push deltas in. | ||
| public _trackedSourceRecords: TrackedSourceRecordsManager<TKey> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit weird to have underscored variables that are exposed publicly. We could replace the direct cross-class access with a method on |
||
| // For live-query collections only: a live-query-local view of "source | ||
| // records this query is currently using." Set by the live-query path | ||
| // during construction; undefined on base collections. When present, the | ||
| // public `getTrackedSourceRecords` / `subscribeTrackedSourceRecords` | ||
| // methods route to this view instead of `_trackedSourceRecords`. | ||
| public _liveQueryTrackedSourceView?: { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, it's a bit weird to have this public underscored variable. We could avoid this by passing the view through |
||
| snapshot: () => Array<TrackedSourceRecord> | ||
| subscribe: ( | ||
| callback: (change: TrackedSourceRecordsChange) => void, | ||
| options?: SubscribeTrackedSourceRecordsOptions, | ||
| ) => () => void | ||
| } | ||
|
|
||
| /** | ||
| * When set, collection consumers should defer processing incoming data | ||
|
|
@@ -354,6 +380,7 @@ export class CollectionImpl< | |
| this._mutations = new CollectionMutationsManager(config, this.id) | ||
| this._state = new CollectionStateManager(config) | ||
| this._sync = new CollectionSyncManager(config, this.id) | ||
| this._trackedSourceRecords = new TrackedSourceRecordsManager<TKey>(this.id) | ||
|
|
||
| this.comparisonOpts = buildCompareOptionsFromConfig(config) | ||
|
|
||
|
|
@@ -941,6 +968,42 @@ export class CollectionImpl< | |
| return this._changes.subscribeChanges(callback, options) | ||
| } | ||
|
|
||
| /** | ||
| * Snapshot of source records currently being tracked through this | ||
| * collection's data flow. | ||
| * | ||
| * On a base collection: the union of records OF this collection being | ||
| * used by any active live query. Each record appears once regardless | ||
| * of how many queries reference it. | ||
| * | ||
| * On a live query collection: the records FROM this query's source | ||
| * collections that the query is currently using. | ||
| * | ||
| * Both views answer "what source records are currently flowing through | ||
| * me," from opposite ends of the data-flow graph. | ||
| */ | ||
| public getTrackedSourceRecords(): Array<TrackedSourceRecord> { | ||
| return ( | ||
| this._liveQueryTrackedSourceView?.snapshot() ?? | ||
| this._trackedSourceRecords.get() | ||
| ) | ||
| } | ||
|
|
||
| /** | ||
| * Subscribe to changes in the set of source records tracked through this | ||
| * collection's data flow. See `getTrackedSourceRecords` for the per- | ||
| * collection-type semantics. | ||
| */ | ||
| public subscribeTrackedSourceRecords( | ||
| callback: (change: TrackedSourceRecordsChange) => void, | ||
| options?: SubscribeTrackedSourceRecordsOptions, | ||
| ): () => void { | ||
| if (this._liveQueryTrackedSourceView) { | ||
| return this._liveQueryTrackedSourceView.subscribe(callback, options) | ||
| } | ||
| return this._trackedSourceRecords.subscribe(callback, options) | ||
| } | ||
|
|
||
| /** | ||
| * Subscribe to a collection event | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| import type { | ||
| SubscribeTrackedSourceRecordsOptions, | ||
| TrackedSourceRecord, | ||
| TrackedSourceRecordsChange, | ||
| } from '../types.js' | ||
|
|
||
| type Entry<TKey> = { key: TKey; refCount: number } | ||
|
|
||
| /** | ||
| * Per-base-collection tracked source records manager. | ||
| * | ||
| * Refcounts over active live queries that depend on this collection. Each | ||
| * live query's aggregator pushes its net alias-level transitions here; this | ||
| * manager dedupes across queries and emits to subscribers only on true 0↔1 | ||
| * transitions. | ||
| */ | ||
| export class TrackedSourceRecordsManager< | ||
| TKey extends string | number = string | number, | ||
| > { | ||
| // Keys are primitives; use them directly as the Map key. No serialization. | ||
| private readonly entries = new Map<TKey, Entry<TKey>>() | ||
| private readonly listeners = new Set< | ||
| (change: TrackedSourceRecordsChange) => void | ||
| >() | ||
|
|
||
| constructor(private readonly collectionId: string) {} | ||
|
|
||
| apply(added: Iterable<TKey>, removed: Iterable<TKey>): void { | ||
| const netAdded: Array<TKey> = [] | ||
| const netRemoved: Array<TKey> = [] | ||
|
|
||
| for (const key of added) { | ||
| const existing = this.entries.get(key) | ||
| if (existing) { | ||
| existing.refCount++ | ||
| } else { | ||
| this.entries.set(key, { key, refCount: 1 }) | ||
| netAdded.push(key) | ||
| } | ||
| } | ||
|
|
||
| for (const key of removed) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can it happen that a key wasn't known until now and occurs in both
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I asked Claude about this, i agree with its answer and would go for adding built-in defense mechanism. Not from the current callers, but the manager doesn't enforce it. Tracing upward:
So end-to-end the precondition holds today. But if it were violated —
Net result: listeners get Two other overlap shapes are fine: if The aggregator's Two options:
I'd lean toward netting in the manager (it's the public-API boundary — callers reach through |
||
| const existing = this.entries.get(key) | ||
| if (!existing) continue | ||
| if (existing.refCount === 1) { | ||
| this.entries.delete(key) | ||
| netRemoved.push(existing.key) | ||
| } else { | ||
| existing.refCount-- | ||
| } | ||
| } | ||
|
|
||
| if (netAdded.length === 0 && netRemoved.length === 0) return | ||
| if (this.listeners.size === 0) return | ||
| const change: TrackedSourceRecordsChange = { | ||
| added: netAdded.map((key) => this.toRecord(key)), | ||
| removed: netRemoved.map((key) => this.toRecord(key)), | ||
| } | ||
| for (const listener of this.listeners) listener(change) | ||
| } | ||
|
|
||
| get(): Array<TrackedSourceRecord> { | ||
| return Array.from(this.entries.values(), ({ key }) => this.toRecord(key)) | ||
| } | ||
|
|
||
| subscribe( | ||
| callback: (change: TrackedSourceRecordsChange) => void, | ||
| options?: SubscribeTrackedSourceRecordsOptions, | ||
| ): () => void { | ||
| this.listeners.add(callback) | ||
| if (options?.includeInitialState && this.entries.size > 0) { | ||
| callback({ added: this.get(), removed: [] }) | ||
| } | ||
| return () => { | ||
| this.listeners.delete(callback) | ||
| } | ||
| } | ||
|
|
||
| private toRecord(key: TKey): TrackedSourceRecord { | ||
| return { collectionId: this.collectionId, key } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we pass these type args to
CollectionImpl(in particularUtilsRecord) such that we don't have to do an unsafe typecast here?