Skip to content

Commit 57b9a2f

Browse files
fix(table): typewriter flash, Run-row completed-skip, dispatch-scope running count (#4687)
* fix(table): no typewriter flash; Run-row skips completed workflows - Typewriter: reset the revealed text synchronously during render when the value changes (not in an effect), so a cell going from running→value no longer flashes the full text for one frame before animating. - Run row / manual incomplete runs now treat a `completed` group as done even if an output column is blank — only "Run all" re-runs completed cells. The auto cascade keeps re-filling blank outputs (completedAndFilled). Client optimistic stamp mirrors: incomplete skips `completed` cells. * fix(table): incomplete bulk-clear is per-group, not per-row bulkClearWorkflowGroupCells in incomplete mode wiped EVERY targeted group's output data + exec on any row that wasn't fully filled across all targeted groups. So Run-row on a row with one completed group and one cancelled group wiped the completed group's outputs + exec too, and the dispatcher re-ran it. Now incomplete-mode clears per-group: only error/cancelled groups get their output columns + exec cleared; completed and in-flight groups are left intact (never-run groups have nothing to clear and run via eligibility). Combined with the classifyEligibility guard, a completed workflow is never re-run by Run-row — only Run-all re-runs it. * fix(table): X-running count from dispatch scope so reload matches live The "X running" badge read countRunningCells (sidecar in-flight), but the dispatcher only stamps one ~20-cell window at a time. During a 1000-row Run-all the client optimistically showed ~1000 while a reload showed ~20 — the sidecar never holds more than a window. Derive the count from the active dispatches instead: rows in scope ahead of the cursor × |groupIds| (exact for Run-all, upper bound for incomplete/new). Both scope and cursor are persisted, so a reload computes the same number. - countActiveRunCells (dispatcher.ts): dispatch-scope total, sidecar fallback when no dispatch is active. byRowId stays sidecar-based (the client overlay renders queued rows ahead of the cursor). - Live: applyDispatch re-syncs the badge from the server on every dispatch event (one per window, after its cells finish + cursor advances), so the badge steps down per window and matches reload. applyCell no longer touches runningCellCount (still keeps runningByRowId live for the gutter). - Optimistic on click: useRunColumn seeds the full run scope (totalCount × groups) so the badge is right before the first window lands. * fix(table): address review — parallel dispatch counts, unfiltered rowCount - countActiveRunCells: run the per-dispatch COUNT queries + the sidecar count in parallel instead of serially (one round-trip per dispatch). - Optimistic Run-all estimate now reads the table definition's maintained, unfiltered rowCount (detail cache) instead of the rows query's filter-scoped totalCount — the dispatcher runs every row regardless of the active filter.
1 parent 48cf200 commit 57b9a2f

6 files changed

Lines changed: 170 additions & 105 deletions

File tree

apps/sim/app/api/table/[tableId]/dispatches/route.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { parseRequest } from '@/lib/api/server'
55
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
8-
import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher'
8+
import { countActiveRunCells, listActiveDispatches } from '@/lib/table/dispatcher'
99
import { accessError, checkAccess } from '@/app/api/table/utils'
1010

1111
const logger = createLogger('TableDispatchesAPI')
@@ -37,10 +37,8 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou
3737
const result = await checkAccess(tableId, authResult.userId, 'read')
3838
if (!result.ok) return accessError(result, requestId, tableId)
3939

40-
const [rows, running] = await Promise.all([
41-
listActiveDispatches(tableId),
42-
countRunningCells(tableId),
43-
])
40+
const rows = await listActiveDispatches(tableId)
41+
const running = await countActiveRunCells(tableId, rows)
4442
const dispatches: ActiveDispatch[] = rows.map((r) => ({
4543
id: r.id,
4644
status: r.status as 'pending' | 'dispatching',

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -303,33 +303,35 @@ const TYPEWRITER_MS_PER_CHAR = 15
303303
*/
304304
function useTypewriter(text: string | null): string | null {
305305
const [revealed, setRevealed] = useState<string | null>(text)
306-
const isFirstRunRef = useRef(true)
307306
const prevTextRef = useRef<string | null>(text)
307+
const mountedRef = useRef(false)
308+
const animateRef = useRef(false)
308309

309-
useEffect(() => {
310-
if (isFirstRunRef.current) {
311-
isFirstRunRef.current = false
312-
prevTextRef.current = text
313-
setRevealed(text)
314-
return
315-
}
316-
if (prevTextRef.current === text) return
310+
// Reset synchronously during render when `text` changes (not on first mount)
311+
// so no frame ever shows the full new value before the animation begins —
312+
// an effect-based reset lands one frame late and flashes the whole text.
313+
if (prevTextRef.current !== text) {
317314
prevTextRef.current = text
315+
const animate = mountedRef.current && text !== null && text.length > 0
316+
animateRef.current = animate
317+
setRevealed(animate ? '' : text)
318+
}
318319

319-
if (text === null || text.length === 0) {
320-
setRevealed(text)
321-
return
322-
}
320+
useEffect(() => {
321+
mountedRef.current = true
322+
}, [])
323323

324-
const full = text
324+
useEffect(() => {
325+
if (!animateRef.current) return
326+
animateRef.current = false
327+
const full = text as string
325328
const start = performance.now()
326329
let raf = 0
327330
const tick = (now: number) => {
328331
const chars = Math.min(full.length, Math.floor((now - start) / TYPEWRITER_MS_PER_CHAR))
329332
setRevealed(full.slice(0, chars))
330333
if (chars < full.length) raf = requestAnimationFrame(tick)
331334
}
332-
setRevealed('')
333335
raf = requestAnimationFrame(tick)
334336
return () => cancelAnimationFrame(raf)
335337
}, [text])

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ export function useTableEventStream({
7373
let lastEventId = loadPointer(tableId)
7474
let reconnectAttempt = 0
7575

76-
const updateRunStateCounters = (
77-
rowId: string,
78-
wasInFlight: boolean,
79-
isInFlight: boolean
80-
): void => {
76+
// Keeps the per-row gutter (`runningByRowId`) live between dispatch events.
77+
// `runningCellCount` (the "X running" badge) is NOT touched here — it's the
78+
// server's dispatch-scope count, seeded optimistically on click and
79+
// re-synced by `applyDispatch` on every window, so live matches reload.
80+
const updateRunningByRow = (rowId: string, wasInFlight: boolean, isInFlight: boolean): void => {
8181
if (wasInFlight === isInFlight) return
8282
const delta = isInFlight ? 1 : -1
8383
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
@@ -87,11 +87,7 @@ export function useTableEventStream({
8787
const nextByRow = { ...prev.runningByRowId }
8888
if (nextForRow === 0) delete nextByRow[rowId]
8989
else nextByRow[rowId] = nextForRow
90-
return {
91-
...prev,
92-
runningCellCount: Math.max(0, prev.runningCellCount + delta),
93-
runningByRowId: nextByRow,
94-
}
90+
return { ...prev, runningByRowId: nextByRow }
9591
})
9692
}
9793

@@ -145,11 +141,7 @@ export function useTableEventStream({
145141
queryKey: tableKeys.activeDispatches(tableId),
146142
})
147143
} else {
148-
updateRunStateCounters(
149-
rowId,
150-
wasInFlight,
151-
isExecInFlight({ status } as RowExecutionMetadata)
152-
)
144+
updateRunningByRow(rowId, wasInFlight, isExecInFlight({ status } as RowExecutionMetadata))
153145
}
154146
}
155147

@@ -195,6 +187,11 @@ export function useTableEventStream({
195187
merged[idx] = next
196188
return { ...base, dispatches: merged }
197189
})
190+
// The dispatcher emits this once per window (after the window's cells
191+
// finish + the cursor advances) and on completion. Re-sync the
192+
// dispatch-scope `runningCellCount` from the server so the badge steps
193+
// down per window and matches a reload exactly.
194+
void queryClient.invalidateQueries({ queryKey: tableKeys.activeDispatches(tableId) })
198195
}
199196

200197
const handlePrune = (payload: PrunedEvent): void => {

apps/sim/hooks/queries/tables.ts

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ import type {
7373
} from '@/lib/table'
7474
import {
7575
areGroupDepsSatisfied,
76-
areOutputsFilled,
7776
isExecInFlight,
7877
optimisticallyScheduleNewlyEligibleGroups,
7978
} from '@/lib/table/deps'
@@ -235,17 +234,34 @@ function countNewlyInFlight(before: RowExecutions, after: RowExecutions): number
235234
return n
236235
}
237236

238-
/** Add optimistically-stamped cells to the run-state counter so the "X running"
239-
* badge + per-row gutter Stop reflect them instantly (the optimistic stamp
240-
* eats the dispatcher's `pending` SSE, so `applyCell` never bumps the count).
241-
* Returns the prior snapshot for rollback, or `null` when nothing was bumped. */
237+
/** The table's maintained, unfiltered `rowCount` from the detail cache (or
238+
* `null` when the detail hasn't loaded). This is the right scope for a Run-all
239+
* estimate: the dispatcher runs every row regardless of the active view
240+
* filter, whereas the rows query's `totalCount` is filter-scoped. */
241+
function readTableRowCount(
242+
queryClient: ReturnType<typeof useQueryClient>,
243+
tableId: string
244+
): number | null {
245+
const def = queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))
246+
return typeof def?.rowCount === 'number' ? def.rowCount : null
247+
}
248+
249+
/** Optimistically reflect a run on the "X running" badge + per-row gutter Stop
250+
* instantly (the optimistic stamp eats the dispatcher's `pending` SSE, so
251+
* `applyCell` never bumps the count, and the server's dispatch-scope count
252+
* isn't live until the first window). `stampedByRow` drives the per-row gutter
253+
* (loaded rows only); `cellCountDelta` is the badge delta — pass the full run
254+
* scope (rows × groups) for Run-all so it matches the server, or omit to use
255+
* the stamped total. Returns the prior snapshot for rollback. */
242256
function bumpRunState(
243257
queryClient: ReturnType<typeof useQueryClient>,
244258
tableId: string,
245-
stampedByRow: Record<string, number>
259+
stampedByRow: Record<string, number>,
260+
cellCountDelta?: number
246261
): { snapshot: TableRunState | undefined } | null {
247-
const total = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
248-
if (total === 0) return null
262+
const stampedTotal = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
263+
const countDelta = cellCountDelta ?? stampedTotal
264+
if (countDelta === 0 && stampedTotal === 0) return null
249265
const snapshot = queryClient.getQueryData<TableRunState>(tableKeys.activeDispatches(tableId))
250266
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
251267
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
@@ -255,7 +271,7 @@ function bumpRunState(
255271
}
256272
return {
257273
...base,
258-
runningCellCount: base.runningCellCount + total,
274+
runningCellCount: base.runningCellCount + countDelta,
259275
runningByRowId: nextByRow,
260276
}
261277
})
@@ -1418,12 +1434,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
14181434
// dispatcher regardless of mode. Stamping pending here would leave
14191435
// the cell flashing Queued indefinitely (no SSE event will arrive).
14201436
if (group && !areGroupDepsSatisfied(group, r)) continue
1421-
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
1422-
// outputs are filled, regardless of exec status. A cancelled/error
1423-
// cell with a leftover value from a prior run was rendering as filled
1424-
// but flipping to "queued" optimistically here even though the server
1425-
// would skip it.
1426-
if (runMode === 'incomplete' && group && areOutputsFilled(group, r)) continue
1437+
// Mirror server eligibility for manual `mode: 'incomplete'`: a
1438+
// `completed` group is done (even with a blank output) — only "Run
1439+
// all" re-runs it. error/cancelled/never-run cells still re-run.
1440+
if (runMode === 'incomplete' && exec?.status === 'completed') continue
14271441
next[groupId] = buildPendingExec(exec)
14281442
// Mirror the server-side bulk clear: wipe output values so the cell
14291443
// doesn't render the stale completed value behind a pending badge.
@@ -1442,7 +1456,14 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
14421456
return { ...r, data: nextData, executions: next }
14431457
})
14441458

1445-
const bumped = bumpRunState(queryClient, tableId, stampedByRow)
1459+
// Badge counts the whole run scope (rows × groups), matching the server's
1460+
// dispatch-scope count — not just the loaded rows we could stamp. For
1461+
// Run-all that's the table's totalCount; for a scoped run, the rowIds.
1462+
const scopeRowCount = targetRowIds
1463+
? targetRowIds.size
1464+
: (readTableRowCount(queryClient, tableId) ?? Object.keys(stampedByRow).length)
1465+
const cellCountDelta = scopeRowCount * targetGroupIds.size
1466+
const bumped = bumpRunState(queryClient, tableId, stampedByRow, cellCountDelta)
14461467
return { snapshots, runStateSnapshot: bumped?.snapshot, didBumpRunState: bumped !== null }
14471468
},
14481469
onError: (_err, _variables, context) => {

apps/sim/lib/table/dispatcher.ts

Lines changed: 93 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -65,67 +65,69 @@ export async function bulkClearWorkflowGroupCells(input: {
6565
// Pre-existing outputs on any other row must not be wiped by an auto-fire.
6666
if (mode === 'new') return
6767

68-
const outputCols = Array.from(new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName))))
6968
const groupIds = groups.map((g) => g.id)
70-
71-
// Step 1: clear the targeted output columns from `data` on every row in
72-
// scope. Identical chain to the previous JSONB-only path.
73-
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
74-
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
75-
76-
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
77-
if (rowIds && rowIds.length > 0) {
78-
filters.push(inArray(userTableRows.id, rowIds))
79-
}
80-
if (mode === 'incomplete') {
81-
// Skip rows where all output columns across all targeted groups already
82-
// have a non-empty value — those are "completed-and-filled" and the
83-
// eligibility predicate would skip them anyway.
84-
const filledChecks = outputCols.map(
85-
(col) => sql`coalesce(${userTableRows.data} ->> ${col}, '') != ''`
69+
const rowScope = rowIds && rowIds.length > 0 ? rowIds : null
70+
71+
if (mode === 'all') {
72+
// Run-all re-runs every targeted group: wipe all their output columns +
73+
// executions for the rows in scope. (Prior in-flight runs were already
74+
// cancelled by the caller.)
75+
const outputCols = Array.from(
76+
new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName)))
8677
)
87-
const allFilled = filledChecks.reduce((acc, expr) => sql`${acc} AND ${expr}`)
88-
filters.push(sql`NOT (${allFilled})`)
89-
// Also skip rows where ANY targeted group has an in-flight exec — those
90-
// belong to another dispatch and clobbering them would race. Encoded as
91-
// a NOT EXISTS subquery against the sidecar's `(table_id, status)`
92-
// partial index.
93-
filters.push(
94-
sql`NOT EXISTS (
78+
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
79+
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
80+
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
81+
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))
82+
83+
await db.transaction(async (trx) => {
84+
await trx
85+
.update(userTableRows)
86+
.set({ data: dataExpr, updatedAt: new Date() })
87+
.where(and(...filters))
88+
const execFilters: SQL[] = [
89+
eq(tableRowExecutions.tableId, tableId),
90+
inArray(tableRowExecutions.groupId, groupIds),
91+
]
92+
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
93+
await trx.delete(tableRowExecutions).where(and(...execFilters))
94+
})
95+
return
96+
}
97+
98+
// `incomplete`: clear per-group, not per-row. Only groups that are
99+
// re-runnable (`error` / `cancelled`) get their output columns + exec wiped;
100+
// `completed` and in-flight groups are left fully intact. A row-level "all
101+
// filled" check would otherwise wipe a completed group's data + exec just
102+
// because a *sibling* group on the same row is incomplete, re-running the
103+
// completed one. (`never-run` groups have no exec/output to clear — the
104+
// dispatcher runs them via eligibility.)
105+
await db.transaction(async (trx) => {
106+
for (const group of groups) {
107+
const reRunnable = sql`EXISTS (
95108
SELECT 1 FROM ${tableRowExecutions} re
96109
WHERE re.row_id = ${userTableRows.id}
97-
AND re.group_id = ANY(ARRAY[${sql.join(
98-
groupIds.map((gid) => sql`${gid}`),
99-
sql`, `
100-
)}]::text[])
101-
AND re.status IN ('queued', 'running', 'pending')
110+
AND re.group_id = ${group.id}
111+
AND re.status IN ('error', 'cancelled')
102112
)`
103-
)
104-
}
113+
const filters: SQL[] = [eq(userTableRows.tableId, tableId), reRunnable]
114+
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))
105115

106-
await db.transaction(async (trx) => {
107-
await trx
108-
.update(userTableRows)
109-
.set({ data: dataExpr, updatedAt: new Date() })
110-
.where(and(...filters))
116+
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
117+
for (const out of group.outputs) dataExpr = sql`(${dataExpr}) - ${out.columnName}::text`
118+
await trx
119+
.update(userTableRows)
120+
.set({ data: dataExpr, updatedAt: new Date() })
121+
.where(and(...filters))
111122

112-
// Step 2: delete the targeted groups' executions for the rows in scope.
113-
// Reuse the same row-scope filter via a subquery.
114-
const execFilters: SQL[] = [
115-
eq(tableRowExecutions.tableId, tableId),
116-
inArray(tableRowExecutions.groupId, groupIds),
117-
]
118-
if (rowIds && rowIds.length > 0) {
119-
execFilters.push(inArray(tableRowExecutions.rowId, rowIds))
120-
}
121-
if (mode === 'incomplete') {
122-
// For `incomplete`, only delete entries that aren't already in-flight
123-
// — terminal states (completed/error/cancelled) get wiped so the
124-
// dispatcher re-enqueues; in-flight entries stay so we don't race
125-
// with their worker.
126-
execFilters.push(sql`${tableRowExecutions.status} NOT IN ('queued', 'running', 'pending')`)
123+
const execFilters: SQL[] = [
124+
eq(tableRowExecutions.tableId, tableId),
125+
eq(tableRowExecutions.groupId, group.id),
126+
sql`${tableRowExecutions.status} IN ('error', 'cancelled')`,
127+
]
128+
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
129+
await trx.delete(tableRowExecutions).where(and(...execFilters))
127130
}
128-
await trx.delete(tableRowExecutions).where(and(...execFilters))
129131
})
130132
}
131133

@@ -193,6 +195,44 @@ export async function countRunningCells(
193195
return { total, byRowId }
194196
}
195197

198+
/** Authoritative "cells queued or running" count for the table, derived from
199+
* active dispatches so it survives reload and matches the live count. For each
200+
* active dispatch every row in scope ahead of the cursor still has to run each
201+
* targeted group, so remaining work = (rows ahead of cursor) × |groupIds|.
202+
* Exact for Run-all; an upper bound for incomplete/new (rows the eligibility
203+
* filter later skips are still counted). Falls back to the sidecar in-flight
204+
* count when no dispatch is active (orphan stragglers). `byRowId` stays
205+
* sidecar-based — the client overlay renders queued rows ahead of the cursor. */
206+
export async function countActiveRunCells(
207+
tableId: string,
208+
dispatches?: DispatchRow[]
209+
): Promise<{ total: number; byRowId: Record<string, number> }> {
210+
const active = dispatches ?? (await listActiveDispatches(tableId))
211+
if (active.length === 0) return countRunningCells(tableId)
212+
213+
const countRowsAhead = async (d: DispatchRow): Promise<number> => {
214+
const groupCount = d.scope.groupIds.length
215+
if (groupCount === 0) return 0
216+
const filters = [eq(userTableRows.tableId, tableId), gt(userTableRows.position, d.cursor)]
217+
if (d.scope.rowIds && d.scope.rowIds.length > 0) {
218+
filters.push(inArray(userTableRows.id, d.scope.rowIds))
219+
}
220+
const [row] = await db
221+
.select({ rowsAhead: sql<number>`count(*)::int` })
222+
.from(userTableRows)
223+
.where(and(...filters))
224+
return (row?.rowsAhead ?? 0) * groupCount
225+
}
226+
227+
// One round-trip per dispatch + the sidecar count, all in parallel.
228+
const [sidecar, perDispatch] = await Promise.all([
229+
countRunningCells(tableId),
230+
Promise.all(active.map(countRowsAhead)),
231+
])
232+
const total = perDispatch.reduce((sum, n) => sum + n, 0)
233+
return { total, byRowId: sidecar.byRowId }
234+
}
235+
196236
export async function listActiveDispatches(tableId: string): Promise<DispatchRow[]> {
197237
const rows = await db
198238
.select()

0 commit comments

Comments
 (0)