Skip to content

Commit 0cdaa49

Browse files
committed
make resume sync
1 parent c47b4a6 commit 0cdaa49

File tree

7 files changed

+123
-23
lines changed

7 files changed

+123
-23
lines changed

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,36 @@ export async function POST(
132132
workflowId,
133133
})
134134

135-
PauseResumeManager.startResumeExecution({
135+
const resumeArgs = {
136136
resumeEntryId: enqueueResult.resumeEntryId,
137137
resumeExecutionId: enqueueResult.resumeExecutionId,
138138
pausedExecution: enqueueResult.pausedExecution,
139139
contextId: enqueueResult.contextId,
140140
resumeInput: enqueueResult.resumeInput,
141141
userId: enqueueResult.userId,
142-
}).catch((error) => {
142+
}
143+
144+
const isApiCaller = access.auth?.authType === AuthType.API_KEY
145+
146+
if (isApiCaller) {
147+
const result = await PauseResumeManager.startResumeExecution(resumeArgs)
148+
149+
return NextResponse.json({
150+
success: result.success,
151+
executionId: enqueueResult.resumeExecutionId,
152+
output: result.output,
153+
error: result.error,
154+
metadata: result.metadata
155+
? {
156+
duration: result.metadata.duration,
157+
startTime: result.metadata.startTime,
158+
endTime: result.metadata.endTime,
159+
}
160+
: undefined,
161+
})
162+
}
163+
164+
PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => {
143165
logger.error('Failed to start resume execution', {
144166
workflowId,
145167
parentExecutionId: executionId,

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,25 +1400,42 @@ async function handleExecutePost(
14001400
return
14011401
}
14021402

1403-
sendEvent({
1404-
type: 'execution:completed',
1405-
timestamp: new Date().toISOString(),
1406-
executionId,
1407-
workflowId,
1408-
data: {
1409-
success: result.success,
1410-
output: includeFileBase64
1411-
? await hydrateUserFilesWithBase64(result.output, {
1412-
requestId,
1413-
executionId,
1414-
maxBytes: base64MaxBytes,
1415-
})
1416-
: result.output,
1417-
duration: result.metadata?.duration || 0,
1418-
startTime: result.metadata?.startTime || startTime.toISOString(),
1419-
endTime: result.metadata?.endTime || new Date().toISOString(),
1420-
},
1421-
})
1403+
const sseOutput = includeFileBase64
1404+
? await hydrateUserFilesWithBase64(result.output, {
1405+
requestId,
1406+
executionId,
1407+
maxBytes: base64MaxBytes,
1408+
})
1409+
: result.output
1410+
1411+
if (result.status === 'paused') {
1412+
sendEvent({
1413+
type: 'execution:paused',
1414+
timestamp: new Date().toISOString(),
1415+
executionId,
1416+
workflowId,
1417+
data: {
1418+
output: sseOutput,
1419+
duration: result.metadata?.duration || 0,
1420+
startTime: result.metadata?.startTime || startTime.toISOString(),
1421+
endTime: result.metadata?.endTime || new Date().toISOString(),
1422+
},
1423+
})
1424+
} else {
1425+
sendEvent({
1426+
type: 'execution:completed',
1427+
timestamp: new Date().toISOString(),
1428+
executionId,
1429+
workflowId,
1430+
data: {
1431+
success: result.success,
1432+
output: sseOutput,
1433+
duration: result.metadata?.duration || 0,
1434+
startTime: result.metadata?.startTime || startTime.toISOString(),
1435+
endTime: result.metadata?.endTime || new Date().toISOString(),
1436+
},
1437+
})
1438+
}
14221439
finalMetaStatus = 'complete'
14231440
} catch (error: unknown) {
14241441
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()

apps/sim/hooks/use-execution-stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type {
99
ExecutionCompletedData,
1010
ExecutionErrorData,
1111
ExecutionEvent,
12+
ExecutionPausedData,
1213
ExecutionStartedData,
1314
StreamChunkData,
1415
StreamDoneData,
@@ -74,6 +75,9 @@ export async function processSSEStream(
7475
case 'execution:completed':
7576
callbacks.onExecutionCompleted?.(event.data)
7677
break
78+
case 'execution:paused':
79+
callbacks.onExecutionPaused?.(event.data)
80+
break
7781
case 'execution:error':
7882
callbacks.onExecutionError?.(event.data)
7983
break
@@ -114,6 +118,7 @@ export async function processSSEStream(
114118
export interface ExecutionStreamCallbacks {
115119
onExecutionStarted?: (data: ExecutionStartedData) => void
116120
onExecutionCompleted?: (data: ExecutionCompletedData) => void
121+
onExecutionPaused?: (data: ExecutionPausedData) => void
117122
onExecutionError?: (data: ExecutionErrorData) => void
118123
onExecutionCancelled?: (data: ExecutionCancelledData) => void
119124
onBlockStarted?: (data: BlockStartedData) => void

apps/sim/lib/a2a/utils.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ export interface ParsedSSEChunk {
264264
/** Final success flag if this chunk contains the final event */
265265
finalSuccess?: boolean
266266
/** Terminal task state if known */
267-
terminalState?: 'completed' | 'failed' | 'canceled'
267+
terminalState?: 'completed' | 'failed' | 'canceled' | 'input-required'
268268
/** Final artifacts if present on terminal event */
269269
finalArtifacts?: Artifact[]
270270
/** Whether this chunk indicates the stream is done */
@@ -326,6 +326,15 @@ export function parseWorkflowSSEChunk(chunk: string): ParsedSSEChunk {
326326
result.finalSuccess = parsed.data?.success !== false
327327
result.terminalState = result.finalSuccess ? 'completed' : 'failed'
328328
result.isDone = true
329+
} else if (parsed.type === 'execution:paused') {
330+
if (parsed.data?.output?.content) {
331+
result.finalContent = parsed.data.output.content
332+
} else if (parsed.data?.output) {
333+
result.finalContent = JSON.stringify(parsed.data.output)
334+
}
335+
result.finalSuccess = true
336+
result.terminalState = 'input-required'
337+
result.isDone = true
329338
} else if (parsed.type === 'execution:cancelled') {
330339
result.finalSuccess = false
331340
result.terminalState = 'canceled'

apps/sim/lib/workflows/executor/execution-events.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type { SubflowType } from '@/stores/workflows/workflow/types'
88
export type ExecutionEventType =
99
| 'execution:started'
1010
| 'execution:completed'
11+
| 'execution:paused'
1112
| 'execution:error'
1213
| 'execution:cancelled'
1314
| 'block:started'
@@ -53,6 +54,20 @@ export interface ExecutionCompletedEvent extends BaseExecutionEvent {
5354
}
5455
}
5556

57+
/**
58+
* Execution paused event (HITL block waiting for human input)
59+
*/
60+
export interface ExecutionPausedEvent extends BaseExecutionEvent {
61+
type: 'execution:paused'
62+
workflowId: string
63+
data: {
64+
output: any
65+
duration: number
66+
startTime: string
67+
endTime: string
68+
}
69+
}
70+
5671
/**
5772
* Execution error event
5873
*/
@@ -196,6 +211,7 @@ export interface StreamDoneEvent extends BaseExecutionEvent {
196211
export type ExecutionEvent =
197212
| ExecutionStartedEvent
198213
| ExecutionCompletedEvent
214+
| ExecutionPausedEvent
199215
| ExecutionErrorEvent
200216
| ExecutionCancelledEvent
201217
| BlockStartedEvent
@@ -207,6 +223,7 @@ export type ExecutionEvent =
207223

208224
export type ExecutionStartedData = ExecutionStartedEvent['data']
209225
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
226+
export type ExecutionPausedData = ExecutionPausedEvent['data']
210227
export type ExecutionErrorData = ExecutionErrorEvent['data']
211228
export type ExecutionCancelledData = ExecutionCancelledEvent['data']
212229
export type BlockStartedData = BlockStartedEvent['data']

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ export class PauseResumeManager {
292292
})
293293
}
294294

295-
static async startResumeExecution(args: StartResumeExecutionArgs): Promise<void> {
295+
static async startResumeExecution(args: StartResumeExecutionArgs): Promise<ExecutionResult> {
296296
const { resumeEntryId, resumeExecutionId, pausedExecution, contextId, resumeInput, userId } =
297297
args
298298

@@ -357,6 +357,8 @@ export class PauseResumeManager {
357357
})
358358

359359
await PauseResumeManager.processQueuedResumes(pausedExecution.executionId)
360+
361+
return result
360362
} catch (error) {
361363
await PauseResumeManager.markResumeFailed({
362364
resumeEntryId,
@@ -995,6 +997,20 @@ export class PauseResumeManager {
995997
data: { duration: result.metadata?.duration || 0 },
996998
} as ExecutionEvent)
997999
finalMetaStatus = 'cancelled'
1000+
} else if (result.status === 'paused') {
1001+
writeBufferedEvent({
1002+
type: 'execution:paused',
1003+
timestamp: new Date().toISOString(),
1004+
executionId: resumeExecutionId,
1005+
workflowId,
1006+
data: {
1007+
output: result.output,
1008+
duration: result.metadata?.duration || 0,
1009+
startTime: result.metadata?.startTime || new Date().toISOString(),
1010+
endTime: result.metadata?.endTime || new Date().toISOString(),
1011+
},
1012+
} as ExecutionEvent)
1013+
finalMetaStatus = 'complete'
9981014
} else {
9991015
writeBufferedEvent({
10001016
type: 'execution:completed',

apps/sim/lib/workflows/executor/queued-workflow-execution.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,20 @@ export async function executeQueuedWorkflowJob(
216216
},
217217
})
218218
await setExecutionMeta(executionId, { status: 'cancelled' })
219+
} else if (result.status === 'paused') {
220+
await eventWriter.write({
221+
type: 'execution:paused',
222+
timestamp: new Date().toISOString(),
223+
executionId,
224+
workflowId,
225+
data: {
226+
output: outputWithBase64,
227+
duration: result.metadata?.duration || 0,
228+
startTime: result.metadata?.startTime || metadata.startTime,
229+
endTime: result.metadata?.endTime || new Date().toISOString(),
230+
},
231+
})
232+
await setExecutionMeta(executionId, { status: 'complete' })
219233
} else {
220234
await eventWriter.write({
221235
type: 'execution:completed',

0 commit comments

Comments
 (0)