diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 89a938da8bf..8c61b7d7fcd 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -25,6 +25,7 @@ import { logger } from "~/services/logger.server"; import { parseDelay } from "~/utils/delays"; import { handleMetadataPacket } from "~/utils/packets"; import { startSpan } from "~/v3/tracing.server"; +import { shouldUseV2RunTable } from "~/v3/runTableV2.server"; import type { TriggerTaskServiceOptions, TriggerTaskServiceResult, @@ -151,7 +152,17 @@ export class RunEngineTriggerTaskService { span.setAttribute("taskId", taskId); span.setAttribute("attempt", attempt); - const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId; + // The single per-org cutover point: an opted-in org mints a KSUID id + // (routing the run to task_run_v2), everyone else keeps a legacy id + // (TaskRun). The flag is a pure in-memory read of the org's + // featureFlags already loaded on `environment` — no DB query on the + // trigger hot path. Downstream routing is by id format only. + const runFriendlyId = + options?.runFriendlyId ?? + (shouldUseV2RunTable(environment.organization.featureFlags) + ? RunId.generateKsuid() + : RunId.generate() + ).friendlyId; const triggerRequest = { taskId, friendlyId: runFriendlyId, diff --git a/apps/webapp/app/services/runsBackfiller.server.ts b/apps/webapp/app/services/runsBackfiller.server.ts index 50e041ee64b..09386c9495c 100644 --- a/apps/webapp/app/services/runsBackfiller.server.ts +++ b/apps/webapp/app/services/runsBackfiller.server.ts @@ -41,6 +41,13 @@ export class RunsBackfillerService { span.setAttribute("cursor", cursor ?? ""); span.setAttribute("batchSize", batchSize ?? 0); + // Keyset on (createdAt, id). Runs now live across two physical tables + // (legacy TaskRun with cuid ids, task_run_v2 with ksuid ids), and `id` + // alone is not a valid order across them: cuid and ksuid sort in + // different ranges. RunStore merges the two tables only on a time-based + // key, so order by createdAt and tiebreak on id within a timestamp. + const keyset = cursor ? decodeBackfillCursor(cursor) : undefined; + const runs = await runStore.findRuns( { where: { @@ -51,11 +58,16 @@ export class RunsBackfillerService { status: { in: FINAL_RUN_STATUSES, }, - ...(cursor ? { id: { gt: cursor } } : {}), - }, - orderBy: { - id: "asc", + ...(keyset + ? { + OR: [ + { createdAt: { gt: keyset.createdAt } }, + { createdAt: keyset.createdAt, id: { gt: keyset.id } }, + ], + } + : {}), }, + orderBy: [{ createdAt: "asc" }, { id: "asc" }], take: batchSize, }, this.prisma @@ -94,8 +106,32 @@ export class RunsBackfillerService { lastRunId: lastRun.id, }); - // Return the last run ID to continue from - return lastRun.id; + // Return a (createdAt, id) cursor to continue from on the next batch. + return encodeBackfillCursor(lastRun.createdAt, lastRun.id); }); } } + +// The backfill cursor is an opaque "_" string. The admin +// worker passes it back verbatim across batches; only this service interprets +// it. An ISO timestamp contains no "_" and run ids are base62/base36, so the +// first "_" cleanly splits the two halves. +const BACKFILL_CURSOR_SEPARATOR = "_"; + +export function encodeBackfillCursor(createdAt: Date, id: string): string { + return `${createdAt.toISOString()}${BACKFILL_CURSOR_SEPARATOR}${id}`; +} + +export function decodeBackfillCursor(cursor: string): { createdAt: Date; id: string } { + const separatorIndex = cursor.indexOf(BACKFILL_CURSOR_SEPARATOR); + const createdAt = separatorIndex === -1 ? new Date(NaN) : new Date(cursor.slice(0, separatorIndex)); + const id = separatorIndex === -1 ? "" : cursor.slice(separatorIndex + 1); + + if (Number.isNaN(createdAt.getTime()) || id.length === 0) { + throw new Error( + `RunsBackfillerService: malformed cursor "${cursor}" (expected "_")` + ); + } + + return { createdAt, id }; +} diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 31d8a3844cf..604056de8e7 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -227,6 +227,11 @@ export class RunsReplicationService { slotName: options.slotName, publicationName: options.publicationName, table: "TaskRun", + // task_run_v2 is a column-identical clone of TaskRun, so its WAL rows + // flow through the same handler/transform into the same ClickHouse table. + // Co-publishing it keeps the ClickHouse mirror complete once orgs cut over + // to v2 run ids; until then the table is empty and this is a no-op. + additionalTables: ["task_run_v2"], redisOptions: options.redisOptions, autoAcknowledge: false, publicationActions: ["insert", "update", "delete"], diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index d32652a0b3b..9602a1267df 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -169,16 +169,13 @@ export class ClickHouseRunsRepository implements IRunsRepository { async listRuns(options: ListRunsOptions) { const { runIds, pagination } = await this.listRunIds(options); - let runs = await runStore.findRuns( + const hydrated = await runStore.findRuns( { where: { id: { in: runIds, }, }, - orderBy: { - id: "desc", - }, select: { id: true, friendlyId: true, @@ -216,6 +213,15 @@ export class ClickHouseRunsRepository implements IRunsRepository { this.options.prisma ); + // ClickHouse already ranked `runIds`. An `IN (...)` hydration comes back + // unordered, and a single SQL `orderBy` can't span the two physical run + // tables (legacy TaskRun + task_run_v2), so restore ClickHouse's ranking + // in memory. + const runById = new Map(hydrated.map((run) => [run.id, run])); + let runs = runIds + .map((id) => runById.get(id)) + .filter((run): run is NonNullable => run !== undefined); + // ClickHouse is slightly delayed, so we're going to do in-memory status filtering too if (options.statuses && options.statuses.length > 0) { runs = runs.filter((run) => options.statuses!.includes(run.status)); diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index 6b75b9ef903..000013f6d23 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -16,6 +16,7 @@ export const FEATURE_FLAG = { computeMigrationFreePercentage: "computeMigrationFreePercentage", computeMigrationPaidPercentage: "computeMigrationPaidPercentage", computeMigrationRequireTemplate: "computeMigrationRequireTemplate", + runTableV2: "runTableV2", } as const; export const FeatureFlagCatalog = { @@ -43,6 +44,12 @@ export const FeatureFlagCatalog = { // When on, migrated orgs build their compute template in required mode at deploy // (fails the deploy on error) instead of shadow. Strict boolean (see above). [FEATURE_FLAG.computeMigrationRequireTemplate]: z.boolean(), + // Per-org cutover to the parallel task_run_v2 table. When on, new runs for the + // org mint a KSUID id (routing them to task_run_v2); off (the default) keeps + // minting legacy ids. Strict boolean (see above): coercing a stringified + // "false" to true would cut an org over by mistake, and runs created on v2 + // stay on v2. + [FEATURE_FLAG.runTableV2]: z.boolean(), }; export type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/runTableV2.server.ts b/apps/webapp/app/v3/runTableV2.server.ts new file mode 100644 index 00000000000..51b55aefbe3 --- /dev/null +++ b/apps/webapp/app/v3/runTableV2.server.ts @@ -0,0 +1,28 @@ +import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags"; + +/** + * Per-org cutover switch for the parallel `task_run_v2` run table. + * + * Read in memory from `Organization.featureFlags` (already loaded on the + * AuthenticatedEnvironment at API-key auth, so this adds no DB query) at the + * single run-id mint site in the trigger path. On → mint a KSUID id, which + * routes the run to `task_run_v2`; off (the default) → mint a legacy id, which + * routes to `TaskRun`. + * + * RunStore never reads this flag: it routes purely by id format. The flag only + * decides which id scheme is minted upstream. Disabling it sends only NEW runs + * back to legacy; runs already created on v2 stay readable there (routed by id). + */ +export function shouldUseV2RunTable(orgFeatureFlags: unknown): boolean { + if (orgFeatureFlags === null || typeof orgFeatureFlags !== "object") { + return false; + } + + const override = (orgFeatureFlags as Record)[FEATURE_FLAG.runTableV2]; + if (override === undefined) { + return false; + } + + const parsed = FeatureFlagCatalog[FEATURE_FLAG.runTableV2].safeParse(override); + return parsed.success ? parsed.data : false; +} diff --git a/apps/webapp/test/runTableV2.test.ts b/apps/webapp/test/runTableV2.test.ts new file mode 100644 index 00000000000..9abae4cb7bb --- /dev/null +++ b/apps/webapp/test/runTableV2.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "vitest"; +import { shouldUseV2RunTable } from "~/v3/runTableV2.server"; + +describe("shouldUseV2RunTable", () => { + it("defaults to false when the org has no flags", () => { + expect(shouldUseV2RunTable(null)).toBe(false); + expect(shouldUseV2RunTable(undefined)).toBe(false); + expect(shouldUseV2RunTable({})).toBe(false); + }); + + it("returns true only when the flag is the boolean true", () => { + expect(shouldUseV2RunTable({ runTableV2: true })).toBe(true); + expect(shouldUseV2RunTable({ runTableV2: false })).toBe(false); + }); + + it("rejects a stringified flag value (strict boolean, no coercion)", () => { + // A stringified "false" must not coerce to true and cut the org over. + expect(shouldUseV2RunTable({ runTableV2: "true" })).toBe(false); + expect(shouldUseV2RunTable({ runTableV2: "false" })).toBe(false); + expect(shouldUseV2RunTable({ runTableV2: 1 })).toBe(false); + }); + + it("ignores unrelated flags and non-object inputs", () => { + expect(shouldUseV2RunTable({ mollifierEnabled: true })).toBe(false); + expect(shouldUseV2RunTable("runTableV2")).toBe(false); + expect(shouldUseV2RunTable(42)).toBe(false); + }); +}); diff --git a/apps/webapp/test/runsReplicationService.taskRunV2.test.ts b/apps/webapp/test/runsReplicationService.taskRunV2.test.ts new file mode 100644 index 00000000000..bf31d97ffb3 --- /dev/null +++ b/apps/webapp/test/runsReplicationService.taskRunV2.test.ts @@ -0,0 +1,125 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { replicationContainerTest } from "@internal/testcontainers"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing } from "./utils/tracing"; +import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunsReplicationService (task_run_v2)", () => { + replicationContainerTest( + "co-publishes task_run_v2 and streams its rows to the same ClickHouse table", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + // Both tables are in the publication; both need FULL identity so the + // delete transform can read the old row. INSERTs (this test) carry the + // full new tuple regardless, but we mirror the production setup. + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const { tracer } = createInMemoryTracing(); + + const runsReplicationService = new RunsReplicationService({ + clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + tracer, + logLevel: "warn", + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // A v2 run lives in task_run_v2, keyed by a KSUID id. + const ksuid = RunId.generateKsuid(); + const run = await prisma.taskRunV2.create({ + data: { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + payloadType: "application/json", + traceId: "v2trace", + spanId: "v2span", + queue: "test", + workerQueue: "us-east-1-next", + region: "us-east-1", + planType: "free", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + + await setTimeout(1000); + + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}", + schema: z.any(), + params: z.object({ runId: z.string() }), + }); + + const [queryError, result] = await queryRuns({ runId: run.id }); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + run_id: run.id, + friendly_id: run.friendlyId, + task_identifier: "my-task", + environment_id: runtimeEnvironment.id, + project_id: project.id, + organization_id: organization.id, + environment_type: "DEVELOPMENT", + engine: "V2", + }) + ); + + await runsReplicationService.stop(); + } + ); +}); diff --git a/apps/webapp/test/utils/replicationUtils.ts b/apps/webapp/test/utils/replicationUtils.ts index 358da0c2cf6..713bd242892 100644 --- a/apps/webapp/test/utils/replicationUtils.ts +++ b/apps/webapp/test/utils/replicationUtils.ts @@ -17,6 +17,10 @@ export async function setupClickhouseReplication({ redisOptions: RedisOptions; }) { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + // task_run_v2 is co-published with TaskRun; it needs FULL identity too so + // UPDATE/DELETE WAL events carry the old row (the delete transform reads + // organizationId/environmentType off it). Mirrors the TaskRun line above. + await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`); const clickhouse = new ClickHouse({ url: clickhouseUrl, diff --git a/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql b/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql new file mode 100644 index 00000000000..22a8bcf2293 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260616151544_create_task_run_v2/migration.sql @@ -0,0 +1,121 @@ +-- CreateTable +CREATE TABLE "public"."task_run_v2" ( + "id" TEXT NOT NULL, + "number" INTEGER NOT NULL DEFAULT 0, + "friendlyId" TEXT NOT NULL, + "engine" "public"."RunEngineVersion" NOT NULL DEFAULT 'V1', + "status" "public"."TaskRunStatus" NOT NULL DEFAULT 'PENDING', + "statusReason" TEXT, + "idempotencyKey" TEXT, + "idempotencyKeyExpiresAt" TIMESTAMP(3), + "idempotencyKeyOptions" JSONB, + "debounce" JSONB, + "taskIdentifier" TEXT NOT NULL, + "isTest" BOOLEAN NOT NULL DEFAULT false, + "payload" TEXT NOT NULL, + "payloadType" TEXT NOT NULL DEFAULT 'application/json', + "context" JSONB, + "traceContext" JSONB, + "traceId" TEXT NOT NULL, + "spanId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "environmentType" "public"."RuntimeEnvironmentType", + "projectId" TEXT NOT NULL, + "organizationId" TEXT, + "queue" TEXT NOT NULL, + "lockedQueueId" TEXT, + "masterQueue" TEXT NOT NULL DEFAULT 'main', + "region" TEXT, + "secondaryMasterQueue" TEXT, + "attemptNumber" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "runTags" TEXT[], + "taskVersion" TEXT, + "sdkVersion" TEXT, + "cliVersion" TEXT, + "startedAt" TIMESTAMP(3), + "executedAt" TIMESTAMP(3), + "completedAt" TIMESTAMP(3), + "machinePreset" TEXT, + "usageDurationMs" INTEGER NOT NULL DEFAULT 0, + "costInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "baseCostInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "lockedAt" TIMESTAMP(3), + "lockedById" TEXT, + "lockedToVersionId" TEXT, + "priorityMs" INTEGER NOT NULL DEFAULT 0, + "concurrencyKey" TEXT, + "delayUntil" TIMESTAMP(3), + "queuedAt" TIMESTAMP(3), + "ttl" TEXT, + "expiredAt" TIMESTAMP(3), + "maxAttempts" INTEGER, + "lockedRetryConfig" JSONB, + "oneTimeUseToken" TEXT, + "taskEventStore" TEXT NOT NULL DEFAULT 'taskEvent', + "queueTimestamp" TIMESTAMP(3), + "scheduleInstanceId" TEXT, + "scheduleId" TEXT, + "bulkActionGroupIds" TEXT[] DEFAULT ARRAY[]::TEXT[], + "logsDeletedAt" TIMESTAMP(3), + "replayedFromTaskRunFriendlyId" TEXT, + "rootTaskRunId" TEXT, + "parentTaskRunId" TEXT, + "parentTaskRunAttemptId" TEXT, + "batchId" TEXT, + "resumeParentOnCompletion" BOOLEAN NOT NULL DEFAULT false, + "depth" INTEGER NOT NULL DEFAULT 0, + "parentSpanId" TEXT, + "runChainState" JSONB, + "seedMetadata" TEXT, + "seedMetadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadata" TEXT, + "metadataType" TEXT NOT NULL DEFAULT 'application/json', + "metadataVersion" INTEGER NOT NULL DEFAULT 1, + "annotations" JSONB, + "isWarmStart" BOOLEAN, + "output" TEXT, + "outputType" TEXT NOT NULL DEFAULT 'application/json', + "error" JSONB, + "planType" TEXT, + "maxDurationInSeconds" INTEGER, + "realtimeStreamsVersion" TEXT NOT NULL DEFAULT 'v1', + "realtimeStreams" TEXT[] DEFAULT ARRAY[]::TEXT[], + "streamBasinName" TEXT, + + CONSTRAINT "task_run_v2_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_friendlyId_key" ON "public"."task_run_v2"("friendlyId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_parentTaskRunId_idx" ON "public"."task_run_v2"("parentTaskRunId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_spanId_idx" ON "public"."task_run_v2"("spanId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_parentSpanId_idx" ON "public"."task_run_v2"("parentSpanId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_runTags_idx" ON "public"."task_run_v2" USING GIN ("runTags" array_ops); + +-- CreateIndex +CREATE INDEX "task_run_v2_runtimeEnvironmentId_batchId_idx" ON "public"."task_run_v2"("runtimeEnvironmentId", "batchId"); + +-- CreateIndex +CREATE INDEX "task_run_v2_runtimeEnvironmentId_createdAt_idx" ON "public"."task_run_v2"("runtimeEnvironmentId", "createdAt" DESC); + +-- CreateIndex +CREATE INDEX "task_run_v2_createdAt_idx" ON "public"."task_run_v2" USING BRIN ("createdAt"); + +-- CreateIndex +CREATE INDEX "task_run_v2_createdAt_id_idx" ON "public"."task_run_v2"("createdAt", "id"); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_oneTimeUseToken_key" ON "public"."task_run_v2"("oneTimeUseToken"); + +-- CreateIndex +CREATE UNIQUE INDEX "task_run_v2_runtimeEnvironmentId_taskIdentifier_idempotency_key" ON "public"."task_run_v2"("runtimeEnvironmentId", "taskIdentifier", "idempotencyKey"); diff --git a/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql b/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql new file mode 100644 index 00000000000..9e7313aade9 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260619120042_drop_taskrun_incoming_fks/migration.sql @@ -0,0 +1,17 @@ +-- Drop all foreign key constraints that reference TaskRun.id from child tables +-- (no schema change, data intact). Integrity moves to app code so a child row +-- can reference a run in either TaskRun (legacy) or task_run_v2 (new) by scalar. +ALTER TABLE "public"."TaskRunAttempt" DROP CONSTRAINT IF EXISTS "TaskRunAttempt_taskRunId_fkey"; +ALTER TABLE "public"."TaskRunDependency" DROP CONSTRAINT IF EXISTS "TaskRunDependency_taskRunId_fkey"; +ALTER TABLE "public"."BatchTaskRunItem" DROP CONSTRAINT IF EXISTS "BatchTaskRunItem_taskRunId_fkey"; +ALTER TABLE "public"."Checkpoint" DROP CONSTRAINT IF EXISTS "Checkpoint_runId_fkey"; +ALTER TABLE "public"."CheckpointRestoreEvent" DROP CONSTRAINT IF EXISTS "CheckpointRestoreEvent_runId_fkey"; +ALTER TABLE "public"."ProjectAlert" DROP CONSTRAINT IF EXISTS "ProjectAlert_taskRunId_fkey"; +ALTER TABLE "public"."BulkActionItem" DROP CONSTRAINT IF EXISTS "BulkActionItem_sourceRunId_fkey"; +ALTER TABLE "public"."BulkActionItem" DROP CONSTRAINT IF EXISTS "BulkActionItem_destinationRunId_fkey"; +ALTER TABLE "public"."_TaskRunToTaskRunTag" DROP CONSTRAINT IF EXISTS "_TaskRunToTaskRunTag_A_fkey"; +ALTER TABLE "public"."TaskRunExecutionSnapshot" DROP CONSTRAINT IF EXISTS "TaskRunExecutionSnapshot_runId_fkey"; +ALTER TABLE "public"."Waitpoint" DROP CONSTRAINT IF EXISTS "Waitpoint_completedByTaskRunId_fkey"; +ALTER TABLE "public"."TaskRunWaitpoint" DROP CONSTRAINT IF EXISTS "TaskRunWaitpoint_taskRunId_fkey"; +ALTER TABLE "public"."_WaitpointRunConnections" DROP CONSTRAINT IF EXISTS "_WaitpointRunConnections_A_fkey"; +ALTER TABLE "public"."PlaygroundConversation" DROP CONSTRAINT IF EXISTS "PlaygroundConversation_runId_fkey"; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index bb80da3a7ec..5668a5ac93c 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -366,6 +366,7 @@ model RuntimeEnvironment { backgroundWorkers BackgroundWorker[] backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] + taskRunsV2 TaskRunV2[] @relation("taskRunsV2") taskQueues TaskQueue[] batchTaskRuns BatchTaskRun[] environmentVariableValues EnvironmentVariableValue[] @@ -453,6 +454,7 @@ model Project { backgroundWorkers BackgroundWorker[] backgroundWorkerTasks BackgroundWorkerTask[] taskRuns TaskRun[] + taskRunsV2 TaskRunV2[] @relation("taskRunsV2") runTags TaskRunTag[] taskQueues TaskQueue[] environmentVariables EnvironmentVariable[] @@ -560,6 +562,7 @@ model BackgroundWorker { tasks BackgroundWorkerTask[] attempts TaskRunAttempt[] lockedRuns TaskRun[] + lockedRunsV2 TaskRunV2[] @relation("lockedRunsV2") files BackgroundWorkerFile[] queues TaskQueue[] promptVersions PromptVersion[] @@ -695,6 +698,7 @@ model BackgroundWorkerTask { attempts TaskRunAttempt[] runs TaskRun[] + runsV2 TaskRunV2[] @relation("lockedRunsV2") queueConfig Json? retryConfig Json? @@ -742,7 +746,9 @@ model PlaygroundConversation { /// The current active run backing this conversation (null if no run yet) runId String? - run TaskRun? @relation(fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade) + run TaskRun? @relation(fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade, map: "PlaygroundConversation_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2? @relation("playgroundConversationsV2", fields: [runId], references: [id], onDelete: SetNull, onUpdate: Cascade, map: "PlaygroundConversation_runId_v2_fkey") /// The client data JSON used for this conversation clientData Json? @@ -1095,6 +1101,242 @@ model TaskRun { @@index([createdAt], type: Brin) } +/// Parallel mirror of TaskRun. +/// Structural copy of TaskRun's scalar columns with NO relation fields, so it +/// carries zero foreign-key constraints and requires no edits to other models. +/// FK id columns are kept as plain scalars; integrity is enforced in app code, +/// matching TaskRun's current FK-free state. Not yet written to or read from. +model TaskRunV2 { + id String @id @default(cuid()) + + number Int @default(0) + friendlyId String @unique + + engine RunEngineVersion @default(V1) + + status TaskRunStatus @default(PENDING) + statusReason String? + + idempotencyKey String? + idempotencyKeyExpiresAt DateTime? + /// Stores the user-provided key and scope: { key: string, scope: "run" | "attempt" | "global" } + idempotencyKeyOptions Json? + + /// Debounce options: { key: string, delay: string, createdAt: Date } + debounce Json? + + taskIdentifier String + + isTest Boolean @default(false) + + payload String + payloadType String @default("application/json") + context Json? + traceContext Json? + + traceId String + spanId String + + runtimeEnvironment RuntimeEnvironment @relation("taskRunsV2", fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "task_run_v2_runtimeEnvironmentId_fkey") + runtimeEnvironmentId String + + environmentType RuntimeEnvironmentType? + + project Project @relation("taskRunsV2", fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "task_run_v2_projectId_fkey") + projectId String + + organizationId String? + + // The specific queue this run is in + queue String + // The queueId is set when the run is locked to a specific queue + lockedQueueId String? + + /// The main queue that this run is part of + workerQueue String @default("main") @map("masterQueue") + + /// User-facing geo region, stamped at trigger; workerQueue is where it actually ran. + region String? + + /// @deprecated + secondaryMasterQueue String? + + /// From engine v2+ this will be defined after a run has been dequeued (starting at 1) + attemptNumber Int? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + attempts TaskRunAttempt[] @relation("attemptsV2") + tags TaskRunTag[] @relation("taskRunTagsV2") + + /// Denormized column that holds the raw tags + runTags String[] + + /// Denormalized version of the background worker task + taskVersion String? + sdkVersion String? + cliVersion String? + + checkpoints Checkpoint[] @relation("checkpointsV2") + + /// startedAt marks the point at which a run is dequeued from MarQS + startedAt DateTime? + /// executedAt is set when the first attempt is about to execute + executedAt DateTime? + completedAt DateTime? + machinePreset String? + + usageDurationMs Int @default(0) + costInCents Float @default(0) + baseCostInCents Float @default(0) + + lockedAt DateTime? + lockedBy BackgroundWorkerTask? @relation("lockedRunsV2", fields: [lockedById], references: [id], map: "task_run_v2_lockedById_fkey") + lockedById String? + + lockedToVersion BackgroundWorker? @relation("lockedRunsV2", fields: [lockedToVersionId], references: [id], map: "task_run_v2_lockedToVersionId_fkey") + lockedToVersionId String? + + /// The "priority" of the run. This is just a negative offset in ms for the queue timestamp + /// E.g. a value of 60_000 would put the run into the queue 60s ago. + priorityMs Int @default(0) + + concurrencyKey String? + + delayUntil DateTime? + queuedAt DateTime? + ttl String? + expiredAt DateTime? + maxAttempts Int? + lockedRetryConfig Json? + + /// optional token that can be used to authenticate the task run + oneTimeUseToken String? + + ///When this run is finished, the waitpoint will be marked as completed + associatedWaitpoint Waitpoint? @relation("CompletingRunV2") + + ///If there are any blocked waitpoints, the run won't be executed + blockedByWaitpoints TaskRunWaitpoint[] @relation("taskRunWaitpointsV2") + + /// All waitpoints that blocked this run at some point, used for display purposes + connectedWaitpoints Waitpoint[] @relation("WaitpointRunConnectionsV2") + + /// Where the logs are stored + taskEventStore String @default("taskEvent") + + queueTimestamp DateTime? + + batchItems BatchTaskRunItem[] @relation("batchItemsV2") + dependency TaskRunDependency? @relation("dependencyV2") + CheckpointRestoreEvent CheckpointRestoreEvent[] @relation("checkpointRestoreEventsV2") + executionSnapshots TaskRunExecutionSnapshot[] @relation("executionSnapshotsV2") + + alerts ProjectAlert[] @relation("alertsV2") + + scheduleInstanceId String? + scheduleId String? + + bulkActionGroupIds String[] @default([]) + + logsDeletedAt DateTime? + + replayedFromTaskRunFriendlyId String? + + /// This represents the original task that that was triggered outside of a Trigger.dev task + rootTaskRun TaskRunV2? @relation("TaskRootRunV2", fields: [rootTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_rootTaskRunId_fkey") + rootTaskRunId String? + + /// The root run will have a list of all the descendant runs, children, grand children, etc. + descendantRuns TaskRunV2[] @relation("TaskRootRunV2") + + /// The immediate parent run of this task run + parentTaskRun TaskRunV2? @relation("TaskParentRunV2", fields: [parentTaskRunId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_parentTaskRunId_fkey") + parentTaskRunId String? + + /// The immediate child runs of this task run + childRuns TaskRunV2[] @relation("TaskParentRunV2") + + /// The immediate parent attempt of this task run + parentTaskRunAttempt TaskRunAttempt? @relation("TaskParentRunAttemptV2", fields: [parentTaskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_parentTaskRunAttemptId_fkey") + parentTaskRunAttemptId String? + + /// The batch run that this task run is a part of + batch BatchTaskRun? @relation("batchRunsV2", fields: [batchId], references: [id], onDelete: SetNull, onUpdate: NoAction, map: "task_run_v2_batchId_fkey") + batchId String? + + /// whether or not the task run was created because of a triggerAndWait for batchTriggerAndWait + resumeParentOnCompletion Boolean @default(false) + + /// The depth of this task run in the task run hierarchy + depth Int @default(0) + + /// The span ID of the "trigger" span in the parent task run + parentSpanId String? + + /// Holds the state of the run chain for deadlock detection + runChainState Json? + + /// seed run metadata + seedMetadata String? + seedMetadataType String @default("application/json") + + /// Run metadata + metadata String? + metadataType String @default("application/json") + metadataVersion Int @default(1) + + /// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId + annotations Json? + + /// Whether the latest attempt was a warm start. Null until first attempt starts. + isWarmStart Boolean? + + /// Run output + output String? + outputType String @default("application/json") + + /// Run error + error Json? + + /// Organization's billing plan type (cached for fallback when billing API fails) + planType String? + + maxDurationInSeconds Int? + + /// The version of the realtime streams implementation used by the run + realtimeStreamsVersion String @default("v1") + /// Store the stream keys that are being used by the run + realtimeStreams String[] @default([]) + /// S2 basin where this run's realtime streams live. Stamped at create + /// time from `Organization.streamBasinName` so reads can resolve the + /// basin without joining org. Null when the org has no per-org basin + /// (OSS, or pre-backfill); reads fall back to the global basin. + streamBasinName String? + + sourceBulkActionItems BulkActionItem[] @relation("SourceActionItemRunV2") + destinationBulkActionItems BulkActionItem[] @relation("DestinationActionItemRunV2") + + playgroundConversations PlaygroundConversation[] @relation("playgroundConversationsV2") + + @@unique([oneTimeUseToken]) + @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) + // Finding child runs + @@index([parentTaskRunId]) + // Run page inspector + @@index([spanId]) + @@index([parentSpanId]) + // Finding runs in a batch + @@index([runTags(ops: ArrayOps)], type: Gin) + @@index([runtimeEnvironmentId, batchId]) + @@index([runtimeEnvironmentId, createdAt(sort: Desc)]) + @@index([createdAt], type: Brin) + // Keyset cursor for merged pagination across run tables + @@index([createdAt, id]) + @@map("task_run_v2") +} + model TaskRunTemplate { id String @id @default(cuid()) @@ -1215,7 +1457,9 @@ model TaskRunExecutionSnapshot { /// Run runId String - run TaskRun @relation(fields: [runId], references: [id]) + run TaskRun @relation(fields: [runId], references: [id], map: "TaskRunExecutionSnapshot_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("executionSnapshotsV2", fields: [runId], references: [id], map: "TaskRunExecutionSnapshot_runId_v2_fkey") runStatus TaskRunStatus // Batch @@ -1335,7 +1579,9 @@ model Waitpoint { /// If it's a RUN type waitpoint, this is the associated run completedByTaskRunId String? @unique - completedByTaskRun TaskRun? @relation("CompletingRun", fields: [completedByTaskRunId], references: [id], onDelete: SetNull) + completedByTaskRun TaskRun? @relation("CompletingRun", fields: [completedByTaskRunId], references: [id], onDelete: SetNull, map: "Waitpoint_completedByTaskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same completedByTaskRunId scalar (FK stripped in prod) + completedByTaskRunV2 TaskRunV2? @relation("CompletingRunV2", fields: [completedByTaskRunId], references: [id], onDelete: SetNull, map: "Waitpoint_completedByTaskRunId_v2_fkey") /// If it's a DATETIME type waitpoint, this is the date. /// If it's a MANUAL waitpoint, this can be set as the `timeout`. @@ -1349,7 +1595,8 @@ model Waitpoint { blockingTaskRuns TaskRunWaitpoint[] /// All runs that have ever been blocked by this waitpoint, used for display purposes - connectedRuns TaskRun[] @relation("WaitpointRunConnections") + connectedRuns TaskRun[] @relation("WaitpointRunConnections") + connectedRunsV2 TaskRunV2[] @relation("WaitpointRunConnectionsV2") /// When a waitpoint is complete completedExecutionSnapshots TaskRunExecutionSnapshot[] @relation("completedWaitpoints") @@ -1400,7 +1647,9 @@ enum WaitpointStatus { model TaskRunWaitpoint { id String @id @default(cuid()) - taskRun TaskRun @relation(fields: [taskRunId], references: [id]) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], map: "TaskRunWaitpoint_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("taskRunWaitpointsV2", fields: [taskRunId], references: [id], map: "TaskRunWaitpoint_taskRunId_v2_fkey") taskRunId String waitpoint Waitpoint @relation(fields: [waitpointId], references: [id]) @@ -1564,7 +1813,8 @@ model TaskRunTag { friendlyId String @unique - runs TaskRun[] + runs TaskRun[] + runsV2 TaskRunV2[] @relation("taskRunTagsV2") project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) projectId String @@ -1581,7 +1831,9 @@ model TaskRunDependency { id String @id @default(cuid()) /// The child run - taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunDependency_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("dependencyV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunDependency_taskRunId_v2_fkey") taskRunId String @unique checkpointEvent CheckpointRestoreEvent? @relation(fields: [checkpointEventId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -1629,7 +1881,9 @@ model TaskRunAttempt { friendlyId String @unique - taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation("attempts", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunAttempt_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("attemptsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "TaskRunAttempt_taskRunId_v2_fkey") taskRunId String backgroundWorker BackgroundWorker @relation(fields: [backgroundWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -1666,6 +1920,7 @@ model TaskRunAttempt { CheckpointRestoreEvent CheckpointRestoreEvent[] alerts ProjectAlert[] childRuns TaskRun[] @relation("TaskParentRunAttempt") + childRunsV2 TaskRunV2[] @relation("TaskParentRunAttemptV2") @@unique([taskRunId, number]) @@index([taskRunId]) @@ -1867,6 +2122,7 @@ model BatchTaskRun { runtimeEnvironmentId String /// This only includes new runs, not idempotent runs. runs TaskRun[] + runsV2 TaskRunV2[] @relation("batchRunsV2") createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -1950,7 +2206,9 @@ model BatchTaskRunItem { batchTaskRun BatchTaskRun @relation(fields: [batchTaskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) batchTaskRunId String - taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BatchTaskRunItem_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2 @relation("batchItemsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BatchTaskRunItem_taskRunId_v2_fkey") taskRunId String taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: Cascade) @@ -2045,7 +2303,9 @@ model Checkpoint { events CheckpointRestoreEvent[] - run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "Checkpoint_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("checkpointsV2", fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "Checkpoint_runId_v2_fkey") runId String attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2080,7 +2340,9 @@ model CheckpointRestoreEvent { checkpoint Checkpoint @relation(fields: [checkpointId], references: [id], onDelete: Cascade, onUpdate: Cascade) checkpointId String - run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade) + run TaskRun @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "CheckpointRestoreEvent_runId_fkey") + /// Mirror relation to TaskRunV2 reusing the same runId scalar (FK stripped in prod) + runV2 TaskRunV2 @relation("checkpointRestoreEventsV2", fields: [runId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "CheckpointRestoreEvent_runId_v2_fkey") runId String attempt TaskRunAttempt @relation(fields: [attemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2366,7 +2628,9 @@ model ProjectAlert { taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade) taskRunAttemptId String? - taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + taskRun TaskRun? @relation(fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "ProjectAlert_taskRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same taskRunId scalar (FK stripped in prod) + taskRunV2 TaskRunV2? @relation("alertsV2", fields: [taskRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "ProjectAlert_taskRunId_v2_fkey") taskRunId String? workerDeployment WorkerDeployment? @relation(fields: [workerDeploymentId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2547,11 +2811,15 @@ model BulkActionItem { status BulkActionItemStatus @default(PENDING) /// The run that is the source of the action, e.g. when replaying this is the original run - sourceRun TaskRun @relation("SourceActionItemRun", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + sourceRun TaskRun @relation("SourceActionItemRun", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_sourceRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same sourceRunId scalar (FK stripped in prod) + sourceRunV2 TaskRunV2 @relation("SourceActionItemRunV2", fields: [sourceRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_sourceRunId_v2_fkey") sourceRunId String /// The run that's a result of the action, this will be set when the run has been created - destinationRun TaskRun? @relation("DestinationActionItemRun", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade) + destinationRun TaskRun? @relation("DestinationActionItemRun", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_destinationRunId_fkey") + /// Mirror relation to TaskRunV2 reusing the same destinationRunId scalar (FK stripped in prod) + destinationRunV2 TaskRunV2? @relation("DestinationActionItemRunV2", fields: [destinationRunId], references: [id], onDelete: Cascade, onUpdate: Cascade, map: "BulkActionItem_destinationRunId_v2_fkey") destinationRunId String? error String? diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 1a7ddb27236..044c5e19320 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -23,6 +23,14 @@ export interface LogicalReplicationClientOptions { * The table to replicate (for publication creation). */ table: string; + /** + * Additional tables to co-publish into the same publication. Their WAL + * events stream through the same `data` handler as `table`, so use this only + * when the extra tables share `table`'s row shape and downstream transform + * (e.g. a parallel clone table). On startup they are added to an existing + * publication via ALTER PUBLICATION ... ADD TABLE. + */ + additionalTables?: string[]; /** * The name of the replication slot to use. */ @@ -407,6 +415,15 @@ export class LogicalReplicationClient { return this; } + // The full set of tables this client publishes: the primary `table` plus any + // `additionalTables`. Order is stable so the publication's FOR TABLE clause is + // deterministic. + #allTables(): string[] { + return this.options.additionalTables + ? [this.options.table, ...this.options.additionalTables] + : [this.options.table]; + } + async #createPublication(): Promise { if (!this.client) { this.events.emit("error", new LogicalReplicationClientError("Client not connected")); @@ -416,8 +433,10 @@ export class LogicalReplicationClient { const publicationExists = await this.#doesPublicationExist(); if (publicationExists) { - // Validate the existing publication is correctly configured - const validationError = await this.#validatePublicationConfiguration(); + // Reconcile the existing publication: add any configured table it is + // missing (e.g. a clone table added after the publication was first + // created). Returns an error string only for unrecoverable mismatches. + const validationError = await this.#ensurePublicationConfiguration(); if (validationError) { this.logger.error("Publication exists but is misconfigured", { @@ -441,9 +460,13 @@ export class LogicalReplicationClient { return true; } + const tableList = this.#allTables() + .map((table) => `"${table}"`) + .join(", "); + const [createError] = await tryCatch( this.client.query( - `CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE "${this.options.table}" ${ + `CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE ${tableList} ${ this.options.publicationActions ? `WITH (publish = '${this.options.publicationActions.join(", ")}')` : "" @@ -483,32 +506,47 @@ export class LogicalReplicationClient { return res.rows[0].exists; } - async #validatePublicationConfiguration(): Promise { + async #ensurePublicationConfiguration(): Promise { if (!this.client) { - return "Cannot validate publication configuration: client not connected"; + return "Cannot ensure publication configuration: client not connected"; } - // Check if the publication has the correct table + // Which public tables the publication already carries. const tablesRes = await this.client.query( - `SELECT schemaname, tablename - FROM pg_publication_tables + `SELECT schemaname, tablename + FROM pg_publication_tables WHERE pubname = '${this.options.publicationName}';` ); - const tables = tablesRes.rows; - const expectedTable = this.options.table; - - // Check if the table is in the publication - const hasTable = tables.some( - (row) => row.tablename === expectedTable && row.schemaname === "public" + const currentTables = new Set( + tablesRes.rows + .filter((row) => row.schemaname === "public") + .map((row) => row.tablename as string) ); - if (!hasTable) { - if (tables.length === 0) { - return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; - } else { - const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", "); - return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; + // Reconcile rather than reject: add any configured table the publication is + // missing. ALTER PUBLICATION ... ADD TABLE is online and leaves the slot + // position intact, so an existing publication can gain a table (e.g. + // task_run_v2 alongside TaskRun) without a drop/recreate. ADD TABLE on a + // table already published raises duplicate_object (42710); treat that as a + // benign race (another instance won) rather than a failure. + const missingTables = this.#allTables().filter((table) => !currentTables.has(table)); + + for (const table of missingTables) { + this.logger.info("Adding table to existing publication", { + name: this.options.name, + publicationName: this.options.publicationName, + table, + }); + + const [addError] = await tryCatch( + this.client.query( + `ALTER PUBLICATION "${this.options.publicationName}" ADD TABLE "${table}";` + ) + ); + + if (addError && (addError as { code?: string }).code !== "42710") { + return `Failed to add table "public.${table}" to publication '${this.options.publicationName}': ${addError.message}`; } } diff --git a/internal-packages/run-store/src/PostgresRunStore.test.ts b/internal-packages/run-store/src/PostgresRunStore.test.ts index 47876b70c8d..1a982d87dc6 100644 --- a/internal-packages/run-store/src/PostgresRunStore.test.ts +++ b/internal-packages/run-store/src/PostgresRunStore.test.ts @@ -1,4 +1,5 @@ import { postgresTest } from "@internal/testcontainers"; +import { isKsuidId, RunId } from "@trigger.dev/core/v3/isomorphic"; import type { PrismaClient } from "@trigger.dev/database"; import { describe, expect } from "vitest"; import { PostgresRunStore } from "./PostgresRunStore.js"; @@ -1772,3 +1773,1037 @@ describe("PostgresRunStore — read", () => { expect(found[0]?.payloadType).toBe("application/json"); }); }); + +describe("PostgresRunStore — table routing by id format", () => { + // Seed a run directly into one physical table, choosing the delegate by id + // format the same way the store does. Returns the ids used. + async function seedRoutedRun( + prisma: PrismaClient, + params: { + id: string; + friendlyId: string; + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; + status?: string; + idempotencyKey?: string; + taskIdentifier?: string; + createdAt?: Date; + } + ) { + const delegate = isKsuidId(params.id) + ? (prisma.taskRunV2 as unknown as typeof prisma.taskRun) + : prisma.taskRun; + + await delegate.create({ + data: { + id: params.id, + engine: "V2", + status: (params.status as any) ?? "PENDING", + friendlyId: params.friendlyId, + runtimeEnvironmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + organizationId: params.organizationId, + projectId: params.projectId, + taskIdentifier: params.taskIdentifier ?? "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: `trace_${params.id}`, + spanId: `span_${params.id}`, + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + ...(params.idempotencyKey !== undefined && { idempotencyKey: params.idempotencyKey }), + ...(params.createdAt !== undefined && { createdAt: params.createdAt }), + }, + }); + } + + postgresTest( + "createRun with a cuid id lands a row in TaskRun and NOT in task_run_v2", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + expect(isKsuidId(cuid.id)).toBe(false); + + await store.createRun({ + data: { + id: cuid.id, + engine: "V2", + status: "PENDING", + friendlyId: cuid.friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_cuid", + spanId: "span_cuid", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + snapshot: { + engine: "V2", + executionStatus: "RUN_CREATED", + description: "Run was created", + runStatus: "PENDING", + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }); + + // cuid run is in TaskRun, not in task_run_v2. + const legacyRow = await prisma.taskRun.findUnique({ where: { id: cuid.id } }); + expect(legacyRow).not.toBeNull(); + const cuidInV2 = await prisma.taskRunV2.findUnique({ where: { id: cuid.id } }); + expect(cuidInV2).toBeNull(); + } + ); + + postgresTest( + "createRun routes a KSUID id to task_run_v2: the scalar row lands there and not in TaskRun", + async ({ prisma }) => { + // This test exercises the routing decision in isolation by writing the + // scalar row directly to the table `createRun` would pick for a KSUID + // `data.id`, then asserts the row landed in task_run_v2 and not in TaskRun. + // The full v2 create path (run + nested snapshot + waitpoint) is covered + // by the "v2 nested writes" suite below. + const { organization, project, environment } = await seedEnvironment(prisma); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const ksuidInLegacy = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(ksuidInLegacy).toBeNull(); + } + ); + + postgresTest( + "findRun and updateMetadata route to task_run_v2 for a KSUID run and to TaskRun for a cuid run", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const cuid = RunId.generate(); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + // By-id read finds each run in its own table. + const foundKsuid = await store.findRun({ id: ksuid.id }, { select: { id: true } }); + expect(foundKsuid?.id).toBe(ksuid.id); + const foundCuid = await store.findRun({ id: cuid.id }, { select: { id: true } }); + expect(foundCuid?.id).toBe(cuid.id); + + // By-id write (updateMetadata) lands in the correct table. + const ksuidResult = await store.updateMetadata( + ksuid.id, + { + metadata: '{"routed":"v2"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + {} + ); + expect(ksuidResult.count).toBe(1); + + const cuidResult = await store.updateMetadata( + cuid.id, + { + metadata: '{"routed":"legacy"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + {} + ); + expect(cuidResult.count).toBe(1); + + // The write hit task_run_v2 for the KSUID run … + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { metadata: true }, + }); + expect(v2Row.metadata).toBe('{"routed":"v2"}'); + + // … and TaskRun for the cuid run. + const legacyRow = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + select: { metadata: true }, + }); + expect(legacyRow.metadata).toBe('{"routed":"legacy"}'); + } + ); + + postgresTest( + "findRun resolves a non-id predicate (idempotency key) against a run in either table", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // A KSUID run carrying an idempotency key lands in task_run_v2 … + const ksuid = RunId.generateKsuid(); + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + + // … and a cuid run carrying a different key lands in legacy TaskRun. + const cuid = RunId.generate(); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-legacy", + taskIdentifier: "my-task", + }); + + // The lookup carries no id/friendlyId, so it must read BOTH tables — + // this is the mixed-window idempotency dedup. Miss either table and a + // reused key produces a duplicate run. + const v2Hit = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + expect(v2Hit?.id).toBe(ksuid.id); + + const legacyHit = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-legacy", + taskIdentifier: "my-task", + }); + expect(legacyHit?.id).toBe(cuid.id); + + // A key in neither table returns null — no false dedup. + const miss = await store.findRun({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-missing", + taskIdentifier: "my-task", + }); + expect(miss).toBeNull(); + + // findRunOrThrow takes the same both-table path: it finds the v2 row … + const thrown = await store.findRunOrThrow({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-v2", + taskIdentifier: "my-task", + }); + expect(thrown.id).toBe(ksuid.id); + + // … and throws when neither table matches. + await expect( + store.findRunOrThrow({ + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-missing", + taskIdentifier: "my-task", + }) + ).rejects.toThrow(); + } + ); + + postgresTest( + "expireRunsBatch with a mixed array updates both tables and returns the combined count", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const cuid = RunId.generate(); + + await seedRoutedRun(prisma, { + id: ksuid.id, + friendlyId: ksuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: cuid.id, + friendlyId: cuid.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const now = new Date("2026-06-19T12:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" }; + + const count = await store.expireRunsBatch([ksuid.id, cuid.id], { error, now }); + + expect(count).toBe(2); + + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true, completedAt: true, expiredAt: true }, + }); + expect(v2Row.status).toBe("EXPIRED"); + expect(v2Row.completedAt).toEqual(now); + expect(v2Row.expiredAt).toEqual(now); + + const legacyRow = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + select: { status: true, completedAt: true, expiredAt: true }, + }); + expect(legacyRow.status).toBe("EXPIRED"); + expect(legacyRow.completedAt).toEqual(now); + expect(legacyRow.expiredAt).toEqual(now); + } + ); + + postgresTest( + "findRuns (unordered) returns runs from BOTH TaskRun and task_run_v2 in one env", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // Two legacy (cuid) runs + two new (ksuid) runs in the SAME env. + const legacyA = RunId.generate(); + const legacyB = RunId.generate(); + const v2A = RunId.generateKsuid(); + const v2B = RunId.generateKsuid(); + expect(isKsuidId(legacyA.id)).toBe(false); + expect(isKsuidId(v2A.id)).toBe(true); + + for (const run of [legacyA, legacyB, v2A, v2B]) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + }); + + // ALL four runs come back, regardless of which physical table they live in. + expect(found.map((r) => r.id).sort()).toEqual( + [legacyA.id, legacyB.id, v2A.id, v2B.id].sort() + ); + } + ); + + postgresTest( + "findRuns (ordered+limited) 2-way merges both tables to the globally-correct first N", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // Interleave createdAt across the two tables so a per-table take+slice + // would be WRONG: the newest run is in v2, the 2nd-newest in legacy, etc. + // t5 (v2) > t4 (legacy) > t3 (v2) > t2 (legacy) > t1 (v2) > t0 (legacy) + const base = new Date("2026-06-01T00:00:00.000Z").getTime(); + const at = (i: number) => new Date(base + i * 60_000); + + const legacy0 = RunId.generate(); // t0 (oldest) + const v2_1 = RunId.generateKsuid(); // t1 + const legacy2 = RunId.generate(); // t2 + const v2_3 = RunId.generateKsuid(); // t3 + const legacy4 = RunId.generate(); // t4 + const v2_5 = RunId.generateKsuid(); // t5 (newest) + + const seeded: Array<{ id: string; friendlyId: string; t: number }> = [ + { id: legacy0.id, friendlyId: legacy0.friendlyId, t: 0 }, + { id: v2_1.id, friendlyId: v2_1.friendlyId, t: 1 }, + { id: legacy2.id, friendlyId: legacy2.friendlyId, t: 2 }, + { id: v2_3.id, friendlyId: v2_3.friendlyId, t: 3 }, + { id: legacy4.id, friendlyId: legacy4.friendlyId, t: 4 }, + { id: v2_5.id, friendlyId: v2_5.friendlyId, t: 5 }, + ]; + + for (const run of seeded) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + createdAt: at(run.t), + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { createdAt: "desc" }, + take: 3, + }); + + // The globally-newest 3 — drawn from BOTH tables in true createdAt order, + // NOT three rows from one table. + expect(found.map((r) => r.id)).toEqual([v2_5.id, legacy4.id, v2_3.id]); + } + ); + + postgresTest( + "findRuns scoping: a run in another env is NOT returned from either table", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // A second env in the same project. + const otherEnv = await prisma.runtimeEnvironment.create({ + data: { + type: "PREVIEW", + slug: "other", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_other_apikey", + pkApiKey: "pk_other_apikey", + shortcode: "other_short_code", + }, + }); + + // One legacy + one v2 run in the TARGET env. + const legacyTarget = RunId.generate(); + const v2Target = RunId.generateKsuid(); + // One legacy + one v2 run in the OTHER env (must never surface). + const legacyOther = RunId.generate(); + const v2Other = RunId.generateKsuid(); + + await seedRoutedRun(prisma, { + id: legacyTarget.id, + friendlyId: legacyTarget.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: v2Target.id, + friendlyId: v2Target.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: legacyOther.id, + friendlyId: legacyOther.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: otherEnv.id, + }); + await seedRoutedRun(prisma, { + id: v2Other.id, + friendlyId: v2Other.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: otherEnv.id, + }); + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + }); + + // The same `where` fences BOTH tables: only the target env's runs come back. + expect(found.map((r) => r.id).sort()).toEqual([legacyTarget.id, v2Target.id].sort()); + const foundIds = new Set(found.map((r) => r.id)); + expect(foundIds.has(legacyOther.id)).toBe(false); + expect(foundIds.has(v2Other.id)).toBe(false); + } + ); + + postgresTest( + "findRuns (include) returns hydrated relations from both tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + const v2 = RunId.generateKsuid(); + + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + await seedRoutedRun(prisma, { + id: v2.id, + friendlyId: v2.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + include: { runtimeEnvironment: true }, + }); + + expect(found).toHaveLength(2); + // Both rows — legacy and v2 — carry the hydrated relation. + for (const run of found) { + expect(run.runtimeEnvironment).not.toBeNull(); + expect(run.runtimeEnvironment.id).toBe(environment.id); + expect(run.runtimeEnvironment.slug).toBe("dev"); + } + } + ); + + postgresTest( + "findRuns (take, no orderBy) caps the combined result across both tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // 2 legacy + 2 v2; an unordered `take: 3` must return exactly 3, all + // belonging to the scoped env. + const runs = [ + RunId.generate(), + RunId.generate(), + RunId.generateKsuid(), + RunId.generateKsuid(), + ]; + for (const run of runs) { + await seedRoutedRun(prisma, { + id: run.id, + friendlyId: run.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + } + + const found = await store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + take: 3, + }); + + expect(found).toHaveLength(3); + const allIds = new Set(runs.map((r) => r.id)); + for (const run of found) { + expect(allIds.has(run.id)).toBe(true); + } + } + ); + + postgresTest( + "findRuns (ordered+limited) by id alone is rejected: id is not a total cross-table order", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + await expect( + store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { id: "asc" }, + take: 10, + }) + ).rejects.toThrow(/total order/i); + } + ); + + postgresTest( + "findRuns (ordered+limited) rejects a Prisma cursor it cannot span across two tables", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const legacy = RunId.generate(); + await seedRoutedRun(prisma, { + id: legacy.id, + friendlyId: legacy.friendlyId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + + await expect( + store.findRuns({ + where: { runtimeEnvironmentId: environment.id }, + select: { id: true }, + orderBy: { createdAt: "desc" }, + take: 5, + cursor: { id: legacy.id }, + }) + ).rejects.toThrow(/cursor/i); + } + ); +}); + +describe("PostgresRunStore — v2 nested writes (run + related rows via nested Prisma create)", () => { + // `task_run_v2` is a full clone of `TaskRun` down to its relations, so the nested Prisma + // create/include used by createRun/lifecycle methods targets it unchanged via the runModel cast. + // The child->run foreign keys (TaskRunExecutionSnapshot.runId, Waitpoint.completedByTaskRunId, …) + // are dropped in production and by the testcontainer harness, so a child row can reference a run + // in EITHER physical table (TaskRun or task_run_v2) by plain scalar id without a FK violation. + + function runAssociatedWaitpoint(params: { + id: string; + friendlyId: string; + projectId: string; + environmentId: string; + }) { + return { + id: params.id, + friendlyId: params.friendlyId, + type: "RUN" as const, + status: "PENDING" as const, + idempotencyKey: `idem_${params.id}`, + userProvidedIdempotencyKey: false, + projectId: params.projectId, + environmentId: params.environmentId, + }; + } + + postgresTest( + "createRun for a KSUID run lands the run in task_run_v2, creates its snapshot keyed to the v2 run id, and creates the associated waitpoint", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + const input: CreateRunInput = { + ...buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }), + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_v2_create_1", + friendlyId: "wp_v2_create_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + input.data.friendlyId = ksuid.friendlyId; + + const run = await store.createRun(input); + + // Returns the TaskRunWithWaitpoint shape with the associated waitpoint included. + expect(run.id).toBe(ksuid.id); + expect(run.status).toBe("PENDING"); + expect(run.associatedWaitpoint).not.toBeNull(); + expect(run.associatedWaitpoint?.id).toBe("wp_v2_create_1"); + + // The run row landed in task_run_v2, not TaskRun. + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const legacyRow = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(legacyRow).toBeNull(); + + // The execution snapshot is keyed to the v2 run id (in the shared snapshot table). + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: ksuid.id }, + }); + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("RUN_CREATED"); + expect(snapshots[0]?.runStatus).toBe("PENDING"); + + // The waitpoint points back at the v2 run via the scalar FK column. + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_v2_create_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(ksuid.id); + } + ); + + postgresTest( + "v2 lifecycle: startAttempt then completeAttemptSuccess creates the completion snapshot keyed to the v2 run id", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const started = await store.startAttempt( + ksuid.id, + { attemptNumber: 1, isWarmStart: false }, + { select: { id: true, status: true, attemptNumber: true } } + ); + expect(started.status).toBe("EXECUTING"); + expect(started.attemptNumber).toBe(1); + + const completedAt = new Date("2026-06-19T11:00:00.000Z"); + const completed = await store.completeAttemptSuccess( + ksuid.id, + { + completedAt, + output: '{"ok":true}', + outputType: "application/json", + usageDurationMs: 250, + costInCents: 4, + snapshot: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: 1, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }, + { select: { id: true, status: true, completedAt: true, usageDurationMs: true, costInCents: true } } + ); + + expect(completed.id).toBe(ksuid.id); + expect(completed.status).toBe("COMPLETED_SUCCESSFULLY"); + expect(completed.completedAt).toEqual(completedAt); + expect(completed.usageDurationMs).toBe(250); + expect(completed.costInCents).toBe(4); + + // The run row updated in task_run_v2. + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true }, + }); + expect(v2Row.status).toBe("COMPLETED_SUCCESSFULLY"); + + // The completion snapshot is keyed to the v2 run id. + const finished = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: ksuid.id, executionStatus: "FINISHED" }, + }); + expect(finished).toHaveLength(1); + expect(finished[0]?.runStatus).toBe("COMPLETED_SUCCESSFULLY"); + } + ); + + postgresTest( + "createFailedRun for a KSUID run lands the run in task_run_v2 and creates the associated waitpoint keyed to the v2 run id", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const completedAt = new Date("2026-06-19T00:00:00.000Z"); + const error = { type: "STRING_ERROR", raw: "system failure" }; + + const input: CreateFailedRunInput = { + data: { + id: ksuid.id, + engine: "V2", + status: "SYSTEM_FAILURE", + friendlyId: ksuid.friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "trace_v2_failed", + spanId: "span_v2_failed", + queue: "task/my-task", + isTest: false, + completedAt, + error: error as unknown as import("@trigger.dev/database").Prisma.InputJsonObject, + depth: 0, + taskEventStore: "taskEvent", + }, + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_v2_failed_1", + friendlyId: "wp_v2_failed_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + + const run = await store.createFailedRun(input); + + expect(run.id).toBe(ksuid.id); + expect(run.status).toBe("SYSTEM_FAILURE"); + expect(run.associatedWaitpoint).not.toBeNull(); + expect(run.associatedWaitpoint?.id).toBe("wp_v2_failed_1"); + + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: ksuid.id } }); + expect(v2Row).not.toBeNull(); + const legacyRow = await prisma.taskRun.findUnique({ where: { id: ksuid.id } }); + expect(legacyRow).toBeNull(); + + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_v2_failed_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(ksuid.id); + } + ); + + postgresTest( + "createRun for a legacy cuid run with an associated waitpoint creates the run, its snapshot, and the waitpoint (regression: identical rows/shape)", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + expect(isKsuidId(cuid.id)).toBe(false); + + const input: CreateRunInput = { + ...buildCreateRunInput({ + runId: cuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }), + associatedWaitpoint: runAssociatedWaitpoint({ + id: "wp_legacy_create_1", + friendlyId: "wp_legacy_create_friendly_1", + projectId: project.id, + environmentId: environment.id, + }), + }; + input.data.friendlyId = cuid.friendlyId; + + const run = await store.createRun(input); + + // Same TaskRunWithWaitpoint shape as before. + expect(run.id).toBe(cuid.id); + expect(run.status).toBe("PENDING"); + expect(run.associatedWaitpoint?.id).toBe("wp_legacy_create_1"); + + // Legacy run is in TaskRun, not task_run_v2. + const legacyRow = await prisma.taskRun.findUnique({ where: { id: cuid.id } }); + expect(legacyRow).not.toBeNull(); + const v2Row = await prisma.taskRunV2.findUnique({ where: { id: cuid.id } }); + expect(v2Row).toBeNull(); + + // Snapshot keyed to the run, waitpoint linked back via the FK column. + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: cuid.id }, + }); + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("RUN_CREATED"); + + const waitpoint = await prisma.waitpoint.findUnique({ where: { id: "wp_legacy_create_1" } }); + expect(waitpoint?.completedByTaskRunId).toBe(cuid.id); + + // The FK still being live for the legacy table proves the waitpoint really + // resolves to a TaskRun row (the regression path is unchanged). + const reloaded = await prisma.taskRun.findUniqueOrThrow({ + where: { id: cuid.id }, + include: { associatedWaitpoint: true }, + }); + expect(reloaded.associatedWaitpoint?.id).toBe("wp_legacy_create_1"); + } + ); + + postgresTest( + "createRun is atomic: a second create with the same id throws and leaves no dangling snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const cuid = RunId.generate(); + const input = buildCreateRunInput({ + runId: cuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = cuid.friendlyId; + + await store.createRun(input); + + const before = await prisma.taskRunExecutionSnapshot.count({ where: { runId: cuid.id } }); + expect(before).toBe(1); + + // A second createRun with the same id fails the unique-id insert and + // propagates the error. Because the run row and its snapshot are written by + // one nested Prisma create, the rollback leaves no extra snapshot behind. + await expect(store.createRun(input)).rejects.toThrow(); + + const after = await prisma.taskRunExecutionSnapshot.count({ where: { runId: cuid.id } }); + expect(after).toBe(1); + } + ); + + postgresTest( + "lockRunToWorker for a KSUID run returns the run with runtimeEnvironment hydrated via include (no manual stitch)", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + expect(isKsuidId(ksuid.id)).toBe(true); + + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const backgroundWorker = await prisma.backgroundWorker.create({ + data: { + friendlyId: "worker_friendly_v2", + version: "20260601.1", + runtimeEnvironmentId: environment.id, + projectId: project.id, + contentHash: "abc123v2", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + metadata: {}, + }, + }); + + const workerTask = await prisma.backgroundWorkerTask.create({ + data: { + friendlyId: "task_friendly_v2", + slug: "my-task", + filePath: "src/my-task.ts", + exportName: "myTask", + workerId: backgroundWorker.id, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + const queue = await prisma.taskQueue.create({ + data: { + friendlyId: "queue_friendly_v2", + name: "task/my-task", + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + const lockedAt = new Date("2026-06-19T13:00:00.000Z"); + const startedAt = new Date("2026-06-19T13:00:01.000Z"); + const snapshotId = "snap_lock_v2_1"; + + const locked = await store.lockRunToWorker(ksuid.id, { + lockedAt, + lockedById: workerTask.id, + lockedToVersionId: backgroundWorker.id, + lockedQueueId: queue.id, + startedAt, + baseCostInCents: 5, + machinePreset: "small-1x", + taskVersion: "20260601.1", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + maxDurationInSeconds: null, + snapshot: { + id: snapshotId, + previousSnapshotId: undefined, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + completedWaitpointIds: [], + completedWaitpointOrder: [], + }, + }); + + expect(locked.status).toBe("DEQUEUED"); + // The relation is hydrated by the nested `include`, not stitched manually. + expect(locked.runtimeEnvironment).toBeDefined(); + expect(locked.runtimeEnvironment.id).toBe(environment.id); + + // The run row landed (and was updated) in task_run_v2. + const v2Row = await prisma.taskRunV2.findUniqueOrThrow({ + where: { id: ksuid.id }, + select: { status: true }, + }); + expect(v2Row.status).toBe("DEQUEUED"); + + // The dequeue snapshot is keyed to the v2 run id. + const snap = await prisma.taskRunExecutionSnapshot.findUnique({ where: { id: snapshotId } }); + expect(snap?.executionStatus).toBe("PENDING_EXECUTING"); + } + ); + + postgresTest( + "findRun with a runtimeEnvironment include resolves the relation for a KSUID run", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const ksuid = RunId.generateKsuid(); + const input = buildCreateRunInput({ + runId: ksuid.id, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }); + input.data.friendlyId = ksuid.friendlyId; + + await store.createRun(input); + + const run = await store.findRun({ id: ksuid.id }, { include: { runtimeEnvironment: true } }); + + expect(run?.id).toBe(ksuid.id); + expect(run?.runtimeEnvironment).toBeDefined(); + expect(run?.runtimeEnvironment.id).toBe(environment.id); + } + ); +}); diff --git a/internal-packages/run-store/src/PostgresRunStore.ts b/internal-packages/run-store/src/PostgresRunStore.ts index fcc53c00266..d07c9630db4 100644 --- a/internal-packages/run-store/src/PostgresRunStore.ts +++ b/internal-packages/run-store/src/PostgresRunStore.ts @@ -20,6 +20,7 @@ import type { TaskRunWithWaitpoint, } from "./types.js"; import type { TaskRunError } from "@trigger.dev/core/v3/schemas"; +import { isKsuidId } from "@trigger.dev/core/v3/isomorphic"; export type PostgresRunStoreOptions = { prisma: PrismaClient; @@ -27,12 +28,17 @@ export type PostgresRunStoreOptions = { }; /** - * Typed write layer for the task-run row, backed by the `taskRun` Prisma model. + * Typed write layer for the task-run row. A run lives in one of two physical + * tables chosen by its id format (`runModel`): the legacy `taskRun`, or the + * `task_run_v2` clone. `task_run_v2` carries the same relation surface as + * `TaskRun`, so a method's nested Prisma create/include (execution snapshot, + * associated waitpoint, `runtimeEnvironment`) targets either table unchanged + * once the delegate comes from `runModel`. * - * Each method is a verbatim relocation of the Prisma statement that lives at a - * specific call site today. Methods write through `(tx ?? this.prisma).taskRun` + * Each method is its original single-table Prisma statement with the run + * delegate routed through `runModel`. Methods write through `tx` when supplied * so callers can opt into an existing transaction. Errors (including unique - * constraint violations) propagate to the caller unchanged. + * constraint violations) propagate unchanged. */ export class PostgresRunStore implements RunStore { private readonly prisma: PrismaClient; @@ -43,13 +49,66 @@ export class PostgresRunStore implements RunStore { this.readOnlyPrisma = options.readOnlyPrisma; } + /** + * A run lives in exactly one physical table, chosen by the FORMAT of its id: + * a KSUID id (new) lives in `task_run_v2`, the legacy cuid id in `TaskRun`. + * `task_run_v2` is an identical clone of `TaskRun` down to its relations, so + * its delegate is cast to the `taskRun` delegate type to reuse the existing + * generic `select`/`include`/nested-write passthrough unchanged. + */ + private runModel(client: PrismaClientOrTransaction, idOrFriendlyId: string) { + return isKsuidId(idOrFriendlyId) + ? (client.taskRunV2 as unknown as typeof client.taskRun) + : client.taskRun; + } + + /** + * The routing key for a single-row read: the `{ id }` or `{ friendlyId }` + * value in the `where` clause. Both carry the same KSUID/cuid body and route + * to the same physical table. Returns `undefined` for a predicate that + * addresses no specific run (e.g. an idempotency-key lookup), which must read + * both tables rather than assume one. + */ + #routingKeyOf(where: Prisma.TaskRunWhereInput): string | undefined { + return typeof where.id === "string" + ? where.id + : typeof where.friendlyId === "string" + ? where.friendlyId + : undefined; + } + + /** + * Read a single row matching a non-id predicate from BOTH physical tables. + * A run lives in exactly one table (chosen by its id format), so a key-based + * predicate (idempotency key, "has this env any runs") can match a row in + * either. Query both in parallel and return the first match — at most one + * side is non-null, and legacy is preferred for a stable result if a + * predicate ever matches both. `task_run_v2` is an identical clone of + * `TaskRun`, so the SAME args (select/include and the security-scoping + * `where`) run unchanged against either delegate. + */ + async #findFirstAcrossTables( + prisma: PrismaClientOrTransaction | PrismaReplicaClient, + where: Prisma.TaskRunWhereInput, + args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude } + ): Promise { + const v2Model = prisma.taskRunV2 as unknown as typeof prisma.taskRun; + + const [legacyRun, v2Run] = await Promise.all([ + prisma.taskRun.findFirst({ where, ...args }), + v2Model.findFirst({ where, ...args }), + ]); + + return legacyRun ?? v2Run; + } + async createRun( params: CreateRunInput, tx?: PrismaClientOrTransaction ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ include: { associatedWaitpoint: true, }, @@ -84,7 +143,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ data: { ...params.data, executionSnapshots: { @@ -111,7 +170,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const client = tx ?? this.prisma; - return client.taskRun.create({ + return this.runModel(client, params.data.id).create({ include: { associatedWaitpoint: true, }, @@ -134,7 +193,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXECUTING", @@ -161,7 +220,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "COMPLETED_SUCCESSFULLY", @@ -197,7 +256,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { machinePreset: data.machinePreset, @@ -215,7 +274,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING" }, select: args.select, @@ -229,7 +288,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - await prisma.taskRun.update({ + await this.runModel(prisma, runId).update({ where: { id: runId }, data: { bulkActionGroupIds: { @@ -253,7 +312,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "CANCELED", @@ -283,7 +342,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: data.status, @@ -304,7 +363,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXPIRED", @@ -341,15 +400,41 @@ export class PostgresRunStore implements RunStore { return 0; } - return prisma.$executeRaw` - UPDATE "TaskRun" - SET "status" = 'EXPIRED'::"TaskRunStatus", - "completedAt" = ${data.now}, - "expiredAt" = ${data.now}, - "updatedAt" = ${data.now}, - "error" = ${JSON.stringify(data.error)}::jsonb - WHERE "id" IN (${Prisma.join(runIds)}) - `; + // A run lives in exactly one table, chosen by its id format. The array may + // be mixed, so partition it and run the UPDATE once per non-empty partition + // on its own table, then sum the counts. + const v2Ids = runIds.filter((id) => isKsuidId(id)); + const legacyIds = runIds.filter((id) => !isKsuidId(id)); + + const error = JSON.stringify(data.error); + + let count = 0; + + if (legacyIds.length > 0) { + count += await prisma.$executeRaw` + UPDATE "TaskRun" + SET "status" = 'EXPIRED'::"TaskRunStatus", + "completedAt" = ${data.now}, + "expiredAt" = ${data.now}, + "updatedAt" = ${data.now}, + "error" = ${error}::jsonb + WHERE "id" IN (${Prisma.join(legacyIds)}) + `; + } + + if (v2Ids.length > 0) { + count += await prisma.$executeRaw` + UPDATE "task_run_v2" + SET "status" = 'EXPIRED'::"TaskRunStatus", + "completedAt" = ${data.now}, + "expiredAt" = ${data.now}, + "updatedAt" = ${data.now}, + "error" = ${error}::jsonb + WHERE "id" IN (${Prisma.join(v2Ids)}) + `; + } + + return count; } async lockRunToWorker( @@ -359,7 +444,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "DEQUEUED", @@ -403,7 +488,7 @@ export class PostgresRunStore implements RunStore { include: { runtimeEnvironment: true, }, - }); + }) as Promise>; } async parkPendingVersion( @@ -414,7 +499,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING_VERSION", @@ -430,7 +515,7 @@ export class PostgresRunStore implements RunStore { ): Promise<{ count: number }> { const prisma = tx ?? this.prisma; - const result = await prisma.taskRun.updateMany({ + const result = await this.runModel(prisma, runId).updateMany({ where: { id: runId, status: "PENDING_VERSION" }, data: { status: "PENDING" }, }); @@ -445,7 +530,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "WAITING_TO_RESUME" }, include: args.include, @@ -459,7 +544,7 @@ export class PostgresRunStore implements RunStore { ): Promise> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "EXECUTING" }, select: args.select, @@ -473,7 +558,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { delayUntil: data.delayUntil, @@ -503,7 +588,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data: { status: "PENDING", @@ -519,7 +604,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId }, data, include: { @@ -540,16 +625,17 @@ export class PostgresRunStore implements RunStore { tx?: PrismaClientOrTransaction ): Promise<{ count: number }> { const prisma = tx ?? this.prisma; + const model = this.runModel(prisma, runId); if (options.expectedMetadataVersion !== undefined) { - const result = await prisma.taskRun.updateMany({ + const result = await model.updateMany({ where: { id: runId, metadataVersion: options.expectedMetadataVersion }, data, }); return { count: result.count }; } - await prisma.taskRun.update({ + await model.update({ where: { id: runId }, data, }); @@ -563,7 +649,7 @@ export class PostgresRunStore implements RunStore { const prisma = tx ?? this.prisma; if (params.byId) { - const result = await prisma.taskRun.updateMany({ + const result = await this.runModel(prisma, params.byId.runId).updateMany({ where: { id: params.byId.runId, idempotencyKey: params.byId.idempotencyKey }, data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, }); @@ -571,23 +657,48 @@ export class PostgresRunStore implements RunStore { } if (params.byPredicate) { + // No run id to route by: a matching run could be in either table during + // the mixed window, so run the predicate against both and sum the counts. + const where = { + idempotencyKey: params.byPredicate.idempotencyKey, + taskIdentifier: params.byPredicate.taskIdentifier, + runtimeEnvironmentId: params.byPredicate.runtimeEnvironmentId, + }; + const data = { idempotencyKey: null, idempotencyKeyExpiresAt: null }; + + const [legacy, v2] = await Promise.all([ + prisma.taskRun.updateMany({ where, data }), + (prisma.taskRunV2 as unknown as typeof prisma.taskRun).updateMany({ where, data }), + ]); + + return { count: legacy.count + v2.count }; + } + + // byFriendlyIds — only clears idempotencyKey, not idempotencyKeyExpiresAt. + // The friendlyId carries the same KSUID/cuid body as the id, so it routes + // the same way; partition the (possibly mixed) array and sum the counts. + const v2FriendlyIds = params.byFriendlyIds.filter((friendlyId) => isKsuidId(friendlyId)); + const legacyFriendlyIds = params.byFriendlyIds.filter((friendlyId) => !isKsuidId(friendlyId)); + + let count = 0; + + if (legacyFriendlyIds.length > 0) { const result = await prisma.taskRun.updateMany({ - where: { - idempotencyKey: params.byPredicate.idempotencyKey, - taskIdentifier: params.byPredicate.taskIdentifier, - runtimeEnvironmentId: params.byPredicate.runtimeEnvironmentId, - }, - data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + where: { friendlyId: { in: legacyFriendlyIds } }, + data: { idempotencyKey: null }, }); - return { count: result.count }; + count += result.count; } - // byFriendlyIds — only clears idempotencyKey, not idempotencyKeyExpiresAt - const result = await prisma.taskRun.updateMany({ - where: { friendlyId: { in: params.byFriendlyIds } }, - data: { idempotencyKey: null }, - }); - return { count: result.count }; + if (v2FriendlyIds.length > 0) { + const result = await (prisma.taskRunV2 as unknown as typeof prisma.taskRun).updateMany({ + where: { friendlyId: { in: v2FriendlyIds } }, + data: { idempotencyKey: null }, + }); + count += result.count; + } + + return { count }; } async pushTags( @@ -598,7 +709,7 @@ export class PostgresRunStore implements RunStore { ): Promise<{ updatedAt: Date }> { const prisma = tx ?? this.prisma; - return prisma.taskRun.update({ + return this.runModel(prisma, runId).update({ where: { id: runId, runtimeEnvironmentId: where.runtimeEnvironmentId }, data: { runTags: { push: tags } }, select: { updatedAt: true }, @@ -612,7 +723,7 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = tx ?? this.prisma; - await prisma.taskRun.update({ + await this.runModel(prisma, runId).update({ where: { id: runId }, data: { realtimeStreams: { push: streamId } }, }); @@ -639,10 +750,15 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return prisma.taskRun.findFirst({ - where, - ...args, - }); + const routingKey = this.#routingKeyOf(where); + if (routingKey !== undefined) { + // by id / friendlyId: the id format picks exactly one table, O(1). + return this.runModel(prisma, routingKey).findFirst({ where, ...args }); + } + + // Non-id predicate (e.g. idempotency-key dedup): the match can be in + // either table, so read both. + return this.#findFirstAcrossTables(prisma, where, args); } findRunOrThrow( @@ -666,10 +782,19 @@ export class PostgresRunStore implements RunStore { ): Promise { const { args, prisma } = this.#resolveReadArgs(argsOrClient, client); - return prisma.taskRun.findFirstOrThrow({ - where, - ...args, - }); + const routingKey = this.#routingKeyOf(where); + if (routingKey !== undefined) { + return this.runModel(prisma, routingKey).findFirstOrThrow({ where, ...args }); + } + + // Non-id predicate: read both tables, then enforce the throw-on-miss + // contract ourselves (neither table's findFirstOrThrow could see the + // other's row). + const run = await this.#findFirstAcrossTables(prisma, where, args); + if (run === null || run === undefined) { + throw new Error("PostgresRunStore.findRunOrThrow: no run matched the predicate"); + } + return run; } findRuns( @@ -718,7 +843,298 @@ export class PostgresRunStore implements RunStore { ): Promise { const prisma = client ?? this.readOnlyPrisma; - return prisma.taskRun.findMany(args); + // A run lives in exactly one physical table, chosen by its id format, so a + // multi-row read must hit BOTH `TaskRun` (legacy cuid) and `task_run_v2` + // (new ksuid) and combine. `task_run_v2` is an identical clone of `TaskRun` + // (same relation surface), so the SAME `args` — crucially the SAME `where`, + // which is the security scope — run unchanged against either delegate. + const legacyModel = prisma.taskRun; + const v2Model = prisma.taskRunV2 as unknown as typeof prisma.taskRun; + + const ordered = this.#normalizeOrderBy(args.orderBy); + + // ORDERED + LIMITED → bounded 2-way merge. + // + // A single Prisma `cursor` addresses one table's row and cannot span two + // tables, so reject it on this path rather than silently paginating one + // table. (No current caller pairs `cursor` with `orderBy`+`take`; keyset + // callers carry the cursor in `where`, which both queries honor.) + if (ordered.length > 0 && args.take !== undefined) { + if (args.cursor !== undefined) { + throw new Error( + "RunStore.findRuns: a Prisma `cursor` cannot address two tables on an ordered+limited read. " + + "Use a where-based keyset (e.g. `where: { createdAt: { lt: X } }`) instead." + ); + } + + const comparator = this.#buildCrossTableComparator(ordered); + + // The in-memory comparator reads the order keys off each row, so they + // MUST be in the projection. If the caller's `select` omits one, add it + // for the query and strip it from the output. (`include`/full-row already + // carry every scalar.) + const { args: queryArgs, addedKeys } = this.#withOrderKeysSelected(args, ordered); + + // Take at most `take` from each table: the merged head of two ordered + // streams of length `take` is fully determined by their first `take` rows. + const perTableArgs = { ...queryArgs, take: args.take }; + + const [legacyRows, v2Rows] = (await Promise.all([ + legacyModel.findMany(perTableArgs), + v2Model.findMany(perTableArgs), + ])) as [Array>, Array>]; + + const merged = this.#mergeOrdered(legacyRows, v2Rows, comparator, args.take); + return this.#stripAddedKeys(merged, addedKeys); + } + + // UNORDERED / NO-LIMIT (or `take` without `orderBy`) → run the SAME args + // against both tables and concatenate. A run is in exactly one table, so + // concatenation is complete and has no duplicates. + // + // `orderBy` without `take` still needs the order keys projected so the + // whole-set re-sort below can read them. + const { args: queryArgs, addedKeys } = + ordered.length > 0 + ? this.#withOrderKeysSelected(args, ordered) + : { args, addedKeys: [] as string[] }; + + const [legacyRows, v2Rows] = (await Promise.all([ + legacyModel.findMany(queryArgs), + v2Model.findMany(queryArgs), + ])) as [Array>, Array>]; + + let combined = legacyRows.concat(v2Rows); + + // `orderBy` without `take`: each table came back ordered, but the + // concatenation is not — re-sort the whole bounded set to honor the order. + if (ordered.length > 0) { + const comparator = this.#buildCrossTableComparator(ordered); + combined = combined.sort(comparator); + } + + // `take` without `orderBy`: an unordered cap. Each table was capped at + // `take`, so the concatenation is at most `2*take`; trim to `take`. Order + // among unordered rows is unspecified either way. + if (args.take !== undefined) { + combined = combined.slice(0, args.take); + } + + return this.#stripAddedKeys(combined, addedKeys); + } + + /** + * The cross-table merge/sort compares order-key VALUES read off each returned + * row, so every scalar order key must be present in the projection. When the + * caller passes a `select` that omits an order key, add it (so the row carries + * the value) and record which keys were added so they can be stripped from the + * final output — the caller asked not to see them. A query with `include`, or + * with neither `select` nor `include` (full row), already returns every scalar + * column, so nothing is added. + */ + #withOrderKeysSelected( + args: { + where: Prisma.TaskRunWhereInput; + select?: Prisma.TaskRunSelect; + include?: Prisma.TaskRunInclude; + orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[]; + take?: number; + skip?: number; + cursor?: Prisma.TaskRunWhereUniqueInput; + }, + ordered: Array<{ key: string; direction: "asc" | "desc" }> + ): { + args: typeof args; + addedKeys: string[]; + } { + // The merge always tiebreaks on `id`, so it must be readable too. + const requiredKeys = new Set([...ordered.map((entry) => entry.key), "id"]); + + if (!args.select) { + // include / full-row: all scalars are present already. + return { args, addedKeys: [] }; + } + + const select = args.select as Record; + const addedKeys: string[] = []; + const augmentedSelect: Record = { ...select }; + + for (const key of requiredKeys) { + if (!(key in augmentedSelect)) { + augmentedSelect[key] = true; + addedKeys.push(key); + } + } + + if (addedKeys.length === 0) { + return { args, addedKeys: [] }; + } + + return { args: { ...args, select: augmentedSelect as Prisma.TaskRunSelect }, addedKeys }; + } + + /** Remove the order-key columns that were added purely to drive the merge. */ + #stripAddedKeys( + rows: Array>, + addedKeys: string[] + ): Array> { + if (addedKeys.length === 0) { + return rows; + } + + for (const row of rows) { + for (const key of addedKeys) { + delete row[key]; + } + } + + return rows; + } + + /** + * Normalize the optional `orderBy` (single object or array) into an array of + * single-key order entries, preserving precedence. An empty array means "no + * ordering requested". + */ + #normalizeOrderBy( + orderBy: + | Prisma.TaskRunOrderByWithRelationInput + | Prisma.TaskRunOrderByWithRelationInput[] + | undefined + ): Array<{ key: string; direction: "asc" | "desc" }> { + if (orderBy === undefined) { + return []; + } + + const list = Array.isArray(orderBy) ? orderBy : [orderBy]; + const entries: Array<{ key: string; direction: "asc" | "desc" }> = []; + + for (const clause of list) { + for (const [key, value] of Object.entries(clause)) { + // Only scalar `{ field: "asc" | "desc" }` entries are mergeable in + // memory. A relation/nested sort (value is an object) can't be compared + // here — flag it rather than mis-order across the two tables. + if (value === "asc" || value === "desc") { + entries.push({ key, direction: value }); + } else { + throw new Error( + `RunStore.findRuns: cannot merge across tables on a non-scalar orderBy key "${key}". ` + + "Ordered+limited cross-table reads must order by a scalar column (a time/createdAt field, with id as a tiebreak)." + ); + } + } + } + + return entries; + } + + /** + * Build a total-order comparator from the requested scalar order keys. + * + * The cross-table merge is only correct when the order is a TOTAL order over + * the union of both tables. A time-based column (`createdAt`, or any other + * Date column) provides that; `id` alone does NOT — a cuid and a ksuid live + * in different, non-interleaving id spaces, so ordering the union by `id` + * lexicographically is meaningless. Require a time/createdAt key to lead (or + * appear in) the order, and use `id` only as a within-timestamp tiebreak. + */ + #buildCrossTableComparator( + ordered: Array<{ key: string; direction: "asc" | "desc" }> + ): (a: Record, b: Record) => number { + const hasTimeKey = ordered.some((entry) => this.#isTimeOrderKey(entry.key)); + + if (!hasTimeKey) { + const keys = ordered.map((entry) => entry.key).join(", "); + throw new Error( + `RunStore.findRuns: ordered+limited read orders by [${keys}], which is not a valid total order across the ` + + "legacy TaskRun (cuid) and task_run_v2 (ksuid) tables. Order by a time/createdAt column (id may follow as a tiebreak)." + ); + } + + // Ensure `id` is present as a final tiebreak so the merge is deterministic + // when two rows share the leading timestamp. Use the direction of the + // leading order key for the tiebreak. + const comparators = [...ordered]; + if (!comparators.some((entry) => entry.key === "id")) { + comparators.push({ key: "id", direction: ordered[0].direction }); + } + + return (a, b) => { + for (const { key, direction } of comparators) { + const cmp = this.#compareValues(a[key], b[key]); + if (cmp !== 0) { + return direction === "asc" ? cmp : -cmp; + } + } + return 0; + }; + } + + /** + * A column is a valid cross-table total-order lead when it is time-based. + * `createdAt` is the canonical one; the other Date columns the callers use + * (`updatedAt`, `completedAt`, etc.) qualify too. The selected/included row + * must carry the column for the comparator to read it. + */ + #isTimeOrderKey(key: string): boolean { + return ( + key === "createdAt" || + key === "updatedAt" || + key === "completedAt" || + key === "startedAt" || + key === "queuedAt" || + key === "lockedAt" || + key === "delayUntil" || + key === "expiredAt" + ); + } + + /** Ascending comparison of two scalar order values (Date, number, string). */ + #compareValues(a: unknown, b: unknown): number { + if (a === b) return 0; + // Nulls sort last (Prisma's default for `nulls: "last"` is the common case; + // a stable, deterministic placement is what matters for the merge). + if (a === null || a === undefined) return 1; + if (b === null || b === undefined) return -1; + + if (a instanceof Date && b instanceof Date) { + return a.getTime() - b.getTime(); + } + if (typeof a === "number" && typeof b === "number") { + return a - b; + } + return String(a) < String(b) ? -1 : String(a) > String(b) ? 1 : 0; + } + + /** + * 2-way merge of two already-ordered streams into the first `take` rows of + * their combined order. Bounded: walks at most `take` steps. The two inputs + * are each `findMany`-ordered by the SAME order keys, so a single linear pass + * picking the smaller head under `comparator` yields the globally-correct head. + */ + #mergeOrdered( + left: Array>, + right: Array>, + comparator: (a: Record, b: Record) => number, + take: number + ): Array> { + const out: Array> = []; + let i = 0; + let j = 0; + + while (out.length < take && (i < left.length || j < right.length)) { + if (i >= left.length) { + out.push(right[j++]); + } else if (j >= right.length) { + out.push(left[i++]); + } else if (comparator(left[i], right[j]) <= 0) { + out.push(left[i++]); + } else { + out.push(right[j++]); + } + } + + return out; } /** diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 4183e85b40b..9cbaeb04ce6 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -2,6 +2,7 @@ import { createClient } from "@clickhouse/client"; import { PostgreSqlContainer, StartedPostgreSqlContainer } from "@testcontainers/postgresql"; import { RedisContainer, StartedRedisContainer } from "@testcontainers/redis"; import { tryCatch } from "@trigger.dev/core"; +import { PrismaClient } from "@trigger.dev/database"; import Redis from "ioredis"; import path from "path"; import { isDebug } from "std-env"; @@ -48,9 +49,50 @@ export async function pushDatabaseSchema(databaseUrl: string) { } ); + await dropRunForeignKeys(databaseUrl); + return result; } +/** + * Production drops every foreign key that sits on, or points at, the run tables (`TaskRun` and + * `task_run_v2`) — a run's id is just a scalar that may live in either physical table, so the FKs + * can't be enforced. `prisma db push` doesn't know that: it recreates a constraint for every + * relation still declared in schema.prisma, so the template DB ends up with run FKs production + * doesn't have. That makes tests diverge — e.g. inserting a child row (a `TaskRunExecutionSnapshot` + * whose `runId` is a `task_run_v2` id) trips a `..._runId_fkey -> TaskRun` constraint that doesn't + * exist in prod. So after the push we strip those FKs to match production exactly. + * + * This is done dynamically (rather than naming each constraint) so any relation added to the schema + * later has its test-only run FK stripped automatically. It only removes FK constraints, so it + * cannot corrupt valid data — it makes the template DB strictly more faithful to production. + */ +async function dropRunForeignKeys(databaseUrl: string) { + const prisma = new PrismaClient({ + datasources: { db: { url: databaseUrl } }, + }); + + try { + await prisma.$executeRawUnsafe(` +DO $$ +DECLARE r record; +BEGIN + FOR r IN + SELECT conrelid::regclass::text AS tbl, conname + FROM pg_constraint + WHERE contype = 'f' + AND (confrelid IN ('"TaskRun"'::regclass, 'task_run_v2'::regclass) + OR conrelid IN ('"TaskRun"'::regclass, 'task_run_v2'::regclass)) + LOOP + EXECUTE format('ALTER TABLE %s DROP CONSTRAINT %I', r.tbl, r.conname); + END LOOP; +END $$; +`); + } finally { + await prisma.$disconnect(); + } +} + /** * Caps each container's CPU/memory to approximate the 2-core CI runner locally (for timing + flake * reproduction). Set TESTCONTAINERS_CPU (cores per container, e.g. "2") and/or diff --git a/packages/core/src/v3/isomorphic/friendlyId.test.ts b/packages/core/src/v3/isomorphic/friendlyId.test.ts new file mode 100644 index 00000000000..e3221fce7f4 --- /dev/null +++ b/packages/core/src/v3/isomorphic/friendlyId.test.ts @@ -0,0 +1,99 @@ +import { describe, it, expect } from "vitest"; +import { + fromFriendlyId, + generateKsuid, + isKsuidId, + RunId, + toFriendlyId, +} from "./friendlyId.js"; + +const BASE62 = /^[0-9A-Za-z]+$/; + +describe("isKsuidId", () => { + it("is true for a freshly minted ksuid and its friendlyId", () => { + const { id, friendlyId } = RunId.generateKsuid(); + + expect(isKsuidId(id)).toBe(true); + expect(isKsuidId(friendlyId)).toBe(true); + }); + + it("is false for a legacy cuid id and its friendlyId", () => { + const { id, friendlyId } = RunId.generate(); + + // sanity: legacy cuid is 25 chars + expect(id.length).toBe(25); + expect(isKsuidId(id)).toBe(false); + expect(isKsuidId(friendlyId)).toBe(false); + }); + + it("is false for empty, prefix-only, and malformed input", () => { + expect(isKsuidId("")).toBe(false); + expect(isKsuidId("run_")).toBe(false); + + // 27 chars but contains a non-base62 char (`-`) + const twentySevenWithDash = `${"a".repeat(26)}-`; + expect(twentySevenWithDash).toHaveLength(27); + expect(isKsuidId(twentySevenWithDash)).toBe(false); + expect(isKsuidId(`run_${twentySevenWithDash}`)).toBe(false); + }); + + it("is false for a 26-char and a 28-char body", () => { + expect("a".repeat(26)).toHaveLength(26); + expect(isKsuidId("a".repeat(26))).toBe(false); + expect(isKsuidId("a".repeat(28))).toBe(false); + expect(isKsuidId(`run_${"a".repeat(26)}`)).toBe(false); + expect(isKsuidId(`run_${"a".repeat(28)}`)).toBe(false); + }); +}); + +describe("generateKsuid", () => { + it("produces a 27-char base62 body", () => { + const id = generateKsuid(); + + expect(id).toHaveLength(27); + expect(id).toMatch(BASE62); + }); + + it("produces unique ids across calls", () => { + const ids = new Set(Array.from({ length: 100 }, () => generateKsuid())); + + expect(ids.size).toBe(100); + }); + + it("round-trips through toFriendlyId / fromFriendlyId", () => { + const id = generateKsuid(); + const friendlyId = toFriendlyId("run", id); + + expect(friendlyId).toBe(`run_${id}`); + expect(fromFriendlyId(friendlyId)).toBe(id); + + const generated = RunId.generateKsuid(); + expect(generated.friendlyId).toBe(`run_${generated.id}`); + expect(RunId.fromFriendlyId(generated.friendlyId)).toBe(generated.id); + }); + + it("is time-ordered: a later timestamp sorts after an earlier one", () => { + // The timestamp lives in the high bytes, so a larger timestamp encodes to a + // lexicographically-greater (left-padded, fixed-width) base62 string. + const realNow = Date.now; + try { + Date.now = () => 1_500_000_000_000; + const earlier = generateKsuid(); + Date.now = () => 1_500_000_100_000; + const later = generateKsuid(); + + expect(later > earlier).toBe(true); + expect(isKsuidId(earlier)).toBe(true); + expect(isKsuidId(later)).toBe(true); + } finally { + Date.now = realNow; + } + }); +}); + +describe("isKsuidId and the minter agree", () => { + it("isKsuidId(generateKsuid().id) === true and isKsuidId(generate().id) === false", () => { + expect(isKsuidId(RunId.generateKsuid().id)).toBe(true); + expect(isKsuidId(RunId.generate().id)).toBe(false); + }); +}); diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 66575c7c178..ebcc8dfa284 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -11,6 +11,84 @@ export function generateInternalId() { return cuid(); } +// KSUID epoch (2014-05-13T16:53:20Z) — seconds offset applied to the unix timestamp. +const KSUID_EPOCH = 1_400_000_000; +const KSUID_TIMESTAMP_BYTES = 4; +const KSUID_PAYLOAD_BYTES = 16; +const KSUID_TOTAL_BYTES = KSUID_TIMESTAMP_BYTES + KSUID_PAYLOAD_BYTES; +const KSUID_STRING_LENGTH = 27; +const BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +/** Encode raw bytes as base62, left-padded to the given length. */ +function base62Encode(bytes: Uint8Array, length: number): string { + // Big-endian base-256 -> base-62 conversion (repeated division). + const digits = Array.from(bytes); + let result = ""; + + while (digits.length > 0) { + let remainder = 0; + const quotient: number[] = []; + + for (let i = 0; i < digits.length; i++) { + const acc = (digits[i] ?? 0) + remainder * 256; + const q = Math.floor(acc / 62); + remainder = acc % 62; + + if (quotient.length > 0 || q > 0) { + quotient.push(q); + } + } + + // `remainder` is always in [0, 61], so this index is always valid. + result = BASE62_ALPHABET.charAt(remainder) + result; + digits.length = 0; + digits.push(...quotient); + } + + return result.padStart(length, BASE62_ALPHABET.charAt(0)); +} + +/** + * Mint a KSUID body: a 27-char, base62, time-ordered identifier. + * + * Layout: 4-byte big-endian uint32 timestamp (seconds since the KSUID epoch) + * + 16 random bytes = 20 bytes, base62-encoded and left-padded to 27 chars. + * + * Isomorphic: relies only on `globalThis.crypto.getRandomValues` for randomness. + */ +export function generateKsuid(): string { + const bytes = new Uint8Array(KSUID_TOTAL_BYTES); + + const timestamp = Math.floor(Date.now() / 1000) - KSUID_EPOCH; + bytes[0] = (timestamp >>> 24) & 0xff; + bytes[1] = (timestamp >>> 16) & 0xff; + bytes[2] = (timestamp >>> 8) & 0xff; + bytes[3] = timestamp & 0xff; + + globalThis.crypto.getRandomValues(bytes.subarray(KSUID_TIMESTAMP_BYTES)); + + return base62Encode(bytes, KSUID_STRING_LENGTH); +} + +/** + * Pure string discriminator: is this id (or friendlyId) a KSUID-format body? + * + * Strips a leading `"_"` if present, then tests the body for the KSUID + * shape (27 chars, base62). The 25-char legacy cuid and any malformed input + * return false. Never throws. + */ +export function isKsuidId(idOrFriendlyId: string): boolean { + if (!idOrFriendlyId) { + return false; + } + + const underscoreIndex = idOrFriendlyId.indexOf("_"); + const body = + underscoreIndex === -1 ? idOrFriendlyId : idOrFriendlyId.slice(underscoreIndex + 1); + + return body.length === KSUID_STRING_LENGTH && /^[0-9A-Za-z]{27}$/.test(body); +} + /** Convert an internal ID to a friendly ID */ export function toFriendlyId(entityName: string, internalId: string): string { if (!entityName) { @@ -69,6 +147,16 @@ export class IdUtil { }; } + /** Mint an id whose body is a KSUID (27-char, base62, time-ordered). */ + generateKsuid() { + const internalId = generateKsuid(); + + return { + id: internalId, + friendlyId: this.toFriendlyId(internalId), + }; + } + toFriendlyId(internalId: string) { return toFriendlyId(this.entityName, internalId); }