Skip to content

Commit 79bd92e

Browse files
committed
fix(run-engine,webapp): harden the batch seal-timeout reaper
Don't abort a batch whose items were all processed (processingCompletedAt set) before the seal landed; those runs resume the parent on their own. Keep expireBatch idempotent: an already-aborted batch still resolves its parent waitpoint, so a retry after a mid-run crash can't leave the parent blocked. Treat ABORTED as terminal in tryCompleteBatch so a straggler run finalizing can't flip the batch back to COMPLETED. Abort the batch if scheduling the reaper fails, so a blocked parent is never stranded with nothing to free it.
1 parent a03dab7 commit 79bd92e

3 files changed

Lines changed: 275 additions & 31 deletions

File tree

apps/webapp/app/runEngine/services/createBatch.server.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,20 @@ export class CreateBatchService extends WithRunEngine {
150150

151151
await this._engine.initializeBatch(initOptions);
152152

153-
// Guard the 2-phase gap: if Phase 2 never seals this batch, the reaper
154-
// aborts it after the timeout and resumes any blocked parent with an
155-
// error instead of leaving it suspended forever.
156-
await this._engine.scheduleExpireBatch({
157-
batchId: batch.id,
158-
availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS),
159-
});
153+
// Guard the gap between creating the batch and sealing it: if the item
154+
// stream never seals this batch, the reaper aborts it after the timeout
155+
// and resumes any blocked parent with an error instead of leaving it
156+
// suspended forever. If scheduling the reaper itself fails, abort the
157+
// batch now so a blocked parent can't be stranded with nothing to free it.
158+
try {
159+
await this._engine.scheduleExpireBatch({
160+
batchId: batch.id,
161+
availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS),
162+
});
163+
} catch (scheduleError) {
164+
await this._engine.expireBatch({ batchId: batch.id });
165+
throw scheduleError;
166+
}
160167

161168
logger.info("Batch created", {
162169
batchId: friendlyId,

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export class BatchSystem {
6262
span.setAttribute("batchId", batchId);
6363

6464
const batch = await this.$.prisma.batchTaskRun.findFirst({
65-
select: { status: true, sealed: true },
65+
select: { status: true, sealed: true, processingCompletedAt: true },
6666
where: { id: batchId },
6767
});
6868

@@ -71,40 +71,58 @@ export class BatchSystem {
7171
return;
7272
}
7373

74-
// The stream sealed the batch, or it already progressed — nothing to fail.
75-
if (batch.sealed || batch.status !== "PENDING") {
76-
this.$.logger.debug("expireBatch: batch already sealed or no longer PENDING", {
74+
// The stream sealed the batch, so the normal completion path owns it.
75+
if (batch.sealed) {
76+
this.$.logger.debug("expireBatch: batch already sealed", { batchId });
77+
return;
78+
}
79+
80+
// Terminal states other than ABORTED are done. ABORTED falls through on
81+
// purpose: a previous attempt may have aborted the batch but crashed before
82+
// resolving the waitpoint, and completeWaitpoint is idempotent, so retrying
83+
// can't leave the parent blocked.
84+
if (batch.status !== "PENDING" && batch.status !== "ABORTED") {
85+
this.$.logger.debug("expireBatch: batch in terminal non-aborted state", {
7786
batchId,
7887
status: batch.status,
79-
sealed: batch.sealed,
8088
});
8189
return;
8290
}
8391

84-
// Conditional update guards against racing a late seal — whichever loses no-ops.
85-
const aborted = await this.$.prisma.batchTaskRun.updateMany({
86-
where: { id: batchId, sealed: false, status: "PENDING" },
87-
data: {
88-
status: "ABORTED",
89-
completedAt: new Date(),
90-
processingCompletedAt: new Date(),
91-
},
92-
});
93-
94-
if (aborted.count === 0) {
95-
this.$.logger.debug("expireBatch: lost race to seal, no-op", { batchId });
92+
// The BatchQueue already processed every item (the completion callback set
93+
// processingCompletedAt) even though the seal never landed — the runs exist
94+
// and will resume the parent on their own. Aborting would fail a healthy batch.
95+
if (batch.status === "PENDING" && batch.processingCompletedAt !== null) {
96+
this.$.logger.debug("expireBatch: items already processed, not aborting", { batchId });
9697
return;
9798
}
9899

99-
// Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to resolve.
100+
if (batch.status === "PENDING") {
101+
// Conditional update guards against racing a late seal or completion —
102+
// whichever loses no-ops.
103+
const aborted = await this.$.prisma.batchTaskRun.updateMany({
104+
where: { id: batchId, sealed: false, status: "PENDING", processingCompletedAt: null },
105+
data: {
106+
status: "ABORTED",
107+
completedAt: new Date(),
108+
},
109+
});
110+
111+
if (aborted.count === 0) {
112+
this.$.logger.debug("expireBatch: lost race to seal/complete, no-op", { batchId });
113+
return;
114+
}
115+
}
116+
117+
// Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to
118+
// resolve. The status filter keeps this idempotent if a previous attempt
119+
// already resolved it.
100120
const waitpoint = await this.$.prisma.waitpoint.findFirst({
101-
where: { completedByBatchId: batchId },
121+
where: { completedByBatchId: batchId, status: "PENDING" },
102122
});
103123

104124
if (!waitpoint) {
105-
this.$.logger.debug("expireBatch: no waitpoint to resolve (fire-and-forget batch)", {
106-
batchId,
107-
});
125+
this.$.logger.debug("expireBatch: no pending waitpoint to resolve", { batchId });
108126
return;
109127
}
110128

@@ -151,8 +169,11 @@ export class BatchSystem {
151169
return;
152170
}
153171

154-
if (batch.status === "COMPLETED") {
155-
this.$.logger.debug("#tryCompleteBatch: Batch already completed", { batchId });
172+
if (batch.status === "COMPLETED" || batch.status === "ABORTED") {
173+
this.$.logger.debug("#tryCompleteBatch: Batch already in a terminal status", {
174+
batchId,
175+
status: batch.status,
176+
});
156177
return;
157178
}
158179

internal-packages/run-engine/src/engine/tests/batchTwoPhase.test.ts

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,53 @@ import type {
1313
BatchItem,
1414
InitializeBatchOptions,
1515
} from "../../batch-queue/types.js";
16+
import { type PrismaClient } from "@trigger.dev/database";
17+
import { type RedisOptions } from "ioredis";
1618

1719
vi.setConfig({ testTimeout: 60_000 });
1820

21+
function createBatchTestEngine(prisma: PrismaClient, redisOptions: RedisOptions) {
22+
return new RunEngine({
23+
prisma,
24+
worker: {
25+
redis: redisOptions,
26+
workers: 1,
27+
tasksPerWorker: 10,
28+
pollIntervalMs: 20,
29+
},
30+
queue: {
31+
redis: redisOptions,
32+
masterQueueConsumersDisabled: true,
33+
processWorkerQueueDebounceMs: 50,
34+
},
35+
runLock: {
36+
redis: redisOptions,
37+
},
38+
machines: {
39+
defaultMachine: "small-1x",
40+
machines: {
41+
"small-1x": {
42+
name: "small-1x" as const,
43+
cpu: 0.5,
44+
memory: 0.5,
45+
centsPerMs: 0.0001,
46+
},
47+
},
48+
baseCostInCents: 0.0001,
49+
},
50+
batchQueue: {
51+
redis: redisOptions,
52+
consumerCount: 2,
53+
consumerIntervalMs: 50,
54+
drr: {
55+
quantum: 10,
56+
maxDeficit: 100,
57+
},
58+
},
59+
tracer: trace.getTracer("test", "0.0.0"),
60+
});
61+
}
62+
1963
describe("RunEngine 2-Phase Batch API", () => {
2064
containerTest(
2165
"2-phase batch: initialize batch, stream items one by one, items get processed",
@@ -823,6 +867,178 @@ describe("RunEngine 2-Phase Batch API", () => {
823867
}
824868
);
825869

870+
containerTest(
871+
"2-phase batch: expireBatch does not abort a batch whose items were all processed",
872+
async ({ prisma, redisOptions }) => {
873+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
874+
const engine = createBatchTestEngine(prisma, redisOptions);
875+
engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" }));
876+
engine.setBatchCompletionCallback(async () => {});
877+
878+
try {
879+
// The BatchQueue streamed and processed every item (the completion callback
880+
// set processingCompletedAt) but the seal never landed. The runs exist and
881+
// will resume the parent on their own, so the reaper must NOT abort this.
882+
const { id: batchId, friendlyId } = BatchId.generate();
883+
await prisma.batchTaskRun.create({
884+
data: {
885+
id: batchId,
886+
friendlyId,
887+
runtimeEnvironmentId: authenticatedEnvironment.id,
888+
status: "PENDING",
889+
runCount: 2,
890+
expectedCount: 2,
891+
sealed: false,
892+
batchVersion: "runengine:v2",
893+
processingCompletedAt: new Date(),
894+
},
895+
});
896+
897+
await engine.expireBatch({ batchId });
898+
899+
const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } });
900+
expect(batch?.status).toBe("PENDING");
901+
} finally {
902+
await engine.quit();
903+
}
904+
}
905+
);
906+
907+
containerTest(
908+
"2-phase batch: expireBatch resumes the parent even if a prior attempt already aborted the batch",
909+
async ({ prisma, redisOptions }) => {
910+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
911+
const engine = createBatchTestEngine(prisma, redisOptions);
912+
engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" }));
913+
engine.setBatchCompletionCallback(async () => {});
914+
915+
try {
916+
const parentTask = "parent-task";
917+
await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]);
918+
919+
const { id: batchId, friendlyId } = BatchId.generate();
920+
await prisma.batchTaskRun.create({
921+
data: {
922+
id: batchId,
923+
friendlyId,
924+
runtimeEnvironmentId: authenticatedEnvironment.id,
925+
status: "PENDING",
926+
runCount: 2,
927+
expectedCount: 2,
928+
sealed: false,
929+
batchVersion: "runengine:v2",
930+
},
931+
});
932+
933+
const parentRun = await engine.trigger(
934+
{
935+
friendlyId: generateFriendlyId("run"),
936+
environment: authenticatedEnvironment,
937+
taskIdentifier: parentTask,
938+
payload: "{}",
939+
payloadType: "application/json",
940+
context: {},
941+
traceContext: {},
942+
traceId: "t_parent",
943+
spanId: "s_parent",
944+
workerQueue: "main",
945+
queue: `task/${parentTask}`,
946+
isTest: false,
947+
tags: [],
948+
},
949+
prisma
950+
);
951+
952+
await setTimeout(500);
953+
const dequeued = await engine.dequeueFromWorkerQueue({
954+
consumerId: "test_12345",
955+
workerQueue: "main",
956+
});
957+
expect(dequeued.length).toBe(1);
958+
959+
const initial = await engine.getRunExecutionData({ runId: parentRun.id });
960+
assertNonNullable(initial);
961+
await engine.startRunAttempt({ runId: parentRun.id, snapshotId: initial.snapshot.id });
962+
963+
await engine.blockRunWithCreatedBatch({
964+
runId: parentRun.id,
965+
batchId,
966+
environmentId: authenticatedEnvironment.id,
967+
projectId: authenticatedEnvironment.projectId,
968+
organizationId: authenticatedEnvironment.organizationId,
969+
});
970+
971+
// Simulate a prior expireBatch attempt that aborted the batch but crashed
972+
// before resolving the parent's waitpoint.
973+
await prisma.batchTaskRun.update({
974+
where: { id: batchId },
975+
data: { status: "ABORTED", completedAt: new Date() },
976+
});
977+
978+
// The retry must still resolve the parent's waitpoint so it can't stay stuck.
979+
await engine.expireBatch({ batchId });
980+
981+
const waitpoint = await prisma.waitpoint.findFirst({
982+
where: { completedByBatchId: batchId },
983+
});
984+
assertNonNullable(waitpoint);
985+
expect(waitpoint.status).toBe("COMPLETED");
986+
expect(waitpoint.outputIsError).toBe(true);
987+
988+
await vi.waitFor(
989+
async () => {
990+
const wps = await prisma.taskRunWaitpoint.findMany({
991+
where: { taskRunId: parentRun.id },
992+
});
993+
expect(wps.length).toBe(0);
994+
},
995+
{ timeout: 15000 }
996+
);
997+
} finally {
998+
await engine.quit();
999+
}
1000+
}
1001+
);
1002+
1003+
containerTest(
1004+
"2-phase batch: tryCompleteBatch leaves an ABORTED batch terminal",
1005+
async ({ prisma, redisOptions }) => {
1006+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
1007+
const engine = createBatchTestEngine(prisma, redisOptions);
1008+
engine.setBatchProcessItemCallback(async () => ({ success: true, runId: "x" }));
1009+
engine.setBatchCompletionCallback(async () => {});
1010+
1011+
try {
1012+
// An aborted batch is terminal: a later tryCompleteBatch (e.g. from a
1013+
// straggler child run finalizing) must not flip it back to COMPLETED.
1014+
const { id: batchId, friendlyId } = BatchId.generate();
1015+
await prisma.batchTaskRun.create({
1016+
data: {
1017+
id: batchId,
1018+
friendlyId,
1019+
runtimeEnvironmentId: authenticatedEnvironment.id,
1020+
status: "ABORTED",
1021+
runCount: 0,
1022+
expectedCount: 0,
1023+
sealed: false,
1024+
batchVersion: "runengine:v2",
1025+
completedAt: new Date(),
1026+
},
1027+
});
1028+
1029+
await engine.tryCompleteBatch({ batchId });
1030+
1031+
// Allow the debounced completion job to run (or correctly no-op).
1032+
await setTimeout(1500);
1033+
1034+
const batch = await prisma.batchTaskRun.findUnique({ where: { id: batchId } });
1035+
expect(batch?.status).toBe("ABORTED");
1036+
} finally {
1037+
await engine.quit();
1038+
}
1039+
}
1040+
);
1041+
8261042
containerTest(
8271043
"2-phase batch: error if batch not initialized",
8281044
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)