-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: route new task runs to a parallel task_run_v2 table #4000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: runstore-read-path
Are you sure you want to change the base?
Changes from all commits
650a081
40aea1b
72af7aa
1e60662
0a591fb
f8c1a04
e174341
37b7f97
658b385
47610ee
912a504
3549341
435e895
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags"; | ||
|
|
||
| /** | ||
| * Per-org cutover switch for the parallel `task_run_v2` run table. | ||
| * | ||
| * Read in memory from `Organization.featureFlags` (already loaded on the | ||
| * AuthenticatedEnvironment at API-key auth, so this adds no DB query) at the | ||
| * single run-id mint site in the trigger path. On → mint a KSUID id, which | ||
| * routes the run to `task_run_v2`; off (the default) → mint a legacy id, which | ||
| * routes to `TaskRun`. | ||
| * | ||
| * RunStore never reads this flag: it routes purely by id format. The flag only | ||
| * decides which id scheme is minted upstream. Disabling it sends only NEW runs | ||
| * back to legacy; runs already created on v2 stay readable there (routed by id). | ||
| */ | ||
| export function shouldUseV2RunTable(orgFeatureFlags: unknown): boolean { | ||
| if (orgFeatureFlags === null || typeof orgFeatureFlags !== "object") { | ||
| return false; | ||
| } | ||
|
|
||
| const override = (orgFeatureFlags as Record<string, unknown>)[FEATURE_FLAG.runTableV2]; | ||
| if (override === undefined) { | ||
| return false; | ||
| } | ||
|
|
||
| const parsed = FeatureFlagCatalog[FEATURE_FLAG.runTableV2].safeParse(override); | ||
| return parsed.success ? parsed.data : false; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import { describe, expect, it } from "vitest"; | ||
| import { shouldUseV2RunTable } from "~/v3/runTableV2.server"; | ||
|
|
||
| describe("shouldUseV2RunTable", () => { | ||
| it("defaults to false when the org has no flags", () => { | ||
| expect(shouldUseV2RunTable(null)).toBe(false); | ||
| expect(shouldUseV2RunTable(undefined)).toBe(false); | ||
| expect(shouldUseV2RunTable({})).toBe(false); | ||
| }); | ||
|
|
||
| it("returns true only when the flag is the boolean true", () => { | ||
| expect(shouldUseV2RunTable({ runTableV2: true })).toBe(true); | ||
| expect(shouldUseV2RunTable({ runTableV2: false })).toBe(false); | ||
| }); | ||
|
|
||
| it("rejects a stringified flag value (strict boolean, no coercion)", () => { | ||
| // A stringified "false" must not coerce to true and cut the org over. | ||
| expect(shouldUseV2RunTable({ runTableV2: "true" })).toBe(false); | ||
| expect(shouldUseV2RunTable({ runTableV2: "false" })).toBe(false); | ||
| expect(shouldUseV2RunTable({ runTableV2: 1 })).toBe(false); | ||
| }); | ||
|
|
||
| it("ignores unrelated flags and non-object inputs", () => { | ||
| expect(shouldUseV2RunTable({ mollifierEnabled: true })).toBe(false); | ||
| expect(shouldUseV2RunTable("runTableV2")).toBe(false); | ||
| expect(shouldUseV2RunTable(42)).toBe(false); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,125 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { ClickHouse } from "@internal/clickhouse"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { replicationContainerTest } from "@internal/testcontainers"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { RunId } from "@trigger.dev/core/v3/isomorphic"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { setTimeout } from "node:timers/promises"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { z } from "zod"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { RunsReplicationService } from "~/services/runsReplicationService.server"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { createInMemoryTracing } from "./utils/tracing"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| vi.setConfig({ testTimeout: 60_000 }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| describe("RunsReplicationService (task_run_v2)", () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| replicationContainerTest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "co-publishes task_run_v2 and streams its rows to the same ClickHouse table", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Both tables are in the publication; both need FULL identity so the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // delete transform can read the old row. INSERTs (this test) carry the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // full new tuple regardless, but we mirror the production setup. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const clickhouse = new ClickHouse({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| url: clickhouseContainer.getConnectionUrl(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name: "runs-replication", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| compression: { request: true }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logLevel: "warn", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { tracer } = createInMemoryTracing(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const runsReplicationService = new RunsReplicationService({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pgConnectionUrl: postgresContainer.getConnectionUri(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| serviceName: "runs-replication", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| slotName: "task_runs_to_clickhouse_v1", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| publicationName: "task_runs_to_clickhouse_v1_publication", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redisOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxFlushConcurrency: 1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| flushIntervalMs: 100, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| flushBatchSize: 1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| leaderLockTimeoutMs: 5000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| leaderLockExtendIntervalMs: 1000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ackIntervalSeconds: 5, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tracer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logLevel: "warn", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await runsReplicationService.start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const organization = await prisma.organization.create({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: { title: "test", slug: "test" }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const project = await prisma.project.create({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| slug: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId: organization.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| externalRef: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const runtimeEnvironment = await prisma.runtimeEnvironment.create({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| slug: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type: "DEVELOPMENT", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| projectId: project.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId: organization.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| apiKey: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pkApiKey: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| shortcode: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // A v2 run lives in task_run_v2, keyed by a KSUID id. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const ksuid = RunId.generateKsuid(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const run = await prisma.taskRunV2.create({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| id: ksuid.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| friendlyId: ksuid.friendlyId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskIdentifier: "my-task", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| payload: JSON.stringify({ foo: "bar" }), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| payloadType: "application/json", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| traceId: "v2trace", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| spanId: "v2span", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| queue: "test", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| workerQueue: "us-east-1-next", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| region: "us-east-1", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planType: "free", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| runtimeEnvironmentId: runtimeEnvironment.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| projectId: project.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId: organization.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| environmentType: "DEVELOPMENT", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| engine: "V2", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await setTimeout(1000); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const queryRuns = clickhouse.reader.query({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name: "runs-replication", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| schema: z.any(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params: z.object({ runId: z.string() }), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const [queryError, result] = await queryRuns({ runId: run.id }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+96
to
+106
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace fixed sleep with bounded polling to avoid flaky replication timing. Line 96 uses a fixed Proposed fix- await setTimeout(1000);
-
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
schema: z.any(),
params: z.object({ runId: z.string() }),
});
- const [queryError, result] = await queryRuns({ runId: run.id });
+ let queryError: unknown = null;
+ let result: unknown[] | undefined;
+ const deadline = Date.now() + 10_000;
+
+ do {
+ [queryError, result] = await queryRuns({ runId: run.id });
+ if (!queryError && result?.length === 1) break;
+ await setTimeout(200);
+ } while (Date.now() < deadline);📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expect(queryError).toBeNull(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expect(result?.length).toBe(1); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expect(result?.[0]).toEqual( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expect.objectContaining({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| run_id: run.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| friendly_id: run.friendlyId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| task_identifier: "my-task", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| environment_id: runtimeEnvironment.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| project_id: project.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organization_id: organization.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| environment_type: "DEVELOPMENT", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| engine: "V2", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await runsReplicationService.stop(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always stop replication service in
finally.If any assertion throws before Line 122, the service keeps running and can interfere with later tests in the same worker.
Proposed fix
Also applies to: 122-123