diff --git a/.changeset/fix-batch-duplicate-idempotency.md b/.changeset/fix-batch-duplicate-idempotency.md new file mode 100644 index 0000000000..b68f2d1e98 --- /dev/null +++ b/.changeset/fix-batch-duplicate-idempotency.md @@ -0,0 +1,7 @@ +--- +"@trigger.dev/webapp": patch +--- + +Fix batchTriggerAndWait running forever when duplicate idempotencyKey is provided in the same batch + +When using batchTriggerAndWait with duplicate idempotencyKeys in the same batch, the batch would never complete because the completedCount and expectedCount would be mismatched. This fix ensures that cached runs (duplicate idempotencyKeys) are properly tracked in the batch, with their completedCount incremented immediately if the cached run is already in a final status. diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index e4bc583b7c..f6dcfd6278 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -124,11 +124,11 @@ export class BatchTriggerV3Service extends BaseService { const existingBatch = options.idempotencyKey ? await this._prisma.batchTaskRun.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - }, - }) + where: { + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + }, + }) : undefined; if (existingBatch) { @@ -167,16 +167,16 @@ export class BatchTriggerV3Service extends BaseService { const dependentAttempt = body?.dependentAttempt ? await this._prisma.taskRunAttempt.findFirst({ - where: { friendlyId: body.dependentAttempt }, - include: { - taskRun: { - select: { - id: true, - status: true, - }, + where: { friendlyId: body.dependentAttempt }, + include: { + taskRun: { + select: { + id: true, + status: true, }, }, - }) + }, + }) : undefined; if ( @@ -890,7 +890,72 @@ export class BatchTriggerV3Service extends BaseService { } } - return false; + // FIX for Issue #2965: When a run is cached (duplicate idempotencyKey), + // we need to ALWAYS create a BatchTaskRunItem to properly track it. + // This handles cases where cached run may originate from another batch. + // Use unique constraint (batchTaskRunId, taskRunId) to prevent duplicates. + const isAlreadyComplete = isFinalRunStatus(result.run.status); + + logger.debug( + "[BatchTriggerV2][processBatchTaskRunItem] Cached run detected, creating batch item", + { + batchId: batch.friendlyId, + runId: task.runId, + cachedRunId: result.run.id, + cachedRunStatus: result.run.status, + isAlreadyComplete, + currentIndex, + } + ); + + // Always create BatchTaskRunItem for cached runs + // This ensures proper tracking even for cross-batch scenarios + try { + await this._prisma.batchTaskRunItem.create({ + data: { + batchTaskRunId: batch.id, + taskRunId: result.run.id, + // Use batchTaskRunItemStatusForRunStatus() for all cases + // This correctly maps both successful (COMPLETED) and failed (FAILED) statuses + status: batchTaskRunItemStatusForRunStatus(result.run.status), + }, + }); + + // Only increment completedCount if the cached run is already finished + // For in-progress runs, completedCount will be incremented when the run completes + if (isAlreadyComplete) { + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + completedCount: { + increment: 1, + }, + }, + }); + } + + // Return true so expectedCount is incremented + return true; + } catch (error) { + if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) { + // BatchTaskRunItem already exists for this batch and cached run + // This can happen if the same idempotencyKey is used multiple times in the same batch + logger.debug( + "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists for cached run", + { + batchId: batch.friendlyId, + runId: task.runId, + cachedRunId: result.run.id, + currentIndex, + } + ); + + // Don't increment expectedCount since this item is already tracked + return false; + } + + throw error; + } } async #enqueueBatchTaskRun(options: BatchProcessingOptions) {