Skip to content

Commit 2cfd1f2

Browse files
feat(table): chunked dispatcher for workflow-column runs
Replaces the all-rows-at-once runWorkflowColumn with a row-window dispatcher backed by a new table_run_dispatches row. Each user click inserts a dispatch row and triggers a trigger.dev task that crawls the table 20 rows at a time, re-enqueueing itself between windows. The HTTP/Mothership entrypoints return { dispatchId } immediately instead of holding the request open for minutes on multi-thousand-row dispatches. - Per-row cancel stamps cancelledAt; the dispatcher skips cells whose cancelledAt > dispatch.requestedAt so a mid-cascade cancel sticks even under isManualRun. - Table-wide cancel marks active dispatches cancelled atomically so the dispatcher bails on its next iteration. - New 'dispatch' SSE event variant plumbed; client ignores for v1.
1 parent 08eeecb commit 2cfd1f2

13 files changed

Lines changed: 17523 additions & 122 deletions

File tree

apps/sim/app/api/table/[tableId]/columns/run/route.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { createLogger } from '@sim/logger'
2-
import { toError } from '@sim/utils/errors'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { runColumnContract } from '@/lib/api/contracts/tables'
54
import { parseRequest } from '@/lib/api/server'
@@ -30,21 +29,16 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
3029
const access = await checkAccess(tableId, auth.userId, 'write')
3130
if (!access.ok) return accessError(access, requestId, tableId)
3231

33-
// Dispatch in the background — large fan-outs (thousands of rows) issue
34-
// sequential trigger.dev calls and would otherwise hold the HTTP response
35-
// open for minutes, blocking the AI/copilot tool span and the UI mutation.
36-
void runWorkflowColumn({
32+
const { dispatchId } = await runWorkflowColumn({
3733
tableId,
3834
workspaceId,
3935
groupIds,
4036
mode: runMode,
4137
rowIds,
4238
requestId,
43-
}).catch((err) => {
44-
logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message)
4539
})
4640

47-
return NextResponse.json({ success: true, data: { triggered: null } })
41+
return NextResponse.json({ success: true, data: { dispatchId } })
4842
} catch (error) {
4943
if (error instanceof Error && error.message === 'Invalid workspace ID') {
5044
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
3+
import { task, tasks } from '@trigger.dev/sdk'
4+
import { dispatcherStep } from '@/lib/table/dispatcher'
5+
6+
const logger = createLogger('TableRunDispatcherTask')
7+
8+
export interface TableRunDispatcherPayload {
9+
dispatchId: string
10+
}
11+
12+
/**
13+
* Trigger.dev wrapper around `dispatcherStep`. Each task run processes one
14+
* window of rows and re-enqueues itself with `concurrencyKey: dispatchId` so
15+
* a single dispatch can't fork. Self-re-enqueue caps each task run's
16+
* duration; the persisted cursor handles crash recovery.
17+
*/
18+
export const tableRunDispatcherTask = task({
19+
id: 'table-run-dispatcher',
20+
machine: 'small-1x',
21+
retry: { maxAttempts: 3 },
22+
queue: {
23+
name: 'table-run-dispatcher',
24+
concurrencyLimit: 8,
25+
},
26+
run: async (payload: TableRunDispatcherPayload) => {
27+
const { dispatchId } = payload
28+
try {
29+
const result = await dispatcherStep(dispatchId)
30+
if (result === 'continue') {
31+
await tasks.trigger<typeof tableRunDispatcherTask>(
32+
'table-run-dispatcher',
33+
{ dispatchId },
34+
{ concurrencyKey: dispatchId }
35+
)
36+
}
37+
} catch (err) {
38+
logger.error(`[${dispatchId}] dispatcher step failed`, { error: toError(err).message })
39+
throw err
40+
}
41+
},
42+
})

apps/sim/lib/api/contracts/tables.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -898,11 +898,11 @@ export const runColumnContract = defineRouteContract({
898898
response: {
899899
mode: 'json',
900900
/**
901-
* `triggered` is `null` when the dispatcher runs in the background — the
902-
* actual count is only known after a fan-out that may be tens of thousands
903-
* of rows, and we don't hold the HTTP response open for that long.
901+
* `dispatchId` is the id of the `table_run_dispatches` row created for
902+
* this run. The dispatcher task picks it up and crawls the table row by
903+
* row; clients receive cell + dispatch events via SSE.
904904
*/
905-
schema: successResponseSchema(z.object({ triggered: z.number().nullable() })),
905+
schema: successResponseSchema(z.object({ dispatchId: z.string().min(1) })),
906906
},
907907
})
908908

apps/sim/lib/copilot/tools/server/table/user-table.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,24 +1415,19 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
14151415
}
14161416
const requestId = generateId().slice(0, 8)
14171417
assertNotAborted()
1418-
// Dispatch in the background — large fan-outs (thousands of rows)
1419-
// issue sequential trigger.dev calls and would otherwise hold the
1420-
// tool span open for minutes, blocking the chat connection.
1421-
void runWorkflowColumn({
1418+
const { dispatchId } = await runWorkflowColumn({
14221419
tableId: args.tableId,
14231420
workspaceId,
14241421
groupIds,
14251422
mode: runMode,
14261423
rowIds,
14271424
requestId,
1428-
}).catch((err) => {
1429-
logger.error(`[${requestId}] run_column dispatch failed`, err)
14301425
})
14311426
const scopeLabel = rowIds ? `${rowIds.length} row(s) by id` : runMode
14321427
return {
14331428
success: true,
14341429
message: `Started running ${groupIds.length} column(s) (${scopeLabel}). Cells will populate as workflows complete.`,
1435-
data: { triggered: null },
1430+
data: { dispatchId },
14361431
}
14371432
}
14381433

apps/sim/lib/table/cell-write.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ export async function markWorkflowGroupPickedUp(
161161
/** Builds the canonical `cancelled` execution state used by every cancel path.
162162
* Preserves `blockErrors` from the prior state so errored cells keep
163163
* rendering Error after a stop click — only cells that hadn't yet produced
164-
* a value or an error should flip to "Cancelled". */
164+
* a value or an error should flip to "Cancelled". `cancelledAt` is the
165+
* tombstone the dispatcher reads to skip re-runs of cells the user killed
166+
* mid-cascade. */
165167
export function buildCancelledExecution(
166168
prev: Pick<RowExecutionMetadata, 'executionId' | 'workflowId' | 'blockErrors'>
167169
): RowExecutionMetadata {
@@ -171,6 +173,7 @@ export function buildCancelledExecution(
171173
jobId: null,
172174
workflowId: prev.workflowId,
173175
error: 'Cancelled',
176+
cancelledAt: new Date().toISOString(),
174177
...(prev.blockErrors ? { blockErrors: prev.blockErrors } : {}),
175178
}
176179
}

apps/sim/lib/table/dispatcher.ts

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
import { db } from '@sim/db'
2+
import { tableRunDispatches, userTableRows } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { generateId } from '@sim/utils/id'
5+
import { and, asc, eq, gt, inArray, sql } from 'drizzle-orm'
6+
import { appendTableEvent } from '@/lib/table/events'
7+
import type { RowData, TableRow } from '@/lib/table/types'
8+
import {
9+
isGroupEligible,
10+
scheduleRunsForRows,
11+
type ScheduleOpts,
12+
TABLE_CONCURRENCY_LIMIT,
13+
toTableRow,
14+
} from './workflow-columns'
15+
16+
const logger = createLogger('TableRunDispatcher')
17+
18+
/** Window size matches the cell-execution concurrency cap so one window
19+
* saturates the pool before the next is loaded — yields a row-major
20+
* scan-line crawl (rows 1-20 finish before 21-40 start). */
21+
const WINDOW_SIZE = TABLE_CONCURRENCY_LIMIT
22+
23+
const ACTIVE_DISPATCH_STATUSES = ['pending', 'dispatching'] as const
24+
25+
export type DispatchStatus = 'pending' | 'dispatching' | 'complete' | 'cancelled'
26+
export type DispatchMode = 'all' | 'incomplete'
27+
28+
export interface DispatchScope {
29+
groupIds: string[]
30+
rowIds?: string[]
31+
}
32+
33+
export interface DispatchRow {
34+
id: string
35+
tableId: string
36+
workspaceId: string
37+
requestId: string
38+
mode: DispatchMode
39+
scope: DispatchScope
40+
status: DispatchStatus
41+
cursor: number
42+
requestedAt: Date
43+
}
44+
45+
export type DispatcherStepResult = 'continue' | 'done'
46+
47+
export async function insertDispatch(input: {
48+
tableId: string
49+
workspaceId: string
50+
requestId: string
51+
mode: DispatchMode
52+
scope: DispatchScope
53+
}): Promise<string> {
54+
const id = `tdsp_${generateId().replace(/-/g, '')}`
55+
await db.insert(tableRunDispatches).values({
56+
id,
57+
tableId: input.tableId,
58+
workspaceId: input.workspaceId,
59+
requestId: input.requestId,
60+
mode: input.mode,
61+
scope: input.scope,
62+
status: 'pending',
63+
cursor: 0,
64+
})
65+
return id
66+
}
67+
68+
export async function readDispatch(dispatchId: string): Promise<DispatchRow | null> {
69+
const [row] = await db
70+
.select()
71+
.from(tableRunDispatches)
72+
.where(eq(tableRunDispatches.id, dispatchId))
73+
.limit(1)
74+
if (!row) return null
75+
return {
76+
id: row.id,
77+
tableId: row.tableId,
78+
workspaceId: row.workspaceId,
79+
requestId: row.requestId,
80+
mode: row.mode as DispatchMode,
81+
scope: row.scope as DispatchScope,
82+
status: row.status as DispatchStatus,
83+
cursor: row.cursor,
84+
requestedAt: row.requestedAt,
85+
}
86+
}
87+
88+
/** Run one window of the dispatcher state machine. Caller re-invokes (via the
89+
* trigger.dev task wrapper) until the returned status is `'done'`. */
90+
export async function dispatcherStep(dispatchId: string): Promise<DispatcherStepResult> {
91+
const dispatch = await readDispatch(dispatchId)
92+
if (!dispatch) {
93+
logger.warn(`[${dispatchId}] dispatch row missing — aborting`)
94+
return 'done'
95+
}
96+
if (dispatch.status === 'cancelled' || dispatch.status === 'complete') return 'done'
97+
98+
if (dispatch.status === 'pending') {
99+
await db
100+
.update(tableRunDispatches)
101+
.set({ status: 'dispatching' })
102+
.where(eq(tableRunDispatches.id, dispatchId))
103+
}
104+
105+
const { getTableById, batchUpdateRows } = await import('./service')
106+
const table = await getTableById(dispatch.tableId)
107+
if (!table) {
108+
logger.warn(`[${dispatchId}] table ${dispatch.tableId} missing — completing dispatch`)
109+
await markDispatchComplete(dispatchId)
110+
return 'done'
111+
}
112+
113+
const allGroups = table.schema.workflowGroups ?? []
114+
const targetGroups = allGroups.filter((g) => dispatch.scope.groupIds.includes(g.id))
115+
if (targetGroups.length === 0) {
116+
await markDispatchComplete(dispatchId)
117+
return 'done'
118+
}
119+
120+
const filters = [
121+
eq(userTableRows.tableId, dispatch.tableId),
122+
gt(userTableRows.position, dispatch.cursor),
123+
]
124+
if (dispatch.scope.rowIds && dispatch.scope.rowIds.length > 0) {
125+
filters.push(inArray(userTableRows.id, dispatch.scope.rowIds))
126+
}
127+
128+
const chunk = await db
129+
.select()
130+
.from(userTableRows)
131+
.where(and(...filters))
132+
.orderBy(asc(userTableRows.position))
133+
.limit(WINDOW_SIZE)
134+
135+
if (chunk.length === 0) {
136+
await markDispatchComplete(dispatchId)
137+
await appendTableEvent({
138+
kind: 'dispatch',
139+
tableId: dispatch.tableId,
140+
dispatchId,
141+
status: 'complete',
142+
})
143+
return 'done'
144+
}
145+
146+
type Update = {
147+
rowId: string
148+
data: RowData
149+
executionsPatch: Record<string, null>
150+
}
151+
const updates: Update[] = []
152+
const clearedRows: TableRow[] = []
153+
for (const r of chunk) {
154+
const tableRow = toTableRow(r)
155+
const eligibleGroups = targetGroups.filter((g) => {
156+
// Skip cells the user explicitly cancelled after this dispatch
157+
// started — a per-row cancel mid-cascade must stick even under
158+
// isManualRun, otherwise the dispatcher resurrects the row.
159+
const exec = tableRow.executions?.[g.id]
160+
if (exec?.cancelledAt) {
161+
const cancelledAtMs = Date.parse(exec.cancelledAt)
162+
if (Number.isFinite(cancelledAtMs) && cancelledAtMs > dispatch.requestedAt.getTime()) {
163+
return false
164+
}
165+
}
166+
return isGroupEligible(g, tableRow, { isManualRun: true, mode: dispatch.mode })
167+
})
168+
if (eligibleGroups.length === 0) continue
169+
170+
const clearedData: RowData = {}
171+
const executionsPatch: Record<string, null> = {}
172+
for (const g of eligibleGroups) {
173+
for (const o of g.outputs) clearedData[o.columnName] = null
174+
executionsPatch[g.id] = null
175+
}
176+
updates.push({ rowId: r.id, data: clearedData, executionsPatch })
177+
178+
const remainingExec = { ...tableRow.executions }
179+
for (const g of eligibleGroups) delete remainingExec[g.id]
180+
clearedRows.push({
181+
...tableRow,
182+
data: { ...tableRow.data, ...clearedData },
183+
executions: remainingExec,
184+
})
185+
}
186+
187+
// Cursor advances to the last position in this chunk regardless of
188+
// eligibility — otherwise a window full of completed cells loops forever.
189+
const lastPosition = chunk[chunk.length - 1].position
190+
191+
if (updates.length > 0) {
192+
await batchUpdateRows(
193+
{
194+
tableId: dispatch.tableId,
195+
updates,
196+
workspaceId: dispatch.workspaceId,
197+
skipScheduler: true,
198+
},
199+
table,
200+
dispatch.requestId
201+
)
202+
203+
const scheduleOpts: ScheduleOpts = {
204+
isManualRun: true,
205+
groupIds: dispatch.scope.groupIds,
206+
mode: dispatch.mode,
207+
}
208+
await scheduleRunsForRows(table, clearedRows, scheduleOpts)
209+
}
210+
211+
await Promise.all([
212+
advanceCursor(dispatchId, lastPosition),
213+
appendTableEvent({
214+
kind: 'dispatch',
215+
tableId: dispatch.tableId,
216+
dispatchId,
217+
status: 'dispatching',
218+
}),
219+
])
220+
221+
return 'continue'
222+
}
223+
224+
async function advanceCursor(dispatchId: string, newCursor: number): Promise<void> {
225+
await db
226+
.update(tableRunDispatches)
227+
.set({ cursor: newCursor })
228+
.where(eq(tableRunDispatches.id, dispatchId))
229+
}
230+
231+
async function markDispatchComplete(dispatchId: string): Promise<void> {
232+
await db
233+
.update(tableRunDispatches)
234+
.set({ status: 'complete', completedAt: new Date() })
235+
.where(eq(tableRunDispatches.id, dispatchId))
236+
}
237+
238+
export async function markDispatchCancelled(dispatchId: string): Promise<void> {
239+
await db
240+
.update(tableRunDispatches)
241+
.set({ status: 'cancelled', cancelledAt: new Date() })
242+
.where(
243+
and(
244+
eq(tableRunDispatches.id, dispatchId),
245+
inArray(tableRunDispatches.status, [...ACTIVE_DISPATCH_STATUSES])
246+
)
247+
)
248+
}
249+
250+
/** Mark every active dispatch on this table as cancelled. Single atomic
251+
* UPDATE so the dispatcher's next iteration observes the cancel. */
252+
export async function markActiveDispatchesCancelled(tableId: string): Promise<void> {
253+
await db
254+
.update(tableRunDispatches)
255+
.set({ status: 'cancelled', cancelledAt: new Date() })
256+
.where(
257+
and(
258+
eq(tableRunDispatches.tableId, tableId),
259+
inArray(tableRunDispatches.status, [...ACTIVE_DISPATCH_STATUSES])
260+
)
261+
)
262+
}

0 commit comments

Comments
 (0)