Skip to content
5 changes: 5 additions & 0 deletions .changeset/fix-batch-trigger-task-identifier.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Fix run.taskIdentifier being reported as "unknown" for batch triggers
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
include: {
checkpoint: true;
completedWaitpoints: true;
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true;
};
};
};
};
};
}>;

Expand All @@ -57,7 +65,9 @@ function enhanceExecutionSnapshot(
*/
function enhanceExecutionSnapshotWithWaitpoints(
snapshot: ExecutionSnapshotWithCheckpoint,
waitpoints: Waitpoint[],
waitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
return {
Expand Down Expand Up @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints(
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
taskIdentifier: w.completedByTaskRun?.taskIdentifier ?? undefined,
}
: undefined,
completedAfter: w.completedAfter ?? undefined,
completedByBatch: w.completedByBatchId
? {
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
: undefined,
output: w.output ?? undefined,
outputType: w.outputType,
Expand Down Expand Up @@ -137,14 +148,23 @@ async function getSnapshotWaitpointIds(
async function fetchWaitpointsInChunks(
prisma: PrismaClientOrTransaction,
waitpointIds: string[]
): Promise<Waitpoint[]> {
): Promise<(Waitpoint & { completedByTaskRun: { taskIdentifier: string | null } | null })[]> {
if (waitpointIds.length === 0) return [];

const allWaitpoints: Waitpoint[] = [];
const allWaitpoints: (Waitpoint & {
completedByTaskRun: { taskIdentifier: string | null } | null;
})[] = [];
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
const waitpoints = await prisma.waitpoint.findMany({
where: { id: { in: chunk } },
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
});
allWaitpoints.push(...waitpoints);
}
Expand All @@ -159,7 +179,15 @@ export async function getLatestExecutionSnapshot(
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { runId, isValid: true },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
checkpoint: true,
},
orderBy: { createdAt: "desc" },
Expand All @@ -179,7 +207,15 @@ export async function getExecutionSnapshotCompletedWaitpoints(
const waitpoints = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: snapshotId },
include: {
completedWaitpoints: true,
completedWaitpoints: {
include: {
completedByTaskRun: {
select: {
taskIdentifier: true,
},
},
},
},
},
});

Expand Down Expand Up @@ -233,19 +269,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
Expand Down
121 changes: 121 additions & 0 deletions packages/core/src/v3/runtime/sharedRuntimeManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { describe, expect, it } from "vitest";
import { SharedRuntimeManager } from "./sharedRuntimeManager.js";
import { CompletedWaitpoint } from "../schemas/index.js";

describe("SharedRuntimeManager", () => {
const mockIpc = {
send: () => { },
} as any;

const manager = new SharedRuntimeManager(mockIpc, false);

// Access private method
const waitpointToResult = (manager as any).waitpointToTaskRunExecutionResult.bind(manager);

describe("waitpointToTaskRunExecutionResult", () => {
it("should use the taskIdentifier from the waitpoint if present (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_1",
friendlyId: "wp_1",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_1",
friendlyId: "run_1",
taskIdentifier: "my-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_1",
taskIdentifier: "my-task",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should default taskIdentifier to 'unknown' if missing (success)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_2",
friendlyId: "wp_2",
type: "RUN",
completedAt: new Date(),
outputIsError: false,
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_2",
friendlyId: "run_2",
// database/legacy object missing taskIdentifier
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: true,
id: "run_2",
taskIdentifier: "unknown",
output: JSON.stringify({ foo: "bar" }),
outputType: "application/json",
});
});

it("should use the taskIdentifier from the waitpoint if present (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_3",
friendlyId: "wp_3",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_3",
friendlyId: "run_3",
taskIdentifier: "my-failed-task",
},
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_3",
taskIdentifier: "my-failed-task",
error: { message: "Boom" },
});
});

it("should default taskIdentifier to 'unknown' if missing (failure)", () => {
const waitpoint: CompletedWaitpoint = {
id: "wp_4",
friendlyId: "wp_4",
type: "RUN",
completedAt: new Date(),
outputIsError: true,
output: JSON.stringify({ message: "Boom" }),
outputType: "application/json",
completedByTaskRun: {
id: "run_4",
friendlyId: "run_4",
} as any,
};

const result = waitpointToResult(waitpoint);

expect(result).toEqual({
ok: false,
id: "run_4",
taskIdentifier: "unknown",
error: { message: "Boom" },
});
});
});
});
10 changes: 6 additions & 4 deletions packages/core/src/v3/runtime/sharedRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class SharedRuntimeManager implements RuntimeManager {

return {
id: params.id,
items: waitpoints.map(this.waitpointToTaskRunExecutionResult),
items: waitpoints.map((wp) => this.waitpointToTaskRunExecutionResult(wp)),
};
});
}
Expand Down Expand Up @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager {
return {
ok: false,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
error: waitpoint.output
? JSON.parse(waitpoint.output)
: {
type: "STRING_ERROR",
message: "Missing error output",
},
type: "STRING_ERROR",
message: "Missing error output",
},
} satisfies TaskRunFailedExecutionResult;
} else {
return {
ok: true,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown",
output: waitpoint.output,
outputType: waitpoint.outputType ?? "application/json",
} satisfies TaskRunSuccessfulExecutionResult;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export const CompletedWaitpoint = z.object({
.object({
id: z.string(),
friendlyId: z.string(),
taskIdentifier: z.string().optional(),
/** If the run has an associated batch */
batch: z
.object({
Expand Down