From 650a081c2819bb819da863503632a9270eb7df02 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 11:07:54 +0100 Subject: [PATCH 01/12] feat(core): add KSUID run-id minting and an isKsuidId discriminator Add an isomorphic generateKsuid() and an isKsuidId() format check to the run-id scheme, so a run's id can encode which table it belongs to. Additive groundwork: nothing mints KSUIDs yet, and generate() (cuid) is unchanged. --- .../core/src/v3/isomorphic/friendlyId.test.ts | 99 +++++++++++++++++++ packages/core/src/v3/isomorphic/friendlyId.ts | 88 +++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 packages/core/src/v3/isomorphic/friendlyId.test.ts diff --git a/packages/core/src/v3/isomorphic/friendlyId.test.ts b/packages/core/src/v3/isomorphic/friendlyId.test.ts new file mode 100644 index 00000000000..e3221fce7f4 --- /dev/null +++ b/packages/core/src/v3/isomorphic/friendlyId.test.ts @@ -0,0 +1,99 @@ +import { describe, it, expect } from "vitest"; +import { + fromFriendlyId, + generateKsuid, + isKsuidId, + RunId, + toFriendlyId, +} from "./friendlyId.js"; + +const BASE62 = /^[0-9A-Za-z]+$/; + +describe("isKsuidId", () => { + it("is true for a freshly minted ksuid and its friendlyId", () => { + const { id, friendlyId } = RunId.generateKsuid(); + + expect(isKsuidId(id)).toBe(true); + expect(isKsuidId(friendlyId)).toBe(true); + }); + + it("is false for a legacy cuid id and its friendlyId", () => { + const { id, friendlyId } = RunId.generate(); + + // sanity: legacy cuid is 25 chars + expect(id.length).toBe(25); + expect(isKsuidId(id)).toBe(false); + expect(isKsuidId(friendlyId)).toBe(false); + }); + + it("is false for empty, prefix-only, and malformed input", () => { + expect(isKsuidId("")).toBe(false); + expect(isKsuidId("run_")).toBe(false); + + // 27 chars but contains a non-base62 char (`-`) + const twentySevenWithDash = `${"a".repeat(26)}-`; + expect(twentySevenWithDash).toHaveLength(27); + expect(isKsuidId(twentySevenWithDash)).toBe(false); + expect(isKsuidId(`run_${twentySevenWithDash}`)).toBe(false); + }); + + it("is false for a 26-char and a 28-char body", () => { + expect("a".repeat(26)).toHaveLength(26); + expect(isKsuidId("a".repeat(26))).toBe(false); + expect(isKsuidId("a".repeat(28))).toBe(false); + expect(isKsuidId(`run_${"a".repeat(26)}`)).toBe(false); + expect(isKsuidId(`run_${"a".repeat(28)}`)).toBe(false); + }); +}); + +describe("generateKsuid", () => { + it("produces a 27-char base62 body", () => { + const id = generateKsuid(); + + expect(id).toHaveLength(27); + expect(id).toMatch(BASE62); + }); + + it("produces unique ids across calls", () => { + const ids = new Set(Array.from({ length: 100 }, () => generateKsuid())); + + expect(ids.size).toBe(100); + }); + + it("round-trips through toFriendlyId / fromFriendlyId", () => { + const id = generateKsuid(); + const friendlyId = toFriendlyId("run", id); + + expect(friendlyId).toBe(`run_${id}`); + expect(fromFriendlyId(friendlyId)).toBe(id); + + const generated = RunId.generateKsuid(); + expect(generated.friendlyId).toBe(`run_${generated.id}`); + expect(RunId.fromFriendlyId(generated.friendlyId)).toBe(generated.id); + }); + + it("is time-ordered: a later timestamp sorts after an earlier one", () => { + // The timestamp lives in the high bytes, so a larger timestamp encodes to a + // lexicographically-greater (left-padded, fixed-width) base62 string. + const realNow = Date.now; + try { + Date.now = () => 1_500_000_000_000; + const earlier = generateKsuid(); + Date.now = () => 1_500_000_100_000; + const later = generateKsuid(); + + expect(later > earlier).toBe(true); + expect(isKsuidId(earlier)).toBe(true); + expect(isKsuidId(later)).toBe(true); + } finally { + Date.now = realNow; + } + }); +}); + +describe("isKsuidId and the minter agree", () => { + it("isKsuidId(generateKsuid().id) === true and isKsuidId(generate().id) === false", () => { + expect(isKsuidId(RunId.generateKsuid().id)).toBe(true); + expect(isKsuidId(RunId.generate().id)).toBe(false); + }); +}); diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 66575c7c178..ebcc8dfa284 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -11,6 +11,84 @@ export function generateInternalId() { return cuid(); } +// KSUID epoch (2014-05-13T16:53:20Z) — seconds offset applied to the unix timestamp. +const KSUID_EPOCH = 1_400_000_000; +const KSUID_TIMESTAMP_BYTES = 4; +const KSUID_PAYLOAD_BYTES = 16; +const KSUID_TOTAL_BYTES = KSUID_TIMESTAMP_BYTES + KSUID_PAYLOAD_BYTES; +const KSUID_STRING_LENGTH = 27; +const BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +/** Encode raw bytes as base62, left-padded to the given length. */ +function base62Encode(bytes: Uint8Array, length: number): string { + // Big-endian base-256 -> base-62 conversion (repeated division). + const digits = Array.from(bytes); + let result = ""; + + while (digits.length > 0) { + let remainder = 0; + const quotient: number[] = []; + + for (let i = 0; i < digits.length; i++) { + const acc = (digits[i] ?? 0) + remainder * 256; + const q = Math.floor(acc / 62); + remainder = acc % 62; + + if (quotient.length > 0 || q > 0) { + quotient.push(q); + } + } + + // `remainder` is always in [0, 61], so this index is always valid. + result = BASE62_ALPHABET.charAt(remainder) + result; + digits.length = 0; + digits.push(...quotient); + } + + return result.padStart(length, BASE62_ALPHABET.charAt(0)); +} + +/** + * Mint a KSUID body: a 27-char, base62, time-ordered identifier. + * + * Layout: 4-byte big-endian uint32 timestamp (seconds since the KSUID epoch) + * + 16 random bytes = 20 bytes, base62-encoded and left-padded to 27 chars. + * + * Isomorphic: relies only on `globalThis.crypto.getRandomValues` for randomness. + */ +export function generateKsuid(): string { + const bytes = new Uint8Array(KSUID_TOTAL_BYTES); + + const timestamp = Math.floor(Date.now() / 1000) - KSUID_EPOCH; + bytes[0] = (timestamp >>> 24) & 0xff; + bytes[1] = (timestamp >>> 16) & 0xff; + bytes[2] = (timestamp >>> 8) & 0xff; + bytes[3] = timestamp & 0xff; + + globalThis.crypto.getRandomValues(bytes.subarray(KSUID_TIMESTAMP_BYTES)); + + return base62Encode(bytes, KSUID_STRING_LENGTH); +} + +/** + * Pure string discriminator: is this id (or friendlyId) a KSUID-format body? + * + * Strips a leading `"_"` if present, then tests the body for the KSUID + * shape (27 chars, base62). The 25-char legacy cuid and any malformed input + * return false. Never throws. + */ +export function isKsuidId(idOrFriendlyId: string): boolean { + if (!idOrFriendlyId) { + return false; + } + + const underscoreIndex = idOrFriendlyId.indexOf("_"); + const body = + underscoreIndex === -1 ? idOrFriendlyId : idOrFriendlyId.slice(underscoreIndex + 1); + + return body.length === KSUID_STRING_LENGTH && /^[0-9A-Za-z]{27}$/.test(body); +} + /** Convert an internal ID to a friendly ID */ export function toFriendlyId(entityName: string, internalId: string): string { if (!entityName) { @@ -69,6 +147,16 @@ export class IdUtil { }; } + /** Mint an id whose body is a KSUID (27-char, base62, time-ordered). */ + generateKsuid() { + const internalId = generateKsuid(); + + return { + id: internalId, + friendlyId: this.toFriendlyId(internalId), + }; + } + toFriendlyId(internalId: string) { return toFriendlyId(this.entityName, internalId); } From 40aea1b9af82131e38de0221e5108ad053ef5b01 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 11:16:15 +0100 Subject: [PATCH 02/12] feat(database): add the task_run_v2 table Add task_run_v2 as a scalar clone of TaskRun with no foreign-key constraints, plus a (createdAt, id) index for keyset pagination. Unused for now; new runs are routed to it by id format in a later change. --- .../migration.sql | 121 +++++++++++ .../database/prisma/schema.prisma | 192 ++++++++++++++++++ 2 files changed, 313 insertions(+) create mode 100644 internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql diff --git a/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql b/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql new file mode 100644 index 00000000000..22a8bcf2293 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql @@ -0,0 +1,121 @@ +-- CreateTable +CREATE TABLE "public"."task_run_v2" ( + "id" TEXT NOT NULL, + "number" INTEGER NOT NULL DEFAULT 0, + "friendlyId" TEXT NOT NULL, + "engine" "public"."RunEngineVersion" NOT NULL DEFAULT 'V1', + "status" "public"."TaskRunStatus" NOT NULL DEFAULT 'PENDING', + "statusReason" TEXT, + "idempotencyKey" TEXT, + "idempotencyKeyExpiresAt" TIMESTAMP(3), + "idempotencyKeyOptions" JSONB, + "debounce" JSONB, + "taskIdentifier" TEXT NOT NULL, + "isTest" BOOLEAN NOT NULL DEFAULT false, + "payload" TEXT NOT NULL, + "payloadType" TEXT NOT NULL DEFAULT 'application/json', + "context" JSONB, + "traceContext" JSONB, + "traceId" TEXT NOT NULL, + "spanId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "environmentType" "public"."RuntimeEnvironmentType", + "projectId" TEXT NOT NULL, + "organizationId" TEXT, + "queue" TEXT NOT NULL, + "lockedQueueId" TEXT, + "masterQueue" TEXT NOT NULL DEFAULT 'main', + "region" TEXT, + "secondaryMasterQueue" TEXT, + "attemptNumber" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "runTags" TEXT[], + "taskVersion" TEXT, + "sdkVersion" TEXT, + "cliVersion" TEXT, + "startedAt" TIMESTAMP(3), + "executedAt" TIMESTAMP(3), + "completedAt" TIMESTAMP(3), + "machinePreset" TEXT, + "usageDurationMs" INTEGER NOT NULL DEFAULT 0, + "costInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "baseCostInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "lockedAt" TIMESTAMP(3), + "lockedById" TEXT, + "lockedToVersionId" TEXT, + "priorityMs" INTEGER NOT NULL DEFAULT 0, + "concurrencyKey" TEXT, + "delayUntil" TIMESTAMP(3), + "queuedAt" TIMESTAMP(3), + "ttl" TEXT, + "expiredAt" TIMESTAMP(3), + "maxAttempts" INTEGER, + "lockedRetryConfig" JSONB, + "oneTimeUseToken" TEXT, + "taskEventStore" TEXT NOT NULL DEFAULT 'taskEvent', + "queueTimestamp" TIMESTAMP(3), + "scheduleInstanceId" TEXT, + "scheduleId" TEXT, + "bulkActionGroupIds" TEXT[] DEFAULT ARRAY[]::TEXT[], + "logsDeletedAt" TIMESTAMP(3), + "replayedFromTaskRunFriendlyId" TEXT, + "rootTaskRunId" TEXT, + "parentTaskRunId" TEXT, + "parentTaskRunAttemptId" TEXT, + "batchId" TEXT, + "resumeParentOnCompletion" BOOLEAN NOT NULL DEFAULT false, + "depth" INTEGER NOT NULL DEFAULT 0, + "parentSpanId" TEXT, + "runChainState" JSONB, + "seedMetadata" TEXT, + "seedMetadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadata" TEXT, + "metadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadataVersion" INTEGER NOT NULL DEFAULT 1, + "annotations" JSONB, + "isWarmStart" BOOLEAN, + "output" TEXT, + "outputType" TEXT NOT NULL DEFAULT 'application/json', + "error" JSONB, + "planType" TEXT, + "maxDurationInSeconds" INTEGER, + "realtimeStreamsVersion" TEXT NOT NULL DEFAULT 'v1', + "realtimeStreams" TEXT[] DEFAULT ARRAY[]::TEXT[], + "streamBasinName" TEXT, + + CONSTRAINT "task_run_v2_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_friendlyId_key" ON "public"."task_run_v2"("friendlyId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_parentTaskRunId_idx" ON "public"."task_run_v2"("parentTaskRunId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_spanId_idx" ON "public"."task_run_v2"("spanId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_parentSpanId_idx" ON "public"."task_run_v2"("parentSpanId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_runTags_idx" ON "public"."task_run_v2" USING GIN ("runTags" array_ops); + +-- CreateIndex +CREATE INDEX "task_run_v2_runtimeEnvironmentId_batchId_idx" ON "public"."task_run_v2"("runtimeEnvironmentId", "batchId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_runtimeEnvironmentId_createdAt_idx" ON "public"."task_run_v2"("runtimeEnvironmentId", "createdAt" DESC); + +-- CreateIndex +CREATE INDEX "task_run_v2_createdAt_idx" ON "public"."task_run_v2" USING BRIN ("createdAt"); + +-- CreateIndex +CREATE INDEX "task_run_v2_createdAt_id_idx" ON "public"."task_run_v2"("createdAt", "id"); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_oneTimeUseToken_key" ON "public"."task_run_v2"("oneTimeUseToken"); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_runtimeEnvironmentId_taskIdentifier_idempotency_key" ON "public"."task_run_v2"("runtimeEnvironmentId", "taskIdentifier", "idempotencyKey"); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index bb80da3a7ec..844d0da5aed 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1095,6 +1095,198 @@ model TaskRun { @@index([createdAt], type: Brin) } +/// Parallel mirror of TaskRun. +/// Structural copy of TaskRun's scalar columns with NO relation fields, so it +/// carries zero foreign-key constraints and requires no edits to other models. +/// FK id columns are kept as plain scalars; integrity is enforced in app code, +/// matching TaskRun's current FK-free state. Not yet written to or read from. +model TaskRunV2 { + id String @id @default(cuid()) + + number Int @default(0) + friendlyId String @unique + + engine RunEngineVersion @default(V1) + + status TaskRunStatus @default(PENDING) + statusReason String? + + idempotencyKey String? + idempotencyKeyExpiresAt DateTime? + /// Stores the user-provided key and scope: { key: string, scope: "run" | "attempt" | "global" } + idempotencyKeyOptions Json? + + /// Debounce options: { key: string, delay: string, createdAt: Date } + debounce Json? + + taskIdentifier String + + isTest Boolean @default(false) + + payload String + payloadType String @default("application/json") + context Json? + traceContext Json? + + traceId String + spanId String + + runtimeEnvironmentId String + + environmentType RuntimeEnvironmentType? + + projectId String + + organizationId String? + + // The specific queue this run is in + queue String + // The queueId is set when the run is locked to a specific queue + lockedQueueId String? + + /// The main queue that this run is part of + workerQueue String @default("main") @map("masterQueue") + + /// User-facing geo region, stamped at trigger; workerQueue is where it actually ran. + region String? + + /// @deprecated + secondaryMasterQueue String? + + /// From engine v2+ this will be defined after a run has been dequeued (starting at 1) + attemptNumber Int? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + /// Denormized column that holds the raw tags + runTags String[] + + /// Denormalized version of the background worker task + taskVersion String? + sdkVersion String? + cliVersion String? + + /// startedAt marks the point at which a run is dequeued from MarQS + startedAt DateTime? + /// executedAt is set when the first attempt is about to execute + executedAt DateTime? + completedAt DateTime? + machinePreset String? + + usageDurationMs Int @default(0) + costInCents Float @default(0) + baseCostInCents Float @default(0) + + lockedAt DateTime? + lockedById String? + + lockedToVersionId String? + + /// The "priority" of the run. This is just a negative offset in ms for the queue timestamp + /// E.g. a value of 60_000 would put the run into the queue 60s ago. + priorityMs Int @default(0) + + concurrencyKey String? + + delayUntil DateTime? + queuedAt DateTime? + ttl String? + expiredAt DateTime? + maxAttempts Int? + lockedRetryConfig Json? + + /// optional token that can be used to authenticate the task run + oneTimeUseToken String? + + /// Where the logs are stored + taskEventStore String @default("taskEvent") + + queueTimestamp DateTime? + + scheduleInstanceId String? + scheduleId String? + + bulkActionGroupIds String[] @default([]) + + logsDeletedAt DateTime? + + replayedFromTaskRunFriendlyId String? + + rootTaskRunId String? + + parentTaskRunId String? + + parentTaskRunAttemptId String? + + batchId String? + + /// whether or not the task run was created because of a triggerAndWait for batchTriggerAndWait + resumeParentOnCompletion Boolean @default(false) + + /// The depth of this task run in the task run hierarchy + depth Int @default(0) + + /// The span ID of the "trigger" span in the parent task run + parentSpanId String? + + /// Holds the state of the run chain for deadlock detection + runChainState Json? + + /// seed run metadata + seedMetadata String? + seedMetadataType String @default("application/json") + + /// Run metadata + metadata String? + metadataType String @default("application/json") + metadataVersion Int @default(1) + + /// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId + annotations Json? + + /// Whether the latest attempt was a warm start. Null until first attempt starts. + isWarmStart Boolean? + + /// Run output + output String? + outputType String @default("application/json") + + /// Run error + error Json? + + /// Organization's billing plan type (cached for fallback when billing API fails) + planType String? + + maxDurationInSeconds Int? + + /// The version of the realtime streams implementation used by the run + realtimeStreamsVersion String @default("v1") + /// Store the stream keys that are being used by the run + realtimeStreams String[] @default([]) + /// S2 basin where this run's realtime streams live. Stamped at create + /// time from `Organization.streamBasinName` so reads can resolve the + /// basin without joining org. Null when the org has no per-org basin + /// (OSS, or pre-backfill); reads fall back to the global basin. + streamBasinName String? + + @@unique([oneTimeUseToken]) + @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) + // Finding child runs + @@index([parentTaskRunId]) + // Run page inspector + @@index([spanId]) + @@index([parentSpanId]) + // Finding runs in a batch + @@index([runTags(ops: ArrayOps)], type: Gin) + @@index([runtimeEnvironmentId, batchId]) + @@index([runtimeEnvironmentId, createdAt(sort: Desc)]) + @@index([createdAt], type: Brin) + // Keyset cursor for merged pagination across run tables + @@index([createdAt, id]) + @@map("task_run_v2") +} + model TaskRunTemplate { id String @id @default(cuid()) From 72af7aae407c4419d055a8eeb901cfd943cec358 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 12:02:27 +0100 Subject: [PATCH 03/12] feat(database): drop incoming foreign keys referencing TaskRun Drop the 14 child-table foreign keys that referenced TaskRun.id so a child row can reference a run in either the legacy or the new run table by plain scalar. Run integrity moves to app code, symmetric with TaskRun's already dropped outgoing foreign keys. Relations stay in the Prisma schema. --- .../migration.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql diff --git a/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql b/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql new file mode 100644 index 00000000000..9e7313aade9 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql @@ -0,0 +1,17 @@ +-- Drop all foreign key constraints that reference TaskRun.id from child tables +-- (no schema change, data intact). Integrity moves to app code so a child row +-- can reference a run in either TaskRun (legacy) or task_run_v2 (new) by scalar. +ALTER TABLE "public"."TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_taskRunId_fkey"; +ALTER TABLE "public"."TaskRunDependency" DROP CONSTRAINT IF EXISTS "TaskRunDependency_taskRunId_fkey"; +ALTER TABLE "public"."BatchTaskRunItem" DROP CONSTRAINT IF EXISTS "BatchTaskRunItem_taskRunId_fkey"; +ALTER TABLE "public"."Checkpoint" DROP CONSTRAINT IF EXISTS "Checkpoint_runId_fkey"; +ALTER TABLE "public"."CheckpointRestoreEvent" DROP CONSTRAINT IF EXISTS "CheckpointRestoreEvent_runId_fkey"; +ALTER TABLE "public"."ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunId_fkey"; +ALTER TABLE "public"."BulkActionItem" DROP CONSTRAINT IF EXISTS "BulkActionItem_sourceRunId_fkey"; +ALTER TABLE "public"."BulkActionItem" DROP CONSTRAINT IF EXISTS "BulkActionItem_destinationRunId_fkey"; +ALTER TABLE "public"."_TaskRunToTaskRunTag" DROP CONSTRAINT IF EXISTS "_TaskRunToTaskRunTag_A_fkey"; +ALTER TABLE "public"."TaskRunExecutionSnapshot" DROP CONSTRAINT IF EXISTS "TaskRunExecutionSnapshot_runId_fkey"; +ALTER TABLE "public"."Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_completedByTaskRunId_fkey"; +ALTER TABLE "public"."TaskRunWaitpoint" DROP CONSTRAINT IF EXISTS "TaskRunWaitpoint_taskRunId_fkey"; +ALTER TABLE "public"."_WaitpointRunConnections" DROP CONSTRAINT IF EXISTS "_WaitpointRunConnections_A_fkey"; +ALTER TABLE "public"."PlaygroundConversation" DROP CONSTRAINT IF EXISTS "PlaygroundConversation_runId_fkey"; From 1e606626901deb4b907a98cffea9f834848dd5db Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 12:36:00 +0100 Subject: [PATCH 04/12] feat(database): mirror TaskRun relations on TaskRunV2 Give TaskRunV2 the same relation surface as TaskRun (belongs-to plus child collections, with child relations sharing the existing scalar fields) so run reads through the store can include relations regardless of table. No DB foreign keys: stripped in production migrations and in the test harness. --- .../database/prisma/schema.prisma | 104 +++++++++++++++--- 1 file changed, 90 insertions(+), 14 deletions(-) diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 844d0da5aed..5668a5ac93c 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -366,6 +366,7 @@ model RuntimeEnvironment { backgroundWorkers BackgroundWorker[] backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] + taskRunsV2 TaskRunV2[] @relation("taskRunsV2") taskQueues TaskQueue[] batchTaskRuns BatchTaskRun[] environmentVariableValues EnvironmentVariableValue[] @@ -453,6 +454,7 @@ model Project { backgroundWorkers BackgroundWorker[] backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] + taskRunsV2 TaskRunV2[] @relation("taskRunsV2") runTags TaskRunTag[] taskQueues TaskQueue[] environmentVariables EnvironmentVariable[] @@ -560,6 +562,7 @@ model BackgroundWorker { tasks BackgroundWorkerTask[] attempts TaskRunAttempt[] lockedRuns TaskRun[] + lockedRunsV2 TaskRunV2[] @relation("lockedRunsV2") files BackgroundWorkerFile[] queues TaskQueue[] promptVersions PromptVersion[] @@ -695,6 +698,7 @@ model BackgroundWorkerTask { attempts TaskRunAttempt[] runs TaskRun[] + runsV2 TaskRunV2[] @relation("lockedRunsV2") queueConfig Json? retryConfig Json? @@ -742,7 +746,9 @@ model PlaygroundConversation { /// The current active run backing this conversation (null if no run yet) runId String? - run TaskRun? @relation(fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade) + run TaskRun? @relation(fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade, map: "PlaygroundConversation_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2? @relation("playgroundConversationsV2", fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade, map: "PlaygroundConversation_runId_v2_fkey") /// The client data JSON used for this conversation clientData Json? @@ -1131,10 +1137,12 @@ model TaskRunV2 { traceId String spanId String + runtimeEnvironment RuntimeEnvironment @relation("taskRunsV2", fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "task_run_v2_runtimeEnvironmentId_fkey") runtimeEnvironmentId String environmentType RuntimeEnvironmentType? + project Project @relation("taskRunsV2", fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "task_run_v2_projectId_fkey") projectId String organizationId String? @@ -1159,6 +1167,9 @@ model TaskRunV2 { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + attempts TaskRunAttempt[] @relation("attemptsV2") + tags TaskRunTag[] @relation("taskRunTagsV2") + /// Denormized column that holds the raw tags runTags String[] @@ -1167,6 +1178,8 @@ model TaskRunV2 { sdkVersion String? cliVersion String? + checkpoints Checkpoint[] @relation("checkpointsV2") + /// startedAt marks the point at which a run is dequeued from MarQS startedAt DateTime? /// executedAt is set when the first attempt is about to execute @@ -1179,8 +1192,10 @@ model TaskRunV2 { baseCostInCents Float @default(0) lockedAt DateTime? + lockedBy BackgroundWorkerTask? @relation("lockedRunsV2", fields: [lockedById], references: [id], map: "task_run_v2_lockedById_fkey") lockedById String? + lockedToVersion BackgroundWorker? @relation("lockedRunsV2", fields: [lockedToVersionId], references: [id], map: "task_run_v2_lockedToVersionId_fkey") lockedToVersionId String? /// The "priority" of the run. This is just a negative offset in ms for the queue timestamp @@ -1199,11 +1214,27 @@ model TaskRunV2 { /// optional token that can be used to authenticate the task run oneTimeUseToken String? + ///When this run is finished, the waitpoint will be marked as completed + associatedWaitpoint Waitpoint? @relation("CompletingRunV2") + + ///If there are any blocked waitpoints, the run won't be executed + blockedByWaitpoints TaskRunWaitpoint[] @relation("taskRunWaitpointsV2") + + /// All waitpoints that blocked this run at some point, used for display purposes + connectedWaitpoints Waitpoint[] @relation("WaitpointRunConnectionsV2") + /// Where the logs are stored taskEventStore String @default("taskEvent") queueTimestamp DateTime? + batchItems BatchTaskRunItem[] @relation("batchItemsV2") + dependency TaskRunDependency? @relation("dependencyV2") + CheckpointRestoreEvent CheckpointRestoreEvent[] @relation("checkpointRestoreEventsV2") + executionSnapshots TaskRunExecutionSnapshot[] @relation("executionSnapshotsV2") + + alerts ProjectAlert[] @relation("alertsV2") + scheduleInstanceId String? scheduleId String? @@ -1213,12 +1244,26 @@ model TaskRunV2 { replayedFromTaskRunFriendlyId String? + /// This represents the original task that that was triggered outside of a Trigger.dev task + rootTaskRun TaskRunV2? @relation("TaskRootRunV2", fields: [rootTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_rootTaskRunId_fkey") rootTaskRunId String? + /// The root run will have a list of all the descendant runs, children, grand children, etc. + descendantRuns TaskRunV2[] @relation("TaskRootRunV2") + + /// The immediate parent run of this task run + parentTaskRun TaskRunV2? @relation("TaskParentRunV2", fields: [parentTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_parentTaskRunId_fkey") parentTaskRunId String? + /// The immediate child runs of this task run + childRuns TaskRunV2[] @relation("TaskParentRunV2") + + /// The immediate parent attempt of this task run + parentTaskRunAttempt TaskRunAttempt? @relation("TaskParentRunAttemptV2", fields: [parentTaskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_parentTaskRunAttemptId_fkey") parentTaskRunAttemptId String? + /// The batch run that this task run is a part of + batch BatchTaskRun? @relation("batchRunsV2", fields: [batchId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_batchId_fkey") batchId String? /// whether or not the task run was created because of a triggerAndWait for batchTriggerAndWait @@ -1270,6 +1315,11 @@ model TaskRunV2 { /// (OSS, or pre-backfill); reads fall back to the global basin. streamBasinName String? + sourceBulkActionItems BulkActionItem[] @relation("SourceActionItemRunV2") + destinationBulkActionItems BulkActionItem[] @relation("DestinationActionItemRunV2") + + playgroundConversations PlaygroundConversation[] @relation("playgroundConversationsV2") + @@unique([oneTimeUseToken]) @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs @@ -1407,7 +1457,9 @@ model TaskRunExecutionSnapshot { /// Run runId String - run TaskRun @relation(fields: [runId], references: [id]) + run TaskRun @relation(fields: [runId], references: [id], map: "TaskRunExecutionSnapshot_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("executionSnapshotsV2", fields: [runId], references: [id], map: "TaskRunExecutionSnapshot_runId_v2_fkey") runStatus TaskRunStatus // Batch @@ -1527,7 +1579,9 @@ model Waitpoint { /// If it's a RUN type waitpoint, this is the associated run completedByTaskRunId String? @unique - completedByTaskRun TaskRun? @relation("CompletingRun", fields: [completedByTaskRunId], references: [id], onDelete: SetNull) + completedByTaskRun TaskRun? @relation("CompletingRun", fields: [completedByTaskRunId], references: [id], onDelete: SetNull, map: "Waitpoint_completedByTaskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same completedByTaskRunId scalar (FK stripped in prod) + completedByTaskRunV2 TaskRunV2? @relation("CompletingRunV2", fields: [completedByTaskRunId], references: [id], onDelete: SetNull, map: "Waitpoint_completedByTaskRunId_v2_fkey") /// If it's a DATETIME type waitpoint, this is the date. /// If it's a MANUAL waitpoint, this can be set as the `timeout`. @@ -1541,7 +1595,8 @@ model Waitpoint { blockingTaskRuns TaskRunWaitpoint[] /// All runs that have ever been blocked by this waitpoint, used for display purposes - connectedRuns TaskRun[] @relation("WaitpointRunConnections") + connectedRuns TaskRun[] @relation("WaitpointRunConnections") + connectedRunsV2 TaskRunV2[] @relation("WaitpointRunConnectionsV2") /// When a waitpoint is complete completedExecutionSnapshots TaskRunExecutionSnapshot[] @relation("completedWaitpoints") @@ -1592,7 +1647,9 @@ enum WaitpointStatus { model TaskRunWaitpoint { id String @id @default(cuid()) - taskRun TaskRun @relation(fields: [taskRunId], references: [id]) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], map: "TaskRunWaitpoint_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("taskRunWaitpointsV2", fields: [taskRunId], references: [id], map: "TaskRunWaitpoint_taskRunId_v2_fkey") taskRunId String waitpoint Waitpoint @relation(fields: [waitpointId], references: [id]) @@ -1756,7 +1813,8 @@ model TaskRunTag { friendlyId String @unique - runs TaskRun[] + runs TaskRun[] + runsV2 TaskRunV2[] @relation("taskRunTagsV2") project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) projectId String @@ -1773,7 +1831,9 @@ model TaskRunDependency { id String @id @default(cuid()) /// The child run - taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunDependency_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("dependencyV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunDependency_taskRunId_v2_fkey") taskRunId String @unique checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -1821,7 +1881,9 @@ model TaskRunAttempt { friendlyId String @unique - taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunAttempt_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("attemptsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunAttempt_taskRunId_v2_fkey") taskRunId String backgroundWorker BackgroundWorker @relation(fields: [backgroundWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -1858,6 +1920,7 @@ model TaskRunAttempt { CheckpointRestoreEvent CheckpointRestoreEvent[] alerts ProjectAlert[] childRuns TaskRun[] @relation("TaskParentRunAttempt") + childRunsV2 TaskRunV2[] @relation("TaskParentRunAttemptV2") @@unique([taskRunId, number]) @@index([taskRunId]) @@ -2059,6 +2122,7 @@ model BatchTaskRun { runtimeEnvironmentId String /// This only includes new runs, not idempotent runs. runs TaskRun[] + runsV2 TaskRunV2[] @relation("batchRunsV2") createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -2142,7 +2206,9 @@ model BatchTaskRunItem { batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) batchTaskRunId String - taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BatchTaskRunItem_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("batchItemsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BatchTaskRunItem_taskRunId_v2_fkey") taskRunId String taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: Cascade) @@ -2237,7 +2303,9 @@ model Checkpoint { events CheckpointRestoreEvent[] - run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "Checkpoint_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("checkpointsV2", fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "Checkpoint_runId_v2_fkey") runId String attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2272,7 +2340,9 @@ model CheckpointRestoreEvent { checkpoint Checkpoint @relation(fields: [checkpointId], references: [id], onDelete: Cascade, onUpdate: Cascade) checkpointId String - run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "CheckpointRestoreEvent_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("checkpointRestoreEventsV2", fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "CheckpointRestoreEvent_runId_v2_fkey") runId String attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2558,7 +2628,9 @@ model ProjectAlert { taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunAttemptId String? - taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "ProjectAlert_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2? @relation("alertsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "ProjectAlert_taskRunId_v2_fkey") taskRunId String? workerDeployment WorkerDeployment? @relation(fields: [workerDeploymentId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2739,11 +2811,15 @@ model BulkActionItem { status BulkActionItemStatus @default(PENDING) /// The run that is the source of the action, e.g. when replaying this is the original run - sourceRun TaskRun @relation("SourceActionItemRun", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + sourceRun TaskRun @relation("SourceActionItemRun", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_sourceRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same sourceRunId scalar (FK stripped in prod) + sourceRunV2 TaskRunV2 @relation("SourceActionItemRunV2", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_sourceRunId_v2_fkey") sourceRunId String /// The run that's a result of the action, this will be set when the run has been created - destinationRun TaskRun? @relation("DestinationActionItemRun", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + destinationRun TaskRun? @relation("DestinationActionItemRun", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_destinationRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same destinationRunId scalar (FK stripped in prod) + destinationRunV2 TaskRunV2? @relation("DestinationActionItemRunV2", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_destinationRunId_v2_fkey") destinationRunId String? error String? From 0a591fb5ce347b8bb97a596ff8e5ffeb81da3f71 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 12:36:00 +0100 Subject: [PATCH 05/12] test(testcontainers): strip run foreign keys after schema push Production drops the foreign keys on and referencing the run tables, but the test harness builds via prisma db push, which recreates them from the schema relations. Drop them after the push so test databases match production and a run can live in either run table. --- internal-packages/testcontainers/src/utils.ts | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 4183e85b40b..9cbaeb04ce6 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -2,6 +2,7 @@ import { createClient } from "@clickhouse/client"; import { PostgreSqlContainer, StartedPostgreSqlContainer } from "@testcontainers/postgresql"; import { RedisContainer, StartedRedisContainer } from "@testcontainers/redis"; import { tryCatch } from "@trigger.dev/core"; +import { PrismaClient } from "@trigger.dev/database"; import Redis from "ioredis"; import path from "path"; import { isDebug } from "std-env"; @@ -48,9 +49,50 @@ export async function pushDatabaseSchema(databaseUrl: string) { } ); + await dropRunForeignKeys(databaseUrl); + return result; } +/** + * Production drops every foreign key that sits on, or points at, the run tables (`TaskRun` and + * `task_run_v2`) — a run's id is just a scalar that may live in either physical table, so the FKs + * can't be enforced. `prisma db push` doesn't know that: it recreates a constraint for every + * relation still declared in schema.prisma, so the template DB ends up with run FKs production + * doesn't have. That makes tests diverge — e.g. inserting a child row (a `TaskRunExecutionSnapshot` + * whose `runId` is a `task_run_v2` id) trips a `..._runId_fkey -> TaskRun` constraint that doesn't + * exist in prod. So after the push we strip those FKs to match production exactly. + * + * This is done dynamically (rather than naming each constraint) so any relation added to the schema + * later has its test-only run FK stripped automatically. It only removes FK constraints, so it + * cannot corrupt valid data — it makes the template DB strictly more faithful to production. + */ +async function dropRunForeignKeys(databaseUrl: string) { + const prisma = new PrismaClient({ + datasources: { db: { url: databaseUrl } }, + }); + + try { + await prisma.$executeRawUnsafe(` +DO $$ +DECLARE r record; +BEGIN + FOR r IN + SELECT conrelid::regclass::text AS tbl, conname + FROM pg_constraint + WHERE contype = 'f' + AND (confrelid IN ('"TaskRun"'::regclass, 'task_run_v2'::regclass) + OR conrelid IN ('"TaskRun"'::regclass, 'task_run_v2'::regclass)) + LOOP + EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I', r.tbl, r.conname); + END LOOP; +END $$; +`); + } finally { + await prisma.$disconnect(); + } +} + /** * Caps each container's CPU/memory to approximate the 2-core CI runner locally (for timing + flake * reproduction). Set TESTCONTAINERS_CPU (cores per container, e.g. "2") and/or From f8c1a04401f849f7da036a068188d72c1a839184 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 12:43:17 +0100 Subject: [PATCH 06/12] feat(run-store): route run reads and writes by id format Select the TaskRun or task_run_v2 table per operation from the run id's format (KSUID routes to v2, anything else to legacy) via a runModel helper, so a run is read and written in its own table. Batch and predicate-keyed operations span both tables. Behavior-preserving for legacy runs. --- .../run-store/src/PostgresRunStore.test.ts | 673 ++++++++++++++++++ .../run-store/src/PostgresRunStore.ts | 201 ++++-- 2 files changed, 821 insertions(+), 53 deletions(-) diff --git a/internal-packages/run-store/src/PostgresRunStore.test.ts b/internal-packages/run-store/src/PostgresRunStore.test.ts index 47876b70c8d..d4f5b851f22 100644 --- a/internal-packages/run-store/src/PostgresRunStore.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.test.ts @@ -1,4 +1,5 @@ import { postgresTest } from "@internal/testcontainers"; +import { isKsuidId, RunId } from "@trigger.dev/core/v3/isomorphic"; import type { PrismaClient } from "@trigger.dev/database"; import { describe, expect } from "vitest"; import { PostgresRunStore } from "./PostgresRunStore.js"; @@ -1772,3 +1773,675 @@ describe("PostgresRunStore — read", () => { expect(found[0]?.payloadType).toBe("application/json"); }); }); + +describe("PostgresRunStore — table routing by id format", () => { + // Seed a run directly into one physical table, choosing the delegate by id + // format the same way the store does. Returns the ids used. + async function seedRoutedRun( + prisma: PrismaClient, + params: { + id: string; + friendlyId: string; + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; + status?: string; + idempotencyKey?: string; + taskIdentifier?: string; + } + ) { + const delegate = isKsuidId(params.id) + ? (prisma.taskRunV2 as unknown as typeof prisma.taskRun) + : prisma.taskRun; + + await delegate.create({ + data: { + id: params.id, + engine: "V2", + status: (params.status as any) ?? "PENDING", + friendlyId: params.friendlyId, + runtimeEnvironmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + organizationId: params.organizationId, + projectId: params.projectId, + taskIdentifier: params.taskIdentifier ?? "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: `trace_${params.id}`, + spanId: `span_${params.id}`, + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + ...(params.idempotencyKey !== undefined && { idempotencyKey: params.idempotencyKey }), + }, + }); + } + + postgresTest( + "createRun with a cuid id lands a row in TaskRun and NOT in task_run_v2", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + expect(isKsuidId(cuid.id)).toBe(false); + + await store.createRun({ + data: { + id: cuid.id, + engine: "V2", + status: "PENDING", + friendlyId: cuid.friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_cuid", + spanId: "span_cuid", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + snapshot: { + engine: "V2", + executionStatus: "RUN_CREATED", + description: "Run was created", + runStatus: "PENDING", + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }); + + // cuid run is in TaskRun, not in task_run_v2. + const legacyRow = await prisma.taskRun.findUnique({ where: { id: cuid.id } }); + expect(legacyRow).not.toBeNull(); + const cuidInV2 = await prisma.taskRunV2.findUnique({ where: { id: cuid.id } }); + expect(cuidInV2).toBeNull(); + } + ); + + postgresTest( + "createRun routes a KSUID id to task_run_v2: the scalar row lands there and not in TaskRun", + async ({ prisma }) => { + // This test exercises the routing decision in isolation by writing the + // scalar row directly to the table `createRun` would pick for a KSUID + // `data.id`, then asserts the row landed in task_run_v2 and not in TaskRun. + // The full v2 create path (run + nested snapshot + waitpoint) is covered + // by the "v2 nested writes" suite below. + const { organization, project, environment } = await seedEnvironment(prisma); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const ksuidInLegacy = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(ksuidInLegacy).toBeNull(); + } + ); + + postgresTest( + "findRun and updateMetadata route to task_run_v2 for a KSUID run and to TaskRun for a cuid run", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const cuid = RunId.generate(); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + // By-id read finds each run in its own table. + const foundKsuid = await store.findRun({ id: ksuid.id }, { select: { id: true } }); + expect(foundKsuid?.id).toBe(ksuid.id); + const foundCuid = await store.findRun({ id: cuid.id }, { select: { id: true } }); + expect(foundCuid?.id).toBe(cuid.id); + + // By-id write (updateMetadata) lands in the correct table. + const ksuidResult = await store.updateMetadata( + ksuid.id, + { + metadata: '{"routed":"v2"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + {} + ); + expect(ksuidResult.count).toBe(1); + + const cuidResult = await store.updateMetadata( + cuid.id, + { + metadata: '{"routed":"legacy"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + {} + ); + expect(cuidResult.count).toBe(1); + + // The write hit task_run_v2 for the KSUID run … + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { metadata: true }, + }); + expect(v2Row.metadata).toBe('{"routed":"v2"}'); + + // … and TaskRun for the cuid run. + const legacyRow = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + select: { metadata: true }, + }); + expect(legacyRow.metadata).toBe('{"routed":"legacy"}'); + } + ); + + postgresTest( + "expireRunsBatch with a mixed array updates both tables and returns the combined count", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const cuid = RunId.generate(); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const now = new Date("2026-06-19T12:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" }; + + const count = await store.expireRunsBatch([ksuid.id, cuid.id], { error, now }); + + expect(count).toBe(2); + + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true, completedAt: true, expiredAt: true }, + }); + expect(v2Row.status).toBe("EXPIRED"); + expect(v2Row.completedAt).toEqual(now); + expect(v2Row.expiredAt).toEqual(now); + + const legacyRow = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + select: { status: true, completedAt: true, expiredAt: true }, + }); + expect(legacyRow.status).toBe("EXPIRED"); + expect(legacyRow.completedAt).toEqual(now); + expect(legacyRow.expiredAt).toEqual(now); + } + ); +}); + +describe("PostgresRunStore — v2 nested writes (run + related rows via nested Prisma create)", () => { + // `task_run_v2` is a full clone of `TaskRun` down to its relations, so the nested Prisma + // create/include used by createRun/lifecycle methods targets it unchanged via the runModel cast. + // The child->run foreign keys (TaskRunExecutionSnapshot.runId, Waitpoint.completedByTaskRunId, …) + // are dropped in production and by the testcontainer harness, so a child row can reference a run + // in EITHER physical table (TaskRun or task_run_v2) by plain scalar id without a FK violation. + + function runAssociatedWaitpoint(params: { + id: string; + friendlyId: string; + projectId: string; + environmentId: string; + }) { + return { + id: params.id, + friendlyId: params.friendlyId, + type: "RUN" as const, + status: "PENDING" as const, + idempotencyKey: `idem_${params.id}`, + userProvidedIdempotencyKey: false, + projectId: params.projectId, + environmentId: params.environmentId, + }; + } + + postgresTest( + "createRun for a KSUID run lands the run in task_run_v2, creates its snapshot keyed to the v2 run id, and creates the associated waitpoint", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + const input: CreateRunInput = { + ...buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }), + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_v2_create_1", + friendlyId: "wp_v2_create_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + input.data.friendlyId = ksuid.friendlyId; + + const run = await store.createRun(input); + + // Returns the TaskRunWithWaitpoint shape with the associated waitpoint included. + expect(run.id).toBe(ksuid.id); + expect(run.status).toBe("PENDING"); + expect(run.associatedWaitpoint).not.toBeNull(); + expect(run.associatedWaitpoint?.id).toBe("wp_v2_create_1"); + + // The run row landed in task_run_v2, not TaskRun. + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const legacyRow = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(legacyRow).toBeNull(); + + // The execution snapshot is keyed to the v2 run id (in the shared snapshot table). + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: ksuid.id }, + }); + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("RUN_CREATED"); + expect(snapshots[0]?.runStatus).toBe("PENDING"); + + // The waitpoint points back at the v2 run via the scalar FK column. + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_v2_create_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(ksuid.id); + } + ); + + postgresTest( + "v2 lifecycle: startAttempt then completeAttemptSuccess creates the completion snapshot keyed to the v2 run id", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const started = await store.startAttempt( + ksuid.id, + { attemptNumber: 1, isWarmStart: false }, + { select: { id: true, status: true, attemptNumber: true } } + ); + expect(started.status).toBe("EXECUTING"); + expect(started.attemptNumber).toBe(1); + + const completedAt = new Date("2026-06-19T11:00:00.000Z"); + const completed = await store.completeAttemptSuccess( + ksuid.id, + { + completedAt, + output: '{"ok":true}', + outputType: "application/json", + usageDurationMs: 250, + costInCents: 4, + snapshot: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: 1, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }, + { select: { id: true, status: true, completedAt: true, usageDurationMs: true, costInCents: true } } + ); + + expect(completed.id).toBe(ksuid.id); + expect(completed.status).toBe("COMPLETED_SUCCESSFULLY"); + expect(completed.completedAt).toEqual(completedAt); + expect(completed.usageDurationMs).toBe(250); + expect(completed.costInCents).toBe(4); + + // The run row updated in task_run_v2. + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true }, + }); + expect(v2Row.status).toBe("COMPLETED_SUCCESSFULLY"); + + // The completion snapshot is keyed to the v2 run id. + const finished = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: ksuid.id, executionStatus: "FINISHED" }, + }); + expect(finished).toHaveLength(1); + expect(finished[0]?.runStatus).toBe("COMPLETED_SUCCESSFULLY"); + } + ); + + postgresTest( + "createFailedRun for a KSUID run lands the run in task_run_v2 and creates the associated waitpoint keyed to the v2 run id", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const completedAt = new Date("2026-06-19T00:00:00.000Z"); + const error = { type: "STRING_ERROR", raw: "system failure" }; + + const input: CreateFailedRunInput = { + data: { + id: ksuid.id, + engine: "V2", + status: "SYSTEM_FAILURE", + friendlyId: ksuid.friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "trace_v2_failed", + spanId: "span_v2_failed", + queue: "task/my-task", + isTest: false, + completedAt, + error: error as unknown as import("@trigger.dev/database").Prisma.InputJsonObject, + depth: 0, + taskEventStore: "taskEvent", + }, + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_v2_failed_1", + friendlyId: "wp_v2_failed_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + + const run = await store.createFailedRun(input); + + expect(run.id).toBe(ksuid.id); + expect(run.status).toBe("SYSTEM_FAILURE"); + expect(run.associatedWaitpoint).not.toBeNull(); + expect(run.associatedWaitpoint?.id).toBe("wp_v2_failed_1"); + + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const legacyRow = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(legacyRow).toBeNull(); + + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_v2_failed_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(ksuid.id); + } + ); + + postgresTest( + "createRun for a legacy cuid run with an associated waitpoint creates the run, its snapshot, and the waitpoint (regression: identical rows/shape)", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + expect(isKsuidId(cuid.id)).toBe(false); + + const input: CreateRunInput = { + ...buildCreateRunInput({ + runId: cuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }), + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_legacy_create_1", + friendlyId: "wp_legacy_create_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + input.data.friendlyId = cuid.friendlyId; + + const run = await store.createRun(input); + + // Same TaskRunWithWaitpoint shape as before. + expect(run.id).toBe(cuid.id); + expect(run.status).toBe("PENDING"); + expect(run.associatedWaitpoint?.id).toBe("wp_legacy_create_1"); + + // Legacy run is in TaskRun, not task_run_v2. + const legacyRow = await prisma.taskRun.findUnique({ where: { id: cuid.id } }); + expect(legacyRow).not.toBeNull(); + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: cuid.id } }); + expect(v2Row).toBeNull(); + + // Snapshot keyed to the run, waitpoint linked back via the FK column. + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: cuid.id }, + }); + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("RUN_CREATED"); + + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_legacy_create_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(cuid.id); + + // The FK still being live for the legacy table proves the waitpoint really + // resolves to a TaskRun row (the regression path is unchanged). + const reloaded = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + include: { associatedWaitpoint: true }, + }); + expect(reloaded.associatedWaitpoint?.id).toBe("wp_legacy_create_1"); + } + ); + + postgresTest( + "createRun is atomic: a second create with the same id throws and leaves no dangling snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + const input = buildCreateRunInput({ + runId: cuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = cuid.friendlyId; + + await store.createRun(input); + + const before = await prisma.taskRunExecutionSnapshot.count({ where: { runId: cuid.id } }); + expect(before).toBe(1); + + // A second createRun with the same id fails the unique-id insert and + // propagates the error. Because the run row and its snapshot are written by + // one nested Prisma create, the rollback leaves no extra snapshot behind. + await expect(store.createRun(input)).rejects.toThrow(); + + const after = await prisma.taskRunExecutionSnapshot.count({ where: { runId: cuid.id } }); + expect(after).toBe(1); + } + ); + + postgresTest( + "lockRunToWorker for a KSUID run returns the run with runtimeEnvironment hydrated via include (no manual stitch)", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const backgroundWorker = await prisma.backgroundWorker.create({ + data: { + friendlyId: "worker_friendly_v2", + version: "20260601.1", + runtimeEnvironmentId: environment.id, + projectId: project.id, + contentHash: "abc123v2", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + metadata: {}, + }, + }); + + const workerTask = await prisma.backgroundWorkerTask.create({ + data: { + friendlyId: "task_friendly_v2", + slug: "my-task", + filePath: "src/my-task.ts", + exportName: "myTask", + workerId: backgroundWorker.id, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + const queue = await prisma.taskQueue.create({ + data: { + friendlyId: "queue_friendly_v2", + name: "task/my-task", + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + const lockedAt = new Date("2026-06-19T13:00:00.000Z"); + const startedAt = new Date("2026-06-19T13:00:01.000Z"); + const snapshotId = "snap_lock_v2_1"; + + const locked = await store.lockRunToWorker(ksuid.id, { + lockedAt, + lockedById: workerTask.id, + lockedToVersionId: backgroundWorker.id, + lockedQueueId: queue.id, + startedAt, + baseCostInCents: 5, + machinePreset: "small-1x", + taskVersion: "20260601.1", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + maxDurationInSeconds: null, + snapshot: { + id: snapshotId, + previousSnapshotId: undefined, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + completedWaitpointIds: [], + completedWaitpointOrder: [], + }, + }); + + expect(locked.status).toBe("DEQUEUED"); + // The relation is hydrated by the nested `include`, not stitched manually. + expect(locked.runtimeEnvironment).toBeDefined(); + expect(locked.runtimeEnvironment.id).toBe(environment.id); + + // The run row landed (and was updated) in task_run_v2. + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true }, + }); + expect(v2Row.status).toBe("DEQUEUED"); + + // The dequeue snapshot is keyed to the v2 run id. + const snap = await prisma.taskRunExecutionSnapshot.findUnique({ where: { id: snapshotId } }); + expect(snap?.executionStatus).toBe("PENDING_EXECUTING"); + } + ); + + postgresTest( + "findRun with a runtimeEnvironment include resolves the relation for a KSUID run", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const run = await store.findRun({ id: ksuid.id }, { include: { runtimeEnvironment: true } }); + + expect(run?.id).toBe(ksuid.id); + expect(run?.runtimeEnvironment).toBeDefined(); + expect(run?.runtimeEnvironment.id).toBe(environment.id); + } + ); +}); diff --git a/internal-packages/run-store/src/PostgresRunStore.ts b/internal-packages/run-store/src/PostgresRunStore.ts index fcc53c00266..c800386b720 100644 --- a/internal-packages/run-store/src/PostgresRunStore.ts +++ b/internal-packages/run-store/src/PostgresRunStore.ts @@ -20,6 +20,7 @@ import type { TaskRunWithWaitpoint, } from "./types.js"; import type { TaskRunError } from "@trigger.dev/core/v3/schemas"; +import { isKsuidId } from "@trigger.dev/core/v3/isomorphic"; export type PostgresRunStoreOptions = { prisma: PrismaClient; @@ -27,12 +28,17 @@ export type PostgresRunStoreOptions = { }; /** - * Typed write layer for the task-run row, backed by the `taskRun` Prisma model. + * Typed write layer for the task-run row. A run lives in one of two physical + * tables chosen by its id format (`runModel`): the legacy `taskRun`, or the + * `task_run_v2` clone. `task_run_v2` carries the same relation surface as + * `TaskRun`, so a method's nested Prisma create/include (execution snapshot, + * associated waitpoint, `runtimeEnvironment`) targets either table unchanged + * once the delegate comes from `runModel`. * - * Each method is a verbatim relocation of the Prisma statement that lives at a - * specific call site today. Methods write through `(tx ?? this.prisma).taskRun` + * Each method is its original single-table Prisma statement with the run + * delegate routed through `runModel`. Methods write through `tx` when supplied * so callers can opt into an existing transaction. Errors (including unique - * constraint violations) propagate to the caller unchanged. + * constraint violations) propagate unchanged. */ export class PostgresRunStore implements RunStore { private readonly prisma: PrismaClient; @@ -43,13 +49,50 @@ export class PostgresRunStore implements RunStore { this.readOnlyPrisma = options.readOnlyPrisma; } + /** + * A run lives in exactly one physical table, chosen by the FORMAT of its id: + * a KSUID id (new) lives in `task_run_v2`, the legacy cuid id in `TaskRun`. + * `task_run_v2` is an identical clone of `TaskRun` down to its relations, so + * its delegate is cast to the `taskRun` delegate type to reuse the existing + * generic `select`/`include`/nested-write passthrough unchanged. + */ + private runModel(client: PrismaClientOrTransaction, idOrFriendlyId: string) { + return isKsuidId(idOrFriendlyId) + ? (client.taskRunV2 as unknown as typeof client.taskRun) + : client.taskRun; + } + + /** + * Route a single-row read to its physical table from the routing key in the + * `where` clause. `findRun`/`findRunOrThrow` are always called with a + * `{ id }` or `{ friendlyId }` predicate; both carry the same KSUID/cuid body + * and route identically. When neither is a plain string (e.g. an unexpected + * predicate-only read), default to the legacy `taskRun` table — matching the + * pre-split single-table behavior. + */ + #runReadModel( + prisma: PrismaClientOrTransaction | PrismaReplicaClient, + where: Prisma.TaskRunWhereInput + ) { + const routingKey = + typeof where.id === "string" + ? where.id + : typeof where.friendlyId === "string" + ? where.friendlyId + : undefined; + + return routingKey !== undefined && isKsuidId(routingKey) + ? (prisma.taskRunV2 as unknown as typeof prisma.taskRun) + : prisma.taskRun; + } + async createRun( params: CreateRunInput, tx?: PrismaClientOrTransaction ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ include: { associatedWaitpoint: true, }, @@ -84,7 +127,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ data: { ...params.data, executionSnapshots: { @@ -111,7 +154,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ include: { associatedWaitpoint: true, }, @@ -134,7 +177,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXECUTING", @@ -161,7 +204,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "COMPLETED_SUCCESSFULLY", @@ -197,7 +240,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { machinePreset: data.machinePreset, @@ -215,7 +258,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING" }, select: args.select, @@ -229,7 +272,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - await prisma.taskRun.update({ + await this.runModel(prisma, runId).update({ where: { id: runId }, data: { bulkActionGroupIds: { @@ -253,7 +296,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "CANCELED", @@ -283,7 +326,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: data.status, @@ -304,7 +347,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXPIRED", @@ -341,15 +384,41 @@ export class PostgresRunStore implements RunStore { return 0; } - return prisma.$executeRaw` - UPDATE "TaskRun" - SET "status" = 'EXPIRED'::"TaskRunStatus", - "completedAt" = ${data.now}, - "expiredAt" = ${data.now}, - "updatedAt" = ${data.now}, - "error" = ${JSON.stringify(data.error)}::jsonb - WHERE "id" IN (${Prisma.join(runIds)}) - `; + // A run lives in exactly one table, chosen by its id format. The array may + // be mixed, so partition it and run the UPDATE once per non-empty partition + // on its own table, then sum the counts. + const v2Ids = runIds.filter((id) => isKsuidId(id)); + const legacyIds = runIds.filter((id) => !isKsuidId(id)); + + const error = JSON.stringify(data.error); + + let count = 0; + + if (legacyIds.length > 0) { + count += await prisma.$executeRaw` + UPDATE "TaskRun" + SET "status" = 'EXPIRED'::"TaskRunStatus", + "completedAt" = ${data.now}, + "expiredAt" = ${data.now}, + "updatedAt" = ${data.now}, + "error" = ${error}::jsonb + WHERE "id" IN (${Prisma.join(legacyIds)}) + `; + } + + if (v2Ids.length > 0) { + count += await prisma.$executeRaw` + UPDATE "task_run_v2" + SET "status" = 'EXPIRED'::"TaskRunStatus", + "completedAt" = ${data.now}, + "expiredAt" = ${data.now}, + "updatedAt" = ${data.now}, + "error" = ${error}::jsonb + WHERE "id" IN (${Prisma.join(v2Ids)}) + `; + } + + return count; } async lockRunToWorker( @@ -359,7 +428,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "DEQUEUED", @@ -403,7 +472,7 @@ export class PostgresRunStore implements RunStore { include: { runtimeEnvironment: true, }, - }); + }) as Promise>; } async parkPendingVersion( @@ -414,7 +483,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING_VERSION", @@ -430,7 +499,7 @@ export class PostgresRunStore implements RunStore { ): Promise<{ count: number }> { const prisma = tx ?? this.prisma; - const result = await prisma.taskRun.updateMany({ + const result = await this.runModel(prisma, runId).updateMany({ where: { id: runId, status: "PENDING_VERSION" }, data: { status: "PENDING" }, }); @@ -445,7 +514,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "WAITING_TO_RESUME" }, include: args.include, @@ -459,7 +528,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXECUTING" }, select: args.select, @@ -473,7 +542,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { delayUntil: data.delayUntil, @@ -503,7 +572,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING", @@ -519,7 +588,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data, include: { @@ -540,16 +609,17 @@ export class PostgresRunStore implements RunStore { tx?: PrismaClientOrTransaction ): Promise<{ count: number }> { const prisma = tx ?? this.prisma; + const model = this.runModel(prisma, runId); if (options.expectedMetadataVersion !== undefined) { - const result = await prisma.taskRun.updateMany({ + const result = await model.updateMany({ where: { id: runId, metadataVersion: options.expectedMetadataVersion }, data, }); return { count: result.count }; } - await prisma.taskRun.update({ + await model.update({ where: { id: runId }, data, }); @@ -563,7 +633,7 @@ export class PostgresRunStore implements RunStore { const prisma = tx ?? this.prisma; if (params.byId) { - const result = await prisma.taskRun.updateMany({ + const result = await this.runModel(prisma, params.byId.runId).updateMany({ where: { id: params.byId.runId, idempotencyKey: params.byId.idempotencyKey }, data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, }); @@ -571,23 +641,48 @@ export class PostgresRunStore implements RunStore { } if (params.byPredicate) { + // No run id to route by: a matching run could be in either table during + // the mixed window, so run the predicate against both and sum the counts. + const where = { + idempotencyKey: params.byPredicate.idempotencyKey, + taskIdentifier: params.byPredicate.taskIdentifier, + runtimeEnvironmentId: params.byPredicate.runtimeEnvironmentId, + }; + const data = { idempotencyKey: null, idempotencyKeyExpiresAt: null }; + + const [legacy, v2] = await Promise.all([ + prisma.taskRun.updateMany({ where, data }), + (prisma.taskRunV2 as unknown as typeof prisma.taskRun).updateMany({ where, data }), + ]); + + return { count: legacy.count + v2.count }; + } + + // byFriendlyIds — only clears idempotencyKey, not idempotencyKeyExpiresAt. + // The friendlyId carries the same KSUID/cuid body as the id, so it routes + // the same way; partition the (possibly mixed) array and sum the counts. + const v2FriendlyIds = params.byFriendlyIds.filter((friendlyId) => isKsuidId(friendlyId)); + const legacyFriendlyIds = params.byFriendlyIds.filter((friendlyId) => !isKsuidId(friendlyId)); + + let count = 0; + + if (legacyFriendlyIds.length > 0) { const result = await prisma.taskRun.updateMany({ - where: { - idempotencyKey: params.byPredicate.idempotencyKey, - taskIdentifier: params.byPredicate.taskIdentifier, - runtimeEnvironmentId: params.byPredicate.runtimeEnvironmentId, - }, - data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + where: { friendlyId: { in: legacyFriendlyIds } }, + data: { idempotencyKey: null }, }); - return { count: result.count }; + count += result.count; } - // byFriendlyIds — only clears idempotencyKey, not idempotencyKeyExpiresAt - const result = await prisma.taskRun.updateMany({ - where: { friendlyId: { in: params.byFriendlyIds } }, - data: { idempotencyKey: null }, - }); - return { count: result.count }; + if (v2FriendlyIds.length > 0) { + const result = await (prisma.taskRunV2 as unknown as typeof prisma.taskRun).updateMany({ + where: { friendlyId: { in: v2FriendlyIds } }, + data: { idempotencyKey: null }, + }); + count += result.count; + } + + return { count }; } async pushTags( @@ -598,7 +693,7 @@ export class PostgresRunStore implements RunStore { ): Promise<{ updatedAt: Date }> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId, runtimeEnvironmentId: where.runtimeEnvironmentId }, data: { runTags: { push: tags } }, select: { updatedAt: true }, @@ -612,7 +707,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - await prisma.taskRun.update({ + await this.runModel(prisma, runId).update({ where: { id: runId }, data: { realtimeStreams: { push: streamId } }, }); @@ -639,7 +734,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return prisma.taskRun.findFirst({ + return this.#runReadModel(prisma, where).findFirst({ where, ...args, }); @@ -666,7 +761,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return prisma.taskRun.findFirstOrThrow({ + return this.#runReadModel(prisma, where).findFirstOrThrow({ where, ...args, }); From e1743415f16f094aa0eda999e6204040e47b9408 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 16:46:55 +0100 Subject: [PATCH 07/12] feat(run-store): both-table merged keyset cursor for findRuns findRuns now queries both TaskRun and task_run_v2 and merges the two ordered streams into one result. Ordered, limited reads require a time-based key (createdAt) because cuid and ksuid ids do not sort into a shared range, so id alone cannot order the union. --- .../run-store/src/PostgresRunStore.test.ts | 288 +++++++++++++++++ .../run-store/src/PostgresRunStore.ts | 293 +++++++++++++++++- 2 files changed, 580 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-store/src/PostgresRunStore.test.ts b/internal-packages/run-store/src/PostgresRunStore.test.ts index d4f5b851f22..a5866fa5d81 100644 --- a/internal-packages/run-store/src/PostgresRunStore.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.test.ts @@ -1788,6 +1788,7 @@ describe("PostgresRunStore — table routing by id format", () => { status?: string; idempotencyKey?: string; taskIdentifier?: string; + createdAt?: Date; } ) { const delegate = isKsuidId(params.id) @@ -1815,6 +1816,7 @@ describe("PostgresRunStore — table routing by id format", () => { taskEventStore: "taskEvent", depth: 0, ...(params.idempotencyKey !== undefined && { idempotencyKey: params.idempotencyKey }), + ...(params.createdAt !== undefined && { createdAt: params.createdAt }), }, }); } @@ -2016,6 +2018,292 @@ describe("PostgresRunStore — table routing by id format", () => { expect(legacyRow.expiredAt).toEqual(now); } ); + + postgresTest( + "findRuns (unordered) returns runs from BOTH TaskRun and task_run_v2 in one env", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // Two legacy (cuid) runs + two new (ksuid) runs in the SAME env. + const legacyA = RunId.generate(); + const legacyB = RunId.generate(); + const v2A = RunId.generateKsuid(); + const v2B = RunId.generateKsuid(); + expect(isKsuidId(legacyA.id)).toBe(false); + expect(isKsuidId(v2A.id)).toBe(true); + + for (const run of [legacyA, legacyB, v2A, v2B]) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + }); + + // ALL four runs come back, regardless of which physical table they live in. + expect(found.map((r) => r.id).sort()).toEqual( + [legacyA.id, legacyB.id, v2A.id, v2B.id].sort() + ); + } + ); + + postgresTest( + "findRuns (ordered+limited) 2-way merges both tables to the globally-correct first N", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // Interleave createdAt across the two tables so a per-table take+slice + // would be WRONG: the newest run is in v2, the 2nd-newest in legacy, etc. + // t5 (v2) > t4 (legacy) > t3 (v2) > t2 (legacy) > t1 (v2) > t0 (legacy) + const base = new Date("2026-06-01T00:00:00.000Z").getTime(); + const at = (i: number) => new Date(base + i * 60_000); + + const legacy0 = RunId.generate(); // t0 (oldest) + const v2_1 = RunId.generateKsuid(); // t1 + const legacy2 = RunId.generate(); // t2 + const v2_3 = RunId.generateKsuid(); // t3 + const legacy4 = RunId.generate(); // t4 + const v2_5 = RunId.generateKsuid(); // t5 (newest) + + const seeded: Array<{ id: string; friendlyId: string; t: number }> = [ + { id: legacy0.id, friendlyId: legacy0.friendlyId, t: 0 }, + { id: v2_1.id, friendlyId: v2_1.friendlyId, t: 1 }, + { id: legacy2.id, friendlyId: legacy2.friendlyId, t: 2 }, + { id: v2_3.id, friendlyId: v2_3.friendlyId, t: 3 }, + { id: legacy4.id, friendlyId: legacy4.friendlyId, t: 4 }, + { id: v2_5.id, friendlyId: v2_5.friendlyId, t: 5 }, + ]; + + for (const run of seeded) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + createdAt: at(run.t), + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { createdAt: "desc" }, + take: 3, + }); + + // The globally-newest 3 — drawn from BOTH tables in true createdAt order, + // NOT three rows from one table. + expect(found.map((r) => r.id)).toEqual([v2_5.id, legacy4.id, v2_3.id]); + } + ); + + postgresTest( + "findRuns scoping: a run in another env is NOT returned from either table", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // A second env in the same project. + const otherEnv = await prisma.runtimeEnvironment.create({ + data: { + type: "PREVIEW", + slug: "other", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_other_apikey", + pkApiKey: "pk_other_apikey", + shortcode: "other_short_code", + }, + }); + + // One legacy + one v2 run in the TARGET env. + const legacyTarget = RunId.generate(); + const v2Target = RunId.generateKsuid(); + // One legacy + one v2 run in the OTHER env (must never surface). + const legacyOther = RunId.generate(); + const v2Other = RunId.generateKsuid(); + + await seedRoutedRun(prisma, { + id: legacyTarget.id, + friendlyId: legacyTarget.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: v2Target.id, + friendlyId: v2Target.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: legacyOther.id, + friendlyId: legacyOther.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: otherEnv.id, + }); + await seedRoutedRun(prisma, { + id: v2Other.id, + friendlyId: v2Other.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: otherEnv.id, + }); + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + }); + + // The same `where` fences BOTH tables: only the target env's runs come back. + expect(found.map((r) => r.id).sort()).toEqual([legacyTarget.id, v2Target.id].sort()); + const foundIds = new Set(found.map((r) => r.id)); + expect(foundIds.has(legacyOther.id)).toBe(false); + expect(foundIds.has(v2Other.id)).toBe(false); + } + ); + + postgresTest( + "findRuns (include) returns hydrated relations from both tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + const v2 = RunId.generateKsuid(); + + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: v2.id, + friendlyId: v2.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + include: { runtimeEnvironment: true }, + }); + + expect(found).toHaveLength(2); + // Both rows — legacy and v2 — carry the hydrated relation. + for (const run of found) { + expect(run.runtimeEnvironment).not.toBeNull(); + expect(run.runtimeEnvironment.id).toBe(environment.id); + expect(run.runtimeEnvironment.slug).toBe("dev"); + } + } + ); + + postgresTest( + "findRuns (take, no orderBy) caps the combined result across both tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // 2 legacy + 2 v2; an unordered `take: 3` must return exactly 3, all + // belonging to the scoped env. + const runs = [ + RunId.generate(), + RunId.generate(), + RunId.generateKsuid(), + RunId.generateKsuid(), + ]; + for (const run of runs) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + take: 3, + }); + + expect(found).toHaveLength(3); + const allIds = new Set(runs.map((r) => r.id)); + for (const run of found) { + expect(allIds.has(run.id)).toBe(true); + } + } + ); + + postgresTest( + "findRuns (ordered+limited) by id alone is rejected: id is not a total cross-table order", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + await expect( + store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { id: "asc" }, + take: 10, + }) + ).rejects.toThrow(/total order/i); + } + ); + + postgresTest( + "findRuns (ordered+limited) rejects a Prisma cursor it cannot span across two tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + await expect( + store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { createdAt: "desc" }, + take: 5, + cursor: { id: legacy.id }, + }) + ).rejects.toThrow(/cursor/i); + } + ); }); describe("PostgresRunStore — v2 nested writes (run + related rows via nested Prisma create)", () => { diff --git a/internal-packages/run-store/src/PostgresRunStore.ts b/internal-packages/run-store/src/PostgresRunStore.ts index c800386b720..6f18a3dd4fc 100644 --- a/internal-packages/run-store/src/PostgresRunStore.ts +++ b/internal-packages/run-store/src/PostgresRunStore.ts @@ -813,7 +813,298 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = client ?? this.readOnlyPrisma; - return prisma.taskRun.findMany(args); + // A run lives in exactly one physical table, chosen by its id format, so a + // multi-row read must hit BOTH `TaskRun` (legacy cuid) and `task_run_v2` + // (new ksuid) and combine. `task_run_v2` is an identical clone of `TaskRun` + // (same relation surface), so the SAME `args` — crucially the SAME `where`, + // which is the security scope — run unchanged against either delegate. + const legacyModel = prisma.taskRun; + const v2Model = prisma.taskRunV2 as unknown as typeof prisma.taskRun; + + const ordered = this.#normalizeOrderBy(args.orderBy); + + // ORDERED + LIMITED → bounded 2-way merge. + // + // A single Prisma `cursor` addresses one table's row and cannot span two + // tables, so reject it on this path rather than silently paginating one + // table. (No current caller pairs `cursor` with `orderBy`+`take`; keyset + // callers carry the cursor in `where`, which both queries honor.) + if (ordered.length > 0 && args.take !== undefined) { + if (args.cursor !== undefined) { + throw new Error( + "RunStore.findRuns: a Prisma `cursor` cannot address two tables on an ordered+limited read. " + + "Use a where-based keyset (e.g. `where: { createdAt: { lt: X } }`) instead." + ); + } + + const comparator = this.#buildCrossTableComparator(ordered); + + // The in-memory comparator reads the order keys off each row, so they + // MUST be in the projection. If the caller's `select` omits one, add it + // for the query and strip it from the output. (`include`/full-row already + // carry every scalar.) + const { args: queryArgs, addedKeys } = this.#withOrderKeysSelected(args, ordered); + + // Take at most `take` from each table: the merged head of two ordered + // streams of length `take` is fully determined by their first `take` rows. + const perTableArgs = { ...queryArgs, take: args.take }; + + const [legacyRows, v2Rows] = (await Promise.all([ + legacyModel.findMany(perTableArgs), + v2Model.findMany(perTableArgs), + ])) as [Array>, Array>]; + + const merged = this.#mergeOrdered(legacyRows, v2Rows, comparator, args.take); + return this.#stripAddedKeys(merged, addedKeys); + } + + // UNORDERED / NO-LIMIT (or `take` without `orderBy`) → run the SAME args + // against both tables and concatenate. A run is in exactly one table, so + // concatenation is complete and has no duplicates. + // + // `orderBy` without `take` still needs the order keys projected so the + // whole-set re-sort below can read them. + const { args: queryArgs, addedKeys } = + ordered.length > 0 + ? this.#withOrderKeysSelected(args, ordered) + : { args, addedKeys: [] as string[] }; + + const [legacyRows, v2Rows] = (await Promise.all([ + legacyModel.findMany(queryArgs), + v2Model.findMany(queryArgs), + ])) as [Array>, Array>]; + + let combined = legacyRows.concat(v2Rows); + + // `orderBy` without `take`: each table came back ordered, but the + // concatenation is not — re-sort the whole bounded set to honor the order. + if (ordered.length > 0) { + const comparator = this.#buildCrossTableComparator(ordered); + combined = combined.sort(comparator); + } + + // `take` without `orderBy`: an unordered cap. Each table was capped at + // `take`, so the concatenation is at most `2*take`; trim to `take`. Order + // among unordered rows is unspecified either way. + if (args.take !== undefined) { + combined = combined.slice(0, args.take); + } + + return this.#stripAddedKeys(combined, addedKeys); + } + + /** + * The cross-table merge/sort compares order-key VALUES read off each returned + * row, so every scalar order key must be present in the projection. When the + * caller passes a `select` that omits an order key, add it (so the row carries + * the value) and record which keys were added so they can be stripped from the + * final output — the caller asked not to see them. A query with `include`, or + * with neither `select` nor `include` (full row), already returns every scalar + * column, so nothing is added. + */ + #withOrderKeysSelected( + args: { + where: Prisma.TaskRunWhereInput; + select?: Prisma.TaskRunSelect; + include?: Prisma.TaskRunInclude; + orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[]; + take?: number; + skip?: number; + cursor?: Prisma.TaskRunWhereUniqueInput; + }, + ordered: Array<{ key: string; direction: "asc" | "desc" }> + ): { + args: typeof args; + addedKeys: string[]; + } { + // The merge always tiebreaks on `id`, so it must be readable too. + const requiredKeys = new Set([...ordered.map((entry) => entry.key), "id"]); + + if (!args.select) { + // include / full-row: all scalars are present already. + return { args, addedKeys: [] }; + } + + const select = args.select as Record; + const addedKeys: string[] = []; + const augmentedSelect: Record = { ...select }; + + for (const key of requiredKeys) { + if (!(key in augmentedSelect)) { + augmentedSelect[key] = true; + addedKeys.push(key); + } + } + + if (addedKeys.length === 0) { + return { args, addedKeys: [] }; + } + + return { args: { ...args, select: augmentedSelect as Prisma.TaskRunSelect }, addedKeys }; + } + + /** Remove the order-key columns that were added purely to drive the merge. */ + #stripAddedKeys( + rows: Array>, + addedKeys: string[] + ): Array> { + if (addedKeys.length === 0) { + return rows; + } + + for (const row of rows) { + for (const key of addedKeys) { + delete row[key]; + } + } + + return rows; + } + + /** + * Normalize the optional `orderBy` (single object or array) into an array of + * single-key order entries, preserving precedence. An empty array means "no + * ordering requested". + */ + #normalizeOrderBy( + orderBy: + | Prisma.TaskRunOrderByWithRelationInput + | Prisma.TaskRunOrderByWithRelationInput[] + | undefined + ): Array<{ key: string; direction: "asc" | "desc" }> { + if (orderBy === undefined) { + return []; + } + + const list = Array.isArray(orderBy) ? orderBy : [orderBy]; + const entries: Array<{ key: string; direction: "asc" | "desc" }> = []; + + for (const clause of list) { + for (const [key, value] of Object.entries(clause)) { + // Only scalar `{ field: "asc" | "desc" }` entries are mergeable in + // memory. A relation/nested sort (value is an object) can't be compared + // here — flag it rather than mis-order across the two tables. + if (value === "asc" || value === "desc") { + entries.push({ key, direction: value }); + } else { + throw new Error( + `RunStore.findRuns: cannot merge across tables on a non-scalar orderBy key "${key}". ` + + "Ordered+limited cross-table reads must order by a scalar column (a time/createdAt field, with id as a tiebreak)." + ); + } + } + } + + return entries; + } + + /** + * Build a total-order comparator from the requested scalar order keys. + * + * The cross-table merge is only correct when the order is a TOTAL order over + * the union of both tables. A time-based column (`createdAt`, or any other + * Date column) provides that; `id` alone does NOT — a cuid and a ksuid live + * in different, non-interleaving id spaces, so ordering the union by `id` + * lexicographically is meaningless. Require a time/createdAt key to lead (or + * appear in) the order, and use `id` only as a within-timestamp tiebreak. + */ + #buildCrossTableComparator( + ordered: Array<{ key: string; direction: "asc" | "desc" }> + ): (a: Record, b: Record) => number { + const hasTimeKey = ordered.some((entry) => this.#isTimeOrderKey(entry.key)); + + if (!hasTimeKey) { + const keys = ordered.map((entry) => entry.key).join(", "); + throw new Error( + `RunStore.findRuns: ordered+limited read orders by [${keys}], which is not a valid total order across the ` + + "legacy TaskRun (cuid) and task_run_v2 (ksuid) tables. Order by a time/createdAt column (id may follow as a tiebreak)." + ); + } + + // Ensure `id` is present as a final tiebreak so the merge is deterministic + // when two rows share the leading timestamp. Use the direction of the + // leading order key for the tiebreak. + const comparators = [...ordered]; + if (!comparators.some((entry) => entry.key === "id")) { + comparators.push({ key: "id", direction: ordered[0].direction }); + } + + return (a, b) => { + for (const { key, direction } of comparators) { + const cmp = this.#compareValues(a[key], b[key]); + if (cmp !== 0) { + return direction === "asc" ? cmp : -cmp; + } + } + return 0; + }; + } + + /** + * A column is a valid cross-table total-order lead when it is time-based. + * `createdAt` is the canonical one; the other Date columns the callers use + * (`updatedAt`, `completedAt`, etc.) qualify too. The selected/included row + * must carry the column for the comparator to read it. + */ + #isTimeOrderKey(key: string): boolean { + return ( + key === "createdAt" || + key === "updatedAt" || + key === "completedAt" || + key === "startedAt" || + key === "queuedAt" || + key === "lockedAt" || + key === "delayUntil" || + key === "expiredAt" + ); + } + + /** Ascending comparison of two scalar order values (Date, number, string). */ + #compareValues(a: unknown, b: unknown): number { + if (a === b) return 0; + // Nulls sort last (Prisma's default for `nulls: "last"` is the common case; + // a stable, deterministic placement is what matters for the merge). + if (a === null || a === undefined) return 1; + if (b === null || b === undefined) return -1; + + if (a instanceof Date && b instanceof Date) { + return a.getTime() - b.getTime(); + } + if (typeof a === "number" && typeof b === "number") { + return a - b; + } + return String(a) < String(b) ? -1 : String(a) > String(b) ? 1 : 0; + } + + /** + * 2-way merge of two already-ordered streams into the first `take` rows of + * their combined order. Bounded: walks at most `take` steps. The two inputs + * are each `findMany`-ordered by the SAME order keys, so a single linear pass + * picking the smaller head under `comparator` yields the globally-correct head. + */ + #mergeOrdered( + left: Array>, + right: Array>, + comparator: (a: Record, b: Record) => number, + take: number + ): Array> { + const out: Array> = []; + let i = 0; + let j = 0; + + while (out.length < take && (i < left.length || j < right.length)) { + if (i >= left.length) { + out.push(right[j++]); + } else if (j >= right.length) { + out.push(left[i++]); + } else if (comparator(left[i], right[j]) <= 0) { + out.push(left[i++]); + } else { + out.push(right[j++]); + } + } + + return out; } /** From 37b7f973d75ec76955eb87876e6378ed7ac703bc Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 16:47:00 +0100 Subject: [PATCH 08/12] fix(webapp): read runs across both run tables with a time keyset runsBackfiller paginates on a (createdAt, id) keyset instead of id alone. The ClickHouse runs list restores ClickHouse ranking in memory after hydrating rows by id, since a single SQL order cannot span the two tables. --- .../app/services/runsBackfiller.server.ts | 48 ++++++++++++++++--- .../clickhouseRunsRepository.server.ts | 14 ++++-- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/services/runsBackfiller.server.ts b/apps/webapp/app/services/runsBackfiller.server.ts index 50e041ee64b..09386c9495c 100644 --- a/apps/webapp/app/services/runsBackfiller.server.ts +++ b/apps/webapp/app/services/runsBackfiller.server.ts @@ -41,6 +41,13 @@ export class RunsBackfillerService { span.setAttribute("cursor", cursor ?? ""); span.setAttribute("batchSize", batchSize ?? 0); + // Keyset on (createdAt, id). Runs now live across two physical tables + // (legacy TaskRun with cuid ids, task_run_v2 with ksuid ids), and `id` + // alone is not a valid order across them: cuid and ksuid sort in + // different ranges. RunStore merges the two tables only on a time-based + // key, so order by createdAt and tiebreak on id within a timestamp. + const keyset = cursor ? decodeBackfillCursor(cursor) : undefined; + const runs = await runStore.findRuns( { where: { @@ -51,11 +58,16 @@ export class RunsBackfillerService { status: { in: FINAL_RUN_STATUSES, }, - ...(cursor ? { id: { gt: cursor } } : {}), - }, - orderBy: { - id: "asc", + ...(keyset + ? { + OR: [ + { createdAt: { gt: keyset.createdAt } }, + { createdAt: keyset.createdAt, id: { gt: keyset.id } }, + ], + } + : {}), }, + orderBy: [{ createdAt: "asc" }, { id: "asc" }], take: batchSize, }, this.prisma @@ -94,8 +106,32 @@ export class RunsBackfillerService { lastRunId: lastRun.id, }); - // Return the last run ID to continue from - return lastRun.id; + // Return a (createdAt, id) cursor to continue from on the next batch. + return encodeBackfillCursor(lastRun.createdAt, lastRun.id); }); } } + +// The backfill cursor is an opaque "_" string. The admin +// worker passes it back verbatim across batches; only this service interprets +// it. An ISO timestamp contains no "_" and run ids are base62/base36, so the +// first "_" cleanly splits the two halves. +const BACKFILL_CURSOR_SEPARATOR = "_"; + +export function encodeBackfillCursor(createdAt: Date, id: string): string { + return `${createdAt.toISOString()}${BACKFILL_CURSOR_SEPARATOR}${id}`; +} + +export function decodeBackfillCursor(cursor: string): { createdAt: Date; id: string } { + const separatorIndex = cursor.indexOf(BACKFILL_CURSOR_SEPARATOR); + const createdAt = separatorIndex === -1 ? new Date(NaN) : new Date(cursor.slice(0, separatorIndex)); + const id = separatorIndex === -1 ? "" : cursor.slice(separatorIndex + 1); + + if (Number.isNaN(createdAt.getTime()) || id.length === 0) { + throw new Error( + `RunsBackfillerService: malformed cursor "${cursor}" (expected "_")` + ); + } + + return { createdAt, id }; +} diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index d32652a0b3b..9602a1267df 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -169,16 +169,13 @@ export class ClickHouseRunsRepository implements IRunsRepository { async listRuns(options: ListRunsOptions) { const { runIds, pagination } = await this.listRunIds(options); - let runs = await runStore.findRuns( + const hydrated = await runStore.findRuns( { where: { id: { in: runIds, }, }, - orderBy: { - id: "desc", - }, select: { id: true, friendlyId: true, @@ -216,6 +213,15 @@ export class ClickHouseRunsRepository implements IRunsRepository { this.options.prisma ); + // ClickHouse already ranked `runIds`. An `IN (...)` hydration comes back + // unordered, and a single SQL `orderBy` can't span the two physical run + // tables (legacy TaskRun + task_run_v2), so restore ClickHouse's ranking + // in memory. + const runById = new Map(hydrated.map((run) => [run.id, run])); + let runs = runIds + .map((id) => runById.get(id)) + .filter((run): run is NonNullable => run !== undefined); + // ClickHouse is slightly delayed, so we're going to do in-memory status filtering too if (options.statuses && options.statuses.length > 0) { runs = runs.filter((run) => options.statuses!.includes(run.status)); From 658b3850e8979609ad7ccb609b19060d2d70d5ec Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 16:56:51 +0100 Subject: [PATCH 09/12] feat(run-store): read non-id predicates across both run tables findRun and findRunOrThrow route by the id/friendlyId in the predicate. A lookup that carries neither (the idempotency-key dedup, or an "are there any runs in this environment" check) previously defaulted to the legacy table and would miss a match that lives in task_run_v2. Such predicates now query both tables in parallel and return the first match, so a reused idempotency key is found wherever its run lives and no duplicate is created. --- .../run-store/src/PostgresRunStore.test.ts | 74 ++++++++++++++++ .../run-store/src/PostgresRunStore.ts | 84 +++++++++++++------ 2 files changed, 131 insertions(+), 27 deletions(-) diff --git a/internal-packages/run-store/src/PostgresRunStore.test.ts b/internal-packages/run-store/src/PostgresRunStore.test.ts index a5866fa5d81..1a982d87dc6 100644 --- a/internal-packages/run-store/src/PostgresRunStore.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.test.ts @@ -1970,6 +1970,80 @@ describe("PostgresRunStore — table routing by id format", () => { } ); + postgresTest( + "findRun resolves a non-id predicate (idempotency key) against a run in either table", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // A KSUID run carrying an idempotency key lands in task_run_v2 … + const ksuid = RunId.generateKsuid(); + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + + // … and a cuid run carrying a different key lands in legacy TaskRun. + const cuid = RunId.generate(); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-legacy", + taskIdentifier: "my-task", + }); + + // The lookup carries no id/friendlyId, so it must read BOTH tables — + // this is the mixed-window idempotency dedup. Miss either table and a + // reused key produces a duplicate run. + const v2Hit = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + expect(v2Hit?.id).toBe(ksuid.id); + + const legacyHit = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-legacy", + taskIdentifier: "my-task", + }); + expect(legacyHit?.id).toBe(cuid.id); + + // A key in neither table returns null — no false dedup. + const miss = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-missing", + taskIdentifier: "my-task", + }); + expect(miss).toBeNull(); + + // findRunOrThrow takes the same both-table path: it finds the v2 row … + const thrown = await store.findRunOrThrow({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + expect(thrown.id).toBe(ksuid.id); + + // … and throws when neither table matches. + await expect( + store.findRunOrThrow({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-missing", + taskIdentifier: "my-task", + }) + ).rejects.toThrow(); + } + ); + postgresTest( "expireRunsBatch with a mixed array updates both tables and returns the combined count", async ({ prisma }) => { diff --git a/internal-packages/run-store/src/PostgresRunStore.ts b/internal-packages/run-store/src/PostgresRunStore.ts index 6f18a3dd4fc..d07c9630db4 100644 --- a/internal-packages/run-store/src/PostgresRunStore.ts +++ b/internal-packages/run-store/src/PostgresRunStore.ts @@ -63,27 +63,43 @@ export class PostgresRunStore implements RunStore { } /** - * Route a single-row read to its physical table from the routing key in the - * `where` clause. `findRun`/`findRunOrThrow` are always called with a - * `{ id }` or `{ friendlyId }` predicate; both carry the same KSUID/cuid body - * and route identically. When neither is a plain string (e.g. an unexpected - * predicate-only read), default to the legacy `taskRun` table — matching the - * pre-split single-table behavior. + * The routing key for a single-row read: the `{ id }` or `{ friendlyId }` + * value in the `where` clause. Both carry the same KSUID/cuid body and route + * to the same physical table. Returns `undefined` for a predicate that + * addresses no specific run (e.g. an idempotency-key lookup), which must read + * both tables rather than assume one. */ - #runReadModel( + #routingKeyOf(where: Prisma.TaskRunWhereInput): string | undefined { + return typeof where.id === "string" + ? where.id + : typeof where.friendlyId === "string" + ? where.friendlyId + : undefined; + } + + /** + * Read a single row matching a non-id predicate from BOTH physical tables. + * A run lives in exactly one table (chosen by its id format), so a key-based + * predicate (idempotency key, "has this env any runs") can match a row in + * either. Query both in parallel and return the first match — at most one + * side is non-null, and legacy is preferred for a stable result if a + * predicate ever matches both. `task_run_v2` is an identical clone of + * `TaskRun`, so the SAME args (select/include and the security-scoping + * `where`) run unchanged against either delegate. + */ + async #findFirstAcrossTables( prisma: PrismaClientOrTransaction | PrismaReplicaClient, - where: Prisma.TaskRunWhereInput - ) { - const routingKey = - typeof where.id === "string" - ? where.id - : typeof where.friendlyId === "string" - ? where.friendlyId - : undefined; - - return routingKey !== undefined && isKsuidId(routingKey) - ? (prisma.taskRunV2 as unknown as typeof prisma.taskRun) - : prisma.taskRun; + where: Prisma.TaskRunWhereInput, + args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude } + ): Promise { + const v2Model = prisma.taskRunV2 as unknown as typeof prisma.taskRun; + + const [legacyRun, v2Run] = await Promise.all([ + prisma.taskRun.findFirst({ where, ...args }), + v2Model.findFirst({ where, ...args }), + ]); + + return legacyRun ?? v2Run; } async createRun( @@ -734,10 +750,15 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return this.#runReadModel(prisma, where).findFirst({ - where, - ...args, - }); + const routingKey = this.#routingKeyOf(where); + if (routingKey !== undefined) { + // by id / friendlyId: the id format picks exactly one table, O(1). + return this.runModel(prisma, routingKey).findFirst({ where, ...args }); + } + + // Non-id predicate (e.g. idempotency-key dedup): the match can be in + // either table, so read both. + return this.#findFirstAcrossTables(prisma, where, args); } findRunOrThrow( @@ -761,10 +782,19 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return this.#runReadModel(prisma, where).findFirstOrThrow({ - where, - ...args, - }); + const routingKey = this.#routingKeyOf(where); + if (routingKey !== undefined) { + return this.runModel(prisma, routingKey).findFirstOrThrow({ where, ...args }); + } + + // Non-id predicate: read both tables, then enforce the throw-on-miss + // contract ourselves (neither table's findFirstOrThrow could see the + // other's row). + const run = await this.#findFirstAcrossTables(prisma, where, args); + if (run === null || run === undefined) { + throw new Error("PostgresRunStore.findRunOrThrow: no run matched the predicate"); + } + return run; } findRuns( From 47610ee3b0a54babbfff10c9b62f7f362ab6e74a Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 17:02:09 +0100 Subject: [PATCH 10/12] feat(webapp): per-org cutover flag for the v2 run table Adds a per-org runTableV2 feature flag, read in memory at the single run-id mint site in the trigger path. When on, the org mints a KSUID id for new runs (routing them to task_run_v2); off, the default, keeps minting legacy ids. The read is a pure lookup on the org featureFlags already loaded at auth, so the trigger path adds no query. RunStore routes purely by id format and never sees this flag. --- .../runEngine/services/triggerTask.server.ts | 13 ++++++++- apps/webapp/app/v3/featureFlags.ts | 7 +++++ apps/webapp/app/v3/runTableV2.server.ts | 28 +++++++++++++++++++ apps/webapp/test/runTableV2.test.ts | 28 +++++++++++++++++++ 4 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 apps/webapp/app/v3/runTableV2.server.ts create mode 100644 apps/webapp/test/runTableV2.test.ts diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 89a938da8bf..8c61b7d7fcd 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -25,6 +25,7 @@ import { logger } from "~/services/logger.server"; import { parseDelay } from "~/utils/delays"; import { handleMetadataPacket } from "~/utils/packets"; import { startSpan } from "~/v3/tracing.server"; +import { shouldUseV2RunTable } from "~/v3/runTableV2.server"; import type { TriggerTaskServiceOptions, TriggerTaskServiceResult, @@ -151,7 +152,17 @@ export class RunEngineTriggerTaskService { span.setAttribute("taskId", taskId); span.setAttribute("attempt", attempt); - const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; + // The single per-org cutover point: an opted-in org mints a KSUID id + // (routing the run to task_run_v2), everyone else keeps a legacy id + // (TaskRun). The flag is a pure in-memory read of the org's + // featureFlags already loaded on `environment` — no DB query on the + // trigger hot path. Downstream routing is by id format only. + const runFriendlyId = + options?.runFriendlyId ?? + (shouldUseV2RunTable(environment.organization.featureFlags) + ? RunId.generateKsuid() + : RunId.generate() + ).friendlyId; const triggerRequest = { taskId, friendlyId: runFriendlyId, diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index 6b75b9ef903..000013f6d23 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -16,6 +16,7 @@ export const FEATURE_FLAG = { computeMigrationFreePercentage: "computeMigrationFreePercentage", computeMigrationPaidPercentage: "computeMigrationPaidPercentage", computeMigrationRequireTemplate: "computeMigrationRequireTemplate", + runTableV2: "runTableV2", } as const; export const FeatureFlagCatalog = { @@ -43,6 +44,12 @@ export const FeatureFlagCatalog = { // When on, migrated orgs build their compute template in required mode at deploy // (fails the deploy on error) instead of shadow. Strict boolean (see above). [FEATURE_FLAG.computeMigrationRequireTemplate]: z.boolean(), + // Per-org cutover to the parallel task_run_v2 table. When on, new runs for the + // org mint a KSUID id (routing them to task_run_v2); off (the default) keeps + // minting legacy ids. Strict boolean (see above): coercing a stringified + // "false" to true would cut an org over by mistake, and runs created on v2 + // stay on v2. + [FEATURE_FLAG.runTableV2]: z.boolean(), }; export type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/runTableV2.server.ts b/apps/webapp/app/v3/runTableV2.server.ts new file mode 100644 index 00000000000..51b55aefbe3 --- /dev/null +++ b/apps/webapp/app/v3/runTableV2.server.ts @@ -0,0 +1,28 @@ +import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags"; + +/** + * Per-org cutover switch for the parallel `task_run_v2` run table. + * + * Read in memory from `Organization.featureFlags` (already loaded on the + * AuthenticatedEnvironment at API-key auth, so this adds no DB query) at the + * single run-id mint site in the trigger path. On → mint a KSUID id, which + * routes the run to `task_run_v2`; off (the default) → mint a legacy id, which + * routes to `TaskRun`. + * + * RunStore never reads this flag: it routes purely by id format. The flag only + * decides which id scheme is minted upstream. Disabling it sends only NEW runs + * back to legacy; runs already created on v2 stay readable there (routed by id). + */ +export function shouldUseV2RunTable(orgFeatureFlags: unknown): boolean { + if (orgFeatureFlags === null || typeof orgFeatureFlags !== "object") { + return false; + } + + const override = (orgFeatureFlags as Record)[FEATURE_FLAG.runTableV2]; + if (override === undefined) { + return false; + } + + const parsed = FeatureFlagCatalog[FEATURE_FLAG.runTableV2].safeParse(override); + return parsed.success ? parsed.data : false; +} diff --git a/apps/webapp/test/runTableV2.test.ts b/apps/webapp/test/runTableV2.test.ts new file mode 100644 index 00000000000..9abae4cb7bb --- /dev/null +++ b/apps/webapp/test/runTableV2.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "vitest"; +import { shouldUseV2RunTable } from "~/v3/runTableV2.server"; + +describe("shouldUseV2RunTable", () => { + it("defaults to false when the org has no flags", () => { + expect(shouldUseV2RunTable(null)).toBe(false); + expect(shouldUseV2RunTable(undefined)).toBe(false); + expect(shouldUseV2RunTable({})).toBe(false); + }); + + it("returns true only when the flag is the boolean true", () => { + expect(shouldUseV2RunTable({ runTableV2: true })).toBe(true); + expect(shouldUseV2RunTable({ runTableV2: false })).toBe(false); + }); + + it("rejects a stringified flag value (strict boolean, no coercion)", () => { + // A stringified "false" must not coerce to true and cut the org over. + expect(shouldUseV2RunTable({ runTableV2: "true" })).toBe(false); + expect(shouldUseV2RunTable({ runTableV2: "false" })).toBe(false); + expect(shouldUseV2RunTable({ runTableV2: 1 })).toBe(false); + }); + + it("ignores unrelated flags and non-object inputs", () => { + expect(shouldUseV2RunTable({ mollifierEnabled: true })).toBe(false); + expect(shouldUseV2RunTable("runTableV2")).toBe(false); + expect(shouldUseV2RunTable(42)).toBe(false); + }); +}); From 912a504a534b75d266adcd7db886ba32451e4114 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 17:41:46 +0100 Subject: [PATCH 11/12] feat(replication): co-publish additional tables and reconcile existing publications LogicalReplicationClient gains an optional additionalTables option. These are published alongside the primary table in the same publication, and their WAL events stream through the same data handler. When the publication already exists, missing tables are added via ALTER PUBLICATION ADD TABLE (online, slot-preserving) instead of erroring, so a publication can gain a table without a drop and recreate. --- internal-packages/replication/src/client.ts | 78 +++++++++++++++------ 1 file changed, 58 insertions(+), 20 deletions(-) diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 1a7ddb27236..044c5e19320 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -23,6 +23,14 @@ export interface LogicalReplicationClientOptions { * The table to replicate (for publication creation). */ table: string; + /** + * Additional tables to co-publish into the same publication. Their WAL + * events stream through the same `data` handler as `table`, so use this only + * when the extra tables share `table`'s row shape and downstream transform + * (e.g. a parallel clone table). On startup they are added to an existing + * publication via ALTER PUBLICATION ... ADD TABLE. + */ + additionalTables?: string[]; /** * The name of the replication slot to use. */ @@ -407,6 +415,15 @@ export class LogicalReplicationClient { return this; } + // The full set of tables this client publishes: the primary `table` plus any + // `additionalTables`. Order is stable so the publication's FOR TABLE clause is + // deterministic. + #allTables(): string[] { + return this.options.additionalTables + ? [this.options.table, ...this.options.additionalTables] + : [this.options.table]; + } + async #createPublication(): Promise { if (!this.client) { this.events.emit("error", new LogicalReplicationClientError("Client not connected")); @@ -416,8 +433,10 @@ export class LogicalReplicationClient { const publicationExists = await this.#doesPublicationExist(); if (publicationExists) { - // Validate the existing publication is correctly configured - const validationError = await this.#validatePublicationConfiguration(); + // Reconcile the existing publication: add any configured table it is + // missing (e.g. a clone table added after the publication was first + // created). Returns an error string only for unrecoverable mismatches. + const validationError = await this.#ensurePublicationConfiguration(); if (validationError) { this.logger.error("Publication exists but is misconfigured", { @@ -441,9 +460,13 @@ export class LogicalReplicationClient { return true; } + const tableList = this.#allTables() + .map((table) => `"${table}"`) + .join(", "); + const [createError] = await tryCatch( this.client.query( - `CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE "${this.options.table}" ${ + `CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE ${tableList} ${ this.options.publicationActions ? `WITH (publish = '${this.options.publicationActions.join(", ")}')` : "" @@ -483,32 +506,47 @@ export class LogicalReplicationClient { return res.rows[0].exists; } - async #validatePublicationConfiguration(): Promise { + async #ensurePublicationConfiguration(): Promise { if (!this.client) { - return "Cannot validate publication configuration: client not connected"; + return "Cannot ensure publication configuration: client not connected"; } - // Check if the publication has the correct table + // Which public tables the publication already carries. const tablesRes = await this.client.query( - `SELECT schemaname, tablename - FROM pg_publication_tables + `SELECT schemaname, tablename + FROM pg_publication_tables WHERE pubname = '${this.options.publicationName}';` ); - const tables = tablesRes.rows; - const expectedTable = this.options.table; - - // Check if the table is in the publication - const hasTable = tables.some( - (row) => row.tablename === expectedTable && row.schemaname === "public" + const currentTables = new Set( + tablesRes.rows + .filter((row) => row.schemaname === "public") + .map((row) => row.tablename as string) ); - if (!hasTable) { - if (tables.length === 0) { - return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; - } else { - const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", "); - return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; + // Reconcile rather than reject: add any configured table the publication is + // missing. ALTER PUBLICATION ... ADD TABLE is online and leaves the slot + // position intact, so an existing publication can gain a table (e.g. + // task_run_v2 alongside TaskRun) without a drop/recreate. ADD TABLE on a + // table already published raises duplicate_object (42710); treat that as a + // benign race (another instance won) rather than a failure. + const missingTables = this.#allTables().filter((table) => !currentTables.has(table)); + + for (const table of missingTables) { + this.logger.info("Adding table to existing publication", { + name: this.options.name, + publicationName: this.options.publicationName, + table, + }); + + const [addError] = await tryCatch( + this.client.query( + `ALTER PUBLICATION "${this.options.publicationName}" ADD TABLE "${table}";` + ) + ); + + if (addError && (addError as { code?: string }).code !== "42710") { + return `Failed to add table "public.${table}" to publication '${this.options.publicationName}': ${addError.message}`; } } From 3549341eef35c2cf4809eae3bf591abf0102036f Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Fri, 19 Jun 2026 17:41:46 +0100 Subject: [PATCH 12/12] feat(webapp): stream task_run_v2 into ClickHouse runsReplicationService co-publishes task_run_v2 alongside TaskRun. It is a column-identical clone, so its WAL rows flow through the same transform into the same ClickHouse table, keeping the mirror complete once orgs cut over to v2 run ids. task_run_v2 needs REPLICA IDENTITY FULL, applied the same out-of-band way as TaskRun, so update and delete events carry the old row. --- .../services/runsReplicationService.server.ts | 5 + .../runsReplicationService.taskRunV2.test.ts | 125 ++++++++++++++++++ apps/webapp/test/utils/replicationUtils.ts | 4 + 3 files changed, 134 insertions(+) create mode 100644 apps/webapp/test/runsReplicationService.taskRunV2.test.ts diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 31d8a3844cf..604056de8e7 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -227,6 +227,11 @@ export class RunsReplicationService { slotName: options.slotName, publicationName: options.publicationName, table: "TaskRun", + // task_run_v2 is a column-identical clone of TaskRun, so its WAL rows + // flow through the same handler/transform into the same ClickHouse table. + // Co-publishing it keeps the ClickHouse mirror complete once orgs cut over + // to v2 run ids; until then the table is empty and this is a no-op. + additionalTables: ["task_run_v2"], redisOptions: options.redisOptions, autoAcknowledge: false, publicationActions: ["insert", "update", "delete"], diff --git a/apps/webapp/test/runsReplicationService.taskRunV2.test.ts b/apps/webapp/test/runsReplicationService.taskRunV2.test.ts new file mode 100644 index 00000000000..bf31d97ffb3 --- /dev/null +++ b/apps/webapp/test/runsReplicationService.taskRunV2.test.ts @@ -0,0 +1,125 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { replicationContainerTest } from "@internal/testcontainers"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing } from "./utils/tracing"; +import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunsReplicationService (task_run_v2)", () => { + replicationContainerTest( + "co-publishes task_run_v2 and streams its rows to the same ClickHouse table", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + // Both tables are in the publication; both need FULL identity so the + // delete transform can read the old row. INSERTs (this test) carry the + // full new tuple regardless, but we mirror the production setup. + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + const runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // A v2 run lives in task_run_v2, keyed by a KSUID id. + const ksuid = RunId.generateKsuid(); + const run = await prisma.taskRunV2.create({ + data: { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + payloadType: "application/json", + traceId: "v2trace", + spanId: "v2span", + queue: "test", + workerQueue: "us-east-1-next", + region: "us-east-1", + planType: "free", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + + await setTimeout(1000); + + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}", + schema: z.any(), + params: z.object({ runId: z.string() }), + }); + + const [queryError, result] = await queryRuns({ runId: run.id }); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: run.id, + friendly_id: run.friendlyId, + task_identifier: "my-task", + environment_id: runtimeEnvironment.id, + project_id: project.id, + organization_id: organization.id, + environment_type: "DEVELOPMENT", + engine: "V2", + }) + ); + + await runsReplicationService.stop(); + } + ); +}); diff --git a/apps/webapp/test/utils/replicationUtils.ts b/apps/webapp/test/utils/replicationUtils.ts index 358da0c2cf6..713bd242892 100644 --- a/apps/webapp/test/utils/replicationUtils.ts +++ b/apps/webapp/test/utils/replicationUtils.ts @@ -17,6 +17,10 @@ export async function setupClickhouseReplication({ redisOptions: RedisOptions; }) { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + // task_run_v2 is co-published with TaskRun; it needs FULL identity too so + // UPDATE/DELETE WAL events carry the old row (the delete transform reads + // organizationId/environmentType off it). Mirrors the TaskRun line above. + await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`); const clickhouse = new ClickHouse({ url: clickhouseUrl,