Skip to content

Commit 56c48bc

Browse files
committed
preserve exact large-value access through workflow materialization
1 parent 47e693c commit 56c48bc

38 files changed

Lines changed: 1346 additions & 114 deletions

apps/sim/app/api/function/execute/route.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from '@/lib/copilot/request/tools/files'
1111
import { isE2bEnabled } from '@/lib/core/config/feature-flags'
1212
import { generateRequestId } from '@/lib/core/utils/request'
13+
import { collectUserFileKeys } from '@/lib/core/utils/user-file'
1314
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1415
import { executeInE2B, executeShellInE2B } from '@/lib/execution/e2b'
1516
import { executeInIsolatedVM, type IsolatedVMBrokerHandler } from '@/lib/execution/isolated-vm'
@@ -18,6 +19,7 @@ import {
1819
isLargeArrayManifest,
1920
materializeLargeArrayManifest,
2021
} from '@/lib/execution/payloads/large-array-manifest'
22+
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
2123
import { containsLargeValueRef, isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
2224
import {
2325
MAX_FUNCTION_INLINE_BYTES,
@@ -703,6 +705,8 @@ interface FunctionRouteExecutionContext {
703705
workspaceId?: string
704706
executionId?: string
705707
largeValueExecutionIds?: string[]
708+
largeValueKeys?: string[]
709+
fileKeys?: string[]
706710
allowLargeValueWorkflowScope?: boolean
707711
userId?: string
708712
requestId: string
@@ -745,17 +749,42 @@ function getBrokerFileArgs(args: unknown): {
745749
function createFunctionRuntimeBrokers(
746750
context: FunctionRouteExecutionContext
747751
): Record<string, IsolatedVMBrokerHandler> {
752+
context.largeValueKeys ??= []
753+
context.fileKeys ??= []
754+
const largeValueKeys = context.largeValueKeys
755+
const fileKeys = context.fileKeys
748756
const base = {
749757
requestId: context.requestId,
750758
workflowId: context.workflowId,
751759
workspaceId: context.workspaceId,
752760
executionId: context.executionId,
753761
largeValueExecutionIds: context.largeValueExecutionIds,
762+
largeValueKeys,
763+
fileKeys,
754764
allowLargeValueWorkflowScope: context.allowLargeValueWorkflowScope,
755765
userId: context.userId,
756766
logger,
757767
}
758768

769+
const recordMaterializedKeys = (value: unknown) => {
770+
const keys = collectLargeValueKeys(value)
771+
const existingKeys = new Set(largeValueKeys)
772+
for (const key of keys) {
773+
if (!existingKeys.has(key)) {
774+
existingKeys.add(key)
775+
largeValueKeys.push(key)
776+
}
777+
}
778+
const keysForFiles = collectUserFileKeys(value)
779+
const existingFileKeys = new Set(fileKeys)
780+
for (const key of keysForFiles) {
781+
if (!existingFileKeys.has(key)) {
782+
existingFileKeys.add(key)
783+
fileKeys.push(key)
784+
}
785+
}
786+
}
787+
759788
const readFile = async (args: unknown, encoding: 'base64' | 'text', chunked = false) => {
760789
const fileArgs = getBrokerFileArgs(args)
761790
return readUserFileContent(fileArgs.file, {
@@ -790,6 +819,7 @@ function createFunctionRuntimeBrokers(
790819
if (value === undefined) {
791820
throw unavailableLargeValueError(ref)
792821
}
822+
recordMaterializedKeys(value)
793823
return value
794824
},
795825
'sim.values.readArray': async (args) => {
@@ -802,10 +832,12 @@ function createFunctionRuntimeBrokers(
802832
if (!context.executionId) {
803833
throw new Error('Large array manifests require an execution context.')
804834
}
805-
return materializeLargeArrayManifest(manifest, {
835+
const value = await materializeLargeArrayManifest(manifest, {
806836
...base,
807837
maxBytes: clampInlineBytes(options.maxBytes, MAX_INLINE_MATERIALIZATION_BYTES),
808838
})
839+
recordMaterializedKeys(value)
840+
return value
809841
},
810842
}
811843
}
@@ -829,7 +861,17 @@ async function functionJsonResponse<T>(
829861
context: FunctionRouteExecutionContext,
830862
init?: ResponseInit
831863
) {
832-
return NextResponse.json(await compactFunctionRouteBody(body, context), init)
864+
return NextResponse.json(
865+
await compactFunctionRouteBody(
866+
{
867+
...body,
868+
largeValueKeys: context.largeValueKeys,
869+
fileKeys: context.fileKeys,
870+
},
871+
context
872+
),
873+
init
874+
)
833875
}
834876

835877
async function maybeExportSandboxFileToWorkspace(args: {
@@ -974,6 +1016,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
9741016
workflowId,
9751017
executionId,
9761018
largeValueExecutionIds,
1019+
largeValueKeys,
1020+
fileKeys,
9771021
allowLargeValueWorkflowScope = false,
9781022
workspaceId,
9791023
isCustomTool = false,
@@ -998,6 +1042,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
9981042
workspaceId,
9991043
executionId,
10001044
largeValueExecutionIds,
1045+
largeValueKeys,
1046+
fileKeys,
10011047
allowLargeValueWorkflowScope,
10021048
userId: auth.userId,
10031049
requestId,

apps/sim/app/api/workflows/[id]/execute/response-block.test.ts

Lines changed: 219 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
import { beforeEach, describe, expect, it, vi } from 'vitest'
99
import { AuthType } from '@/lib/auth/hybrid'
1010
import { clearLargeValueCacheForTests } from '@/lib/execution/payloads/cache'
11+
import { createLargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest'
1112
import { compactExecutionPayload } from '@/lib/execution/payloads/serializer'
13+
import { storeLargeValue } from '@/lib/execution/payloads/store'
1214
import { EXECUTION_RESOURCE_LIMIT_CODE } from '@/lib/execution/resource-errors'
1315
import type { ExecutionResult } from '@/lib/workflows/types'
1416
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
1517

16-
const { mockUploadFile } = vi.hoisted(() => ({
18+
const { mockDownloadFile, mockUploadFile, uploadedFiles } = vi.hoisted(() => ({
19+
mockDownloadFile: vi.fn(),
1720
mockUploadFile: vi.fn(),
21+
uploadedFiles: new Map<string, Buffer>(),
1822
}))
1923

2024
const MATERIALIZATION_CONTEXT = {
@@ -26,6 +30,7 @@ const MATERIALIZATION_CONTEXT = {
2630

2731
vi.mock('@/lib/uploads', () => ({
2832
StorageService: {
33+
downloadFile: mockDownloadFile,
2934
uploadFile: mockUploadFile,
3035
},
3136
}))
@@ -60,7 +65,14 @@ describe('Response block gating by auth type', () => {
6065
beforeEach(() => {
6166
vi.clearAllMocks()
6267
clearLargeValueCacheForTests()
63-
mockUploadFile.mockImplementation(async ({ customKey }) => ({ key: customKey }))
68+
uploadedFiles.clear()
69+
mockUploadFile.mockImplementation(async ({ customKey, file }) => {
70+
uploadedFiles.set(customKey, file)
71+
return { key: customKey }
72+
})
73+
mockDownloadFile.mockImplementation(
74+
async ({ key }) => uploadedFiles.get(key) ?? Buffer.from('{}')
75+
)
6476
resultWithResponseBlock = buildExecutionResult()
6577
})
6678

@@ -165,6 +177,211 @@ describe('Response block gating by auth type', () => {
165177
expect(body.success).toBeUndefined()
166178
})
167179

180+
it('should materialize Response block manifests from an allowed source execution', async () => {
181+
const rows = [{ key: 'SIM-1' }, { key: 'SIM-2' }]
182+
const manifest = await createLargeArrayManifest(rows, {
183+
...MATERIALIZATION_CONTEXT,
184+
executionId: 'source-execution-1',
185+
})
186+
187+
const response = await createHttpResponseFromBlock(
188+
buildExecutionResult({
189+
output: {
190+
data: { rows: manifest },
191+
status: 200,
192+
headers: {},
193+
},
194+
}),
195+
{
196+
...MATERIALIZATION_CONTEXT,
197+
largeValueExecutionIds: ['source-execution-1'],
198+
}
199+
)
200+
const body = await response.json()
201+
202+
expect(body.rows).toEqual(rows)
203+
})
204+
205+
it('should reject Response block manifests from non-source same-workflow executions', async () => {
206+
const manifest = await createLargeArrayManifest([{ key: 'SIM-stale' }], {
207+
...MATERIALIZATION_CONTEXT,
208+
executionId: 'stale-execution-1',
209+
})
210+
211+
await expect(
212+
createHttpResponseFromBlock(
213+
buildExecutionResult({
214+
output: {
215+
data: { rows: manifest },
216+
status: 200,
217+
headers: {},
218+
},
219+
}),
220+
{
221+
...MATERIALIZATION_CONTEXT,
222+
largeValueExecutionIds: ['source-execution-1'],
223+
}
224+
)
225+
).rejects.toThrow('Large execution value is not available in this execution')
226+
})
227+
228+
it('should materialize Response block manifests inherited by the source snapshot', async () => {
229+
const rows = [{ key: 'SIM-inherited' }]
230+
const manifest = await createLargeArrayManifest(rows, {
231+
...MATERIALIZATION_CONTEXT,
232+
executionId: 'original-execution-1',
233+
})
234+
235+
const response = await createHttpResponseFromBlock(
236+
buildExecutionResult({
237+
output: {
238+
data: { rows: manifest },
239+
status: 200,
240+
headers: {},
241+
},
242+
}),
243+
{
244+
...MATERIALIZATION_CONTEXT,
245+
largeValueExecutionIds: ['source-execution-1', 'original-execution-1'],
246+
}
247+
)
248+
249+
const body = await response.json()
250+
251+
expect(body.rows).toEqual(rows)
252+
})
253+
254+
it('should recursively materialize refs inside Response block manifest rows', async () => {
255+
const text = 'nested'.repeat(2 * 1024 * 1024)
256+
const nestedOutput = await compactExecutionPayload(
257+
{ text },
258+
{
259+
...MATERIALIZATION_CONTEXT,
260+
executionId: 'original-execution-1',
261+
requireDurable: true,
262+
preserveRoot: true,
263+
}
264+
)
265+
const nestedRef = (nestedOutput as unknown as { text: unknown }).text
266+
const manifest = await createLargeArrayManifest([{ nested: nestedRef }], {
267+
...MATERIALIZATION_CONTEXT,
268+
executionId: 'source-execution-1',
269+
})
270+
const response = await createHttpResponseFromBlock(
271+
buildExecutionResult({
272+
output: {
273+
data: { rows: manifest },
274+
status: 200,
275+
headers: {},
276+
},
277+
}),
278+
{
279+
...MATERIALIZATION_CONTEXT,
280+
largeValueExecutionIds: ['source-execution-1'],
281+
}
282+
)
283+
284+
const body = await response.json()
285+
286+
expect(body.rows).toEqual([{ nested: text }])
287+
})
288+
289+
it('should recursively materialize refs inside stored Response block objects', async () => {
290+
const text = 'nested'.repeat(2 * 1024 * 1024)
291+
const nestedOutput = await compactExecutionPayload(
292+
{ text },
293+
{
294+
...MATERIALIZATION_CONTEXT,
295+
executionId: 'original-execution-1',
296+
requireDurable: true,
297+
preserveRoot: true,
298+
}
299+
)
300+
const nestedRef = (nestedOutput as unknown as { text: unknown }).text
301+
const storedValue = {
302+
wrapper: {
303+
nested: nestedRef,
304+
padding: 'x'.repeat(2048),
305+
},
306+
}
307+
const storedJson = JSON.stringify(storedValue)
308+
const storedOutput = await storeLargeValue(
309+
storedValue,
310+
storedJson,
311+
Buffer.byteLength(storedJson),
312+
{
313+
...MATERIALIZATION_CONTEXT,
314+
executionId: 'source-execution-1',
315+
requireDurable: true,
316+
}
317+
)
318+
319+
const response = await createHttpResponseFromBlock(
320+
buildExecutionResult({
321+
output: {
322+
data: storedOutput,
323+
status: 200,
324+
headers: {},
325+
},
326+
}),
327+
{
328+
...MATERIALIZATION_CONTEXT,
329+
largeValueExecutionIds: ['source-execution-1'],
330+
}
331+
)
332+
333+
const body = await response.json()
334+
335+
expect(body.wrapper.nested).toEqual(text)
336+
})
337+
338+
it('should memoize repeated materialized objects while resolving nested refs', async () => {
339+
const text = 'nested'.repeat(2 * 1024 * 1024)
340+
const nestedOutput = await compactExecutionPayload(
341+
{ text },
342+
{
343+
...MATERIALIZATION_CONTEXT,
344+
executionId: 'original-execution-1',
345+
requireDurable: true,
346+
preserveRoot: true,
347+
}
348+
)
349+
const nestedRef = (nestedOutput as unknown as { text: unknown }).text
350+
const sourceValue = { nested: nestedRef }
351+
const sourceJson = JSON.stringify(sourceValue)
352+
const sourceRef = await storeLargeValue(
353+
sourceValue,
354+
sourceJson,
355+
Buffer.byteLength(sourceJson),
356+
{
357+
...MATERIALIZATION_CONTEXT,
358+
executionId: 'source-execution-1',
359+
requireDurable: true,
360+
}
361+
)
362+
363+
const response = await createHttpResponseFromBlock(
364+
buildExecutionResult({
365+
output: {
366+
data: { first: sourceRef, second: sourceRef },
367+
status: 200,
368+
headers: {},
369+
},
370+
}),
371+
{
372+
...MATERIALIZATION_CONTEXT,
373+
largeValueKeys: sourceRef.key ? [sourceRef.key] : [],
374+
}
375+
)
376+
377+
const body = await response.json()
378+
379+
expect(body).toEqual({
380+
first: { nested: text },
381+
second: { nested: text },
382+
})
383+
})
384+
168385
it('should materialize large string refs for Response block HTTP output', async () => {
169386
const text = 'x'.repeat(9 * 1024 * 1024)
170387
const output = await compactExecutionPayload(

0 commit comments

Comments
 (0)