From b50d3c3a1f6d300a9461816409e66790255c47d4 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 Apr 2026 17:46:18 -0400 Subject: [PATCH 1/6] fix: yield priority to other tasks in the queue on cobuild misses Signed-off-by: Aramis Sennyey --- .../logic/operations/AsyncOperationQueue.ts | 13 ++++ .../operations/CacheableOperationPlugin.ts | 1 + .../operations/OperationExecutionManager.ts | 3 + .../operations/OperationExecutionRecord.ts | 9 +++ .../test/AsyncOperationQueue.test.ts | 73 +++++++++++++++++++ 5 files changed, 99 insertions(+) diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index 2616d46f3b7..392d267624c 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -86,6 +86,19 @@ export class AsyncOperationQueue } } + /** + * Moves an operation to the back of the queue (lowest priority). + * Used when a cobuild lock acquisition fails, so that locally-executable operations are + * assigned before this operation is retried. + */ + public yieldPriority(record: OperationExecutionRecord): void { + const index: number = this._queue.indexOf(record); + if (index > 0) { + this._queue.splice(index, 1); + this._queue.unshift(record); + } + } + /** * Routes ready operations with 0 dependencies to waiting iterators. Normally invoked as part of `next()`, but * if the caller does not update operation dependencies prior to calling `next()`, may need to be invoked manually. diff --git a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts index 90af287b9dd..8c8724230e3 100644 --- a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts @@ -411,6 +411,7 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { periodicCallback.start(); } else { setTimeout(() => { + record.yieldPriority(); record.status = OperationStatus.Ready; }, 500); return OperationStatus.Executing; diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index 976a214f026..b701878b8ae 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -148,6 +148,9 @@ export class OperationExecutionManager { const executionRecordContext: IOperationExecutionRecordContext = { streamCollator: this._streamCollator, onOperationStatusChanged: this._onOperationStatusChanged, + yieldPriority: (record: OperationExecutionRecord) => { + this._executionQueue.yieldPriority(record); + }, createEnvironment: this._createEnvironmentForOperation, inputsSnapshot, debugMode, diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts index 57a6d1a17ec..63a18810780 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts @@ -42,6 +42,7 @@ import { export interface IOperationExecutionRecordContext { streamCollator: StreamCollator; onOperationStatusChanged?: (record: OperationExecutionRecord) => void; + yieldPriority?: (record: OperationExecutionRecord) => void; createEnvironment?: (record: OperationExecutionRecord) => IEnvironment; inputsSnapshot: IInputsSnapshot | undefined; @@ -234,6 +235,14 @@ export class OperationExecutionRecord implements IOperationRunnerContext, IOpera return !this.operation.enabled || this.runner.silent; } + /** + * Moves this operation to the back of the execution queue so that other operations + * are assigned first. + */ + public yieldPriority(): void { + this._context.yieldPriority?.(this); + } + public getStateHash(): string { if (this._stateHash === undefined) { const components: readonly string[] = this.getStateHashComponents(); diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index fcd1a87a8d4..d2d3b237b6f 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -168,4 +168,77 @@ describe(AsyncOperationQueue.name, () => { const result: IteratorResult = await iterator.next(); expect(result.done).toEqual(true); }); + + it('yieldPriority moves an operation to the back of the queue', async () => { + // Three independent operations: A, B, C (all Ready). + // A is assigned first. Calling yieldPriority + setting Ready should move it to the back, + // so B and C are assigned before A on the next pass. + const opA = createRecord('a'); + const opB = createRecord('b'); + const opC = createRecord('c'); + + const queue: AsyncOperationQueue = new AsyncOperationQueue([opA, opB, opC], nullSort); + + // Assign one operation + const r1: IteratorResult = await queue.next(); + const firstAssigned: OperationExecutionRecord = r1.value; + + // Simulate cobuild retry: yield priority then re-ready the operation + queue.yieldPriority(firstAssigned); + firstAssigned.status = OperationStatus.Ready; + + // Now assign the remaining — untried operations should come before the retry + const results: OperationExecutionRecord[] = []; + const r2: IteratorResult = await queue.next(); + results.push(r2.value); + const r3: IteratorResult = await queue.next(); + results.push(r3.value); + const r4: IteratorResult = await queue.next(); + results.push(r4.value); + + // The yielded operation should be last + expect(results[2]).toBe(firstAssigned); + }); + + it('yieldPriority assigns freshly unblocked operations first', async () => { + // A (no deps), B (depends on C), C (no deps) + // A is assigned and yields priority (cobuild retry). + // C completes, unblocking B. B should be assigned before A. + const opA = createRecord('a'); + const opB = createRecord('b'); + const opC = createRecord('c'); + + addDependency(opB, opC); + + const queue: AsyncOperationQueue = new AsyncOperationQueue([opA, opB, opC], nullSort); + + // Pull both initially ready operations (A and C) + const r1: IteratorResult = await queue.next(); + const r2: IteratorResult = await queue.next(); + expect(new Set([r1.value, r2.value])).toEqual(new Set([opA, opC])); + + // Simulate: A fails cobuild lock + queue.yieldPriority(opA); + opA.status = OperationStatus.Ready; + + // C succeeds, which unblocks B + opC.status = OperationStatus.Success; + queue.complete(opC); + + // B is freshly unblocked, A yielded priority — B should be assigned first + const r3: IteratorResult = await queue.next(); + expect(r3.value).toBe(opB); + + const r4: IteratorResult = await queue.next(); + expect(r4.value).toBe(opA); + + // Complete remaining + opA.status = OperationStatus.Success; + queue.complete(opA); + opB.status = OperationStatus.Success; + queue.complete(opB); + + const rEnd: IteratorResult = await queue.next(); + expect(rEnd.done).toBe(true); + }); }); From e5c659c078e4b1eb965d8989653cb987c411f817 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Thu, 2 Apr 2026 08:35:15 -0400 Subject: [PATCH 2/6] Rush change --- ...ennyeya-fix-cobuild-ordering_2026-04-02-12-35.json | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 common/changes/@microsoft/rush/sennyeya-fix-cobuild-ordering_2026-04-02-12-35.json diff --git a/common/changes/@microsoft/rush/sennyeya-fix-cobuild-ordering_2026-04-02-12-35.json b/common/changes/@microsoft/rush/sennyeya-fix-cobuild-ordering_2026-04-02-12-35.json new file mode 100644 index 00000000000..d4ea0bff0c1 --- /dev/null +++ b/common/changes/@microsoft/rush/sennyeya-fix-cobuild-ordering_2026-04-02-12-35.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "comment": "When cobuilds are remote executing, check them after locally executable tasks.", + "type": "none", + "packageName": "@microsoft/rush" + } + ], + "packageName": "@microsoft/rush", + "email": "aramissennyeydd@users.noreply.github.com" +} \ No newline at end of file From 9b8109cc905a84ba7a6e7b5025bb070d6eb85dc6 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 Apr 2026 08:00:15 -0400 Subject: [PATCH 3/6] cobuilds: yield priority to operations that haven't been tried yet Signed-off-by: Aramis Sennyey --- .../logic/operations/AsyncOperationQueue.ts | 54 +++++++++++-------- .../operations/CacheableOperationPlugin.ts | 1 - .../operations/OperationExecutionManager.ts | 3 -- .../operations/OperationExecutionRecord.ts | 9 ---- .../test/AsyncOperationQueue.test.ts | 23 ++++---- 5 files changed, 44 insertions(+), 46 deletions(-) diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index 392d267624c..0d0d4211b1d 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -22,6 +22,11 @@ export class AsyncOperationQueue private readonly _totalOperations: number; private readonly _completedOperations: Set; + // Tracks how many times each operation has been assigned to an execution slot. + // Operations that have been assigned more times (e.g. cobuild retries) are sorted + // after operations with fewer attempts, so untried work is preferred. + private readonly _timesQueued: Map; + private _isDone: boolean; /** @@ -37,6 +42,7 @@ export class AsyncOperationQueue this._totalOperations = this._queue.length; this._isDone = false; this._completedOperations = new Set(); + this._timesQueued = new Map(); } /** @@ -63,6 +69,7 @@ export class AsyncOperationQueue */ public complete(record: OperationExecutionRecord): void { this._completedOperations.add(record); + this._timesQueued.delete(record); // Apply status changes to direct dependents if (record.status !== OperationStatus.Failure && record.status !== OperationStatus.Blocked) { @@ -86,25 +93,14 @@ export class AsyncOperationQueue } } - /** - * Moves an operation to the back of the queue (lowest priority). - * Used when a cobuild lock acquisition fails, so that locally-executable operations are - * assigned before this operation is retried. - */ - public yieldPriority(record: OperationExecutionRecord): void { - const index: number = this._queue.indexOf(record); - if (index > 0) { - this._queue.splice(index, 1); - this._queue.unshift(record); - } - } - /** * Routes ready operations with 0 dependencies to waiting iterators. Normally invoked as part of `next()`, but * if the caller does not update operation dependencies prior to calling `next()`, may need to be invoked manually. */ public assignOperations(): void { - const { _queue: queue, _pendingIterators: waitingIterators } = this; + const { _queue: queue, _pendingIterators: waitingIterators, _timesQueued: timesQueued } = this; + + const readyOperations: OperationExecutionRecord[] = []; // By iterating in reverse order we do less array shuffling when removing operations for (let i: number = queue.length - 1; waitingIterators.length > 0 && i >= 0; i--) { @@ -122,6 +118,7 @@ export class AsyncOperationQueue ) { // It shouldn't be on the queue, remove it queue.splice(i, 1); + timesQueued.delete(record); } else if (record.status === OperationStatus.Queued || record.status === OperationStatus.Executing) { // This operation is currently executing // next one plz :) @@ -133,17 +130,32 @@ export class AsyncOperationQueue // Sanity check throw new Error(`Unexpected status "${record.status}" for queued operation: ${record.name}`); } else { - // This task is ready to process, hand it to the iterator. - // Needs to have queue semantics, otherwise tools that iterate it get confused - record.status = OperationStatus.Queued; - waitingIterators.shift()!({ - value: record, - done: false - }); + readyOperations.push(record); } // Otherwise operation is still waiting } + if (readyOperations.length > 1) { + // Sort by times queued ascending. Operations that have never been queued (0) + // come first, then operations with fewer attempts. This ensures cobuild retries + // (queued 1+ times, returned to Ready) are tried after untried operations. + readyOperations.sort((a, b) => (timesQueued.get(a) ?? 0) - (timesQueued.get(b) ?? 0)); + } + + for (const record of readyOperations) { + if (waitingIterators.length === 0) { + break; + } + // This task is ready to process, hand it to the iterator. + // Needs to have queue semantics, otherwise tools that iterate it get confused + timesQueued.set(record, (timesQueued.get(record) ?? 0) + 1); + record.status = OperationStatus.Queued; + waitingIterators.shift()!({ + value: record, + done: false + }); + } + // Since items only get removed from the queue when they have a final status, this should be safe. if (queue.length === 0) { this._isDone = true; diff --git a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts index 8c8724230e3..90af287b9dd 100644 --- a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts @@ -411,7 +411,6 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { periodicCallback.start(); } else { setTimeout(() => { - record.yieldPriority(); record.status = OperationStatus.Ready; }, 500); return OperationStatus.Executing; diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index b701878b8ae..976a214f026 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -148,9 +148,6 @@ export class OperationExecutionManager { const executionRecordContext: IOperationExecutionRecordContext = { streamCollator: this._streamCollator, onOperationStatusChanged: this._onOperationStatusChanged, - yieldPriority: (record: OperationExecutionRecord) => { - this._executionQueue.yieldPriority(record); - }, createEnvironment: this._createEnvironmentForOperation, inputsSnapshot, debugMode, diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts index 63a18810780..57a6d1a17ec 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts @@ -42,7 +42,6 @@ import { export interface IOperationExecutionRecordContext { streamCollator: StreamCollator; onOperationStatusChanged?: (record: OperationExecutionRecord) => void; - yieldPriority?: (record: OperationExecutionRecord) => void; createEnvironment?: (record: OperationExecutionRecord) => IEnvironment; inputsSnapshot: IInputsSnapshot | undefined; @@ -235,14 +234,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext, IOpera return !this.operation.enabled || this.runner.silent; } - /** - * Moves this operation to the back of the execution queue so that other operations - * are assigned first. - */ - public yieldPriority(): void { - this._context.yieldPriority?.(this); - } - public getStateHash(): string { if (this._stateHash === undefined) { const components: readonly string[] = this.getStateHashComponents(); diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index d2d3b237b6f..c0292181a4a 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -169,10 +169,11 @@ describe(AsyncOperationQueue.name, () => { expect(result.done).toEqual(true); }); - it('yieldPriority moves an operation to the back of the queue', async () => { + it('sorts cobuild retries after untried operations', async () => { // Three independent operations: A, B, C (all Ready). - // A is assigned first. Calling yieldPriority + setting Ready should move it to the back, - // so B and C are assigned before A on the next pass. + // A is assigned first, then returns to Ready (cobuild lock failed). + // On the next pass, B and C should be assigned before A because A + // has a recent lastAssignedAt timestamp. const opA = createRecord('a'); const opB = createRecord('b'); const opC = createRecord('c'); @@ -183,11 +184,10 @@ describe(AsyncOperationQueue.name, () => { const r1: IteratorResult = await queue.next(); const firstAssigned: OperationExecutionRecord = r1.value; - // Simulate cobuild retry: yield priority then re-ready the operation - queue.yieldPriority(firstAssigned); + // Simulate cobuild retry: operation returns to Ready firstAssigned.status = OperationStatus.Ready; - // Now assign the remaining — untried operations should come before the retry + // Assign all three — untried operations should come before the retry const results: OperationExecutionRecord[] = []; const r2: IteratorResult = await queue.next(); results.push(r2.value); @@ -196,13 +196,13 @@ describe(AsyncOperationQueue.name, () => { const r4: IteratorResult = await queue.next(); results.push(r4.value); - // The yielded operation should be last + // The cobuild retry should be last expect(results[2]).toBe(firstAssigned); }); - it('yieldPriority assigns freshly unblocked operations first', async () => { + it('assigns freshly unblocked operations before cobuild retries', async () => { // A (no deps), B (depends on C), C (no deps) - // A is assigned and yields priority (cobuild retry). + // A is assigned and returns to Ready (cobuild retry). // C completes, unblocking B. B should be assigned before A. const opA = createRecord('a'); const opB = createRecord('b'); @@ -217,15 +217,14 @@ describe(AsyncOperationQueue.name, () => { const r2: IteratorResult = await queue.next(); expect(new Set([r1.value, r2.value])).toEqual(new Set([opA, opC])); - // Simulate: A fails cobuild lock - queue.yieldPriority(opA); + // Simulate: A fails cobuild lock and returns to Ready opA.status = OperationStatus.Ready; // C succeeds, which unblocks B opC.status = OperationStatus.Success; queue.complete(opC); - // B is freshly unblocked, A yielded priority — B should be assigned first + // B is freshly unblocked (never assigned), A is a cobuild retry — B should be first const r3: IteratorResult = await queue.next(); expect(r3.value).toBe(opB); From 57306c6d7e5210de85332d115ca9587f94417181 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey <159921952+aramissennyeydd@users.noreply.github.com> Date: Mon, 6 Apr 2026 10:40:05 -0400 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Ian Clanton-Thuon --- .../rush-lib/src/logic/operations/AsyncOperationQueue.ts | 8 +++++--- .../logic/operations/test/AsyncOperationQueue.test.ts | 9 +++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index 0d0d4211b1d..b66f1da8f1a 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -22,9 +22,11 @@ export class AsyncOperationQueue private readonly _totalOperations: number; private readonly _completedOperations: Set; - // Tracks how many times each operation has been assigned to an execution slot. - // Operations that have been assigned more times (e.g. cobuild retries) are sorted - // after operations with fewer attempts, so untried work is preferred. + /** + * Tracks how many times each operation has been assigned to an execution slot. + * Operations that have been assigned more times (e.g. cobuild retries) are sorted + * after operations with fewer attempts, so untried work is preferred. + */ private readonly _timesQueued: Map; private _isDone: boolean; diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index c0292181a4a..f7cd38d5198 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -189,12 +189,9 @@ describe(AsyncOperationQueue.name, () => { // Assign all three — untried operations should come before the retry const results: OperationExecutionRecord[] = []; - const r2: IteratorResult = await queue.next(); - results.push(r2.value); - const r3: IteratorResult = await queue.next(); - results.push(r3.value); - const r4: IteratorResult = await queue.next(); - results.push(r4.value); + for await (const item of queue) { + result.push(item); + } // The cobuild retry should be last expect(results[2]).toBe(firstAssigned); From 5dbda98ae181a4b54b242af5db7e3bc52467ff68 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Mon, 6 Apr 2026 10:42:10 -0400 Subject: [PATCH 5/6] rename to number of times queued Signed-off-by: Aramis Sennyey --- .../src/logic/operations/AsyncOperationQueue.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index b66f1da8f1a..6602ca74485 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -27,7 +27,7 @@ export class AsyncOperationQueue * Operations that have been assigned more times (e.g. cobuild retries) are sorted * after operations with fewer attempts, so untried work is preferred. */ - private readonly _timesQueued: Map; + private readonly _numberOfTimesQueuedByOperation: Map; private _isDone: boolean; @@ -44,7 +44,7 @@ export class AsyncOperationQueue this._totalOperations = this._queue.length; this._isDone = false; this._completedOperations = new Set(); - this._timesQueued = new Map(); + this._numberOfTimesQueuedByOperation = new Map(); } /** @@ -71,7 +71,7 @@ export class AsyncOperationQueue */ public complete(record: OperationExecutionRecord): void { this._completedOperations.add(record); - this._timesQueued.delete(record); + this._numberOfTimesQueuedByOperation.delete(record); // Apply status changes to direct dependents if (record.status !== OperationStatus.Failure && record.status !== OperationStatus.Blocked) { @@ -100,7 +100,11 @@ export class AsyncOperationQueue * if the caller does not update operation dependencies prior to calling `next()`, may need to be invoked manually. */ public assignOperations(): void { - const { _queue: queue, _pendingIterators: waitingIterators, _timesQueued: timesQueued } = this; + const { + _queue: queue, + _pendingIterators: waitingIterators, + _numberOfTimesQueuedByOperation: timesQueued + } = this; const readyOperations: OperationExecutionRecord[] = []; From e6a1b504ba9392786fe58c64cd9563752cf1db9c Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Mon, 6 Apr 2026 11:11:20 -0400 Subject: [PATCH 6/6] fix test by completing items Signed-off-by: Aramis Sennyey --- .../src/logic/operations/test/AsyncOperationQueue.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index f7cd38d5198..1fcd1bb4cc7 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -190,7 +190,8 @@ describe(AsyncOperationQueue.name, () => { // Assign all three — untried operations should come before the retry const results: OperationExecutionRecord[] = []; for await (const item of queue) { - result.push(item); + results.push(item); + queue.complete(item); } // The cobuild retry should be last