Skip to content

Commit f0311a6

Browse files
feat(table): chunked dispatcher + workflow cascade (#4672)
* feat(table): chunked dispatcher for workflow-column runs Replaces the all-rows-at-once runWorkflowColumn with a row-window dispatcher backed by a new table_run_dispatches row. Each user click inserts a dispatch row and triggers a trigger.dev task that crawls the table 20 rows at a time, re-enqueueing itself between windows. The HTTP/Mothership entrypoints return { dispatchId } immediately instead of holding the request open for minutes on multi-thousand-row dispatches. - Per-row cancel stamps cancelledAt; the dispatcher skips cells whose cancelledAt > dispatch.requestedAt so a mid-cascade cancel sticks even under isManualRun. - Table-wide cancel marks active dispatches cancelled atomically so the dispatcher bails on its next iteration. - New 'dispatch' SSE event variant plumbed; client ignores for v1. * fix(table): eager bulk clear on column run so cells flip immediately Run-column with run-mode 'all' wasn't visually flipping rows that already had data — the cell renderer's "value wins" branch kept showing the prior output behind the queued/running state. The dispatcher only cleared one window of rows at a time, so most of the column stayed stale until the cursor walked to it. Now: - Dispatcher's `pending → dispatching` transition runs a single SQL UPDATE that wipes targeted `data` output columns and `executions[gid]` across every targeted row (mode-aware: 'incomplete' skips fully-filled rows). - Per-window clear in `dispatcherStep` is gone — rows are pre-cleared, the loop only filters cancel tombstones / unmet deps and enqueues. - Optimistic patch in `useRunColumn` mirrors the bulk clear by nulling output values in the cached row, so the UI flips queued/running instantly without waiting for the SSE catch-up. * fix(table): bulk clear honors in-flight execs under mode: 'incomplete' The eager bulk clear for mode: 'incomplete' only skipped rows that were already fully filled, so two overlapping dispatches could race — dispatch B would nuke executions[gid] on a row dispatch A had just stamped 'queued', flickering the cell and potentially confusing the worker. Skip any row whose targeted group is currently queued/running/pending — an 'incomplete' run shouldn't touch what another dispatch is actively working on. The per-walk 'in-flight' eligibility skip already handles rows that flip in-flight between the clear and the cursor reaching them. * refactor(table): dispatcher uses batchTriggerAndWait + tag-based cancel Switch the per-window cell fan-out from fire-and-forget tasks.trigger to tasks.batchTriggerAndWait. The dispatcher is now a single long-lived trigger.dev task that loops dispatcherStep until the table is exhausted; trigger.dev CRIU-checkpoints the parent during each wait so we don't pay compute while cells execute. Queue depth is bounded at WINDOW_SIZE per dispatch — no more flooding trigger.dev with a million queued runs. - dispatcher.ts builds payloads via the new shared buildPendingRuns helper and calls tasks.batchTriggerAndWait directly. Pre-stamps each cell to `queued` (jobId=null) so the UI flips instantly. - table-run-dispatcher.ts is now a plain while-true loop. No RUN_BUDGET_MS, no self-re-enqueue, no cold-start tax per window. Cancel: - New cancelCellRunsByTags(tags) paginates runs.list + runs.cancel(id). - cancelWorkflowGroupRuns fires the tag-sweep alongside the per-jobId queue.cancelJob path (preserved for auto-fire cells that have real jobIds from single tasks.trigger calls). - Trigger.dev acks the cancel → batchTriggerAndWait resumes → dispatcher observes the dispatch-row cancel flag → exits. Side fixes: - getAsyncBackendType returns 'trigger-dev' whenever taskContext.isInsideTask is true, regardless of TRIGGER_DEV_ENABLED env. The preview/dev-sim worker silently routing cell jobs to DatabaseJobQueue (no poller) is fixed without any env config change. - runWorkflowColumn skips the dispatcher entirely when trigger.dev is disabled, running cells inline via DatabaseJobQueue.runInline. HTTP response returns dispatchId: null in that mode. - runColumnContract response schema updated to dispatchId.nullable(). * fix(table): show Stop button on optimistic-pending row cells isExecInFlight required a jobId for `pending` status, gating it as "real backend pending" vs "optimistic flag only." The row-gutter Stop button keyed on this — so a freshly clicked Play sat as `pending` (no jobId) and the user couldn't cancel it until the server-side `queued` stamp arrived via SSE. With the dispatcher pre-batch stamping cells as `queued` (not `pending`) and no per-cell jobIds under batchTriggerAndWait, the gap was worse. Drop the jobId requirement. `pending` now counts as in-flight everywhere. Cancel writes `cancelled` to the cell exec authoritatively whether or not a real trigger.dev run exists yet — cancelling an optimistic cell means "don't run this," which is correct. Also collapse isOptimisticInFlight into isExecInFlight since the two helpers are now identical. * refactor(table): loop-in-cell cascade + dispatcher-everywhere routing Two coupled changes: 1. Cell-task runs the row's full cascade in-process. executeWorkflowGroupCellJob acquires a Redis lock per (tableId, rowId) with heartbeat (10s/30s TTL), then loops through eligible workflow groups for the row. One cell-task = one row's full cascade, not N. Resume worker holds the same lock and continues the cascade after a HITL resume. Shared withCascadeLock helper in lib/table/cascade-lock.ts. 2. Every cell-enqueue goes through the dispatcher. The implicit scheduleRunsForRows reactor in service.ts is removed — 8 callsites (insertRow, batchInsertRows, upsertRow, updateRowsByFilter, batchUpdateRows, addWorkflowGroup, updateWorkflowGroup) now fire runWorkflowColumn with mode: 'incomplete', isManualRun: false. HTTP routes that call updateRow directly also fire runWorkflowColumn afterwards. scheduleRunsForTable / scheduleRunsForRowIds deleted; scheduleRunsForRows demoted to private (only the TRIGGER_DEV_ENABLED=false fallback uses it). skipScheduler flag dropped from UpdateRowData / BatchUpdateByIdData — no longer meaningful since there's nothing implicit to suppress. Plumbed isManualRun through the dispatch row (new is_manual_run column, default true) so auto-fire callers honor autoRun: false and don't re-run completed cells. Stamp 'pending' (not 'queued', executionId: null) before batchTriggerAndWait — cell-task writes its own 'queued' on lock acquire. Small UI polish: row gutter Play button spacing, "Delete workflow" → "Delete column" label, optimistic-pending cells now show Stop button (isExecInFlight no longer requires jobId). * fix(table): SQL cancellation guard allows worker to claim a null-execId cell The dispatcher's pre-batch `pending` stamp leaves executionId unset so any cell-task that wins the cascade lock can claim the cell. The cancellation- guard SQL clause was rejecting these claims because it tested `executions->gid IS NULL` (whole exec missing) but the pre-stamp leaves the exec present with executionId=null. Add a third carve-out: `executions->gid->>'executionId' IS NULL`. Now the guard reads "write allowed if no exec exists, OR no executionId is set yet, OR the executionId matches ours." Symptom: every cell-task's first markWorkflowGroupPickedUp call would log "SQL guard saw cancelled" and skip, leaving cells stuck at the dispatcher's pending stamp. * fix(table): dispatcher cursor starts at -1 so position 0 is included The dispatcher's row-window SELECT is `position > cursor` for exclusive lower-bound semantics. With cursor initialized to 0, position-0 rows were never picked up — every dispatch silently skipped the table's first row. Start cursor at -1 instead. First window's filter `position > -1` matches position 0; subsequent iterations advance to `lastPosition` which then correctly excludes already-processed rows. * refactor(table): align optimistic UI with new dispatcher; sticky cancel via 'new' mode Fix 0: new `DispatchMode = 'new'` for auto-fire callsites. Eligibility skips rows with any prior `executions[gid]` entry — cancelled / errored / completed cells stay sticky until a manual run. Dispatcher's windowed SELECT pushes `NOT jsonb_exists_any(...)` to SQL so CSV imports into mostly-attempted tables don't pay a per-window load+JS-filter. `batchInsertRows` drops its `rowIds` payload (keeps dispatch scope tiny on big imports). Fix A/B/D: client optimistic patches now mirror the backend's actual invariants. `useCreateTableRow.onSuccess` stamps eligible groups via `optimisticallyScheduleNewlyEligibleGroups` so newly-inserted rows show `Queued` instantly. `useCancelTableRuns.onMutate` distinguishes optimistic- only pending (`executionId == null` — strip silently) from real worker claims (stamp cancelled; SSE will reconcile). Drop `onSettled` invalidation on `useUpdateTableRow` / `useBatchUpdateTableRows` to kill the delete-cell flicker. Fix C: active-dispatches overlay. New `listActiveDispatches` helper, contract, and `GET /api/table/[tableId]/dispatches` route. `kind:'dispatch'` SSE events carry scope+cursor+mode on every transition. New `useActiveDispatches` hook + `resolveCellExec` synthesize a virtual `pending` exec for cells in an active dispatch's scope ahead of cursor — queued indicators now survive page refresh during long Run-all dispatches. `cancelWorkflowGroupRuns` emits `kind:'dispatch',status:'cancelled'` events so the overlay clears without a refetch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(table): unify trigger.dev and inline dispatcher paths `runWorkflowColumn` now always inserts a `table_run_dispatches` row and drives the dispatcher state machine. The trigger.dev / in-process branch narrows to a single line: trigger.dev fires `tableRunDispatcherTask` (which calls the new `runDispatcherToCompletion`), the inline path calls the same helper fire-and-forget. Deletes `scheduleRunsForRows` and `stampQueuedOrCancel` — the inline-fallback no longer duplicates window walking, SSE emission, or cancel. The dispatcher's window-execute call goes through `JobQueueBackend`: - New `batchEnqueueAndWait` interface method. - Trigger.dev impl wraps `tasks.batchTriggerAndWait` behind a `taskContext.isInsideTask` guard (clear error if called from outside a task). - Database impl skips `async_jobs` entirely — `Promise.all` over `options.runner(payload, signal)` per item, with per-cell AbortControllers tracked by `cancelKey` for cancel. `cancelInlineRun` moves to the interface as `cancelByKey` so `cancelWorkflowGroupRuns` no longer reaches into the database backend. Fix `mode: 'new'` SQL filter: - `${array}::text[]` interpolated as a tuple-cast which Postgres rejected ("cannot cast type record to text[]") and every inline dispatch silently failed. Switched to `ARRAY[${sql.join(...)}]::text[]`. - Predicate was `jsonb_exists_any` ("any one targeted group present"), which excluded rows that needed at least one group re-run after a downstream output was deleted. Switched to `jsonb_exists_all` — per-group JS eligibility handles the rest. Cascade-loop workflowId bug: `runRowCascadeLoop` was not threading the new group's `workflowId` when advancing across groups. The cell-task ran the previous group's workflow against the next group's cell, terminating `completed` with empty `accumulatedData`. Fixed by tracking `currentWorkflowId` alongside `currentGroupId` / `currentExecutionId`. Client optimistic-patch tightening: - `useRunColumn.onMutate` mirrors server eligibility — skip cells with unmet deps so unmet rows don't flash Queued and get stuck (no SSE will arrive for cells the server skipped). - `resolveCellExec` overlay synthesizes a virtual `pending` only when `areGroupDepsSatisfied` is true. Rows with unmet deps render Waiting, matching the dispatcher's actual behavior. Cleanup from /simplify pass: - Use `generateShortId(20)` instead of `generateId().replace(/-/g, '').slice(0, 20)`. - Inline `batchEnqueueAndWait` no longer allocates synthetic ids (returned `string[]` is unused). - Flattened the per-cell `tracked` array — only push entries that registered controllers, drop the null placeholders. - Extracted `runDispatcherToCompletion` to share the loop between the trigger.dev wrapper and the in-process path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(table): backend running counter, dep-aware retrigger, sidebar polish Counter (Fix 1): top-right "X running" + per-row badge are now backend-bootstrapped via a count on `user_table_rows.executions ->> 'status' = 'running'` returned alongside active dispatches. SSE `kind: 'cell'` events compute a delta from `prev → next` status to keep the cache live; cell events for rows outside the loaded page slice trigger a run-state refetch. On `pruned` we invalidate the cache. Counts only worker-claimed `running` cells — optimistic queued/pending no longer inflate the badge, and rows outside the loaded page slice are counted too. Sidebar (Fix 2 + 3a): `Run after` no longer ticks every column by default for new groups (empty list). Save is disabled with an inline error when auto-run is on with zero deps. `edit-group` mode anchors the left-of-current filter to the group's leftmost column, so a workflow can only depend on columns to its left. Reorder scrub (Fix 3b): `updateTableMetadata` walks the schema's workflow groups when `columnOrder` is in the patch and drops any dep whose new position lands at or after the group's leftmost column (uses the existing `stripGroupDeps` helper). Metadata + schema updates land atomically. Server returns ordered columns (Fix 3b cont'd): `getTableById` / `listTables` now sort `schema.columns` by `metadata.columnOrder` before returning, via a new `applyColumnOrderToSchema` helper. Every consumer (grid, sidebar, copilot, mothership) gets one ordered list — the sidebar's leftmost-group-column anchor now points at the right index. Dep-aware retrigger (Fix 4): editing a value that a downstream workflow depends on now re-runs that workflow. - `deriveExecClearsForDataPatch` returns `{ executionsPatch, inFlightDownstreamGroups }`. Walks `schema.workflowGroups[].dependencies.columns` for every column in the patch, clears terminal-state downstream entries, and reports in-flight entries. - `updateRow` calls `cancelWorkflowGroupRuns` + `runWorkflowColumn` (`mode: 'incomplete' + isManualRun: true`) for in-flight downstream groups, then always fires `runWorkflowColumn({ mode: 'new' })` for the cleared groups. Skips both when `executionsPatch` is provided by the caller — those are cell-task / cancel writes that would otherwise spawn a recursive flood of dispatches per partial-write. - `cancelWorkflowGroupRuns(tableId, rowId, { groupIds? })` accepts a per-group filter so the cancel only touches the affected groups, not every in-flight cell on the row. - `pickNextEligibleGroupForRow` now treats a dispatcher pre-stamp (`pending` + `executionId: null`) as claimable — the cascade-loop is the real owner. Without this, the dispatcher's pre-stamp of downstream groups made the cascade-loop see them as "in-flight" and skip them, stranding `pending` cells forever. - `optimisticallyScheduleNewlyEligibleGroups` extends the cache patch to flip dep-touched groups to `pending` regardless of their current status, matching the server's cancel-then-rerun behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(table): paused workflow cells route through executeResumeJob; render Pending + viewable Three connected issues with workflows that pause mid-cell (e.g. wait blocks): 1. `/api/resume/poll` (the time-pause auto-resumer) called `PauseResumeManager.startResumeExecution` directly, bypassing `executeResumeJob` from `background/resume-execution.ts`. The wrapper is where the cell-context restoration + cascade-loop continuation lives — without it, the resumed workflow ran to completion but never wrote the terminal state back to the table cell. Cell stays `pending` forever even though the underlying execution finished. Fix: dynamically import `executeResumeJob` and use it for the `'starting'` branch. Same primitive the trigger.dev `resumeExecutionTask` wraps — calling it directly handles both trigger.dev-disabled local dev and trigger.dev-enabled prod identically. 2. The cell renderer mapped `status: 'pending'` to `kind: 'queued'` (gray "Queued" badge) regardless of whether the run had started. A HITL-paused run has `status: 'pending'` + `jobId` prefixed `paused-` + a real `executionId` — semantically very different from "queued, hasn't run." Now renders as `pending-upstream` (the existing Pending pill) for paused-jobId rows. 3. Right-click "View execution" was disabled for `pending` cells (gated to `completed | error | running`), so users couldn't open the trace for a paused execution. Paused runs have a viewable trace (the executionId is real and the log row exists). Both the per-row context menu and the action-bar derivation now recognize `pending` + `paused-` jobId as a started run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(table): typewriter reveal for SSE-driven workflow cell values Workflow-output cells now reveal their text character-by-character when an SSE update lands, while page reloads and virtualization remounts still paint the value instantly. A first-render guard inside the new useTypewriter hook distinguishes hydration from live updates with no plumbing through the cell tree. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(table): address bugbot/greptile review feedback Two P1 issues + one cleanup from the bot reviewers: 1. **Double-dispatch + completed-output wipe.** Both PATCH row routes (`app/api/table/[tableId]/rows/[rowId]` and `app/api/v1/tables/[tableId]/rows/[rowId]`) were firing a second `runWorkflowColumn({ mode: 'incomplete' })` after `updateRow` returns. `updateRow` already fires `mode: 'new'` internally for user edits, so the second call created a concurrent dispatch. Worse, the `mode: 'incomplete'` path's `bulkClearWorkflowGroupCells` wipes ALL targeted output columns on any row where any one column is empty — meaning sibling-group completed outputs could be erased. Removed both route-level calls; auto-dispatch lives entirely in `updateRow`. 2. **`runWorkflowColumn` log-spamming on plain tables.** `if (targetGroups.length === 0) throw new Error(...)` fired on every row insert/update for tables without any workflow groups (the majority). Every caller wraps with `.catch(logger.error)`, so each PATCH produced an error-level log. Return `{ dispatchId: null }` silently — manual `runWorkflowColumn` callers pass `groupIds` explicitly so they can't reach this branch. 3. **`isManualRun` plumbed through dispatch SSE events.** Late-arriving `kind: 'dispatch'` events for dispatches not in the initial fetch were hardcoding `isManualRun: false`. Added the field to the event shape, emit it from `dispatcherStep` (pending → complete, dispatching transitions) and `markActiveDispatchesCancelled`, and consume it in the SSE handler with a sensible fallback for legacy emits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(table): row executions sidecar + left-to-right dep retrigger + cancel counter refresh Split per-row workflow-group execution state out of the user_table_rows.executions JSONB column into a new table_row_executions sidecar keyed by (row_id, group_id). Dispatcher filters, "X running" counter, bulk clears, and the cancellation guard all hit indexed columns instead of walking JSONB. Wire shape unchanged — server merges sidecar rows back into row.executions on the way out. Also: - deriveExecClearsForDataPatch now walks workflowGroups left-to-right with a propagating dirtied-column set so transitive dep chains (edit col A → group 1 re-runs → group 2 depends on group 1's output → group 2 re-runs) collapse to a single forward pass. - useCancelTableRuns.onSettled invalidates the activeDispatches query so the top-right counter and row gutter Stop button refetch from the server after any Stop (per-cell, row, or table-wide). countRunningCells is the source of truth; client no longer needs duplicate state. Three migrations on this branch (0209 + 0210 + new sidecar) collapsed into one since the feature is unreleased. * fix(table): address remaining cursor/greptile review feedback - Mothership update_row no longer double-dispatches. updateRow already fires the auto-cascade internally; the second `mode: 'incomplete'` call here raced with it and could bulk-clear sibling-group outputs. - SSE dispatch events no longer dropped when the activeDispatches cache is cold. Seed an empty TableRunState if the initial fetch hasn't landed yet so the queued overlay doesn't lose the first dispatch event. - batchUpdateRows now runs cancel+rerun for per-row in-flight downstream groups, mirroring updateRow. Without this, dep edits in a batch left running workflows reading stale upstream values. * fix(table): cancel prior runs, scope batch insert dispatch, recover orphan pre-stamps Addresses cursor + greptile review feedback on table dispatcher edge cases: - Manual table-wide Run-all / Run-column now cancels prior active dispatches AND in-flight cell workers before bulk-clearing. Without this, mode:'all' deleted running sidecar rows out from under their workers (which kept writing into the wiped state) and a second Run-all could enqueue overlapping cells racing on the same rows. Row-scoped manual calls (dep-edit cascade) are excluded — those already cancel their own scope. - batchInsertRowsWithTx now scopes its auto-dispatch to the newly-inserted row ids. Without this, after the sidecar migration the NOT EXISTS filter matches every existing row (zero sidecar entries), so a CSV import would walk the entire table dispatching workflow runs on every pre-existing row. - classifyEligibility carve-out: pending + executionId=null is an orphan pre-stamp (cascade-lock contention, batchEnqueueAndWait failure, etc.), treated as claimable so future dispatchers can re-stamp instead of skipping it as 'in-flight' forever. Matches pickNextEligibleGroupForRow's logic. - On batchEnqueueAndWait failure, dispatcherStep now sweeps the orphan pre-stamps it wrote for the failed batch so the cells don't render Queued forever; the next user action picks them up cleanly. * fix(table): row-scoped Refresh cancels in-flight; counter includes queued/pending - runWorkflowColumn now cancels prior in-flight cells for row-scoped manual runs too (context-menu Refresh on a row subset, action-bar Refresh on selected rows). Previously only the table-wide path cancelled, so a row-scoped Refresh would bulk-clear running sidecar rows without aborting workers. Per-row cancel skips markActiveDispatchesCancelled so unrelated dispatches keep running. - countRunningCells now counts all in-flight statuses (queued / running / pending) instead of just running. The row gutter Run/Stop button reads this map — with the old behavior, clicking Play during the queued window would re-enqueue an already-queued cell. SSE applyCell handler updated to use isExecInFlight so client deltas track the same semantics. * fix(table): per-row Stop tombstones ahead-of-cursor rows during Run-all Per-row Stop only cancelled sidecar rows already in flight. A row the dispatcher hadn't reached yet had no exec record, so Stop was a no-op there — the dispatcher would later walk to it, classify the group eligible, and re-fire workflows the user thought they stopped. cancelWorkflowGroupRuns now, for a per-row cancel, checks active dispatches whose scope covers the row and writes `cancelled` tombstones (cancelledAt = now) for the at-risk groups that don't already have a sidecar entry. The dispatcher's existing `cancelledAt > dispatch.requestedAt` filter then skips them when the cursor arrives. onConflictDoNothing guards against clobbering a concurrently-written entry; the active-dispatch check avoids stamping spurious cancels on idle rows. * fix(table): seed dispatch overlay on Run; surface batch-enqueue failures as error - useRunColumn.onSuccess invalidates the activeDispatches query so the resolveCellExec queued overlay populates immediately for ahead-of-cursor rows (scrolled-in / refetched), instead of waiting for the first dispatch SSE. Targeted at activeDispatches only — the rows cache stays owned by useTableEventStream. - On batchEnqueueAndWait failure, dispatcherStep now flips the orphan pre-stamps to a terminal `error` state and emits a cell SSE event, rather than deleting them. The cursor still advances past the window, but the dropped cells are now visible (Error pill) instead of silently empty, stay out of the in-flight set, and re-run on the next manual run. * fix(table): seed dispatch overlay on Run; surface batch-enqueue failures as error - useRunColumn.onSuccess invalidates activeDispatches so the resolveCellExec queued overlay populates immediately for ahead-of-cursor rows instead of waiting for the first dispatch SSE. Rows cache stays owned by SSE. - On batchEnqueueAndWait failure, dispatcherStep flips orphan pre-stamps to a terminal error state (+ cell SSE) instead of deleting them, so the dropped window is visible (Error pill) rather than silently empty and re-runs on the next manual run. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d55f40c commit f0311a6

40 files changed

Lines changed: 19416 additions & 910 deletions

File tree

apps/sim/app/api/resume/poll/route.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,21 @@ async function dispatchRow(row: DueRow, now: Date): Promise<RowResult> {
139139
})
140140

141141
if (enqueueResult.status === 'starting') {
142-
PauseResumeManager.startResumeExecution({
142+
// Route through `executeResumeJob` (not `PauseResumeManager.startResumeExecution`
143+
// directly) so cell-context restoration + cascade-loop continuation
144+
// fires. This is the same primitive the trigger.dev `resumeExecutionTask`
145+
// wraps — calling it directly handles both trigger.dev-disabled local
146+
// dev and trigger.dev-enabled prod identically.
147+
const { executeResumeJob } = await import('@/background/resume-execution')
148+
void executeResumeJob({
143149
resumeEntryId: enqueueResult.resumeEntryId,
144150
resumeExecutionId: enqueueResult.resumeExecutionId,
145-
pausedExecution: enqueueResult.pausedExecution,
151+
pausedExecutionId: enqueueResult.pausedExecution.id,
146152
contextId: enqueueResult.contextId,
147153
resumeInput: enqueueResult.resumeInput,
148154
userId: enqueueResult.userId,
155+
workflowId: row.workflowId,
156+
parentExecutionId: row.executionId,
149157
}).catch((error) => {
150158
logger.error('Background time-pause resume failed', {
151159
executionId: row.executionId,

apps/sim/app/api/table/[tableId]/columns/run/route.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { createLogger } from '@sim/logger'
2-
import { toError } from '@sim/utils/errors'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { runColumnContract } from '@/lib/api/contracts/tables'
54
import { parseRequest } from '@/lib/api/server'
@@ -30,21 +29,16 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
3029
const access = await checkAccess(tableId, auth.userId, 'write')
3130
if (!access.ok) return accessError(access, requestId, tableId)
3231

33-
// Dispatch in the background — large fan-outs (thousands of rows) issue
34-
// sequential trigger.dev calls and would otherwise hold the HTTP response
35-
// open for minutes, blocking the AI/copilot tool span and the UI mutation.
36-
void runWorkflowColumn({
32+
const { dispatchId } = await runWorkflowColumn({
3733
tableId,
3834
workspaceId,
3935
groupIds,
4036
mode: runMode,
4137
rowIds,
4238
requestId,
43-
}).catch((err) => {
44-
logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message)
4539
})
4640

47-
return NextResponse.json({ success: true, data: { triggered: null } })
41+
return NextResponse.json({ success: true, data: { dispatchId } })
4842
} catch (error) {
4943
if (error instanceof Error && error.message === 'Invalid workspace ID') {
5044
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { type ActiveDispatch, listActiveDispatchesContract } from '@/lib/api/contracts/tables'
4+
import { parseRequest } from '@/lib/api/server'
5+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
6+
import { generateRequestId } from '@/lib/core/utils/request'
7+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
8+
import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher'
9+
import { accessError, checkAccess } from '@/app/api/table/utils'
10+
11+
const logger = createLogger('TableDispatchesAPI')
12+
13+
interface RouteParams {
14+
params: Promise<{ tableId: string }>
15+
}
16+
17+
/**
18+
* GET /api/table/[tableId]/dispatches
19+
*
20+
* Returns active (`pending` / `dispatching`) dispatches for the table. Drives
21+
* the client's "about to run" overlay so refresh during a long Run-all keeps
22+
* the queued indicators on rows the dispatcher hasn't reached yet.
23+
*/
24+
export const GET = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
25+
const requestId = generateRequestId()
26+
27+
try {
28+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
29+
if (!authResult.success || !authResult.userId) {
30+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
31+
}
32+
33+
const parsed = await parseRequest(listActiveDispatchesContract, request, { params })
34+
if (!parsed.success) return parsed.response
35+
const { tableId } = parsed.data.params
36+
37+
const result = await checkAccess(tableId, authResult.userId, 'read')
38+
if (!result.ok) return accessError(result, requestId, tableId)
39+
40+
const [rows, running] = await Promise.all([
41+
listActiveDispatches(tableId),
42+
countRunningCells(tableId),
43+
])
44+
const dispatches: ActiveDispatch[] = rows.map((r) => ({
45+
id: r.id,
46+
status: r.status as 'pending' | 'dispatching',
47+
mode: r.mode,
48+
isManualRun: r.isManualRun,
49+
cursor: r.cursor,
50+
scope: r.scope,
51+
}))
52+
53+
return NextResponse.json({
54+
success: true,
55+
data: {
56+
dispatches,
57+
runningCellCount: running.total,
58+
runningByRowId: running.byRowId,
59+
},
60+
})
61+
} catch (error) {
62+
logger.error(`[${requestId}] list-dispatches failed:`, error)
63+
return NextResponse.json({ error: 'Failed to list active dispatches' }, { status: 500 })
64+
}
65+
})

apps/sim/app/api/table/[tableId]/rows/[rowId]/route.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: RowR
136136
// Only `null` when a `cancellationGuard` is supplied and the SQL guard
137137
// rejects the write — this route doesn't pass one, so reaching null is a bug.
138138
if (!updatedRow) throw new Error('updateRow returned null without a cancellationGuard')
139+
// Auto-dispatch for user edits is handled inside `updateRow` (mode: 'new').
140+
// Firing a second mode: 'incomplete' dispatch here would race with the
141+
// `mode: 'new'` one AND bulk-clear sibling-group outputs (the incomplete
142+
// bulk-clear wipes ALL targeted columns when any one column on the row
143+
// is empty).
139144

140145
return NextResponse.json({
141146
success: true,

apps/sim/app/api/table/[tableId]/rows/route.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { db } from '@sim/db'
2-
import { userTableRows } from '@sim/db/schema'
2+
import { tableRowExecutions, userTableRows } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { toError } from '@sim/utils/errors'
5-
import { and, eq, sql } from 'drizzle-orm'
5+
import { and, eq, inArray, sql } from 'drizzle-orm'
66
import { type NextRequest, NextResponse } from 'next/server'
77
import {
88
type BatchInsertTableRowsBodyInput,
@@ -17,7 +17,14 @@ import { isZodError, validationErrorResponse } from '@/lib/api/server/validation
1717
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
1818
import { generateRequestId } from '@/lib/core/utils/request'
1919
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
20-
import type { Filter, RowData, Sort, TableSchema } from '@/lib/table'
20+
import type {
21+
Filter,
22+
RowData,
23+
RowExecutionMetadata,
24+
RowExecutions,
25+
Sort,
26+
TableSchema,
27+
} from '@/lib/table'
2128
import {
2229
batchInsertRows,
2330
batchUpdateRows,
@@ -283,7 +290,6 @@ export const GET = withRouteHandler(
283290
.select({
284291
id: userTableRows.id,
285292
data: userTableRows.data,
286-
executions: userTableRows.executions,
287293
position: userTableRows.position,
288294
createdAt: userTableRows.createdAt,
289295
updatedAt: userTableRows.updatedAt,
@@ -313,6 +319,41 @@ export const GET = withRouteHandler(
313319

314320
const rows = await query.limit(validated.limit).offset(validated.offset)
315321

322+
// Sidecar: fetch per-(row, group) execution state and group into a map
323+
// so the response preserves the legacy `row.executions[groupId]` wire
324+
// shape. One indexed-IN scan against table_row_executions.
325+
const executionsByRow = new Map<string, RowExecutions>()
326+
if (rows.length > 0) {
327+
const execRows = await db
328+
.select()
329+
.from(tableRowExecutions)
330+
.where(
331+
inArray(
332+
tableRowExecutions.rowId,
333+
rows.map((r) => r.id)
334+
)
335+
)
336+
for (const e of execRows) {
337+
const existing = executionsByRow.get(e.rowId) ?? {}
338+
const meta: RowExecutionMetadata = {
339+
status: e.status as RowExecutionMetadata['status'],
340+
executionId: e.executionId ?? null,
341+
jobId: e.jobId ?? null,
342+
workflowId: e.workflowId,
343+
error: e.error ?? null,
344+
...(e.runningBlockIds && e.runningBlockIds.length > 0
345+
? { runningBlockIds: e.runningBlockIds }
346+
: {}),
347+
...(e.blockErrors && Object.keys(e.blockErrors as Record<string, string>).length > 0
348+
? { blockErrors: e.blockErrors as Record<string, string> }
349+
: {}),
350+
...(e.cancelledAt ? { cancelledAt: e.cancelledAt.toISOString() } : {}),
351+
}
352+
existing[e.groupId] = meta
353+
executionsByRow.set(e.rowId, existing)
354+
}
355+
}
356+
316357
logger.info(
317358
`[${requestId}] Queried ${rows.length} rows from table ${tableId} (total: ${totalCount ?? 'n/a'})`
318359
)
@@ -323,7 +364,7 @@ export const GET = withRouteHandler(
323364
rows: rows.map((r) => ({
324365
id: r.id,
325366
data: r.data,
326-
executions: r.executions ?? {},
367+
executions: executionsByRow.get(r.id) ?? {},
327368
position: r.position,
328369
createdAt:
329370
r.createdAt instanceof Date ? r.createdAt.toISOString() : String(r.createdAt),

apps/sim/app/api/v1/tables/[tableId]/rows/[rowId]/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: RowR
144144
if (!updatedRow) {
145145
return NextResponse.json({ error: 'Row not found' }, { status: 404 })
146146
}
147+
// Auto-dispatch for user edits is handled inside `updateRow` (mode: 'new').
148+
// Firing a second mode: 'incomplete' dispatch here would race with it AND
149+
// bulk-clear sibling-group outputs.
147150

148151
return NextResponse.json({
149152
success: true,

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use client'
22

33
import type React from 'react'
4+
import { useEffect, useRef, useState } from 'react'
45
import { parse } from 'tldts'
56
import { Badge, Checkbox, Tooltip } from '@/components/emcn'
67
import { cn } from '@/lib/core/utils/cn'
@@ -60,6 +61,14 @@ export function resolveCellRender({
6061
if (!isNull) return { kind: 'value', text: stringifyValue(value) }
6162

6263
if (inFlight && !(groupHasBlockErrors && !blockRunning)) {
64+
// A `pending` cell whose jobId starts with `paused-` is mid-pause
65+
// (workflow yielded for human-in-the-loop). Render as Pending rather
66+
// than Queued so the user can tell it's not just waiting to start.
67+
const isPaused =
68+
exec?.status === 'pending' &&
69+
typeof exec.jobId === 'string' &&
70+
exec.jobId.startsWith('paused-')
71+
if (isPaused) return { kind: 'pending-upstream' }
6372
if (exec?.status === 'queued' || exec?.status === 'pending') return { kind: 'queued' }
6473
return { kind: 'pending-upstream' }
6574
}
@@ -119,6 +128,9 @@ interface CellRenderProps {
119128
}
120129

121130
export function CellRender({ kind, isEditing }: CellRenderProps): React.ReactElement | null {
131+
const valueText = kind.kind === 'value' ? kind.text : null
132+
const revealedValueText = useTypewriter(valueText)
133+
122134
switch (kind.kind) {
123135
case 'value':
124136
return (
@@ -128,7 +140,7 @@ export function CellRender({ kind, isEditing }: CellRenderProps): React.ReactEle
128140
isEditing && 'invisible'
129141
)}
130142
>
131-
{kind.text}
143+
{revealedValueText ?? kind.text}
132144
</span>
133145
)
134146

@@ -275,3 +287,45 @@ function Wrap({ isEditing, children }: { isEditing: boolean; children: React.Rea
275287
if (!isEditing) return <>{children}</>
276288
return <div className='invisible'>{children}</div>
277289
}
290+
291+
const TYPEWRITER_MS_PER_CHAR = 15
292+
293+
/**
294+
* Reveals `text` character-by-character whenever it changes after the first
295+
* render. Initial render (page hydration or virtualization remount) shows the
296+
* value statically — animation fires only for subsequent updates, which in
297+
* practice means SSE-driven workflow completions arriving via
298+
* `useTableEventStream → applyCell()`.
299+
*/
300+
function useTypewriter(text: string | null): string | null {
301+
const [revealed, setRevealed] = useState<string | null>(text)
302+
const isFirstRunRef = useRef(true)
303+
const prevTextRef = useRef<string | null>(text)
304+
305+
useEffect(() => {
306+
if (isFirstRunRef.current) {
307+
isFirstRunRef.current = false
308+
prevTextRef.current = text
309+
setRevealed(text)
310+
return
311+
}
312+
if (prevTextRef.current === text) return
313+
prevTextRef.current = text
314+
315+
if (text === null || text.length === 0) {
316+
setRevealed(text)
317+
return
318+
}
319+
320+
setRevealed('')
321+
let i = 0
322+
const id = window.setInterval(() => {
323+
i++
324+
setRevealed(text.slice(0, i))
325+
if (i >= text.length) window.clearInterval(id)
326+
}, TYPEWRITER_MS_PER_CHAR)
327+
return () => window.clearInterval(id)
328+
}, [text])
329+
330+
return revealed
331+
}

0 commit comments

Comments
 (0)