Skip to content

Commit 2fbdc5d

Browse files
committed
refactor(webapp): route run metadata, idempotency-key, and reschedule writes through RunStore
1 parent 109c6a7 commit 2fbdc5d

5 files changed

Lines changed: 62 additions & 61 deletions

File tree

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
1010
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1111
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
1212
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
13+
import { runStore } from "~/v3/runStore.server";
1314
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
1415

1516
// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
@@ -190,10 +191,10 @@ export class IdempotencyKeyConcern {
190191
});
191192

192193
// Update the existing run to remove the idempotency key
193-
await this.prisma.taskRun.updateMany({
194-
where: { id: existingRun.id, idempotencyKey },
195-
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
196-
});
194+
await runStore.clearIdempotencyKey(
195+
{ byId: { runId: existingRun.id, idempotencyKey } },
196+
this.prisma
197+
);
197198

198199
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
199200
}
@@ -207,10 +208,10 @@ export class IdempotencyKeyConcern {
207208
});
208209

209210
// Update the existing run to remove the idempotency key
210-
await this.prisma.taskRun.updateMany({
211-
where: { id: existingRun.id, idempotencyKey },
212-
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
213-
});
211+
await runStore.clearIdempotencyKey(
212+
{ byId: { runId: existingRun.id, idempotencyKey } },
213+
this.prisma
214+
);
214215

215216
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
216217
}

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { Effect, Schedule, Duration, Fiber } from "effect";
1313
import { type RuntimeFiber } from "effect/Fiber";
1414
import { setTimeout } from "timers/promises";
1515
import { Logger, LogLevel } from "@trigger.dev/core/logger";
16+
import type { RunStore } from "@internal/run-store";
17+
import { runStore as defaultRunStore } from "~/v3/runStore.server";
1618

1719
const RUN_UPDATABLE_WINDOW_MS = 60 * 60 * 1000; // 1 hour
1820

@@ -24,6 +26,7 @@ type BufferedRunMetadataChangeOperation = {
2426

2527
export type UpdateMetadataServiceOptions = {
2628
prisma: PrismaClientOrTransaction;
29+
runStore?: RunStore;
2730
flushIntervalMs?: number;
2831
flushEnabled?: boolean;
2932
flushLoggingEnabled?: boolean;
@@ -49,6 +52,7 @@ export class UpdateMetadataService {
4952
private _bufferedOperations: Map<string, BufferedRunMetadataChangeOperation[]> = new Map();
5053
private _flushFiber: RuntimeFiber<void> | null = null;
5154
private readonly _prisma: PrismaClientOrTransaction;
55+
private readonly _runStore: RunStore;
5256
private readonly flushIntervalMs: number;
5357
private readonly flushEnabled: boolean;
5458
private readonly flushLoggingEnabled: boolean;
@@ -57,6 +61,7 @@ export class UpdateMetadataService {
5761

5862
constructor(private readonly options: UpdateMetadataServiceOptions) {
5963
this._prisma = options.prisma;
64+
this._runStore = options.runStore ?? defaultRunStore;
6065
this.flushIntervalMs = options.flushIntervalMs ?? 5000;
6166
this.flushEnabled = options.flushEnabled ?? true;
6267
this.flushLoggingEnabled = options.flushLoggingEnabled ?? true;
@@ -260,17 +265,16 @@ export class UpdateMetadataService {
260265
const writeTime = new Date();
261266
const result = yield* _(
262267
Effect.tryPromise(() =>
263-
this._prisma.taskRun.updateMany({
264-
where: {
265-
id: runId,
266-
metadataVersion: run.metadataVersion,
267-
},
268-
data: {
269-
metadata: newMetadataPacket.data,
268+
this._runStore.updateMetadata(
269+
runId,
270+
{
271+
metadata: newMetadataPacket.data!,
270272
metadataVersion: { increment: 1 },
271273
updatedAt: writeTime,
272274
},
273-
})
275+
{ expectedMetadataVersion: run.metadataVersion },
276+
this._prisma
277+
)
274278
)
275279
);
276280

@@ -469,20 +473,19 @@ export class UpdateMetadataService {
469473
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
470474
// publish the exact committed watermark without a follow-up read.
471475
const writeTime = new Date();
472-
const result = await this._prisma.taskRun.updateMany({
473-
where: {
474-
id: runId,
475-
metadataVersion: run.metadataVersion,
476-
},
477-
data: {
478-
metadata: newMetadataPacket.data,
476+
const result = await this._runStore.updateMetadata(
477+
runId,
478+
{
479+
metadata: newMetadataPacket.data!,
479480
metadataType: newMetadataPacket.dataType,
480481
metadataVersion: {
481482
increment: 1,
482483
},
483484
updatedAt: writeTime,
484485
},
485-
});
486+
{ expectedMetadataVersion: run.metadataVersion },
487+
this._prisma
488+
);
486489

487490
if (result.count === 0) {
488491
if (this.flushLoggingEnabled) {
@@ -564,19 +567,19 @@ export class UpdateMetadataService {
564567
// Update the metadata without version check; updatedAt stamped explicitly so the
565568
// caller can publish the exact committed watermark.
566569
const writeTime = new Date();
567-
await this._prisma.taskRun.update({
568-
where: {
569-
id: runId,
570-
},
571-
data: {
572-
metadata: metadataPacket?.data,
570+
await this._runStore.updateMetadata(
571+
runId,
572+
{
573+
metadata: metadataPacket?.data!,
573574
metadataType: metadataPacket?.dataType,
574575
metadataVersion: {
575576
increment: 1,
576577
},
577578
updatedAt: writeTime,
578579
},
579-
});
580+
{},
581+
this._prisma
582+
);
580583
updatedAtMs = writeTime.getTime();
581584
}
582585

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,10 @@ export class BatchTriggerV3Service extends BaseService {
408408

409409
// Expire the cached runs that are no longer valid
410410
if (expiredRunIds.size) {
411-
await this._prisma.taskRun.updateMany({
412-
where: { friendlyId: { in: Array.from(expiredRunIds) } },
413-
data: { idempotencyKey: null },
414-
});
411+
await this.runStore.clearIdempotencyKey(
412+
{ byFriendlyIds: Array.from(expiredRunIds) },
413+
this._prisma
414+
);
415415
}
416416

417417
return runs;

apps/webapp/app/v3/services/rescheduleTaskRun.server.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ export class RescheduleTaskRunService extends BaseService {
1717
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
1818
}
1919

20-
const updatedRun = await this._prisma.taskRun.update({
21-
where: {
22-
id: taskRun.id,
23-
},
24-
data: {
20+
const updatedRun = await this.runStore.rescheduleRun(
21+
taskRun.id,
22+
{
2523
delayUntil: delay,
2624
queueTimestamp: delay,
2725
},
28-
});
26+
this._prisma
27+
);
2928

3029
if (updatedRun.engine === "V1") {
3130
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);

apps/webapp/app/v3/services/resetIdempotencyKey.server.ts

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,16 @@ export class ResetIdempotencyKeyService extends BaseService {
99
taskIdentifier: string,
1010
authenticatedEnv: AuthenticatedEnvironment
1111
): Promise<{ id: string }> {
12-
const { count: pgCount } = await this._prisma.taskRun.updateMany({
13-
where: {
14-
idempotencyKey,
15-
taskIdentifier,
16-
runtimeEnvironmentId: authenticatedEnv.id,
17-
},
18-
data: {
19-
idempotencyKey: null,
20-
idempotencyKeyExpiresAt: null,
12+
const { count: pgCount } = await this.runStore.clearIdempotencyKey(
13+
{
14+
byPredicate: {
15+
idempotencyKey,
16+
taskIdentifier,
17+
runtimeEnvironmentId: authenticatedEnv.id,
18+
},
2119
},
22-
});
20+
this._prisma
21+
);
2322

2423
// Buffer-side reset: the key may belong to a buffered run that
2524
// hasn't materialised yet. The PG updateMany above can't see it.
@@ -75,17 +74,16 @@ export class ResetIdempotencyKeyService extends BaseService {
7574
// lookup against the writer when there's nothing to find;
7675
// otherwise the exact write the customer asked for (i.e., not
7776
// duplicative — without it the reset is silently lost).
78-
const { count: handoffPgCount } = await this._prisma.taskRun.updateMany({
79-
where: {
80-
idempotencyKey,
81-
taskIdentifier,
82-
runtimeEnvironmentId: authenticatedEnv.id,
83-
},
84-
data: {
85-
idempotencyKey: null,
86-
idempotencyKeyExpiresAt: null,
77+
const { count: handoffPgCount } = await this.runStore.clearIdempotencyKey(
78+
{
79+
byPredicate: {
80+
idempotencyKey,
81+
taskIdentifier,
82+
runtimeEnvironmentId: authenticatedEnv.id,
83+
},
8784
},
88-
});
85+
this._prisma
86+
);
8987
if (handoffPgCount > 0) {
9088
logger.info(
9189
`Reset idempotency key via handoff re-check: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${handoffPgCount} run(s)`

0 commit comments

Comments
 (0)