Skip to content

Commit 3549341

Browse files
committed
feat(webapp): stream task_run_v2 into ClickHouse
runsReplicationService co-publishes task_run_v2 alongside TaskRun. It is a column-identical clone, so its WAL rows flow through the same transform into the same ClickHouse table, keeping the mirror complete once orgs cut over to v2 run ids. task_run_v2 needs REPLICA IDENTITY FULL, applied the same out-of-band way as TaskRun, so update and delete events carry the old row.
1 parent 912a504 commit 3549341

3 files changed

Lines changed: 134 additions & 0 deletions

File tree

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ export class RunsReplicationService {
227227
slotName: options.slotName,
228228
publicationName: options.publicationName,
229229
table: "TaskRun",
230+
// task_run_v2 is a column-identical clone of TaskRun, so its WAL rows
231+
// flow through the same handler/transform into the same ClickHouse table.
232+
// Co-publishing it keeps the ClickHouse mirror complete once orgs cut over
233+
// to v2 run ids; until then the table is empty and this is a no-op.
234+
additionalTables: ["task_run_v2"],
230235
redisOptions: options.redisOptions,
231236
autoAcknowledge: false,
232237
publicationActions: ["insert", "update", "delete"],
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { replicationContainerTest } from "@internal/testcontainers";
3+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
4+
import { setTimeout } from "node:timers/promises";
5+
import { z } from "zod";
6+
import { RunsReplicationService } from "~/services/runsReplicationService.server";
7+
import { createInMemoryTracing } from "./utils/tracing";
8+
import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory";
9+
10+
vi.setConfig({ testTimeout: 60_000 });
11+
12+
describe("RunsReplicationService (task_run_v2)", () => {
13+
replicationContainerTest(
14+
"co-publishes task_run_v2 and streams its rows to the same ClickHouse table",
15+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
16+
// Both tables are in the publication; both need FULL identity so the
17+
// delete transform can read the old row. INSERTs (this test) carry the
18+
// full new tuple regardless, but we mirror the production setup.
19+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
20+
await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`);
21+
22+
const clickhouse = new ClickHouse({
23+
url: clickhouseContainer.getConnectionUrl(),
24+
name: "runs-replication",
25+
compression: { request: true },
26+
logLevel: "warn",
27+
});
28+
29+
const { tracer } = createInMemoryTracing();
30+
31+
const runsReplicationService = new RunsReplicationService({
32+
clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse),
33+
pgConnectionUrl: postgresContainer.getConnectionUri(),
34+
serviceName: "runs-replication",
35+
slotName: "task_runs_to_clickhouse_v1",
36+
publicationName: "task_runs_to_clickhouse_v1_publication",
37+
redisOptions,
38+
maxFlushConcurrency: 1,
39+
flushIntervalMs: 100,
40+
flushBatchSize: 1,
41+
leaderLockTimeoutMs: 5000,
42+
leaderLockExtendIntervalMs: 1000,
43+
ackIntervalSeconds: 5,
44+
tracer,
45+
logLevel: "warn",
46+
});
47+
48+
await runsReplicationService.start();
49+
50+
const organization = await prisma.organization.create({
51+
data: { title: "test", slug: "test" },
52+
});
53+
const project = await prisma.project.create({
54+
data: {
55+
name: "test",
56+
slug: "test",
57+
organizationId: organization.id,
58+
externalRef: "test",
59+
},
60+
});
61+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
62+
data: {
63+
slug: "test",
64+
type: "DEVELOPMENT",
65+
projectId: project.id,
66+
organizationId: organization.id,
67+
apiKey: "test",
68+
pkApiKey: "test",
69+
shortcode: "test",
70+
},
71+
});
72+
73+
// A v2 run lives in task_run_v2, keyed by a KSUID id.
74+
const ksuid = RunId.generateKsuid();
75+
const run = await prisma.taskRunV2.create({
76+
data: {
77+
id: ksuid.id,
78+
friendlyId: ksuid.friendlyId,
79+
taskIdentifier: "my-task",
80+
payload: JSON.stringify({ foo: "bar" }),
81+
payloadType: "application/json",
82+
traceId: "v2trace",
83+
spanId: "v2span",
84+
queue: "test",
85+
workerQueue: "us-east-1-next",
86+
region: "us-east-1",
87+
planType: "free",
88+
runtimeEnvironmentId: runtimeEnvironment.id,
89+
projectId: project.id,
90+
organizationId: organization.id,
91+
environmentType: "DEVELOPMENT",
92+
engine: "V2",
93+
},
94+
});
95+
96+
await setTimeout(1000);
97+
98+
const queryRuns = clickhouse.reader.query({
99+
name: "runs-replication",
100+
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
101+
schema: z.any(),
102+
params: z.object({ runId: z.string() }),
103+
});
104+
105+
const [queryError, result] = await queryRuns({ runId: run.id });
106+
107+
expect(queryError).toBeNull();
108+
expect(result?.length).toBe(1);
109+
expect(result?.[0]).toEqual(
110+
expect.objectContaining({
111+
run_id: run.id,
112+
friendly_id: run.friendlyId,
113+
task_identifier: "my-task",
114+
environment_id: runtimeEnvironment.id,
115+
project_id: project.id,
116+
organization_id: organization.id,
117+
environment_type: "DEVELOPMENT",
118+
engine: "V2",
119+
})
120+
);
121+
122+
await runsReplicationService.stop();
123+
}
124+
);
125+
});

apps/webapp/test/utils/replicationUtils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ export async function setupClickhouseReplication({
1717
redisOptions: RedisOptions;
1818
}) {
1919
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
20+
// task_run_v2 is co-published with TaskRun; it needs FULL identity too so
21+
// UPDATE/DELETE WAL events carry the old row (the delete transform reads
22+
// organizationId/environmentType off it). Mirrors the TaskRun line above.
23+
await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`);
2024

2125
const clickhouse = new ClickHouse({
2226
url: clickhouseUrl,

0 commit comments

Comments
 (0)