Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ final class SynchronizerSlot {

/// Manages the state of initializers and synchronizers, tracks which
/// source is active, and handles source transitions for the orchestrator.
///
/// The manager assumes a single, sequential driver and relies on these
/// contracts:
///
/// - At most one source is active at a time. Activating a source closes
/// the previously active one, exhausting the initializer list closes
/// the final initializer, and [close] closes whatever remains.
/// - The synchronizer scan cursor doubles as the active-slot pointer:
/// from an activation ([nextAvailableSynchronizer] or
/// [recreateCurrentSynchronizer]) until the next cursor mutation
/// ([resetSynchronizerIndex] or [engageFdv1Fallback]), the cursor
/// identifies the running synchronizer's slot. [isPrimarySynchronizer]
/// and [blockCurrentSynchronizer] read the cursor and are only
/// meaningful inside that window. After mutating the cursor, activate
/// a new synchronizer before consulting either of them.
final class SourceManager {
final List<InitializerFactory> _initializerFactories;
final List<SynchronizerSlot> _synchronizerSlots;
Expand Down Expand Up @@ -66,12 +81,14 @@ final class SourceManager {

/// Get the next initializer and set it as the active source. Closes the
/// previous active source. Returns null when all initializers are
/// exhausted.
/// exhausted; exhaustion also closes the final initializer, so a
/// terminal null leaves no source running.
Initializer? nextInitializer() {
if (_shutdown) return null;

_initializerIndex += 1;
if (_initializerIndex >= _initializerFactories.length) {
_closeActiveSource();
return null;
}

Expand Down Expand Up @@ -130,8 +147,10 @@ final class SourceManager {
return synchronizer;
}

/// Mark the current synchronizer as blocked (e.g. after a terminal
/// error).
/// Mark the active synchronizer's slot as blocked (e.g. after a
/// terminal error). Reads the scan cursor, so it must only be called
/// while the cursor identifies the active slot (see the class
/// contract).
void blockCurrentSynchronizer() {
if (_synchronizerIndex >= 0 &&
_synchronizerIndex < _synchronizerSlots.length) {
Expand All @@ -140,14 +159,25 @@ final class SourceManager {
}
}

/// Reset the synchronizer scan position so the next call to
/// [nextAvailableSynchronizer] starts from the beginning.
/// Reset the synchronizer scan cursor so the next call to
/// [nextAvailableSynchronizer] starts from the beginning. After a
/// reset the cursor no longer identifies the active slot; activate a
/// new synchronizer before consulting [isPrimarySynchronizer] or
/// [blockCurrentSynchronizer].
void resetSynchronizerIndex() {
_synchronizerIndex = -1;
}

/// Block all non-FDv1 synchronizers and unblock FDv1 synchronizers.
/// Block all non-FDv1 synchronizers, unblock the FDv1 fallback, and
/// reset the scan cursor so the next activation selects the fallback.
/// Does nothing when no FDv1 fallback slot is configured, since
/// blocking every slot without unblocking one would leave nothing to
/// activate. The active source keeps running until the next
/// activation closes it.
void engageFdv1Fallback() {
if (!hasFdv1FallbackConfigured) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice we may not make this possible.

return;
}
for (final slot in _synchronizerSlots) {
slot.state = slot.isFdv1Fallback
? SynchronizerSlotState.available
Expand All @@ -156,7 +186,10 @@ final class SourceManager {
_synchronizerIndex = -1;
}

/// True if the current synchronizer is the first available (primary).
/// True if the active synchronizer occupies the first available slot
/// (the primary). Reads the scan cursor, so it is only meaningful
/// while the cursor identifies the active slot (see the class
/// contract).
bool get isPrimarySynchronizer =>
_synchronizerIndex == _firstAvailableIndex();

Expand All @@ -165,8 +198,9 @@ final class SourceManager {
.where((slot) => slot.state == SynchronizerSlotState.available)
.length;

/// True if any synchronizer slot is marked as an FDv1 fallback.
bool get hasFdv1Fallback =>
/// True if any synchronizer slot is the FDv1 fallback, whether or not
/// it has been engaged.
bool get hasFdv1FallbackConfigured =>
_synchronizerSlots.any((slot) => slot.isFdv1Fallback);

/// Close the active source and mark the manager as shut down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ void main() {
reason: 'starting the second source closes the first');
});

test('initializer exhaustion closes the final initializer', () {
final created = <RecordingInitializer>[];
final factory = InitializerFactory(create: (_) {
final initializer = RecordingInitializer();
created.add(initializer);
return initializer;
});

final manager = _manager(initializers: [factory]);

expect(manager.nextInitializer(), isNotNull);
expect(created.single.closed, isFalse);
expect(manager.nextInitializer(), isNull);
expect(created.single.closed, isTrue,
reason: 'a terminal null must leave no source running');
});

test('synchronizers cycle through available slots and wrap around', () {
final created = <RecordingSynchronizer>[];
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);
Expand Down Expand Up @@ -150,7 +167,7 @@ void main() {
_slot(1, fdv1Created, isFdv1Fallback: true),
]);

expect(manager.hasFdv1Fallback, isTrue);
expect(manager.hasFdv1FallbackConfigured, isTrue);
expect(manager.availableSynchronizerCount, 1);

manager.nextAvailableSynchronizer();
Expand All @@ -165,6 +182,19 @@ void main() {
reason: 'the FDv2 tier is disabled after fallback');
});

test('engaging FDv1 fallback without a configured slot does nothing', () {
final created = <RecordingSynchronizer>[];
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);

expect(manager.hasFdv1FallbackConfigured, isFalse);
manager.engageFdv1Fallback();

expect(manager.availableSynchronizerCount, 2,
reason: 'blocking every slot without unblocking a fallback would '
'leave nothing to activate');
expect(manager.nextAvailableSynchronizer(), isNotNull);
});

test('close prevents further source creation', () {
final created = <RecordingSynchronizer>[];
final manager = _manager(slots: [_slot(0, created)]);
Expand Down
Loading