From c039d65e95dfd342e518a171ad39e9f0967895ec Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Fri, 20 Mar 2026 16:30:00 +0100 Subject: [PATCH] Add annotations to task runs --- .../v3/ApiRetrieveRunPresenter.server.ts | 3 ++ .../routes/api.v1.runs.$runParam.replay.ts | 2 +- .../routes/api.v1.tasks.$taskId.trigger.ts | 6 +++ apps/webapp/app/routes/api.v1.tasks.batch.ts | 3 ++ apps/webapp/app/routes/api.v2.tasks.batch.ts | 3 ++ .../resources.taskruns.$runParam.replay.ts | 1 + .../runEngine/services/batchTrigger.server.ts | 4 ++ .../runEngine/services/triggerTask.server.ts | 18 +++++++ .../webapp/app/v3/runEngineHandlers.server.ts | 2 + apps/webapp/app/v3/scheduleEngine.server.ts | 2 + .../app/v3/services/batchTriggerV3.server.ts | 6 +++ .../v3/services/bulk/BulkActionV2.server.ts | 1 + .../services/bulk/performBulkAction.server.ts | 2 +- .../app/v3/services/replayTaskRun.server.ts | 3 ++ .../webapp/app/v3/services/testTask.server.ts | 51 +++++++++++-------- .../app/v3/services/triggerTask.server.ts | 2 + .../database/prisma/schema.prisma | 3 ++ .../run-engine/src/engine/index.ts | 2 + .../run-engine/src/engine/types.ts | 1 + packages/cli-v3/src/apiClient.ts | 6 ++- packages/cli-v3/src/mcp/context.ts | 2 +- packages/cli-v3/src/mcp/tools/tasks.ts | 19 ++++--- packages/core/src/v3/schemas/api.ts | 20 ++++++++ 23 files changed, 126 insertions(+), 36 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 8d1a312c5d7..f203f76af07 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -1,5 +1,6 @@ import { AttemptStatus, + RunAnnotations, RunStatus, SerializedError, TaskRunError, @@ -56,6 +57,7 @@ const commonRunSelect = { }, }, runTags: true, + annotations: true, } satisfies Prisma.TaskRunSelect; type CommonRelatedRun = Prisma.Result< @@ -466,6 +468,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, + annotations: run.annotations ? RunAnnotations.safeParse(run.annotations).data : undefined, }; } diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts index 66f30ca1dff..3c65cc963da 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts @@ -42,7 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) { } const service = new ReplayTaskRunService(); - const newRun = await service.call(taskRun); + const newRun = await service.call(taskRun, { triggerSource: "api" }); if (!newRun) { return json({ error: "Failed to create new run" }, { status: 400 }); diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index d55c3659eea..3d1cfd969d0 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -36,6 +36,7 @@ export const HeadersSchema = z.object({ "x-trigger-engine-version": RunEngineVersionSchema.nullish(), "x-trigger-request-idempotency-key": z.string().nullish(), "x-trigger-realtime-streams-version": z.string().nullish(), + "x-trigger-source": z.string().nullish(), traceparent: z.string().optional(), tracestate: z.string().optional(), }); @@ -67,6 +68,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-engine-version": engineVersion, "x-trigger-request-idempotency-key": requestIdempotencyKey, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, } = headers; const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, { @@ -119,6 +121,10 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: body.options?.parentRunId + ? "sdk" + : triggerSourceHeader ?? "api", + triggerAction: "trigger", }, engineVersion ?? undefined ); diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 4a10ade0e60..99ab11cab22 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -72,6 +72,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-engine-version": engineVersion, "batch-processing-strategy": batchProcessingStrategy, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, traceparent, tracestate, } = headers; @@ -113,6 +114,8 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: triggerSourceHeader ?? undefined, + triggerAction: "trigger", }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index cd351505b50..ed3fd46ed09 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -62,6 +62,7 @@ const { action, loader } = createActionApiRoute( "batch-processing-strategy": batchProcessingStrategy, "x-trigger-request-idempotency-key": requestIdempotencyKey, "x-trigger-realtime-streams-version": realtimeStreamsVersion, + "x-trigger-source": triggerSourceHeader, traceparent, tracestate, } = headers; @@ -127,6 +128,8 @@ const { action, loader } = createActionApiRoute( realtimeStreamsVersion: determineRealtimeStreamsVersion( realtimeStreamsVersion ?? undefined ), + triggerSource: triggerSourceHeader ?? undefined, + triggerAction: "trigger", }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index eba924f409b..8a22822d06b 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => { ttlSeconds: submission.value.ttlSeconds, version: submission.value.version, prioritySeconds: submission.value.prioritySeconds, + triggerSource: "dashboard", }); if (!newRun) { diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 78427a001e1..c46880497b0 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = { spanParentAsLink?: boolean; oneTimeUseToken?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; /** @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine { batchId: batch.id, batchIndex: currentIndex, realtimeStreamsVersion: options?.realtimeStreamsVersion, + triggerSource: parentRunId ? "sdk" : options?.triggerSource ?? "api", + triggerAction: options?.triggerAction ?? "trigger", }, "V2" ); diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 2cc849e78de..f3de00476e0 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -289,6 +289,23 @@ export class RunEngineTriggerTaskService { const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); + // Build annotations for this run + const triggerSource = options.triggerSource ?? "api"; + const triggerAction = options.triggerAction ?? "trigger"; + const parentAnnotations = parentRun?.annotations as + | Record + | null + | undefined; + const annotations = { + triggerSource, + triggerAction, + rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource, + rootScheduleId: + (parentAnnotations?.rootScheduleId as string | undefined) || + options.scheduleId || + undefined, + }; + try { return await this.traceEventConcern.traceRun( triggerRequest, @@ -369,6 +386,7 @@ export class RunEngineTriggerTaskService { planType, realtimeStreamsVersion: options.realtimeStreamsVersion, debounce: body.options?.debounce, + annotations, // When debouncing with triggerAndWait, create a span for the debounced trigger onDebounced: body.options?.debounce && body.options?.resumeParentOnCompletion diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index df0df255de2..19b135614a6 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() { batchIndex: itemIndex, realtimeStreamsVersion: meta.realtimeStreamsVersion, planType: meta.planType, + triggerSource: meta.parentRunId ? "sdk" : "api", + triggerAction: "trigger", }, "V2" ); diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index ef3cabe64df..cbec21a3bdb 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -106,6 +106,8 @@ function createScheduleEngine() { scheduleInstanceId, queueTimestamp: exactScheduleTime, overrideCreatedAt: exactScheduleTime, + triggerSource: "schedule", + triggerAction: "trigger", } ); diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index e4bc583b7cc..686635406a1 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = { spanParentAsLink?: boolean; oneTimeUseToken?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; type RunItemData = { @@ -853,6 +855,10 @@ export class BatchTriggerV3Service extends BaseService { skipChecks: true, runFriendlyId: task.runId, realtimeStreamsVersion: options?.realtimeStreamsVersion, + triggerSource: task.item.options?.parentRunId + ? "sdk" + : options?.triggerSource ?? "api", + triggerAction: options?.triggerAction ?? "trigger", } ); diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 4ca558bc2b7..156b68bff59 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService { const [error, result] = await tryCatch( replayService.call(run, { bulkActionId: bulkActionId, + triggerSource: "dashboard", }) ); if (error) { diff --git a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts index dc2a8d11ded..e535fd7db32 100644 --- a/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts +++ b/apps/webapp/app/v3/services/bulk/performBulkAction.server.ts @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService { switch (item.group.type) { case "REPLAY": { const service = new ReplayTaskRunService(this._prisma); - const result = await service.call(item.sourceRun); + const result = await service.call(item.sourceRun, { triggerSource: "dashboard" }); await this._prisma.bulkActionItem.update({ where: { id: item.id }, diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index a5018f51c57..836611b3610 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -18,6 +18,7 @@ type OverrideOptions = { payload?: string; metadata?: unknown; bulkActionId?: string; + triggerSource?: string; } & RunOptionsData; export class ReplayTaskRunService extends BaseService { @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService { realtimeStreamsVersion: determineRealtimeStreamsVersion( existingTaskRun.realtimeStreamsVersion ), + triggerSource: overrideOptions.triggerSource ?? "api", + triggerAction: "replay", } ); diff --git a/apps/webapp/app/v3/services/testTask.server.ts b/apps/webapp/app/v3/services/testTask.server.ts index 0b64367f600..3ad2af22e51 100644 --- a/apps/webapp/app/v3/services/testTask.server.ts +++ b/apps/webapp/app/v3/services/testTask.server.ts @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService { switch (triggerSource) { case "STANDARD": { - const result = await triggerTaskService.call(data.taskIdentifier, environment, { - payload: data.payload, - options: { - test: true, - metadata: data.metadata, - delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined, - ttl: data.ttlSeconds, - idempotencyKey: data.idempotencyKey, - idempotencyKeyTTL: data.idempotencyKeyTTLSeconds - ? `${data.idempotencyKeyTTLSeconds}s` - : undefined, - queue: data.queue ? { name: data.queue } : undefined, - concurrencyKey: data.concurrencyKey, - maxAttempts: data.maxAttempts, - maxDuration: data.maxDurationSeconds, - tags: data.tags, - machine: data.machine, - region: data.region, - lockToVersion: data.version === "latest" ? undefined : data.version, - priority: data.prioritySeconds, + const result = await triggerTaskService.call( + data.taskIdentifier, + environment, + { + payload: data.payload, + options: { + test: true, + metadata: data.metadata, + delay: data.delaySeconds + ? new Date(Date.now() + data.delaySeconds * 1000) + : undefined, + ttl: data.ttlSeconds, + idempotencyKey: data.idempotencyKey, + idempotencyKeyTTL: data.idempotencyKeyTTLSeconds + ? `${data.idempotencyKeyTTLSeconds}s` + : undefined, + queue: data.queue ? { name: data.queue } : undefined, + concurrencyKey: data.concurrencyKey, + maxAttempts: data.maxAttempts, + maxDuration: data.maxDurationSeconds, + tags: data.tags, + machine: data.machine, + region: data.region, + lockToVersion: data.version === "latest" ? undefined : data.version, + priority: data.prioritySeconds, + }, }, - }); + { triggerSource: "dashboard", triggerAction: "test" } + ); return result?.run; } @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService { priority: data.prioritySeconds, }, }, - { customIcon: "scheduled" } + { customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" } ); return result?.run; diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 2ed34f0342c..000633fb73f 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = { replayedFromTaskRunFriendlyId?: string; planType?: string; realtimeStreamsVersion?: "v1" | "v2"; + triggerSource?: string; + triggerAction?: string; }; export class OutOfEntitlementError extends Error { diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 9e91fc70f14..d4bb2c35688 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -766,6 +766,9 @@ model TaskRun { metadataType String @default("application/json") metadataVersion Int @default(1) + /// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId + annotations Json? + /// Run output output String? outputType String @default("application/json") diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 6cf80bd46ca..e459dd85697 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -495,6 +495,7 @@ export class RunEngine { planType, realtimeStreamsVersion, debounce, + annotations, onDebounced, }: TriggerParams, tx?: PrismaClientOrTransaction @@ -668,6 +669,7 @@ export class RunEngine { createdAt: new Date(), } : undefined, + annotations, executionSnapshots: { create: { engine: "V2", diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 9af808754ce..dbe9036bdcf 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -222,6 +222,7 @@ export type TriggerParams = { mode?: "leading" | "trailing"; maxDelay?: string; }; + annotations?: Record; /** * Called when a run is debounced (existing delayed run found with triggerAndWait). * Return spanIdToComplete to enable span closing when the run completes. diff --git a/packages/cli-v3/src/apiClient.ts b/packages/cli-v3/src/apiClient.ts index 693b48d992e..53c15f11aa6 100644 --- a/packages/cli-v3/src/apiClient.ts +++ b/packages/cli-v3/src/apiClient.ts @@ -60,16 +60,19 @@ import { VERSION } from "./version.js"; export class CliApiClient { private engineURL: string; + private source: string; constructor( public readonly apiURL: string, // TODO: consider making this required public readonly accessToken?: string, - public readonly branch?: string + public readonly branch?: string, + options?: { source?: string } ) { this.apiURL = apiURL.replace(/\/$/, ""); this.engineURL = this.apiURL; this.branch = branch; + this.source = options?.source ?? "cli"; } async createAuthorizationCode() { @@ -819,6 +822,7 @@ export class CliApiClient { const headers: Record = { Authorization: `Bearer ${this.accessToken}`, "Content-Type": "application/json", + "x-trigger-source": this.source, }; if (this.branch) { diff --git a/packages/cli-v3/src/mcp/context.ts b/packages/cli-v3/src/mcp/context.ts index 886935db181..a372b545ab9 100644 --- a/packages/cli-v3/src/mcp/context.ts +++ b/packages/cli-v3/src/mcp/context.ts @@ -86,7 +86,7 @@ export class McpContext { public async getCliApiClient(branch?: string) { const auth = await this.getAuth(); - return new CliApiClient(auth.auth.apiUrl, auth.auth.accessToken, branch); + return new CliApiClient(auth.auth.apiUrl, auth.auth.accessToken, branch, { source: "mcp" }); } public async getApiClient(options: { diff --git a/packages/cli-v3/src/mcp/tools/tasks.ts b/packages/cli-v3/src/mcp/tools/tasks.ts index fda3cc8943e..7ea66a8b3b3 100644 --- a/packages/cli-v3/src/mcp/tools/tasks.ts +++ b/packages/cli-v3/src/mcp/tools/tasks.ts @@ -98,12 +98,7 @@ export const triggerTaskTool = { cwd: input.configPath, }); - const apiClient = await ctx.getApiClient({ - projectRef, - environment: input.environment, - scopes: ["write:tasks"], - branch: input.branch, - }); + const cliApiClient = await ctx.getCliApiClient(input.branch); ctx.logger?.log("triggering task", { input }); @@ -117,21 +112,25 @@ export const triggerTaskTool = { } } - const result = await apiClient.triggerTask(input.taskId, { + const result = await cliApiClient.triggerTaskRun(input.taskId, { payload, options: input.options, }); - const taskRunUrl = await ctx.getDashboardUrl(`/projects/v3/${projectRef}/runs/${result.id}`); + if (!result.success) { + return respondWithError(`Failed to trigger task ${input.taskId}: ${result.error}`); + } + + const runId = result.data.id; + const taskRunUrl = await ctx.getDashboardUrl(`/projects/v3/${projectRef}/runs/${runId}`); const contents = [ - `Task ${input.taskId} triggered and run with ID created: ${result.id}.`, + `Task ${input.taskId} triggered and run with ID created: ${runId}.`, `View the run in the dashboard: ${taskRunUrl}`, `Use the ${toolsMetadata.wait_for_run_to_complete.name} tool to wait for the run to complete and the ${toolsMetadata.get_run_details.name} tool to get the details of the run.`, ]; if (input.environment === "dev") { - const cliApiClient = await ctx.getCliApiClient(input.branch); const devStatus = await cliApiClient.getDevStatus(projectRef); const isConnected = devStatus.success ? devStatus.data.isConnected : false; const connectionMessage = isConnected diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 376392ddd14..6b1f35d926c 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -540,6 +540,25 @@ export const DeploymentTriggeredVia = z export type DeploymentTriggeredVia = z.infer; +export const TriggerSource = z + .enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"]) + .or(anyString); + +export type TriggerSource = z.infer; + +export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString); + +export type TriggerAction = z.infer; + +export const RunAnnotations = z.object({ + triggerSource: TriggerSource, + triggerAction: TriggerAction, + rootTriggerSource: TriggerSource, + rootScheduleId: z.string().optional(), +}); + +export type RunAnnotations = z.infer; + export const UpsertBranchRequestBody = z.object({ git: GitMeta.optional(), env: z.enum(["preview"]), @@ -1110,6 +1129,7 @@ const CommonRunFields = { baseCostInCents: z.number(), durationMs: z.number(), metadata: z.record(z.any()).optional(), + annotations: RunAnnotations.optional(), }; const RetrieveRunCommandFields = {