diff --git a/packages/common_client/lib/launchdarkly_common_client.dart b/packages/common_client/lib/launchdarkly_common_client.dart index 475948d7..50a01e3e 100644 --- a/packages/common_client/lib/launchdarkly_common_client.dart +++ b/packages/common_client/lib/launchdarkly_common_client.dart @@ -6,6 +6,21 @@ export 'src/ld_common_config.dart' AutoEnvAttributes, PollingConfig; +export 'src/config/data_system_config.dart' + show DataSystemConfig, ConnectionModeId; +export 'src/data_sources/fdv2/mode_definition.dart' + show + ModeDefinition, + EndpointConfig, + InitializerEntry, + SynchronizerEntry, + CacheInitializer, + PollingInitializer, + StreamingInitializer, + PollingSynchronizer, + StreamingSynchronizer, + Fdv1FallbackConfig; + export 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' show LDContext, diff --git a/packages/common_client/lib/src/config/data_system_config.dart b/packages/common_client/lib/src/config/data_system_config.dart new file mode 100644 index 00000000..4a7a9e53 --- /dev/null +++ b/packages/common_client/lib/src/config/data_system_config.dart @@ -0,0 +1,79 @@ +import '../data_sources/fdv2/mode_definition.dart'; + +// Maintainer note (not public API): ConnectionModeId is a sealed +// hierarchy rather than an enum so a custom-mode variant can be added +// later without changing this surface. The planned extension is a custom +// variant constructed as `ConnectionModeId.custom('my-mode')`: +// +// factory ConnectionModeId.custom(String name) = _CustomConnectionMode; +// final class _CustomConnectionMode extends ConnectionModeId { +// final String name; +// const _CustomConnectionMode(this.name); +// // value equality on name so it works as an override-map key +// } +// +// A custom mode is a distinct type from a built-in, so the two share no +// namespace: a custom id never equals a built-in id (even with the same +// name), and so cannot collide with a current or future built-in. The +// type is the namespace -- no name prefix is needed. This holds only +// while custom modes stay typed; if one is ever reduced to a bare string +// (logs, persistence) that reintroduces a shared string space where a +// prefix would matter again. +// +// Equality split: the built-in values are const singletons relying on +// canonical-instance identity, which lets a connectionModes map of only +// built-in keys be a const map. A runtime-constructed custom variant must +// carry value equality, so an override map holding a custom key would be +// non-const. The built-in variant therefore must not override +// `==`/`hashCode`. + +/// Identifies a built-in connection mode whose data-source pipeline can be +/// overridden through [DataSystemConfig.connectionModes]: [streaming], +/// [polling], [background], or [offline]. +sealed class ConnectionModeId { + const ConnectionModeId(); + + /// The built-in streaming mode. + static const ConnectionModeId streaming = _BuiltInConnectionMode('streaming'); + + /// The built-in polling mode. + static const ConnectionModeId polling = _BuiltInConnectionMode('polling'); + + /// The built-in background mode. + static const ConnectionModeId background = + _BuiltInConnectionMode('background'); + + /// The built-in offline mode. Its pipeline loads cached flags and runs + /// no synchronizer, so overriding it customizes how the SDK behaves + /// while offline (for example, the cache initializer it uses). + static const ConnectionModeId offline = _BuiltInConnectionMode('offline'); +} + +final class _BuiltInConnectionMode extends ConnectionModeId { + final String name; + + const _BuiltInConnectionMode(this.name); + + @override + String toString() => 'ConnectionModeId.$name'; +} + +/// Configuration for the FDv2 data system. +/// +/// Providing a [DataSystemConfig] (even an empty one) opts the SDK into +/// the FDv2 data acquisition protocol. When absent the SDK uses the +/// FDv1 data sources. +/// +/// This feature is not stable, and not subject to any backwards +/// compatibility guarantees or semantic versioning. It is in early +/// access. If you want access to this feature please join the EAP. +final class DataSystemConfig { + /// Overrides for built-in connection modes. A definition given here + /// replaces the built-in pipeline for that mode; modes not present keep + /// their built-in definition. + final Map connectionModes; + + const DataSystemConfig({ + this.connectionModes = const {}, + }); +} diff --git a/packages/common_client/lib/src/data_sources/data_manager.dart b/packages/common_client/lib/src/data_sources/data_manager.dart new file mode 100644 index 00000000..a8825d65 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/data_manager.dart @@ -0,0 +1,78 @@ +import 'dart:async'; + +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + show LDContext; + +import '../flag_manager/flag_manager.dart'; +import 'data_source_manager.dart'; + +/// Owns the data-acquisition strategy for an identify: how the cache is +/// loaded and when the identify resolves. The FDv1 and FDv2 protocols +/// diverge here, so each has its own implementation; everything else +/// (connection lifecycle, mode switching, event routing) is shared in the +/// [DataSourceManager] that both delegate to. +abstract interface class DataManager { + /// Brings the SDK to a usable state for [context], resolving when the + /// manager's data-availability strategy is satisfied. + /// + /// When [waitForNetworkResults] is true the returned future resolves + /// only once network (or otherwise fresh) data has arrived; otherwise it + /// may resolve as soon as cached data is available. + Future identify(LDContext context, + {required bool waitForNetworkResults}); +} + +/// FDv1 data manager. +/// +/// The cache is loaded imperatively at identify time via +/// [FlagManager.loadCached]. A cache hit resolves identify immediately +/// unless the caller is waiting for network results; either way the +/// network connection is started so live data follows. +final class FDv1DataManager implements DataManager { + final DataSourceManager _dataSourceManager; + final FlagManager _flagManager; + + FDv1DataManager(this._dataSourceManager, this._flagManager); + + @override + Future identify(LDContext context, + {required bool waitForNetworkResults}) async { + final completer = Completer(); + final loadedFromCache = await _flagManager.loadCached(context); + _dataSourceManager.identify(context, completer); + if (loadedFromCache && !waitForNetworkResults) { + return; + } + return completer.future; + } +} + +/// FDv2 data manager. +/// +/// The cache is not loaded at identify time; the data source pipeline's +/// cache initializer loads it as the first tier. Identify resolves on the +/// first delivered payload, or -- when waiting for network results -- only +/// on fresh data, so a cache load alone does not satisfy a wait-for-network +/// identify. +/// +/// Each identify starts data acquisition fresh: any held selector is +/// discarded via [clearSelector] before connecting, so the new connection +/// re-fetches a full payload rather than resuming a previous context's +/// basis. Mode switches keep the selector and reach the data source manager +/// directly rather than through here, so they are unaffected. +final class FDv2DataManager implements DataManager { + final DataSourceManager _dataSourceManager; + final void Function() _clearSelector; + + FDv2DataManager(this._dataSourceManager, this._clearSelector); + + @override + Future identify(LDContext context, + {required bool waitForNetworkResults}) { + _clearSelector(); + final completer = Completer(); + _dataSourceManager.identify(context, completer, + requireFreshData: waitForNetworkResults); + return completer.future; + } +} diff --git a/packages/common_client/lib/src/data_sources/data_source.dart b/packages/common_client/lib/src/data_sources/data_source.dart index af0f4f0b..6d9e8d0c 100644 --- a/packages/common_client/lib/src/data_sources/data_source.dart +++ b/packages/common_client/lib/src/data_sources/data_source.dart @@ -31,6 +31,13 @@ final class StatusEvent implements DataSourceEvent { {this.shutdown = false}); } +/// Emitted once by the FDv2 orchestrator when initialization is complete: +/// a selector-bearing payload arrived, the initializer chain was exhausted +/// (with cached data or in a cache-only system), or the first synchronizer +/// delivered a change set. The manager resolves a wait-for-network identify +/// on this; a cached identify resolves earlier, on the first applied payload. +final class InitializedEvent implements DataSourceEvent {} + abstract interface class DataSource { Stream get events; diff --git a/packages/common_client/lib/src/data_sources/data_source_event_handler.dart b/packages/common_client/lib/src/data_sources/data_source_event_handler.dart index c28daa77..01b15256 100644 --- a/packages/common_client/lib/src/data_sources/data_source_event_handler.dart +++ b/packages/common_client/lib/src/data_sources/data_source_event_handler.dart @@ -101,14 +101,13 @@ final class DataSourceEventHandler { /// /// Full change sets replace the stored flags, partial change sets apply /// each update, and a change set of type none confirms the SDK is up to - /// date without changing data. All three mark the data source valid. + /// date without changing data. Future handlePayload(LDContext context, ChangeSet changeSet, {String? environmentId}) async { try { await _flagManager.applyChanges( context, changeSet.updates, changeSet.type, environmentId: environmentId); - _statusManager.setValid(); return MessageStatus.messageHandled; } catch (err) { _logger.error('Failed to apply an FDv2 change set: ${err.runtimeType}'); diff --git a/packages/common_client/lib/src/data_sources/data_source_manager.dart b/packages/common_client/lib/src/data_sources/data_source_manager.dart index ceab262e..ec8cd0bc 100644 --- a/packages/common_client/lib/src/data_sources/data_source_manager.dart +++ b/packages/common_client/lib/src/data_sources/data_source_manager.dart @@ -38,6 +38,11 @@ final class DataSourceManager { Completer? _identifyCompleter; + /// When true, the active identify resolves only on fresh data, not on a + /// cache load. Set per identify from the caller's wait-for-network-results + /// preference. + bool _requireFreshData = false; + DataSourceManager({ ConnectionMode startingMode = ConnectionMode.streaming, required DataSourceStatusManager statusManager, @@ -61,8 +66,10 @@ final class DataSourceManager { _dataSourceFactories.addAll(factories); } - void identify(LDContext context, Completer completer) { + void identify(LDContext context, Completer completer, + {bool requireFreshData = false}) { _identifyCompleter = completer; + _requireFreshData = requireFreshData; _activeContext = context; _setupConnection(); @@ -92,6 +99,21 @@ final class DataSourceManager { _activeDataSource = null; } + /// Resolves the pending identify, if any. Idempotent: only the first call + /// completes it. Callers decide *when* to call it -- a cached identify on + /// the first applied payload, a wait-for-network identify on the + /// orchestrator's [InitializedEvent]. + void _maybeCompleteIdentify() { + final completer = _identifyCompleter; + if (completer == null) { + return; + } + if (!completer.isCompleted) { + completer.complete(); + } + _identifyCompleter = null; + } + DataSource? _createDataSource(FDv2ConnectionMode mode) { if (_activeContext != null) { if (_dataSourceFactories[mode] == null) { @@ -118,6 +140,11 @@ final class DataSourceManager { switch (_activeConnectionMode) { case FDv2Offline(): + // Report why the SDK is offline. When an offline data source is + // configured (the FDv2 data system supplies one) it then loads + // cached flags through the pipeline below; its payload does not + // drive the status to valid while offline, so this status stands. + // FDv1 has no offline factory, so offline stays status-only. switch (_offlineDetail) { case OfflineSetOffline(): _statusManager.setOffline(); @@ -126,7 +153,6 @@ final class DataSourceManager { case OfflineBackgroundDisabled(): _statusManager.setBackgroundDisabled(); } - return; case FDv2Streaming(): case FDv2Polling(): case FDv2Background(): @@ -146,22 +172,36 @@ final class DataSourceManager { var handled = await _dataSourceEventHandler.handleMessage( _activeContext!, event.type, event.data, environmentId: event.environmentId); - if (handled == MessageStatus.messageHandled && - _identifyCompleter != null) { - if (_identifyCompleter!.isCompleted) { - _logger.error('Identify was already complete before receiving ' - 'data. This could represent an issue with SDK logic. Please' - 'make a bug report if you encounter this situation.'); - } else { - _identifyCompleter!.complete(); - } + if (handled == MessageStatus.messageHandled) { + _maybeCompleteIdentify(); } - // Only need to complete this the first time. - _identifyCompleter = null; return handled; case PayloadEvent(): - // The FDv1 data sources this manager runs never produce FDv2 - // payload events. + var handled = await _dataSourceEventHandler.handlePayload( + _activeContext!, event.changeSet, + environmentId: event.environmentId); + if (handled == MessageStatus.messageHandled) { + // Applying any change set from a live source marks it valid -- + // including a no-change response, which restores valid after an + // interruption. While offline the status set in _setupConnection + // stands, so cached data does not report a live connection. + if (_activeConnectionMode is! FDv2Offline) { + _statusManager.setValid(); + } + // A cached identify resolves on any applied data; a + // wait-for-network identify waits for the orchestrator's + // InitializedEvent instead. + if (!_requireFreshData) { + _maybeCompleteIdentify(); + } + } + return handled; + case InitializedEvent(): + // Initialization is complete (network basis, initializer + // exhaustion, or the first synchronizer change set). Resolves a + // wait-for-network identify; a cached identify has usually resolved + // already on earlier data. + _maybeCompleteIdentify(); return MessageStatus.messageHandled; case StatusEvent(): if (_identifyCompleter != null && !_identifyCompleter!.isCompleted) { diff --git a/packages/common_client/lib/src/data_sources/fdv2/data_system.dart b/packages/common_client/lib/src/data_sources/fdv2/data_system.dart new file mode 100644 index 00000000..5f0d30ef --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/data_system.dart @@ -0,0 +1,169 @@ +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + hide ServiceEndpoints; +import 'package:meta/meta.dart'; + +import '../../config/data_system_config.dart'; +import '../../config/service_endpoints.dart'; +import '../../fdv2_connection_mode.dart'; +import '../data_source_manager.dart'; +import '../data_source_status_manager.dart'; +import 'built_in_modes.dart'; +import 'cache_initializer.dart'; +import 'entry_factories.dart'; +import 'fdv1_fallback_synchronizer.dart'; +import 'mode_definition.dart'; +import 'orchestrator.dart'; +import 'requestor.dart'; +import 'selector.dart'; +import 'source_factory_context.dart'; +import 'source_manager.dart'; + +/// Composes the FDv2 data source factories consumed by the +/// DataSourceManager and owns the selector, which must outlive any single +/// orchestrator instance. +/// +/// A fresh orchestrator is created per connection-mode switch and per +/// identify. The selector survives mode switches (initializers are +/// skipped when a selector is held). The data manager clears it via +/// [clearSelector] on each identify, so a new identify starts from a full +/// payload rather than resuming a prior state. +final class FDv2DataSystem { + final String _credential; + final LDLogger _logger; + final HttpProperties _httpProperties; + final ServiceEndpoints _serviceEndpoints; + final bool _withReasons; + final Duration _defaultPollingInterval; + final DataSourceStatusManager _statusManager; + final Map _connectionModeOverrides; + final CachedFlagsReader _cachedFlagsReader; + final FDv2SseClientFactory _sseClientFactory; + final HttpClientFactory? _httpClientFactory; + + Selector _selector = Selector.empty; + + FDv2DataSystem({ + required DataSystemConfig config, + required String credential, + required LDLogger logger, + required HttpProperties httpProperties, + required ServiceEndpoints serviceEndpoints, + required bool withReasons, + required Duration defaultPollingInterval, + required DataSourceStatusManager statusManager, + required CachedFlagsReader cachedFlagsReader, + FDv2SseClientFactory sseClientFactory = defaultSseClientFactory, + HttpClientFactory? httpClientFactory, + }) : _credential = credential, + _logger = logger, + _httpProperties = httpProperties, + _serviceEndpoints = serviceEndpoints, + _withReasons = withReasons, + _defaultPollingInterval = defaultPollingInterval, + _statusManager = statusManager, + _cachedFlagsReader = cachedFlagsReader, + _sseClientFactory = sseClientFactory, + _httpClientFactory = httpClientFactory, + _connectionModeOverrides = config.connectionModes; + + /// The built-in definition for each connection mode, before any override. + static const Map _builtInDefinitions = { + ConnectionModeId.streaming: BuiltInModes.streaming, + ConnectionModeId.polling: BuiltInModes.polling, + ConnectionModeId.background: BuiltInModes.background, + ConnectionModeId.offline: BuiltInModes.offline, + }; + + /// The definition for [mode]: the user's override if one was given for + /// it, otherwise the built-in default. + ModeDefinition _resolve(ConnectionModeId mode) { + if (_builtInDefinitions[mode] case final builtIn?) { + return _connectionModeOverrides[mode] ?? builtIn; + } + // Unreachable: ConnectionModeId is sealed over the built-in modes, each + // of which has an entry above. + throw StateError('No built-in definition for connection mode: $mode'); + } + + /// The resolved definition for [mode], exposed so tests can confirm that + /// an override is selected over the built-in. How a definition's entries + /// become concrete data sources is covered by the entry-factory tests. + @visibleForTesting + ModeDefinition resolvedDefinition(ConnectionModeId mode) => _resolve(mode); + + /// Discards the held selector so the next source re-fetches a full + /// payload from its initializers. Called when identifying a new context, + /// since a selector points at one context's data and cannot seed a delta + /// for another. Mode switches keep the selector and so do not call this. + void clearSelector() { + _selector = Selector.empty; + } + + /// Produces the factory map for the DataSourceManager. Offline is a + /// real pipeline mode: its data source runs the cache initializer with + /// no synchronizer, so the SDK serves cached flags while offline. The + /// manager reports the offline status itself; the offline source's + /// payload does not drive the status to valid. + Map buildFactories() { + return { + const FDv2Streaming(): + _factoryForMode(_resolve(ConnectionModeId.streaming)), + const FDv2Polling(): _factoryForMode(_resolve(ConnectionModeId.polling)), + const FDv2Background(): + _factoryForMode(_resolve(ConnectionModeId.background)), + const FDv2Offline(): _factoryForMode(_resolve(ConnectionModeId.offline)), + }; + } + + DataSourceFactory _factoryForMode(ModeDefinition modeDefinition) { + return (LDContext context) { + final factoryContext = SourceFactoryContext.fromClientConfig( + context: context, + credential: _credential, + logger: _logger, + httpProperties: _httpProperties, + serviceEndpoints: _serviceEndpoints, + withReasons: _withReasons, + defaultPollingInterval: _defaultPollingInterval, + // The cache initializer reads persistence through this reader and + // feeds the result into the pipeline. + cachedFlagsReader: _cachedFlagsReader, + httpClientFactory: _httpClientFactory, + ); + + // When a selector is held the SDK already has current data for this + // context; mode switches go straight to synchronizers. + final includeInitializers = _selector.isEmpty; + final initializerFactories = includeInitializers + ? buildInitializerFactories( + modeDefinition.initializers, factoryContext) + : []; + + final synchronizerSlots = buildSynchronizerFactories( + modeDefinition.synchronizers, factoryContext, + sseClientFactory: _sseClientFactory) + .map((factory) => SynchronizerSlot(factory: factory)) + .toList(); + + // The FDv1 fallback is the terminal tier: appended last, blocked until + // the server directs fallback. Its source never re-asserts the + // directive, so engaging it cannot loop. + if (modeDefinition.fdv1Fallback case final fallbackConfig?) { + synchronizerSlots.add(SynchronizerSlot( + factory: createFdv1FallbackSynchronizerFactory( + fallbackConfig, factoryContext), + isFdv1Fallback: true, + )); + } + + return FDv2DataSourceOrchestrator( + initializerFactories: initializerFactories, + synchronizerSlots: synchronizerSlots, + selectorGetter: () => _selector, + selectorUpdater: (selector) => _selector = selector, + statusManager: _statusManager, + logger: _logger, + ); + }; + } +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/fdv1_fallback_synchronizer.dart b/packages/common_client/lib/src/data_sources/fdv2/fdv1_fallback_synchronizer.dart new file mode 100644 index 00000000..f1d1fee8 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/fdv1_fallback_synchronizer.dart @@ -0,0 +1,121 @@ +import 'dart:convert'; + +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + hide ServiceEndpoints; + +import '../../config/data_source_config.dart'; +import '../../item_descriptor.dart'; +import '../data_source.dart' + show + DataEvent, + DataSourceEvent, + InitializedEvent, + PayloadEvent, + StatusEvent; +import '../requestor.dart' as fdv1; +import 'entry_factories.dart' show SynchronizerFactory, mergeServiceEndpoints; +import 'mode_definition.dart'; +import 'payload.dart'; +import 'polling_synchronizer.dart'; +import 'selector.dart'; +import 'source.dart'; +import 'source_factory_context.dart'; +import 'source_result.dart'; + +/// Builds the FDv1 fallback synchronizer: an FDv1 poller whose responses are +/// translated into FDv2 change sets. +/// +/// Engaged only when the server directs fallback (the `x-ld-fd-fallback` +/// response header, detected by the primary FDv2 sources). It is the terminal +/// tier and never re-asserts the fallback directive -- every result it emits +/// carries `fdv1Fallback: false`. +/// +/// FDv1 has no delta protocol, so it polls with the context in the path and +/// each response is a complete flag set, translated to a `full` change set +/// with no selector. +SynchronizerFactory createFdv1FallbackSynchronizerFactory( + Fdv1FallbackConfig config, + SourceFactoryContext ctx, +) { + final endpoints = + mergeServiceEndpoints(ctx.serviceEndpoints, config.endpoints); + final interval = config.pollInterval ?? ctx.defaultPollingInterval; + final pollingConfig = PollingDataSourceConfig( + useReport: false, + withReasons: ctx.withReasons, + pollingInterval: interval, + ); + + return SynchronizerFactory( + create: (SelectorGetter selectorGetter) { + final requestor = fdv1.Requestor( + logger: ctx.logger, + contextString: base64UrlEncode(utf8.encode(ctx.contextJson)), + method: RequestMethod.get, + httpProperties: ctx.httpProperties, + credential: ctx.credential, + endpoints: endpoints, + dataSourceConfig: pollingConfig, + httpClientFactory: ctx.httpClientFactory ?? _defaultHttpClientFactory, + ); + return FDv2PollingSynchronizer( + // FDv1 has no basis/selector, so the selector argument is ignored. + poll: ({Selector basis = Selector.empty}) async => + _translate(await requestor.requestAllFlags()), + selectorGetter: selectorGetter, + interval: interval, + logger: ctx.logger, + ); + }, + ); +} + +HttpClient _defaultHttpClientFactory(HttpProperties httpProperties) => + HttpClient(httpProperties: httpProperties); + +/// Translates an FDv1 polling result into an FDv2 source result. Results +/// always carry `fdv1Fallback: false` (the default), so the fallback tier can +/// never re-trigger fallback. +FDv2SourceResult _translate(DataSourceEvent? event) { + switch (event) { + case null: + // 304 Not Modified: the SDK's data is confirmed current. + return const ChangeSetResult( + changeSet: ChangeSet(type: PayloadType.none, updates: {}), + persist: false, + ); + case DataEvent(): + try { + final results = + LDEvaluationResultsSerialization.fromJson(jsonDecode(event.data)); + final updates = results.map((key, value) => + MapEntry(key, ItemDescriptor(version: value.version, flag: value))); + // FDv1 carries no selector; every poll is a complete snapshot. + return ChangeSetResult( + changeSet: ChangeSet(type: PayloadType.full, updates: updates), + environmentId: event.environmentId, + persist: true, + ); + } catch (_) { + return const StatusResult( + state: SourceState.interrupted, + message: 'Could not parse FDv1 fallback payload', + ); + } + case StatusEvent(): + return StatusResult( + state: event.shutdown + ? SourceState.terminalError + : SourceState.interrupted, + message: event.message, + statusCode: event.statusCode?.toInt(), + ); + case PayloadEvent(): + case InitializedEvent(): + // The FDv1 requestor only produces DataEvent / StatusEvent / null. + return const StatusResult( + state: SourceState.interrupted, + message: 'Unexpected event from the FDv1 fallback poller', + ); + } +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/orchestrator.dart b/packages/common_client/lib/src/data_sources/fdv2/orchestrator.dart index 2c3caf4b..82e427f1 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/orchestrator.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/orchestrator.dart @@ -62,6 +62,12 @@ final class FDv2DataSourceOrchestrator implements DataSource { bool _started = false; bool _closed = false; bool _emittedPayload = false; + bool _initialized = false; + + /// True when the only sources are cache initializers (no synchronizers). + /// Such a system must still reach a usable state on a cache miss, so an + /// empty payload is emitted when no data was produced. + final bool _cacheOnlyDataSystem; /// Resolves the outcome of the active synchronizer run. Set while a /// synchronizer is running; [restart] and [stop] use it to interrupt @@ -86,6 +92,9 @@ final class FDv2DataSourceOrchestrator implements DataSource { _recoveryTimeout = recoveryTimeout, _recycleDelay = recycleDelay, _logger = logger.subLogger('FDv2Orchestrator'), + _cacheOnlyDataSystem = initializerFactories.isNotEmpty && + initializerFactories.every((f) => f.isCache) && + synchronizerSlots.isEmpty, _sourceManager = SourceManager( initializerFactories: initializerFactories, synchronizerSlots: synchronizerSlots, @@ -142,10 +151,10 @@ final class FDv2DataSourceOrchestrator implements DataSource { // An intent of "none" means the SDK is already up to date; it carries // no selector and must not regress the one we hold. For any other // type the payload's selector is adopted verbatim, including an empty - // one -- a selector-less full transfer (an FDv1 fallback payload, - // whose state cannot serve as an FDv2 basis) must clear the held - // selector so the next request sends no stale basis. Do not gate this - // on a non-empty selector. + // one -- a selector-less full transfer (e.g. an FDv1 fallback payload, + // whose state cannot drive FDv2 deltas) clears the held selector so the + // next request asks for a full payload rather than a stale delta. Do + // not gate this on a non-empty selector. if (result.changeSet.type != PayloadType.none) { _selectorUpdater(result.changeSet.selector); } @@ -173,10 +182,22 @@ final class FDv2DataSourceOrchestrator implements DataSource { .add(StatusEvent(ErrorKind.unknown, null, message, shutdown: true)); } + /// Signals that initialization is complete. Emitted at most once -- the + /// manager resolves a wait-for-network identify on it. + void _emitInitialized() { + if (_initialized || _closed || _controller.isClosed) return; + _initialized = true; + _controller.add(InitializedEvent()); + } + /// True when the source indicated an FDv1 fallback directive and a - /// fallback tier exists to engage. + /// fallback tier exists to engage. The directive is ignored when the + /// FDv1 fallback synchronizer is already the active source, so it cannot + /// loop (the fallback source also never re-asserts the directive). bool _handleFdv1Fallback(FDv2SourceResult result) { - if (result.fdv1Fallback && _sourceManager.hasFdv1FallbackConfigured) { + if (result.fdv1Fallback && + _sourceManager.hasFdv1FallbackConfigured && + !_sourceManager.isCurrentSynchronizerFdv1Fallback) { _logger.warn('Server directed fallback to FDv1; engaging the FDv1 ' 'fallback synchronizer.'); _sourceManager.engageFdv1Fallback(); @@ -187,6 +208,7 @@ final class FDv2DataSourceOrchestrator implements DataSource { Future _runInitializers() async { var errorDuringInit = false; + var dataReceived = false; while (!_closed) { final initializer = _sourceManager.nextInitializer(); @@ -201,18 +223,22 @@ final class FDv2DataSourceOrchestrator implements DataSource { case ChangeSetResult(): if (result.changeSet.type != PayloadType.none) { _emitPayload(result); + dataReceived = true; if (_handleFdv1Fallback(result)) { // Data was received but the server directed FDv1 fallback; - // move on to synchronizers where the fallback tier runs. + // move on to synchronizers where the fallback tier runs and + // its first change set completes initialization. return; } if (result.changeSet.selector.isNotEmpty) { - // Basis data with a selector: initialization is complete. + // A selector means a complete, server-versioned payload: + // initialization is done. A selector-less payload (e.g. cache) + // is applied, but we keep initializing toward network data. + _emitInitialized(); return; } - // Data without a selector (e.g. cache); keep initializing. } case StatusResult(): switch (result.state) { @@ -235,30 +261,34 @@ final class FDv2DataSourceOrchestrator implements DataSource { if (_closed) return; - // All initializers exhausted. A data system whose only sources are - // cache initializers must still complete initialization on a cache - // miss -- there is nowhere else for data to come from. Emit an empty - // payload so the pipeline reaches a valid state, unless an error has - // already been reported. - final cacheOnlyDataSystem = _initializerFactories.isNotEmpty && - _initializerFactories.every((f) => f.isCache) && - _synchronizerSlots.isEmpty; - if (cacheOnlyDataSystem && !_emittedPayload && !errorDuringInit) { + // All initializers exhausted. A cache-only system (no synchronizer to + // produce data on its own) must still surface something on a cache + // miss, so a cached identify has a payload to resolve on; emit an empty + // payload unless an error was already reported. + if (_cacheOnlyDataSystem && !_emittedPayload && !errorDuringInit) { _emitPayload(const ChangeSetResult( changeSet: ChangeSet(type: PayloadType.none, updates: {}), persist: false, )); } + + // Initialization completes at exhaustion when there is no synchronizer + // to wait on (cache-only), or when an initializer already delivered data + // without a selector. Otherwise the first synchronizer completes it. + if (_cacheOnlyDataSystem || dataReceived) { + _emitInitialized(); + } } Future _runSynchronizers() async { - // A data system with no sources at all has nothing to do; an empty - // payload marks it valid so a pending identify completes. + // A data system with no sources at all has nothing to do; complete + // initialization so a pending identify resolves. if (_initializerFactories.isEmpty && _synchronizerSlots.isEmpty) { _emitPayload(const ChangeSetResult( changeSet: ChangeSet(type: PayloadType.none, updates: {}), persist: false, )); + _emitInitialized(); return; } @@ -350,6 +380,9 @@ final class FDv2DataSourceOrchestrator implements DataSource { switch (result) { case ChangeSetResult(): _emitPayload(result); + // The first synchronizer change set -- of any type, with or + // without a selector -- completes initialization. + _emitInitialized(); case StatusResult(): switch (result.state) { case SourceState.interrupted: diff --git a/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart b/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart index 0d0f631f..41dbfa03 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart @@ -203,6 +203,15 @@ final class SourceManager { bool get hasFdv1FallbackConfigured => _synchronizerSlots.any((slot) => slot.isFdv1Fallback); + /// True when the active synchronizer is the FDv1 fallback. Reads the scan + /// cursor, so it is only meaningful while the cursor identifies the active + /// slot (see the class contract). Used to ignore a repeat fallback + /// directive once the SDK is already on FDv1. + bool get isCurrentSynchronizerFdv1Fallback => + _synchronizerIndex >= 0 && + _synchronizerIndex < _synchronizerSlots.length && + _synchronizerSlots[_synchronizerIndex].isFdv1Fallback; + /// Close the active source and mark the manager as shut down. void close() { _shutdown = true; diff --git a/packages/common_client/lib/src/data_sources/polling_data_source.dart b/packages/common_client/lib/src/data_sources/polling_data_source.dart index 555da33c..f900a325 100644 --- a/packages/common_client/lib/src/data_sources/polling_data_source.dart +++ b/packages/common_client/lib/src/data_sources/polling_data_source.dart @@ -117,7 +117,8 @@ final class PollingDataSource implements DataSource { stop(); } case PayloadEvent(): - // The FDv1 requestor never produces FDv2 payload events. + case InitializedEvent(): + // The FDv1 requestor never produces FDv2 payload or lifecycle events. break; } diff --git a/packages/common_client/lib/src/data_sources/streaming_data_source.dart b/packages/common_client/lib/src/data_sources/streaming_data_source.dart index 6796b152..fec2d4d3 100644 --- a/packages/common_client/lib/src/data_sources/streaming_data_source.dart +++ b/packages/common_client/lib/src/data_sources/streaming_data_source.dart @@ -174,7 +174,9 @@ final class StreamingDataSource implements DataSource { _logger.error('$message: $argument'); _dataController.sink.add(res); case PayloadEvent(): - // The FDv1 requestor never produces FDv2 payload events. + case InitializedEvent(): + // The FDv1 requestor never produces FDv2 payload or lifecycle + // events. break; } } else { diff --git a/packages/common_client/lib/src/flag_manager/flag_manager.dart b/packages/common_client/lib/src/flag_manager/flag_manager.dart index 1e9c7973..381eb479 100644 --- a/packages/common_client/lib/src/flag_manager/flag_manager.dart +++ b/packages/common_client/lib/src/flag_manager/flag_manager.dart @@ -72,6 +72,14 @@ final class FlagManager { return _flagPersistence.loadCached(context); } + /// Reads cached values from persistence without applying them to the + /// store. Used by the FDv2 cache initializer, which loads the cache + /// through the data source pipeline rather than at identify time. + Future<({Map flags, String? environmentId})?> + readCached(LDContext context) async { + return _flagPersistence.readCached(context); + } + /// A broadcast stream which emits events as flag changes occur based either /// on loading cached values or updates from the data source. Stream get changes => _flagUpdater.changes; diff --git a/packages/common_client/lib/src/flag_manager/flag_persistence.dart b/packages/common_client/lib/src/flag_manager/flag_persistence.dart index 7ce0477c..e0ed7f68 100644 --- a/packages/common_client/lib/src/flag_manager/flag_persistence.dart +++ b/packages/common_client/lib/src/flag_manager/flag_persistence.dart @@ -81,12 +81,20 @@ final class FlagPersistence { return false; } - Future loadCached(LDContext context) async { + /// Reads the cached flag state for [context] from persistence without + /// applying it to the store. Returns null on a cache miss, an + /// unreadable entry, or a parse failure. + /// + /// The FDv2 data system loads the cache through its cache initializer + /// rather than the [loadCached] apply-at-identify path, so it needs the + /// parsed flags back rather than a side effect on the store. + Future<({Map flags, String? environmentId})?> + readCached(LDContext context) async { final json = await _persistence?.read( _environmentKey, encodePersistenceKey(context.canonicalKey)); if (json == null) { - return false; + return null; } final environmentId = await _persistence?.read(_environmentKey, _envIdKey); @@ -94,18 +102,26 @@ final class FlagPersistence { try { final flagConfig = LDEvaluationResultsSerialization.fromJson(jsonDecode(json)); - - _updater.initCached( - context, - flagConfig.map((key, value) => MapEntry( - key, ItemDescriptor(version: value.version, flag: value))), - environmentId: environmentId); - _logger.debug('Loaded a cached flag config from persistence.'); - return true; + return (flags: flagConfig, environmentId: environmentId); } catch (e) { _logger.warn('Could not load cached flag values for context: $e'); + return null; + } + } + + Future loadCached(LDContext context) async { + final cached = await readCached(context); + if (cached == null) { return false; } + + _updater.initCached( + context, + cached.flags.map((key, value) => + MapEntry(key, ItemDescriptor(version: value.version, flag: value))), + environmentId: cached.environmentId); + _logger.debug('Loaded a cached flag config from persistence.'); + return true; } Future _loadIndex() async { diff --git a/packages/common_client/lib/src/ld_common_client.dart b/packages/common_client/lib/src/ld_common_client.dart index f13bf3c2..ff28ab99 100644 --- a/packages/common_client/lib/src/ld_common_client.dart +++ b/packages/common_client/lib/src/ld_common_client.dart @@ -16,9 +16,11 @@ import 'context_modifiers/context_modifier.dart'; import 'context_modifiers/env_context_modifier.dart'; import 'hooks/hook.dart'; import 'hooks/hook_runner.dart'; +import 'data_sources/data_manager.dart'; import 'data_sources/data_source.dart'; import 'data_sources/data_source_event_handler.dart'; import 'data_sources/fdv2/built_in_modes.dart'; +import 'data_sources/fdv2/data_system.dart'; import 'data_sources/data_source_manager.dart'; import 'data_sources/data_source_status.dart'; import 'data_sources/data_source_status_manager.dart'; @@ -204,6 +206,11 @@ final class LDCommonClient { final CommonPlatform _platform; late final DataSourceManager _dataSourceManager; + + /// Owns the per-protocol identify strategy (cache load + resolution). + /// Selected from [_config.dataSystem]: FDv2 when a data system is + /// configured, otherwise FDv1. + late final DataManager _dataManager; late final EnvironmentReport _envReport; late final AsyncSingleQueue _identifyQueue = AsyncSingleQueue(); late final DataSourceFactoriesFn _dataSourceFactories; @@ -421,10 +428,33 @@ final class LDCommonClient { _updateEventSendingState(); if (!_config.offline) { - _dataSourceManager.setFactories(_composeFactoriesForManager( - fdv1Factories: _dataSourceFactories(_config, _logger, httpProperties), - backgroundFactory: _backgroundFactory(_config, _logger, httpProperties), - )); + if (_config.dataSystem case final dataSystemConfig?) { + final dataSystem = FDv2DataSystem( + config: dataSystemConfig, + credential: _config.sdkCredential, + logger: _logger, + httpProperties: httpProperties, + serviceEndpoints: _config.serviceEndpoints, + withReasons: _config.dataSourceConfig.evaluationReasons, + defaultPollingInterval: + _config.dataSourceConfig.polling.pollingInterval, + statusManager: _dataSourceStatusManager, + cachedFlagsReader: _flagManager.readCached, + ); + _dataSourceManager.setFactories(dataSystem.buildFactories()); + // FDv2 loads the cache through its pipeline and clears the held + // selector on a context change. + _dataManager = + FDv2DataManager(_dataSourceManager, dataSystem.clearSelector); + } else { + _dataSourceManager.setFactories(_composeFactoriesForManager( + fdv1Factories: _dataSourceFactories(_config, _logger, httpProperties), + backgroundFactory: + _backgroundFactory(_config, _logger, httpProperties), + )); + // FDv1 loads the cache imperatively at identify. + _dataManager = FDv1DataManager(_dataSourceManager, _flagManager); + } } else { DataSource nullSource(LDContext _) => NullDataSource(); _dataSourceManager.setFactories({ @@ -432,6 +462,9 @@ final class LDCommonClient { const FDv2Polling(): nullSource, const FDv2Background(): nullSource, }); + // Fully offline serves cached flags directly at identify; the data + // manager is not exercised, but assign one so the field is set. + _dataManager = FDv1DataManager(_dataSourceManager, _flagManager); } } @@ -533,19 +566,18 @@ final class LDCommonClient { final afterIdentify = _hookRunner.identify(_context); hookCallback(afterIdentify); - final completer = Completer(); _eventProcessor?.processIdentifyEvent(IdentifyEvent(context: _context)); - final loadedFromCache = await _flagManager.loadCached(_context); if (_config.offline) { + // Fully offline: there is no data source to run, so load the cache + // directly to serve flags. (Distinct from the offline connection + // mode, whose pipeline loads the cache for the FDv2 data system.) + await _flagManager.loadCached(_context); return; } - _dataSourceManager.identify(_context, completer); - if (loadedFromCache && !waitForNetworkResults) { - return; - } - return completer.future; + return _dataManager.identify(_context, + waitForNetworkResults: waitForNetworkResults); } /// Returns the value of flag [flagKey] for the current context as a bool. diff --git a/packages/common_client/lib/src/ld_common_config.dart b/packages/common_client/lib/src/ld_common_config.dart index e62aa295..e175c0c0 100644 --- a/packages/common_client/lib/src/ld_common_config.dart +++ b/packages/common_client/lib/src/ld_common_config.dart @@ -1,13 +1,15 @@ import 'dart:collection'; import 'dart:math'; -import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + hide ServiceEndpoints; import 'hooks/hook.dart'; +import 'config/data_system_config.dart'; import 'config/defaults/default_config.dart'; import 'config/events_config.dart'; import 'connection_mode.dart'; -import 'config/service_endpoints.dart' as client_endpoints; +import 'config/service_endpoints.dart'; /// Configuration which affects how the SDK uses persistence. final class PersistenceConfig { @@ -132,6 +134,16 @@ abstract class LDCommonConfig { /// An initial list of hooks. final UnmodifiableListView? hooks; + /// Configuration for the FDv2 data system. Providing this (even an + /// empty configuration) opts the SDK into the FDv2 data acquisition + /// protocol. + /// + /// This feature is not stable, and not subject to any backwards + /// compatibility guarantees or semantic versioning. It is in early + /// access. If you want access to this feature please join the EAP. + /// https://launchdarkly.com/docs/sdk/features/data-saving-mode + final DataSystemConfig? dataSystem; + LDCommonConfig(this.sdkCredential, this.autoEnvAttributes, {this.applicationInfo, HttpProperties? httpProperties, @@ -143,10 +155,10 @@ abstract class LDCommonConfig { DataSourceConfig? dataSourceConfig, bool? allAttributesPrivate, List? globalPrivateAttributes, - List? hooks}) + List? hooks, + this.dataSystem}) : httpProperties = httpProperties ?? HttpProperties(), - serviceEndpoints = - serviceEndpoints ?? client_endpoints.ServiceEndpoints(), + serviceEndpoints = serviceEndpoints ?? ServiceEndpoints(), events = events ?? EventsConfig(), persistence = persistence ?? PersistenceConfig(), offline = offline ?? DefaultConfig.defaultOffline, diff --git a/packages/common_client/pubspec.yaml b/packages/common_client/pubspec.yaml index 1242bf51..7efc6d8b 100644 --- a/packages/common_client/pubspec.yaml +++ b/packages/common_client/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: launchdarkly_dart_common: 1.8.1 launchdarkly_event_source_client: 2.2.0 crypto: ^3.0.3 + meta: ^1.12.0 uuid: ">= 3.0.7 <5.0.0" dev_dependencies: diff --git a/packages/common_client/test/data_sources/data_manager_test.dart b/packages/common_client/test/data_sources/data_manager_test.dart new file mode 100644 index 00000000..918e967d --- /dev/null +++ b/packages/common_client/test/data_sources/data_manager_test.dart @@ -0,0 +1,66 @@ +import 'dart:async'; + +import 'package:launchdarkly_common_client/src/data_sources/data_manager.dart'; +import 'package:launchdarkly_common_client/src/data_sources/data_source_event_handler.dart'; +import 'package:launchdarkly_common_client/src/data_sources/data_source_manager.dart'; +import 'package:launchdarkly_common_client/src/data_sources/data_source_status_manager.dart'; +import 'package:launchdarkly_common_client/src/flag_manager/flag_manager.dart'; +import 'package:launchdarkly_common_client/src/offline_detail.dart'; +import 'package:launchdarkly_common_client/src/resolved_connection_mode.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:test/test.dart'; + +/// A data source manager with no factories. Its identify is a no-op +/// connection-wise (no factory builds a source), which is all these tests +/// need: they exercise the data manager's own logic, not the connection. +DataSourceManager _managerWithoutFactories() { + final logger = LDLogger(level: LDLogLevel.none); + final statusManager = DataSourceStatusManager(); + return DataSourceManager( + statusManager: statusManager, + dataSourceEventHandler: DataSourceEventHandler( + flagManager: FlagManager( + sdkKey: 'sdk-key', maxCachedContexts: 5, logger: logger), + statusManager: statusManager, + logger: logger), + logger: logger, + ); +} + +LDContext _ctx(String key) => LDContextBuilder().kind('user', key).build(); + +void main() { + group('FDv2DataManager', () { + test('clears the selector on every identify', () { + var clears = 0; + final manager = + FDv2DataManager(_managerWithoutFactories(), () => clears++); + + // The returned futures never complete (no factory delivers data); we + // only care that each identify starts fresh. + unawaited(manager.identify(_ctx('a'), waitForNetworkResults: false)); + unawaited(manager.identify(_ctx('a'), waitForNetworkResults: false)); + unawaited(manager.identify(_ctx('b'), waitForNetworkResults: false)); + + // Every identify clears, including re-identifying the same context. + expect(clears, 3); + }); + + test('mode switches do not clear the selector, only identifies do', () { + // The clear is driven at identify time. Mode switches reach the data + // source manager directly (not this manager), so they keep the held + // selector and resume rather than re-initializing. + var clears = 0; + final dataSourceManager = _managerWithoutFactories(); + final manager = FDv2DataManager(dataSourceManager, () => clears++); + + unawaited(manager.identify(_ctx('a'), waitForNetworkResults: false)); + dataSourceManager.setMode(const ResolvedOffline(OfflineSetOffline())); + unawaited(manager.identify(_ctx('b'), waitForNetworkResults: false)); + dataSourceManager.setMode(const ResolvedStreaming()); + + // Two identifies cleared; the offline/streaming switches did not. + expect(clears, 2); + }); + }); +} diff --git a/packages/common_client/test/data_sources/data_source_event_handler_test.dart b/packages/common_client/test/data_sources/data_source_event_handler_test.dart index 488a4f0c..e20a8ced 100644 --- a/packages/common_client/test/data_sources/data_source_event_handler_test.dart +++ b/packages/common_client/test/data_sources/data_source_event_handler_test.dart @@ -303,13 +303,7 @@ void main() { ), ); - test('a full change set replaces the stored flags and sets valid', - () async { - expectLater( - statusManager!.changes, - emits(DataSourceStatus( - state: DataSourceState.valid, stateSince: DateTime(2)))); - + test('a full change set replaces the stored flags', () async { await eventHandler!.handlePayload(context, ChangeSet(type: PayloadType.full, updates: {'flagA': flagEval(1)})); @@ -329,7 +323,7 @@ void main() { test( 'a partial change set applies updates without per-item version ' - 'comparison and sets valid', () async { + 'comparison', () async { await eventHandler!.handlePayload(context, ChangeSet(type: PayloadType.full, updates: {'flagA': flagEval(7)})); @@ -348,7 +342,7 @@ void main() { expect(updated.detail.value, LDValue.ofBool(false)); }); - test('a change set of none changes no data and sets valid', () async { + test('a change set of none changes no data', () async { await eventHandler!.handlePayload(context, ChangeSet(type: PayloadType.full, updates: {'flagA': flagEval(1)})); diff --git a/packages/common_client/test/data_sources/data_source_manager_test.dart b/packages/common_client/test/data_sources/data_source_manager_test.dart index e2d518da..4b418035 100644 --- a/packages/common_client/test/data_sources/data_source_manager_test.dart +++ b/packages/common_client/test/data_sources/data_source_manager_test.dart @@ -6,23 +6,33 @@ import 'package:launchdarkly_common_client/src/data_sources/data_source_event_ha import 'package:launchdarkly_common_client/src/data_sources/data_source_manager.dart'; import 'package:launchdarkly_common_client/src/data_sources/data_source_status.dart'; import 'package:launchdarkly_common_client/src/data_sources/data_source_status_manager.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/selector.dart'; import 'package:launchdarkly_common_client/src/flag_manager/flag_manager.dart'; +import 'package:launchdarkly_common_client/src/item_descriptor.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:test/test.dart'; final class MockDataSource implements DataSource { final StreamController controller = StreamController(); + final List _startEvents; bool startCalled = false; bool stopCalled = false; bool restartCalled = false; + MockDataSource({List? startEvents}) + : _startEvents = startEvents ?? [DataEvent('put', '{}')]; + @override Stream get events => controller.stream; @override void start() { startCalled = true; - controller.sink.add(DataEvent('put', '{}')); + for (final event in _startEvents) { + controller.sink.add(event); + } } @override @@ -112,6 +122,78 @@ void main() { )); }); + test('it applies an FDv2 payload event and completes identify', () async { + final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); + final context = LDContextBuilder().kind('user', 'bob').build(); + final changeSet = ChangeSet( + selector: const Selector(state: 'state-1', version: 1), + type: PayloadType.full, + updates: { + 'flag-a': ItemDescriptor( + version: 3, + flag: LDEvaluationResult( + version: 3, + detail: LDEvaluationDetail( + LDValue.ofBool(true), 0, LDEvaluationReason.off()), + ), + ), + }); + final factories = { + const FDv2Streaming(): (_) => + MockDataSource(startEvents: [PayloadEvent(changeSet)]), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + }; + final manager = + makeManager(context, factories, inStatusManager: statusManager); + + expectLater( + statusManager.changes, + emits(DataSourceStatus( + state: DataSourceState.valid, stateSince: DateTime(1)))); + + final completer = Completer(); + manager.identify(context, completer); + + // The network payload (carrying a selector) reaches handlePayload, which + // applies the change set; the manager marks the source valid and + // completes the pending identify. (A dropped/no-op payload would leave + // the identify hanging.) + await completer.future; + }); + + test('a no-change payload after an interruption restores valid', () async { + final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); + final context = LDContextBuilder().kind('user', 'bob').build(); + const networkBasis = ChangeSet( + selector: Selector(state: 'state-1', version: 1), + type: PayloadType.full, + updates: {}); + const noChange = ChangeSet(type: PayloadType.none, updates: {}); + final factories = { + const FDv2Streaming(): (_) => MockDataSource(startEvents: [ + // Healthy connection delivers basis data, then drops, then + // reconnects and reports no changes. + PayloadEvent(networkBasis), + StatusEvent(ErrorKind.networkError, null, 'connection dropped'), + PayloadEvent(noChange), + ]), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + }; + final manager = + makeManager(context, factories, inStatusManager: statusManager); + + final completer = Completer(); + manager.identify(context, completer); + await completer.future; + await pumpEventQueue(); + + expect(statusManager.status.state, DataSourceState.valid, + reason: 'a healthy reconnect reporting no changes carries no selector, ' + 'but it is still a server response and must restore valid'); + }); + test('it can transition to offline and tear-down the previous connection', () { final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); @@ -250,4 +332,116 @@ void main() { expect(createdDataSource.controller.hasListener, isTrue); expect(createdDataSource.restartCalled, isTrue); }); + + ChangeSet aChangeSet({Selector selector = Selector.empty}) => + ChangeSet(selector: selector, type: PayloadType.full, updates: { + 'flag-a': ItemDescriptor( + version: 3, + flag: LDEvaluationResult( + version: 3, + detail: LDEvaluationDetail( + LDValue.ofBool(true), 0, LDEvaluationReason.off()), + ), + ), + }); + + test( + 'a cached identify resolves on the first applied payload, which marks ' + 'the source valid', () async { + final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); + final context = LDContextBuilder().kind('user', 'bob').build(); + final factories = { + // A cache load (selector-less full) is enough for a cached identify. + const FDv2Streaming(): (_) => + MockDataSource(startEvents: [PayloadEvent(aChangeSet())]), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + }; + final manager = + makeManager(context, factories, inStatusManager: statusManager); + + final completer = Completer(); + // requireFreshData defaults false (cached): resolves on any applied data. + manager.identify(context, completer); + await completer.future; + + expect(statusManager.status.state, DataSourceState.valid, + reason: 'applying any data while online marks the source valid'); + }); + + test( + 'a wait-for-network identify resolves on the initialized event, not ' + 'earlier data', () async { + final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); + final context = LDContextBuilder().kind('user', 'bob').build(); + final factories = { + const FDv2Streaming(): (_) => MockDataSource(startEvents: [ + // Cache data, then the orchestrator's initialized signal. + PayloadEvent(aChangeSet()), + InitializedEvent(), + ]), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + }; + final manager = + makeManager(context, factories, inStatusManager: statusManager); + + final completer = Completer(); + manager.identify(context, completer, requireFreshData: true); + await completer.future; + + expect(statusManager.status.state, DataSourceState.valid); + }); + + test('an identify requiring fresh data does not resolve on cache alone', + () async { + final context = LDContextBuilder().kind('user', 'bob').build(); + final factories = { + const FDv2Streaming(): (_) => + MockDataSource(startEvents: [PayloadEvent(aChangeSet())]), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + }; + final manager = makeManager(context, factories); + + final completer = Completer(); + manager.identify(context, completer, requireFreshData: true); + await pumpEventQueue(); + + expect(completer.isCompleted, isFalse, + reason: + 'cache data alone must not satisfy a wait-for-network identify'); + }); + + test( + 'offline runs its data source to load cache but keeps the offline status', + () async { + final statusManager = DataSourceStatusManager(stamper: () => DateTime(1)); + final context = LDContextBuilder().kind('user', 'bob').build(); + var offlineStarted = false; + final factories = { + const FDv2Streaming(): (_) => MockDataSource(), + const FDv2Polling(): (_) => MockDataSource(), + const FDv2Background(): (_) => MockDataSource(), + const FDv2Offline(): (_) { + offlineStarted = true; + // Offline cannot reach the network, so the identify resolves on the + // selector-less cache payload -- but the manager must keep the + // offline status rather than report valid. + return MockDataSource(startEvents: [PayloadEvent(aChangeSet())]); + }, + }; + final manager = + makeManager(context, factories, inStatusManager: statusManager); + + manager.setMode(const ResolvedOffline(OfflineSetOffline())); + final completer = Completer(); + manager.identify(context, completer); + await completer.future; + + expect(offlineStarted, isTrue, + reason: 'offline is a pipeline mode that runs its data source'); + expect(statusManager.status.state, DataSourceState.setOffline, + reason: 'a cache load while offline must not report valid'); + }); } diff --git a/packages/common_client/test/data_sources/fdv2/data_system_test.dart b/packages/common_client/test/data_sources/fdv2/data_system_test.dart new file mode 100644 index 00000000..3f3981c4 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/data_system_test.dart @@ -0,0 +1,93 @@ +import 'package:launchdarkly_common_client/src/config/data_system_config.dart'; +import 'package:launchdarkly_common_client/src/config/service_endpoints.dart'; +import 'package:launchdarkly_common_client/src/data_sources/data_source.dart'; +import 'package:launchdarkly_common_client/src/data_sources/data_source_status_manager.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/built_in_modes.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/data_system.dart'; +import 'package:launchdarkly_common_client/src/fdv2_connection_mode.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + hide ServiceEndpoints; +import 'package:test/test.dart'; + +FDv2DataSystem makeDataSystem( + {DataSystemConfig config = const DataSystemConfig()}) => + FDv2DataSystem( + config: config, + credential: 'the-credential', + logger: LDLogger(level: LDLogLevel.none), + httpProperties: HttpProperties(), + serviceEndpoints: ServiceEndpoints(), + withReasons: false, + defaultPollingInterval: const Duration(seconds: 300), + statusManager: DataSourceStatusManager(), + cachedFlagsReader: (_) async => null, + ); + +LDContext _context() => LDContextBuilder().kind('user', 'bob').build(); + +void main() { + test('an empty data system config overrides no modes', () { + expect(const DataSystemConfig().connectionModes, isEmpty); + }); + + test('buildFactories exposes streaming, polling, background, and offline', + () { + final factories = makeDataSystem().buildFactories(); + + expect( + factories.keys, + containsAll([ + const FDv2Streaming(), + const FDv2Polling(), + const FDv2Background(), + const FDv2Offline(), + ])); + }); + + test('a factory builds a data source, fresh on each call', () { + final factory = makeDataSystem().buildFactories()[const FDv2Streaming()]!; + final context = _context(); + + final first = factory(context); + final second = factory(context); + + expect(first, isA()); + expect(identical(first, second), isFalse, + reason: 'a fresh orchestrator is created per connection'); + + first.stop(); + second.stop(); + }); + + test('an override is selected over the built-in for that mode', () { + // The data system's job here is resolution: the overridden mode uses + // the override definition, others keep their built-in. Translating a + // definition's entries into concrete sources (e.g. that the polling + // definition yields a polling source) is covered by entry_factories. + final dataSystem = makeDataSystem( + config: const DataSystemConfig(connectionModes: { + ConnectionModeId.streaming: BuiltInModes.polling, + })); + + expect(dataSystem.resolvedDefinition(ConnectionModeId.streaming), + same(BuiltInModes.polling), + reason: 'the override replaces the built-in streaming definition'); + expect(dataSystem.resolvedDefinition(ConnectionModeId.polling), + same(BuiltInModes.polling), + reason: 'an un-overridden mode keeps its built-in'); + expect(dataSystem.resolvedDefinition(ConnectionModeId.offline), + same(BuiltInModes.offline)); + }); + + test('the override map is keyed only by built-in modes', () { + // ConnectionModeId is a sealed type whose only nameable values are the + // built-in modes, so a custom/arbitrary mode name cannot be expressed + // as a key. Providing an override for a built-in resolves; the others + // keep their built-in definitions. + const config = DataSystemConfig(connectionModes: { + ConnectionModeId.polling: BuiltInModes.streaming, + }); + final factories = makeDataSystem(config: config).buildFactories(); + expect(factories.keys, hasLength(4)); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/fdv1_fallback_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/fdv1_fallback_synchronizer_test.dart new file mode 100644 index 00000000..8f6b9221 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/fdv1_fallback_synchronizer_test.dart @@ -0,0 +1,108 @@ +import 'package:http/http.dart' as http; +import 'package:http/testing.dart'; +import 'package:launchdarkly_common_client/src/config/service_endpoints.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/fdv1_fallback_synchronizer.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/mode_definition.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/selector.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_factory_context.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' + hide ServiceEndpoints; +import 'package:test/test.dart'; + +SourceFactoryContext _ctx(MockClient client) => + SourceFactoryContext.fromClientConfig( + context: LDContextBuilder().kind('user', 'bob').build(), + logger: LDLogger(level: LDLogLevel.none), + httpProperties: HttpProperties(), + serviceEndpoints: ServiceEndpoints(), + withReasons: false, + defaultPollingInterval: const Duration(seconds: 300), + cachedFlagsReader: (_) async => null, + credential: 'the-credential', + httpClientFactory: (props) => + HttpClient(client: client, httpProperties: props), + ); + +Future _firstResult(MockClient client) { + final synchronizer = createFdv1FallbackSynchronizerFactory( + const Fdv1FallbackConfig(), _ctx(client)) + .create(() => Selector.empty); + final result = synchronizer.results.first; + return result.whenComplete(synchronizer.close); +} + +void main() { + test('translates an FDv1 flag map into a full change set with no selector', + () async { + final mock = MockClient((_) async => http.Response( + '{"flagA":{"version":3,"value":true,"variation":0,' + '"reason":{"kind":"OFF"}}}', + 200)); + + final result = await _firstResult(mock); + + expect(result, isA()); + final changeSetResult = result as ChangeSetResult; + expect(changeSetResult.changeSet.type, PayloadType.full); + expect(changeSetResult.changeSet.selector.isEmpty, isTrue, + reason: 'FDv1 carries no selector'); + expect(changeSetResult.changeSet.updates.keys, contains('flagA')); + expect(changeSetResult.fdv1Fallback, isFalse, + reason: 'the fallback tier must never re-assert the directive'); + }); + + test('a 304 response becomes a no-op none change set', () async { + final mock = MockClient((_) async => http.Response('', 304)); + + final result = await _firstResult(mock); + + expect(result, isA()); + expect((result as ChangeSetResult).changeSet.type, PayloadType.none); + expect(result.fdv1Fallback, isFalse); + }); + + test('a server error surfaces as interrupted without re-triggering fallback', + () async { + final mock = MockClient((_) async => http.Response('', 503)); + + final result = await _firstResult(mock); + + expect(result, isA()); + expect((result as StatusResult).state, SourceState.interrupted); + expect(result.fdv1Fallback, isFalse); + }); + + test('a fresh synchronizer instance does not inherit a prior ETag', () async { + final requests = []; + final mock = MockClient((req) async { + requests.add(req); + return http.Response( + '{"flagA":{"version":1,"value":true,"variation":0,' + '"reason":{"kind":"OFF"}}}', + 200, + headers: {'etag': 'etag-from-first-connection'}, + ); + }); + final factory = createFdv1FallbackSynchronizerFactory( + const Fdv1FallbackConfig(), _ctx(mock)); + + // First instance polls and receives an ETag. + final first = factory.create(() => Selector.empty); + await first.results.first; + first.close(); + + // A second instance (a new connection) must poll without if-none-match; + // were the ETag shared, it could receive a 304 for a request it never + // made and silently keep stale data. + final second = factory.create(() => Selector.empty); + await second.results.first; + second.close(); + + expect(requests, hasLength(2)); + expect(requests[0].headers.containsKey('if-none-match'), isFalse); + expect(requests[1].headers.containsKey('if-none-match'), isFalse, + reason: 'the ETag is scoped to a single requestor instance'); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/orchestrator_test.dart b/packages/common_client/test/data_sources/fdv2/orchestrator_test.dart index 59a5589d..327b2c53 100644 --- a/packages/common_client/test/data_sources/fdv2/orchestrator_test.dart +++ b/packages/common_client/test/data_sources/fdv2/orchestrator_test.dart @@ -124,7 +124,8 @@ final class Harness { } void main() { - test('runs initializers in order until one returns basis data', () async { + test('runs initializers in order until one returns data with a selector', + () async { final firstCreated = []; final secondCreated = []; final thirdCreated = []; @@ -194,6 +195,118 @@ void main() { harness.orchestrator.stop(); }); + test('a cache hit is applied but initialization continues to network data', + () async { + final synchronizers = []; + final harness = Harness(initializerFactories: [ + // A cache hit: full data with no selector. + initializerFactory(changeSet(type: PayloadType.full), isCache: true), + ], synchronizerSlots: [ + synchronizerSlot(synchronizers), + ]); + + harness.orchestrator.start(); + await harness.pump(); + + final afterCache = harness.events.whereType().toList(); + expect(afterCache, hasLength(1)); + expect(afterCache.single.changeSet.selector.isEmpty, isTrue, + reason: 'cache data carries no selector'); + expect(synchronizers, hasLength(1), + reason: 'a selector-less payload does not complete initialization, ' + 'so the synchronizer tier still starts'); + + synchronizers.single.controller + .add(changeSet(selector: const Selector(state: 'state-1', version: 1))); + await harness.pump(); + + expect(harness.selector.state, 'state-1', + reason: 'network data carries the selector forward'); + + harness.orchestrator.stop(); + }); + + test('a selector-less full payload clears the held selector', () async { + final synchronizers = []; + final harness = Harness( + initializerFactories: [], + synchronizerSlots: [synchronizerSlot(synchronizers)]); + + harness.orchestrator.start(); + await harness.pump(); + + synchronizers.single.controller + .add(changeSet(selector: const Selector(state: 'state-1', version: 1))); + await harness.pump(); + expect(harness.selector.state, 'state-1'); + + // A full transfer with no selector (e.g. an FDv1 fallback) clears it, so + // the next reconnect asks for a full payload rather than a stale delta. + synchronizers.single.controller.add(changeSet(type: PayloadType.full)); + await harness.pump(); + expect(harness.selector.isEmpty, isTrue); + + harness.orchestrator.stop(); + }); + + test('emits InitializedEvent when an initializer returns a selector', + () async { + final synchronizers = []; + final harness = Harness(initializerFactories: [ + initializerFactory( + changeSet(selector: const Selector(state: 'state-1', version: 1))), + ], synchronizerSlots: [ + synchronizerSlot(synchronizers), + ]); + + harness.orchestrator.start(); + await harness.pump(); + + expect(harness.events.whereType(), hasLength(1)); + + harness.orchestrator.stop(); + }); + + test( + 'emits InitializedEvent on the first synchronizer change set, even ' + 'a no-change one with no selector', () async { + final synchronizers = []; + final harness = Harness( + initializerFactories: [], + synchronizerSlots: [synchronizerSlot(synchronizers)]); + + harness.orchestrator.start(); + await harness.pump(); + expect(harness.events.whereType(), isEmpty, + reason: 'no data has arrived yet'); + + synchronizers.single.controller.add(changeSet(type: PayloadType.none)); + await harness.pump(); + expect(harness.events.whereType(), hasLength(1), + reason: 'the first synchronizer result completes initialization'); + + synchronizers.single.controller + .add(changeSet(selector: const Selector(state: 's', version: 1))); + await harness.pump(); + expect(harness.events.whereType(), hasLength(1), + reason: 'it is emitted at most once'); + + harness.orchestrator.stop(); + }); + + test('a cache-only system emits InitializedEvent at exhaustion', () async { + final harness = Harness(initializerFactories: [ + initializerFactory(changeSet(type: PayloadType.none), isCache: true), + ], synchronizerSlots: []); + + harness.orchestrator.start(); + await harness.pump(); + + expect(harness.events.whereType(), hasLength(1)); + + harness.orchestrator.stop(); + }); + test('synchronizer change sets are emitted and update the selector', () async { final synchronizers = []; diff --git a/packages/common_client/test/data_sources/polling_data_source_test.dart b/packages/common_client/test/data_sources/polling_data_source_test.dart index 754c9844..a2ec0d56 100644 --- a/packages/common_client/test/data_sources/polling_data_source_test.dart +++ b/packages/common_client/test/data_sources/polling_data_source_test.dart @@ -66,6 +66,7 @@ class MockLogAdapter extends Mock implements LDLogAdapter {} shutdown: event.shutdown); } case PayloadEvent(): + case InitializedEvent(): break; } }).listen((_) {}); @@ -443,6 +444,7 @@ void main() { shutdown: event.shutdown); } case PayloadEvent(): + case InitializedEvent(): break; } }).listen((_) {}); @@ -506,6 +508,7 @@ void main() { shutdown: event.shutdown); } case PayloadEvent(): + case InitializedEvent(): break; } }).listen((_) {}); diff --git a/packages/common_client/test/data_sources/streaming_data_source_test.dart b/packages/common_client/test/data_sources/streaming_data_source_test.dart index 6296ddd2..8289457b 100644 --- a/packages/common_client/test/data_sources/streaming_data_source_test.dart +++ b/packages/common_client/test/data_sources/streaming_data_source_test.dart @@ -97,6 +97,7 @@ class MockSseClient implements SSEClient { shutdown: event.shutdown); } case PayloadEvent(): + case InitializedEvent(): break; } }).listen((_) {}); diff --git a/packages/common_client/test/flag_persistence_test.dart b/packages/common_client/test/flag_persistence_test.dart index 12c8e528..470e7e8c 100644 --- a/packages/common_client/test/flag_persistence_test.dart +++ b/packages/common_client/test/flag_persistence_test.dart @@ -372,6 +372,63 @@ void main() { expect(flagStore.get('flagB'), basicData['flagB']); }); + test('readCached returns parsed flags without applying them', () async { + final context = LDContextBuilder().kind('user', 'user-key').build(); + final contextPersistenceKey = + sha256.convert(utf8.encode(context.canonicalKey)).toString(); + + final flagStore = FlagStore(); + final mockPersistence = MockPersistence(); + + mockPersistence.storage[sdkKeyPersistence] = { + contextPersistenceKey: '{"flagA":{' + '"version":1,' + '"value":"test",' + '"variation":0,' + '"reason":{"kind":"OFF"}' + '},' + '"flagB":{' + '"version":2,' + '"value":"test2",' + '"variation":1,' + '"reason":{"kind":"TARGET_MATCH"}' + '}}', + }; + + final flagPersistence = FlagPersistence( + persistence: mockPersistence, + updater: FlagUpdater(flagStore: flagStore, logger: logger), + store: flagStore, + sdkKey: sdkKey, + maxCachedContexts: 5, + logger: logger, + stamper: () => DateTime.fromMillisecondsSinceEpoch(0)); + + final cached = await flagPersistence.readCached(context); + + expect(cached, isNotNull); + expect(cached!.flags.keys, containsAll(['flagA', 'flagB'])); + expect(flagStore.getAll(), isEmpty, + reason: 'readCached must not apply to the store'); + }); + + test('readCached returns null on a cache miss', () async { + final context = LDContextBuilder().kind('user', 'user-key').build(); + final flagStore = FlagStore(); + final mockPersistence = MockPersistence(); + + final flagPersistence = FlagPersistence( + persistence: mockPersistence, + updater: FlagUpdater(flagStore: flagStore, logger: logger), + store: flagStore, + sdkKey: sdkKey, + maxCachedContexts: 5, + logger: logger, + stamper: () => DateTime.fromMillisecondsSinceEpoch(0)); + + expect(await flagPersistence.readCached(context), isNull); + }); + test('it can handle a corrupt cached flag payload', () async { final context = LDContextBuilder().kind('user', 'user-key').build(); final contextPersistenceKey =