@@ -2,9 +2,9 @@ import { db } from '@sim/db'
22import { tableRunDispatches , userTableRows } from '@sim/db/schema'
33import { createLogger } from '@sim/logger'
44import { generateId } from '@sim/utils/id'
5- import { and , asc , eq , gt , inArray , sql } from 'drizzle-orm'
5+ import { and , asc , eq , gt , inArray , type SQL , sql } from 'drizzle-orm'
66import { appendTableEvent } from '@/lib/table/events'
7- import type { RowData , TableRow } from '@/lib/table/types'
7+ import type { TableRow } from '@/lib/table/types'
88import {
99 isGroupEligible ,
1010 scheduleRunsForRows ,
@@ -44,6 +44,50 @@ export interface DispatchRow {
4444
4545export type DispatcherStepResult = 'continue' | 'done'
4646
47+ /** Eager bulk clear at click time so the user sees every targeted cell go
48+ * blank/Pending instantly — without it, only the rows the dispatcher has
49+ * reached visibly change, and the rest sit on stale data until the cursor
50+ * walks to them. For `mode: 'incomplete'` we skip rows whose outputs are
51+ * already filled, mirroring the eligibility predicate. */
52+ export async function bulkClearWorkflowGroupCells ( input : {
53+ tableId : string
54+ groups : Array < { id : string ; outputs : Array < { columnName : string } > } >
55+ rowIds ?: string [ ]
56+ mode : DispatchMode
57+ } ) : Promise < void > {
58+ const { tableId, groups, rowIds, mode } = input
59+ if ( groups . length === 0 ) return
60+
61+ const outputCols = Array . from ( new Set ( groups . flatMap ( ( g ) => g . outputs . map ( ( o ) => o . columnName ) ) ) )
62+ const groupIds = groups . map ( ( g ) => g . id )
63+
64+ // Build `data - 'col1' - 'col2' - ...` and `executions - 'gid1' - 'gid2' - ...`.
65+ let dataExpr : SQL = sql `coalesce(${ userTableRows . data } , '{}'::jsonb)`
66+ for ( const col of outputCols ) dataExpr = sql `(${ dataExpr } ) - ${ col } ::text`
67+ let execExpr : SQL = sql `coalesce(${ userTableRows . executions } , '{}'::jsonb)`
68+ for ( const gid of groupIds ) execExpr = sql `(${ execExpr } ) - ${ gid } ::text`
69+
70+ const filters : SQL [ ] = [ eq ( userTableRows . tableId , tableId ) ]
71+ if ( rowIds && rowIds . length > 0 ) {
72+ filters . push ( inArray ( userTableRows . id , rowIds ) )
73+ }
74+ if ( mode === 'incomplete' ) {
75+ // Skip rows where all output columns across all targeted groups already
76+ // have a non-empty value — those are "completed-and-filled" and the
77+ // eligibility predicate would skip them anyway.
78+ const filledChecks = outputCols . map (
79+ ( col ) => sql `coalesce(${ userTableRows . data } ->> ${ col } , '') != ''`
80+ )
81+ const allFilled = filledChecks . reduce ( ( acc , expr ) => sql `${ acc } AND ${ expr } ` )
82+ filters . push ( sql `NOT (${ allFilled } )` )
83+ }
84+
85+ await db
86+ . update ( userTableRows )
87+ . set ( { data : dataExpr , executions : execExpr , updatedAt : new Date ( ) } )
88+ . where ( and ( ...filters ) )
89+ }
90+
4791export async function insertDispatch ( input : {
4892 tableId : string
4993 workspaceId : string
@@ -95,14 +139,7 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
95139 }
96140 if ( dispatch . status === 'cancelled' || dispatch . status === 'complete' ) return 'done'
97141
98- if ( dispatch . status === 'pending' ) {
99- await db
100- . update ( tableRunDispatches )
101- . set ( { status : 'dispatching' } )
102- . where ( eq ( tableRunDispatches . id , dispatchId ) )
103- }
104-
105- const { getTableById, batchUpdateRows } = await import ( './service' )
142+ const { getTableById } = await import ( './service' )
106143 const table = await getTableById ( dispatch . tableId )
107144 if ( ! table ) {
108145 logger . warn ( `[${ dispatchId } ] table ${ dispatch . tableId } missing — completing dispatch` )
@@ -117,6 +154,23 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
117154 return 'done'
118155 }
119156
157+ // First iteration: wipe every targeted cell across the whole table so the
158+ // user sees the column flip to empty/Pending immediately. The cancel
159+ // tombstone is preserved because the clear runs before any per-row cancels
160+ // could have landed (cancel routes write cells after dispatch insertion).
161+ if ( dispatch . status === 'pending' ) {
162+ await bulkClearWorkflowGroupCells ( {
163+ tableId : dispatch . tableId ,
164+ groups : targetGroups . map ( ( g ) => ( { id : g . id , outputs : g . outputs } ) ) ,
165+ rowIds : dispatch . scope . rowIds ,
166+ mode : dispatch . mode ,
167+ } )
168+ await db
169+ . update ( tableRunDispatches )
170+ . set ( { status : 'dispatching' } )
171+ . where ( eq ( tableRunDispatches . id , dispatchId ) )
172+ }
173+
120174 const filters = [
121175 eq ( userTableRows . tableId , dispatch . tableId ) ,
122176 gt ( userTableRows . position , dispatch . cursor ) ,
@@ -143,19 +197,13 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
143197 return 'done'
144198 }
145199
146- type Update = {
147- rowId : string
148- data : RowData
149- executionsPatch : Record < string , null >
150- }
151- const updates : Update [ ] = [ ]
152- const clearedRows : TableRow [ ] = [ ]
200+ // Rows were bulk-cleared at click time, so the chunk is ready to enqueue
201+ // as-is. We only filter out cells the user cancelled mid-cascade (the
202+ // tombstone) and cells whose deps still aren't satisfied.
203+ const eligibleRows : TableRow [ ] = [ ]
153204 for ( const r of chunk ) {
154205 const tableRow = toTableRow ( r )
155- const eligibleGroups = targetGroups . filter ( ( g ) => {
156- // Skip cells the user explicitly cancelled after this dispatch
157- // started — a per-row cancel mid-cascade must stick even under
158- // isManualRun, otherwise the dispatcher resurrects the row.
206+ const anyEligible = targetGroups . some ( ( g ) => {
159207 const exec = tableRow . executions ?. [ g . id ]
160208 if ( exec ?. cancelledAt ) {
161209 const cancelledAtMs = Date . parse ( exec . cancelledAt )
@@ -165,47 +213,20 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
165213 }
166214 return isGroupEligible ( g , tableRow , { isManualRun : true , mode : dispatch . mode } )
167215 } )
168- if ( eligibleGroups . length === 0 ) continue
169-
170- const clearedData : RowData = { }
171- const executionsPatch : Record < string , null > = { }
172- for ( const g of eligibleGroups ) {
173- for ( const o of g . outputs ) clearedData [ o . columnName ] = null
174- executionsPatch [ g . id ] = null
175- }
176- updates . push ( { rowId : r . id , data : clearedData , executionsPatch } )
177-
178- const remainingExec = { ...tableRow . executions }
179- for ( const g of eligibleGroups ) delete remainingExec [ g . id ]
180- clearedRows . push ( {
181- ...tableRow ,
182- data : { ...tableRow . data , ...clearedData } ,
183- executions : remainingExec ,
184- } )
216+ if ( anyEligible ) eligibleRows . push ( tableRow )
185217 }
186218
187219 // Cursor advances to the last position in this chunk regardless of
188- // eligibility — otherwise a window full of completed cells loops forever.
220+ // eligibility — otherwise a window full of skipped cells loops forever.
189221 const lastPosition = chunk [ chunk . length - 1 ] . position
190222
191- if ( updates . length > 0 ) {
192- await batchUpdateRows (
193- {
194- tableId : dispatch . tableId ,
195- updates,
196- workspaceId : dispatch . workspaceId ,
197- skipScheduler : true ,
198- } ,
199- table ,
200- dispatch . requestId
201- )
202-
223+ if ( eligibleRows . length > 0 ) {
203224 const scheduleOpts : ScheduleOpts = {
204225 isManualRun : true ,
205226 groupIds : dispatch . scope . groupIds ,
206227 mode : dispatch . mode ,
207228 }
208- await scheduleRunsForRows ( table , clearedRows , scheduleOpts )
229+ await scheduleRunsForRows ( table , eligibleRows , scheduleOpts )
209230 }
210231
211232 await Promise . all ( [
0 commit comments