Skip to content

Commit 1a5ccdc

Browse files
committed
refactor(webapp): route tag and realtime-stream appends through RunStore
1 parent 2fbdc5d commit 1a5ccdc

3 files changed

Lines changed: 6 additions & 24 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
1010
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1111
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
12+
import { runStore } from "~/v3/runStore.server";
1213

1314
// Pull the existing tags out of a buffer entry's serialised payload so
1415
// the buffer-path response can dedup against them, matching the
@@ -84,14 +85,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
8485
if (newTags.length === 0) {
8586
return json({ message: "No new tags to add" }, { status: 200 });
8687
}
87-
const updated = await prisma.taskRun.update({
88-
where: {
89-
id: taskRun.id,
90-
runtimeEnvironmentId: env.id,
91-
},
92-
data: { runTags: { push: newTags } },
93-
select: { updatedAt: true },
94-
});
88+
const updated = await runStore.pushTags(taskRun.id, newTags, { runtimeEnvironmentId: env.id }, prisma);
9589
// Publish a run-changed record with the NEW tag set so tag feeds reindex
9690
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
9791
publishChangeRecord({

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { $replica, prisma } from "~/db.server";
66
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
77
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
88
import { ServiceValidationError } from "~/v3/services/common.server";
9+
import { runStore } from "~/v3/runStore.server";
910

1011
const ParamsSchema = z.object({
1112
runId: z.string(),
@@ -87,16 +88,7 @@ const { action } = createActionApiRoute(
8788
}
8889

8990
if (!targetRun.realtimeStreams.includes(params.streamId)) {
90-
await prisma.taskRun.update({
91-
where: {
92-
id: targetRun.id,
93-
},
94-
data: {
95-
realtimeStreams: {
96-
push: params.streamId,
97-
},
98-
},
99-
});
91+
await runStore.pushRealtimeStream(targetRun.id, params.streamId, prisma);
10092
}
10193

10294
const part = await request.text();

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
createActionApiRoute,
77
createLoaderApiRoute,
88
} from "~/services/routeBuilders/apiBuilder.server";
9+
import { runStore } from "~/v3/runStore.server";
910

1011
const ParamsSchema = z.object({
1112
runId: z.string(),
@@ -86,12 +87,7 @@ const { action } = createActionApiRoute(
8687
}
8788

8889
if (!target.realtimeStreams.includes(params.streamId)) {
89-
await prisma.taskRun.update({
90-
where: { id: target.id },
91-
data: {
92-
realtimeStreams: { push: params.streamId },
93-
},
94-
});
90+
await runStore.pushRealtimeStream(target.id, params.streamId, prisma);
9591
}
9692

9793
const realtimeStream = getRealtimeStreamInstance(

0 commit comments

Comments
 (0)