diff --git a/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart b/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart index 301a1f7b..0d0f631f 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/source_manager.dart @@ -33,6 +33,21 @@ final class SynchronizerSlot { /// Manages the state of initializers and synchronizers, tracks which /// source is active, and handles source transitions for the orchestrator. +/// +/// The manager assumes a single, sequential driver and relies on these +/// contracts: +/// +/// - At most one source is active at a time. Activating a source closes +/// the previously active one, exhausting the initializer list closes +/// the final initializer, and [close] closes whatever remains. +/// - The synchronizer scan cursor doubles as the active-slot pointer: +/// from an activation ([nextAvailableSynchronizer] or +/// [recreateCurrentSynchronizer]) until the next cursor mutation +/// ([resetSynchronizerIndex] or [engageFdv1Fallback]), the cursor +/// identifies the running synchronizer's slot. [isPrimarySynchronizer] +/// and [blockCurrentSynchronizer] read the cursor and are only +/// meaningful inside that window. After mutating the cursor, activate +/// a new synchronizer before consulting either of them. final class SourceManager { final List _initializerFactories; final List _synchronizerSlots; @@ -66,12 +81,14 @@ final class SourceManager { /// Get the next initializer and set it as the active source. Closes the /// previous active source. Returns null when all initializers are - /// exhausted. + /// exhausted; exhaustion also closes the final initializer, so a + /// terminal null leaves no source running. Initializer? nextInitializer() { if (_shutdown) return null; _initializerIndex += 1; if (_initializerIndex >= _initializerFactories.length) { + _closeActiveSource(); return null; } @@ -130,8 +147,10 @@ final class SourceManager { return synchronizer; } - /// Mark the current synchronizer as blocked (e.g. after a terminal - /// error). + /// Mark the active synchronizer's slot as blocked (e.g. after a + /// terminal error). Reads the scan cursor, so it must only be called + /// while the cursor identifies the active slot (see the class + /// contract). void blockCurrentSynchronizer() { if (_synchronizerIndex >= 0 && _synchronizerIndex < _synchronizerSlots.length) { @@ -140,14 +159,25 @@ final class SourceManager { } } - /// Reset the synchronizer scan position so the next call to - /// [nextAvailableSynchronizer] starts from the beginning. + /// Reset the synchronizer scan cursor so the next call to + /// [nextAvailableSynchronizer] starts from the beginning. After a + /// reset the cursor no longer identifies the active slot; activate a + /// new synchronizer before consulting [isPrimarySynchronizer] or + /// [blockCurrentSynchronizer]. void resetSynchronizerIndex() { _synchronizerIndex = -1; } - /// Block all non-FDv1 synchronizers and unblock FDv1 synchronizers. + /// Block all non-FDv1 synchronizers, unblock the FDv1 fallback, and + /// reset the scan cursor so the next activation selects the fallback. + /// Does nothing when no FDv1 fallback slot is configured, since + /// blocking every slot without unblocking one would leave nothing to + /// activate. The active source keeps running until the next + /// activation closes it. void engageFdv1Fallback() { + if (!hasFdv1FallbackConfigured) { + return; + } for (final slot in _synchronizerSlots) { slot.state = slot.isFdv1Fallback ? SynchronizerSlotState.available @@ -156,7 +186,10 @@ final class SourceManager { _synchronizerIndex = -1; } - /// True if the current synchronizer is the first available (primary). + /// True if the active synchronizer occupies the first available slot + /// (the primary). Reads the scan cursor, so it is only meaningful + /// while the cursor identifies the active slot (see the class + /// contract). bool get isPrimarySynchronizer => _synchronizerIndex == _firstAvailableIndex(); @@ -165,8 +198,9 @@ final class SourceManager { .where((slot) => slot.state == SynchronizerSlotState.available) .length; - /// True if any synchronizer slot is marked as an FDv1 fallback. - bool get hasFdv1Fallback => + /// True if any synchronizer slot is the FDv1 fallback, whether or not + /// it has been engaged. + bool get hasFdv1FallbackConfigured => _synchronizerSlots.any((slot) => slot.isFdv1Fallback); /// Close the active source and mark the manager as shut down. diff --git a/packages/common_client/test/data_sources/fdv2/source_manager_test.dart b/packages/common_client/test/data_sources/fdv2/source_manager_test.dart index b57b280d..12df6005 100644 --- a/packages/common_client/test/data_sources/fdv2/source_manager_test.dart +++ b/packages/common_client/test/data_sources/fdv2/source_manager_test.dart @@ -76,6 +76,23 @@ void main() { reason: 'starting the second source closes the first'); }); + test('initializer exhaustion closes the final initializer', () { + final created = []; + final factory = InitializerFactory(create: (_) { + final initializer = RecordingInitializer(); + created.add(initializer); + return initializer; + }); + + final manager = _manager(initializers: [factory]); + + expect(manager.nextInitializer(), isNotNull); + expect(created.single.closed, isFalse); + expect(manager.nextInitializer(), isNull); + expect(created.single.closed, isTrue, + reason: 'a terminal null must leave no source running'); + }); + test('synchronizers cycle through available slots and wrap around', () { final created = []; final manager = _manager(slots: [_slot(0, created), _slot(1, created)]); @@ -150,7 +167,7 @@ void main() { _slot(1, fdv1Created, isFdv1Fallback: true), ]); - expect(manager.hasFdv1Fallback, isTrue); + expect(manager.hasFdv1FallbackConfigured, isTrue); expect(manager.availableSynchronizerCount, 1); manager.nextAvailableSynchronizer(); @@ -165,6 +182,19 @@ void main() { reason: 'the FDv2 tier is disabled after fallback'); }); + test('engaging FDv1 fallback without a configured slot does nothing', () { + final created = []; + final manager = _manager(slots: [_slot(0, created), _slot(1, created)]); + + expect(manager.hasFdv1FallbackConfigured, isFalse); + manager.engageFdv1Fallback(); + + expect(manager.availableSynchronizerCount, 2, + reason: 'blocking every slot without unblocking a fallback would ' + 'leave nothing to activate'); + expect(manager.nextAvailableSynchronizer(), isNotNull); + }); + test('close prevents further source creation', () { final created = []; final manager = _manager(slots: [_slot(0, created)]);