-
-
Notifications
You must be signed in to change notification settings - Fork 985
fix: batchTriggerAndWait with duplicate idempotencyKeys (#2965) #2977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| "@trigger.dev/webapp": patch | ||
| --- | ||
|
|
||
| Fix batchTriggerAndWait running forever when duplicate idempotencyKey is provided in the same batch | ||
|
|
||
| When using batchTriggerAndWait with duplicate idempotencyKeys in the same batch, the batch would never complete because the completedCount and expectedCount would be mismatched. This fix ensures that cached runs (duplicate idempotencyKeys) are properly tracked in the batch, with their completedCount incremented immediately if the cached run is already in a final status. |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -124,11 +124,11 @@ export class BatchTriggerV3Service extends BaseService { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| const existingBatch = options.idempotencyKey | ||||||||||||||||||||||
| ? await this._prisma.batchTaskRun.findFirst({ | ||||||||||||||||||||||
| where: { | ||||||||||||||||||||||
| runtimeEnvironmentId: environment.id, | ||||||||||||||||||||||
| idempotencyKey: options.idempotencyKey, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| where: { | ||||||||||||||||||||||
| runtimeEnvironmentId: environment.id, | ||||||||||||||||||||||
| idempotencyKey: options.idempotencyKey, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| : undefined; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (existingBatch) { | ||||||||||||||||||||||
|
|
@@ -167,16 +167,16 @@ export class BatchTriggerV3Service extends BaseService { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| const dependentAttempt = body?.dependentAttempt | ||||||||||||||||||||||
| ? await this._prisma.taskRunAttempt.findFirst({ | ||||||||||||||||||||||
| where: { friendlyId: body.dependentAttempt }, | ||||||||||||||||||||||
| include: { | ||||||||||||||||||||||
| taskRun: { | ||||||||||||||||||||||
| select: { | ||||||||||||||||||||||
| id: true, | ||||||||||||||||||||||
| status: true, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| where: { friendlyId: body.dependentAttempt }, | ||||||||||||||||||||||
| include: { | ||||||||||||||||||||||
| taskRun: { | ||||||||||||||||||||||
| select: { | ||||||||||||||||||||||
| id: true, | ||||||||||||||||||||||
| status: true, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| : undefined; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if ( | ||||||||||||||||||||||
|
|
@@ -890,7 +890,71 @@ export class BatchTriggerV3Service extends BaseService { | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| return false; | ||||||||||||||||||||||
| // FIX for Issue #2965: When a run is cached (duplicate idempotencyKey), | ||||||||||||||||||||||
| // we need to ALWAYS create a BatchTaskRunItem to properly track it. | ||||||||||||||||||||||
| // This handles cases where cached run may originate from another batch. | ||||||||||||||||||||||
| // Use unique constraint (batchTaskRunId, taskRunId) to prevent duplicates. | ||||||||||||||||||||||
| const isAlreadyComplete = isFinalRunStatus(result.run.status); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| logger.debug( | ||||||||||||||||||||||
| "[BatchTriggerV2][processBatchTaskRunItem] Cached run detected, creating batch item", | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| batchId: batch.friendlyId, | ||||||||||||||||||||||
| runId: task.runId, | ||||||||||||||||||||||
| cachedRunId: result.run.id, | ||||||||||||||||||||||
| cachedRunStatus: result.run.status, | ||||||||||||||||||||||
| isAlreadyComplete, | ||||||||||||||||||||||
| currentIndex, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Always create BatchTaskRunItem for cached runs | ||||||||||||||||||||||
| // This ensures proper tracking even for cross-batch scenarios | ||||||||||||||||||||||
| try { | ||||||||||||||||||||||
| await this._prisma.batchTaskRunItem.create({ | ||||||||||||||||||||||
| data: { | ||||||||||||||||||||||
| batchTaskRunId: batch.id, | ||||||||||||||||||||||
| taskRunId: result.run.id, | ||||||||||||||||||||||
| // Use appropriate status based on the cached run's current status | ||||||||||||||||||||||
| status: isAlreadyComplete ? "COMPLETED" : batchTaskRunItemStatusForRunStatus(result.run.status), | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
|
Comment on lines
+918
to
+920
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Failed cached runs are incorrectly marked as "COMPLETED". When The 🐛 Proposed fix await this._prisma.batchTaskRunItem.create({
data: {
batchTaskRunId: batch.id,
taskRunId: result.run.id,
- // Use appropriate status based on the cached run's current status
- status: isAlreadyComplete ? "COMPLETED" : batchTaskRunItemStatusForRunStatus(result.run.status),
+ status: batchTaskRunItemStatusForRunStatus(result.run.status),
},
});📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
| }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Only increment completedCount if the cached run is already finished | ||||||||||||||||||||||
| // For in-progress runs, completedCount will be incremented when the run completes | ||||||||||||||||||||||
| if (isAlreadyComplete) { | ||||||||||||||||||||||
| await this._prisma.batchTaskRun.update({ | ||||||||||||||||||||||
| where: { id: batch.id }, | ||||||||||||||||||||||
| data: { | ||||||||||||||||||||||
| completedCount: { | ||||||||||||||||||||||
| increment: 1, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Return true so expectedCount is incremented | ||||||||||||||||||||||
| return true; | ||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||
| if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) { | ||||||||||||||||||||||
| // BatchTaskRunItem already exists for this batch and cached run | ||||||||||||||||||||||
| // This can happen if the same idempotencyKey is used multiple times in the same batch | ||||||||||||||||||||||
| logger.debug( | ||||||||||||||||||||||
| "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists for cached run", | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| batchId: batch.friendlyId, | ||||||||||||||||||||||
| runId: task.runId, | ||||||||||||||||||||||
| cachedRunId: result.run.id, | ||||||||||||||||||||||
| currentIndex, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Don't increment expectedCount since this item is already tracked | ||||||||||||||||||||||
| return false; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| throw error; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async #enqueueBatchTaskRun(options: BatchProcessingOptions) { | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Cached failed runs incorrectly marked as COMPLETED instead of FAILED
When a cached run has a final failed status (e.g.,
CRASHED,SYSTEM_FAILURE,CANCELED, etc.), theBatchTaskRunItemis incorrectly set to"COMPLETED"instead of the appropriate"FAILED"status.Click to expand
Root Cause
The code uses
isAlreadyCompletewhich isisFinalRunStatus(result.run.status). However,isFinalRunStatusreturnstruefor ALL final statuses including failed ones (taskStatus.ts:3-12):The ternary at line 919 sets status to
"COMPLETED"whenisAlreadyCompleteis true, regardless of whether the run actually succeeded or failed:Expected Behavior
The status should use
batchTaskRunItemStatusForRunStatus(result.run.status)for all cases, which correctly maps failed statuses to"FAILED"(taskRun.server.ts:113-139).Impact
Batch items for cached failed runs will be marked as
COMPLETEDinstead ofFAILED, misrepresenting the actual outcome of the run in the batch tracking system.Recommendation: Replace line 919 with:
status: batchTaskRunItemStatusForRunStatus(result.run.status),to properly map both successful and failed statuses.Was this helpful? React with 👍 or 👎 to provide feedback.