Skip to content

Commit a03dab7

Browse files
committed
fix(run-engine,webapp): abort unsealed batches so blocked parents resume
batchTriggerAndWait blocks the parent on a waitpoint as soon as the batch is created, but the batch is only sealed once all its items finish streaming. If the item stream fails completely (rate limit, request timeout, crash), the batch stayed PENDING with no runs and the parent waited on its waitpoint forever. A seal-timeout reaper, scheduled when the batch is created, now aborts a batch that is still unsealed after BATCH_SEAL_TIMEOUT_MS (default 30 minutes) and completes the parent's waitpoint with an error so it resumes. The reaper is idempotent and no-ops if the stream sealed the batch in the meantime.
1 parent 65c545d commit a03dab7

7 files changed

Lines changed: 405 additions & 0 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Recover `batchTriggerAndWait` parents that previously hung forever when a batch's item stream never completed. Batches left unsealed past a timeout are now aborted and the waiting parent resumes with an error instead of waiting indefinitely.

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,13 @@ const EnvironmentSchema = z
768768
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
769769
// so raise with care. Set to 1 for fully sequential ingestion.
770770
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
771+
// Seal-timeout reaper: if a batch's Phase 2 item stream never seals the batch
772+
// (rate-limit, request timeout, crash), abort it after this delay and resume any
773+
// blocked parent (batchTriggerAndWait) with an error instead of hanging forever.
774+
// Must exceed the worst-case legitimate time-to-seal: the SDK retries the stream
775+
// up to maxAttempts (5) times, each attempt bounded by the server request timeout
776+
// (~300s), so the floor is ~5 × requestTimeout. Default 30m leaves headroom.
777+
BATCH_SEAL_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000),
771778
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
772779
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
773780
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),

apps/webapp/app/runEngine/services/createBatch.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
44
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
55
import { Evt } from "evt";
66
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
7+
import { env } from "~/env.server";
78
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
89
import { logger } from "~/services/logger.server";
910
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
@@ -149,6 +150,14 @@ export class CreateBatchService extends WithRunEngine {
149150

150151
await this._engine.initializeBatch(initOptions);
151152

153+
// Guard the 2-phase gap: if Phase 2 never seals this batch, the reaper
154+
// aborts it after the timeout and resumes any blocked parent with an
155+
// error instead of leaving it suspended forever.
156+
await this._engine.scheduleExpireBatch({
157+
batchId: batch.id,
158+
availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS),
159+
});
160+
152161
logger.info("Batch created", {
153162
batchId: friendlyId,
154163
runCount: body.runCount,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ export class RunEngine {
263263
tryCompleteBatch: async ({ payload }) => {
264264
await this.batchSystem.performCompleteBatch({ batchId: payload.batchId });
265265
},
266+
expireBatch: async ({ payload }) => {
267+
await this.batchSystem.expireBatch({ batchId: payload.batchId });
268+
},
266269
continueRunIfUnblocked: async ({ payload }) => {
267270
await this.waitpointSystem.continueRunIfUnblocked({
268271
runId: payload.runId,
@@ -1640,6 +1643,29 @@ export class RunEngine {
16401643
return this.batchSystem.scheduleCompleteBatch({ batchId });
16411644
}
16421645

1646+
/**
1647+
* Terminally fail a batch whose Phase 2 item stream never sealed, resolving the
1648+
* parent's batchTriggerAndWait waitpoint with an error so the parent resumes
1649+
* instead of hanging forever.
1650+
*/
1651+
async expireBatch({ batchId }: { batchId: string }): Promise<void> {
1652+
return this.batchSystem.expireBatch({ batchId });
1653+
}
1654+
1655+
/**
1656+
* Schedule the seal-timeout reaper for a batch. If the batch hasn't sealed by
1657+
* `availableAt`, {@link expireBatch} terminally fails it and resumes the parent.
1658+
*/
1659+
async scheduleExpireBatch({
1660+
batchId,
1661+
availableAt,
1662+
}: {
1663+
batchId: string;
1664+
availableAt: Date;
1665+
}): Promise<void> {
1666+
return this.batchSystem.scheduleExpireBatch({ batchId, availableAt });
1667+
}
1668+
16431669
// ============================================================================
16441670
// BatchQueue methods (DRR-based batch processing)
16451671
// ============================================================================

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { startSpan } from "@internal/tracing";
2+
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
23
import { isFinalRunStatus } from "../statuses.js";
34
import { SystemResources } from "./systems.js";
45
import { WaitpointSystem } from "./waitpointSystem.js";
@@ -32,6 +33,98 @@ export class BatchSystem {
3233
await this.#tryCompleteBatch({ batchId });
3334
}
3435

36+
public async scheduleExpireBatch({
37+
batchId,
38+
availableAt,
39+
}: {
40+
batchId: string;
41+
availableAt: Date;
42+
}): Promise<void> {
43+
await this.$.worker.enqueue({
44+
// Stable id dedupes repeated schedules for the same batch.
45+
id: `expireBatch:${batchId}`,
46+
job: "expireBatch",
47+
payload: { batchId },
48+
availableAt,
49+
});
50+
}
51+
52+
/**
53+
* Terminally fail a batch whose Phase 2 item stream never sealed it, and resolve
54+
* the parent's batchTriggerAndWait waitpoint with an error so the parent resumes
55+
* with a failure instead of hanging forever.
56+
*
57+
* Idempotent and race-safe: if the stream sealed the batch (or it otherwise
58+
* progressed past an unsealed PENDING state) in the meantime, this is a no-op.
59+
*/
60+
public async expireBatch({ batchId }: { batchId: string }): Promise<void> {
61+
return startSpan(this.$.tracer, "expireBatch", async (span) => {
62+
span.setAttribute("batchId", batchId);
63+
64+
const batch = await this.$.prisma.batchTaskRun.findFirst({
65+
select: { status: true, sealed: true },
66+
where: { id: batchId },
67+
});
68+
69+
if (!batch) {
70+
this.$.logger.debug("expireBatch: batch doesn't exist", { batchId });
71+
return;
72+
}
73+
74+
// The stream sealed the batch, or it already progressed — nothing to fail.
75+
if (batch.sealed || batch.status !== "PENDING") {
76+
this.$.logger.debug("expireBatch: batch already sealed or no longer PENDING", {
77+
batchId,
78+
status: batch.status,
79+
sealed: batch.sealed,
80+
});
81+
return;
82+
}
83+
84+
// Conditional update guards against racing a late seal — whichever loses no-ops.
85+
const aborted = await this.$.prisma.batchTaskRun.updateMany({
86+
where: { id: batchId, sealed: false, status: "PENDING" },
87+
data: {
88+
status: "ABORTED",
89+
completedAt: new Date(),
90+
processingCompletedAt: new Date(),
91+
},
92+
});
93+
94+
if (aborted.count === 0) {
95+
this.$.logger.debug("expireBatch: lost race to seal, no-op", { batchId });
96+
return;
97+
}
98+
99+
// Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to resolve.
100+
const waitpoint = await this.$.prisma.waitpoint.findFirst({
101+
where: { completedByBatchId: batchId },
102+
});
103+
104+
if (!waitpoint) {
105+
this.$.logger.debug("expireBatch: no waitpoint to resolve (fire-and-forget batch)", {
106+
batchId,
107+
});
108+
return;
109+
}
110+
111+
const error: TaskRunError = {
112+
type: "STRING_ERROR",
113+
raw: "Batch items could not be streamed before the batch timed out",
114+
};
115+
116+
await this.waitpointSystem.completeWaitpoint({
117+
id: waitpoint.id,
118+
output: { value: JSON.stringify(error), isError: true },
119+
});
120+
121+
this.$.logger.warn("expireBatch: aborted unsealed batch and resumed parent with error", {
122+
batchId,
123+
waitpointId: waitpoint.id,
124+
});
125+
});
126+
}
127+
35128
/**
36129
* Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status.
37130
* This isn't used operationally, but it's used for the Batches dashboard page.

0 commit comments

Comments
 (0)