Skip to content

Commit 3712d1e

Browse files
authored
feat(mship): make mship block stream output (#4626)
* Mship block stream * Improvements * Fixes * Fix stream * Fix * Fix
1 parent f8ae249 commit 3712d1e

5 files changed

Lines changed: 747 additions & 56 deletions

File tree

apps/sim/app/api/mothership/execute/route.ts

Lines changed: 201 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ import { parseRequest } from '@/lib/api/server'
77
import { checkInternalAuth } from '@/lib/auth/hybrid'
88
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
99
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
10+
import {
11+
MothershipStreamV1EventType,
12+
MothershipStreamV1TextChannel,
13+
} from '@/lib/copilot/generated/mothership-stream-v1'
1014
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
1115
import { requestExplicitStreamAbort } from '@/lib/copilot/request/session/explicit-abort'
16+
import type { StreamEvent } from '@/lib/copilot/request/types'
1217
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1318
import { buildMothershipToolsForRequest } from '@/lib/mothership/settings/runtime'
1419
import {
@@ -19,17 +24,60 @@ import {
1924
export const maxDuration = 3600
2025

2126
const logger = createLogger('MothershipExecuteAPI')
27+
const MOTHERSHIP_EXECUTE_STREAM_HEADER = 'x-mothership-execute-stream'
28+
const MOTHERSHIP_EXECUTE_STREAM_VALUE = 'ndjson'
29+
const MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE = 'application/x-ndjson'
30+
const MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS = 15_000
31+
const ndjsonEncoder = new TextEncoder()
2232

2333
function isAbortError(error: unknown): boolean {
2434
return error instanceof Error && error.name === 'AbortError'
2535
}
2636

37+
function wantsStreamedExecuteResponse(req: NextRequest): boolean {
38+
return (
39+
req.headers.get(MOTHERSHIP_EXECUTE_STREAM_HEADER) === MOTHERSHIP_EXECUTE_STREAM_VALUE ||
40+
req.headers.get('accept')?.includes(MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE) === true
41+
)
42+
}
43+
44+
function encodeNdjson(value: unknown): Uint8Array {
45+
return ndjsonEncoder.encode(`${JSON.stringify(value)}\n`)
46+
}
47+
48+
function buildExecuteResponsePayload(
49+
result: Awaited<ReturnType<typeof runHeadlessCopilotLifecycle>>,
50+
effectiveChatId: string,
51+
integrationTools: Array<{ name: string }>
52+
) {
53+
const clientToolNames = new Set(integrationTools.map((t) => t.name))
54+
const clientToolCalls = (result.toolCalls || []).filter(
55+
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
56+
)
57+
58+
return {
59+
content: result.content,
60+
model: 'mothership',
61+
conversationId: effectiveChatId,
62+
tokens: result.usage
63+
? {
64+
prompt: result.usage.prompt,
65+
completion: result.usage.completion,
66+
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
67+
}
68+
: {},
69+
cost: result.cost || undefined,
70+
toolCalls: clientToolCalls,
71+
}
72+
}
73+
2774
/**
2875
* POST /api/mothership/execute
2976
*
30-
* Non-streaming endpoint for Mothership block execution within workflows.
31-
* Called by the executor via internal JWT auth, not by the browser directly.
32-
* Consumes the Go SSE stream internally and returns a single JSON response.
77+
* Endpoint for Mothership block execution within workflows. Called by the
78+
* executor via internal JWT auth, not by the browser directly. JSON callers get
79+
* a single final response; NDJSON callers get heartbeats followed by a final
80+
* event so long-running headless requests do not look idle to HTTP stacks.
3381
*/
3482
export const POST = withRouteHandler(async (req: NextRequest) => {
3583
let messageId: string | undefined
@@ -100,7 +148,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
100148

101149
let allowExplicitAbort = true
102150
let explicitAbortRequest: Promise<void> | undefined
103-
const onAbort = () => {
151+
const lifecycleAbortController = new AbortController()
152+
const requestExplicitAbortOnce = () => {
104153
if (!allowExplicitAbort || explicitAbortRequest || !messageId) {
105154
return
106155
}
@@ -115,15 +164,24 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
115164
})
116165
})
117166
}
167+
const abortLifecycle = (reason?: unknown) => {
168+
if (!lifecycleAbortController.signal.aborted) {
169+
lifecycleAbortController.abort(reason ?? 'mothership_execute_aborted')
170+
}
171+
requestExplicitAbortOnce()
172+
}
173+
const onAbort = () => {
174+
abortLifecycle(req.signal.reason ?? 'request_aborted')
175+
}
118176

119177
if (req.signal.aborted) {
120178
onAbort()
121179
} else {
122180
req.signal.addEventListener('abort', onAbort, { once: true })
123181
}
124182

125-
try {
126-
const result = await runHeadlessCopilotLifecycle(requestPayload, {
183+
const runLifecycle = (onEvent?: (event: StreamEvent) => Promise<void>) =>
184+
runHeadlessCopilotLifecycle(requestPayload, {
127185
userId,
128186
workspaceId,
129187
chatId: effectiveChatId,
@@ -133,12 +191,145 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
133191
goRoute: '/api/mothership/execute',
134192
autoExecuteTools: true,
135193
interactive: false,
136-
abortSignal: req.signal,
194+
abortSignal: lifecycleAbortController.signal,
195+
onEvent,
137196
})
138197

198+
if (wantsStreamedExecuteResponse(req)) {
199+
let cancelled = false
200+
let heartbeatId: ReturnType<typeof setInterval> | undefined
201+
202+
const stream = new ReadableStream<Uint8Array>({
203+
start(controller) {
204+
let forwardedAssistantContent = ''
205+
const send = (event: unknown) => {
206+
if (!cancelled) {
207+
controller.enqueue(encodeNdjson(event))
208+
}
209+
}
210+
211+
// Flush response headers promptly and keep long headless runs from
212+
// looking idle to worker/proxy HTTP stacks.
213+
send({ type: 'heartbeat', timestamp: new Date().toISOString() })
214+
heartbeatId = setInterval(() => {
215+
send({ type: 'heartbeat', timestamp: new Date().toISOString() })
216+
}, MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS)
217+
218+
void (async () => {
219+
try {
220+
const result = await runLifecycle(async (event) => {
221+
if (
222+
event.type === MothershipStreamV1EventType.text &&
223+
event.payload.channel === MothershipStreamV1TextChannel.assistant &&
224+
event.payload.text
225+
) {
226+
const text = event.payload.text
227+
const content = text.startsWith(forwardedAssistantContent)
228+
? text.slice(forwardedAssistantContent.length)
229+
: text
230+
if (content) {
231+
forwardedAssistantContent += content
232+
send({ type: 'chunk', content })
233+
}
234+
}
235+
})
236+
allowExplicitAbort = false
237+
238+
if (lifecycleAbortController.signal.aborted) {
239+
send({ type: 'error', error: 'Mothership execution aborted' })
240+
return
241+
}
242+
243+
if (!result.success) {
244+
logger.error(
245+
messageId
246+
? `Mothership execute failed [messageId:${messageId}]`
247+
: 'Mothership execute failed',
248+
{
249+
requestId,
250+
workflowId,
251+
executionId,
252+
error: result.error,
253+
errors: result.errors,
254+
}
255+
)
256+
send({
257+
type: 'error',
258+
error: result.error || 'Mothership execution failed',
259+
content: result.content || '',
260+
})
261+
return
262+
}
263+
264+
send({
265+
type: 'final',
266+
data: buildExecuteResponsePayload(result, effectiveChatId, integrationTools),
267+
})
268+
} catch (error) {
269+
if (
270+
lifecycleAbortController.signal.aborted ||
271+
req.signal.aborted ||
272+
isAbortError(error)
273+
) {
274+
logger.info(
275+
messageId
276+
? `Mothership execute aborted [messageId:${messageId}]`
277+
: 'Mothership execute aborted',
278+
{ requestId }
279+
)
280+
send({ type: 'error', error: 'Mothership execution aborted' })
281+
return
282+
}
283+
284+
logger.error(
285+
messageId
286+
? `Mothership execute error [messageId:${messageId}]`
287+
: 'Mothership execute error',
288+
{
289+
requestId,
290+
error: error instanceof Error ? error.message : 'Unknown error',
291+
}
292+
)
293+
send({
294+
type: 'error',
295+
error: error instanceof Error ? error.message : 'Internal server error',
296+
})
297+
} finally {
298+
allowExplicitAbort = false
299+
if (heartbeatId) {
300+
clearInterval(heartbeatId)
301+
}
302+
req.signal.removeEventListener('abort', onAbort)
303+
await explicitAbortRequest
304+
if (!cancelled) {
305+
controller.close()
306+
}
307+
}
308+
})()
309+
},
310+
cancel(reason) {
311+
cancelled = true
312+
if (heartbeatId) {
313+
clearInterval(heartbeatId)
314+
}
315+
abortLifecycle(reason ?? 'mothership_execute_stream_cancelled')
316+
},
317+
})
318+
319+
return new Response(stream, {
320+
headers: {
321+
'Content-Type': `${MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE}; charset=utf-8`,
322+
'Cache-Control': 'no-cache, no-transform',
323+
},
324+
})
325+
}
326+
327+
try {
328+
const result = await runLifecycle()
329+
139330
allowExplicitAbort = false
140331

141-
if (req.signal.aborted) {
332+
if (lifecycleAbortController.signal.aborted || req.signal.aborted) {
142333
reqLogger.info('Mothership execute aborted after lifecycle completion')
143334
return NextResponse.json({ error: 'Mothership execution aborted' }, { status: 499 })
144335
}
@@ -165,25 +356,9 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
165356
)
166357
}
167358

168-
const clientToolNames = new Set(integrationTools.map((t) => t.name))
169-
const clientToolCalls = (result.toolCalls || []).filter(
170-
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
359+
return NextResponse.json(
360+
buildExecuteResponsePayload(result, effectiveChatId, integrationTools)
171361
)
172-
173-
return NextResponse.json({
174-
content: result.content,
175-
model: 'mothership',
176-
conversationId: effectiveChatId,
177-
tokens: result.usage
178-
? {
179-
prompt: result.usage.prompt,
180-
completion: result.usage.completion,
181-
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
182-
}
183-
: {},
184-
cost: result.cost || undefined,
185-
toolCalls: clientToolCalls,
186-
})
187362
} finally {
188363
allowExplicitAbort = false
189364
req.signal.removeEventListener('abort', onAbort)

0 commit comments

Comments
 (0)