Skip to content

Commit e76469a

Browse files
refactor(table): dispatcher uses batchTriggerAndWait + tag-based cancel
Switch the per-window cell fan-out from fire-and-forget tasks.trigger to tasks.batchTriggerAndWait. The dispatcher is now a single long-lived trigger.dev task that loops dispatcherStep until the table is exhausted; trigger.dev CRIU-checkpoints the parent during each wait so we don't pay compute while cells execute. Queue depth is bounded at WINDOW_SIZE per dispatch — no more flooding trigger.dev with a million queued runs. - dispatcher.ts builds payloads via the new shared buildPendingRuns helper and calls tasks.batchTriggerAndWait directly. Pre-stamps each cell to `queued` (jobId=null) so the UI flips instantly. - table-run-dispatcher.ts is now a plain while-true loop. No RUN_BUDGET_MS, no self-re-enqueue, no cold-start tax per window. Cancel: - New cancelCellRunsByTags(tags) paginates runs.list + runs.cancel(id). - cancelWorkflowGroupRuns fires the tag-sweep alongside the per-jobId queue.cancelJob path (preserved for auto-fire cells that have real jobIds from single tasks.trigger calls). - Trigger.dev acks the cancel → batchTriggerAndWait resumes → dispatcher observes the dispatch-row cancel flag → exits. Side fixes: - getAsyncBackendType returns 'trigger-dev' whenever taskContext.isInsideTask is true, regardless of TRIGGER_DEV_ENABLED env. The preview/dev-sim worker silently routing cell jobs to DatabaseJobQueue (no poller) is fixed without any env config change. - runWorkflowColumn skips the dispatcher entirely when trigger.dev is disabled, running cells inline via DatabaseJobQueue.runInline. HTTP response returns dispatchId: null in that mode. - runColumnContract response schema updated to dispatchId.nullable().
1 parent b315324 commit e76469a

5 files changed

Lines changed: 258 additions & 110 deletions

File tree

apps/sim/background/table-run-dispatcher.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
3-
import { task, tasks } from '@trigger.dev/sdk'
3+
import { task } from '@trigger.dev/sdk'
44
import { dispatcherStep } from '@/lib/table/dispatcher'
55

66
const logger = createLogger('TableRunDispatcherTask')
@@ -10,10 +10,12 @@ export interface TableRunDispatcherPayload {
1010
}
1111

1212
/**
13-
* Trigger.dev wrapper around `dispatcherStep`. Each task run processes one
14-
* window of rows and re-enqueues itself with `concurrencyKey: dispatchId` so
15-
* a single dispatch can't fork. Self-re-enqueue caps each task run's
16-
* duration; the persisted cursor handles crash recovery.
13+
* Trigger.dev wrapper around `dispatcherStep`. One task run holds the
14+
* dispatcher loop for the dispatch's entire lifetime — each iteration
15+
* processes a window of cells via `batchTriggerAndWait`, which checkpoints
16+
* the parent via CRIU during the wait so we don't pay compute while cells
17+
* execute. The cursor is persisted in DB; if this run crashes, trigger.dev
18+
* retries and the next attempt resumes from the persisted cursor.
1719
*/
1820
export const tableRunDispatcherTask = task({
1921
id: 'table-run-dispatcher',
@@ -26,16 +28,12 @@ export const tableRunDispatcherTask = task({
2628
run: async (payload: TableRunDispatcherPayload) => {
2729
const { dispatchId } = payload
2830
try {
29-
const result = await dispatcherStep(dispatchId)
30-
if (result === 'continue') {
31-
await tasks.trigger<typeof tableRunDispatcherTask>(
32-
'table-run-dispatcher',
33-
{ dispatchId },
34-
{ concurrencyKey: dispatchId }
35-
)
31+
while (true) {
32+
const result = await dispatcherStep(dispatchId)
33+
if (result === 'done') return
3634
}
3735
} catch (err) {
38-
logger.error(`[${dispatchId}] dispatcher step failed`, { error: toError(err).message })
36+
logger.error(`[${dispatchId}] dispatcher loop failed`, { error: toError(err).message })
3937
throw err
4038
}
4139
},

apps/sim/lib/api/contracts/tables.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -900,9 +900,11 @@ export const runColumnContract = defineRouteContract({
900900
/**
901901
* `dispatchId` is the id of the `table_run_dispatches` row created for
902902
* this run. The dispatcher task picks it up and crawls the table row by
903-
* row; clients receive cell + dispatch events via SSE.
903+
* row; clients receive cell + dispatch events via SSE. Null when
904+
* trigger.dev is disabled — in that mode cells run inline in-process and
905+
* no dispatch row is created.
904906
*/
905-
schema: successResponseSchema(z.object({ dispatchId: z.string().min(1) })),
907+
schema: successResponseSchema(z.object({ dispatchId: z.string().min(1).nullable() })),
906908
},
907909
})
908910

apps/sim/lib/core/async-jobs/config.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createLogger } from '@sim/logger'
2+
import { taskContext } from '@trigger.dev/core/v3'
23
import type { AsyncBackendType, JobQueueBackend } from '@/lib/core/async-jobs/types'
34
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
45

@@ -10,10 +11,15 @@ let cachedInlineBackend: JobQueueBackend | null = null
1011

1112
/**
1213
* Determines which async backend to use based on environment configuration.
13-
* Follows the fallback chain: trigger.dev → database
14+
* Falls back to the database backend when trigger.dev isn't enabled — except
15+
* when this process IS a trigger.dev worker (`taskContext.isInsideTask`), in
16+
* which case the SDK runtime is available regardless of env vars and we
17+
* always want to enqueue back through trigger.dev. Without this carve-out, a
18+
* worker pod missing `TRIGGER_DEV_ENABLED=true` silently routes cell jobs to
19+
* the database backend that nothing's draining.
1420
*/
1521
export function getAsyncBackendType(): AsyncBackendType {
16-
if (isTriggerDevEnabled) {
22+
if (isTriggerDevEnabled || taskContext.isInsideTask) {
1723
return 'trigger-dev'
1824
}
1925

apps/sim/lib/table/dispatcher.ts

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import { db } from '@sim/db'
22
import { tableRunDispatches, userTableRows } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
4+
import { toError } from '@sim/utils/errors'
45
import { generateId } from '@sim/utils/id'
6+
import { tasks } from '@trigger.dev/sdk'
57
import { and, asc, eq, gt, inArray, type SQL, sql } from 'drizzle-orm'
8+
import { writeWorkflowGroupState } from '@/lib/table/cell-write'
69
import { appendTableEvent } from '@/lib/table/events'
710
import type { TableRow } from '@/lib/table/types'
811
import {
9-
isGroupEligible,
10-
scheduleRunsForRows,
11-
type ScheduleOpts,
12+
buildPendingRuns,
13+
cellTagsFor,
14+
type RunGroupCellOptions,
1215
TABLE_CONCURRENCY_LIMIT,
1316
toTableRow,
1417
} from './workflow-columns'
@@ -207,36 +210,54 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
207210
return 'done'
208211
}
209212

210-
// Rows were bulk-cleared at click time, so the chunk is ready to enqueue
211-
// as-is. We only filter out cells the user cancelled mid-cascade (the
212-
// tombstone) and cells whose deps still aren't satisfied.
213-
const eligibleRows: TableRow[] = []
213+
// Strip rows the user cancelled mid-cascade (post-dispatch tombstones)
214+
// before running the shared eligibility filter — `buildPendingRuns`
215+
// doesn't know about the per-dispatch cancel tombstone.
216+
const tombstoneFiltered: TableRow[] = []
214217
for (const r of chunk) {
215218
const tableRow = toTableRow(r)
216-
const anyEligible = targetGroups.some((g) => {
217-
const exec = tableRow.executions?.[g.id]
218-
if (exec?.cancelledAt) {
219-
const cancelledAtMs = Date.parse(exec.cancelledAt)
220-
if (Number.isFinite(cancelledAtMs) && cancelledAtMs > dispatch.requestedAt.getTime()) {
221-
return false
222-
}
223-
}
224-
return isGroupEligible(g, tableRow, { isManualRun: true, mode: dispatch.mode })
219+
const tombstoned = dispatch.scope.groupIds.some((gid) => {
220+
const exec = tableRow.executions?.[gid]
221+
if (!exec?.cancelledAt) return false
222+
const cancelledAtMs = Date.parse(exec.cancelledAt)
223+
return Number.isFinite(cancelledAtMs) && cancelledAtMs > dispatch.requestedAt.getTime()
225224
})
226-
if (anyEligible) eligibleRows.push(tableRow)
225+
if (!tombstoned) tombstoneFiltered.push(tableRow)
227226
}
228227

228+
const pendingRuns = buildPendingRuns(table, tombstoneFiltered, {
229+
isManualRun: true,
230+
groupIds: dispatch.scope.groupIds,
231+
mode: dispatch.mode,
232+
})
233+
229234
// Cursor advances to the last position in this chunk regardless of
230235
// eligibility — otherwise a window full of skipped cells loops forever.
231236
const lastPosition = chunk[chunk.length - 1].position
232237

233-
if (eligibleRows.length > 0) {
234-
const scheduleOpts: ScheduleOpts = {
235-
isManualRun: true,
236-
groupIds: dispatch.scope.groupIds,
237-
mode: dispatch.mode,
238+
if (pendingRuns.length > 0) {
239+
await stampQueuedForBatch(pendingRuns)
240+
241+
// `batchTriggerAndWait` blocks the parent dispatcher until every cell
242+
// terminates (success / fail / cancel). Trigger.dev checkpoints the
243+
// parent during the wait via CRIU, so we don't pay compute. Bounds the
244+
// queue depth at WINDOW_SIZE per dispatch — no flooding trigger.dev.
245+
const items = pendingRuns.map((runOpts) => ({
246+
payload: runOpts,
247+
options: {
248+
concurrencyKey: runOpts.tableId,
249+
tags: cellTagsFor(runOpts),
250+
},
251+
}))
252+
try {
253+
await tasks.batchTriggerAndWait('workflow-group-cell', items)
254+
} catch (err) {
255+
logger.error(`[${dispatchId}] batchTriggerAndWait failed`, {
256+
error: toError(err).message,
257+
})
258+
// Don't bail the dispatch — terminal states are already in the DB
259+
// (workers wrote them) or will be reconciled on the next user click.
238260
}
239-
await scheduleRunsForRows(table, eligibleRows, scheduleOpts)
240261
}
241262

242263
await Promise.all([
@@ -252,6 +273,26 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
252273
return 'continue'
253274
}
254275

276+
/** Pre-batch stamp: write each cell's exec to `queued` before firing the
277+
* batch. We don't know per-cell jobIds (batchTriggerAndWait returns only a
278+
* batch handle), so `jobId` stays null. Workers update to `running` and
279+
* terminal states from inside their own run. */
280+
async function stampQueuedForBatch(pendingRuns: RunGroupCellOptions[]): Promise<void> {
281+
await Promise.allSettled(
282+
pendingRuns.map((runOpts) =>
283+
writeWorkflowGroupState(runOpts, {
284+
executionState: {
285+
status: 'queued',
286+
executionId: runOpts.executionId,
287+
jobId: null,
288+
workflowId: runOpts.workflowId,
289+
error: null,
290+
},
291+
})
292+
)
293+
)
294+
}
295+
255296
async function advanceCursor(dispatchId: string, newCursor: number): Promise<void> {
256297
await db
257298
.update(tableRunDispatches)

0 commit comments

Comments
 (0)