Skip to content

Commit 628b452

Browse files
committed
improvement(execution): memory usage for aggregated results
1 parent 08eeecb commit 628b452

17 files changed

Lines changed: 898 additions & 31 deletions

File tree

apps/sim/app/api/function/execute/route.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import { POST } from '@/app/api/function/execute/route'
5252
describe('Function Execute API Route', () => {
5353
beforeEach(() => {
5454
vi.clearAllMocks()
55+
featureFlagsMock.isE2bEnabled = false
5556

5657
hybridAuthMockFns.mockCheckInternalAuth.mockResolvedValue({
5758
success: true,
@@ -240,6 +241,33 @@ describe('Function Execute API Route', () => {
240241
expect(response.status).toBe(200)
241242
expect(data.success).toBe(true)
242243
})
244+
245+
it('rejects large refs in runtimes without ref-native helpers', async () => {
246+
featureFlagsMock.isE2bEnabled = true
247+
const req = createMockRequest('POST', {
248+
code: 'echo "${__blockRef_0}"',
249+
language: 'shell',
250+
contextVariables: {
251+
__blockRef_0: {
252+
__simLargeValueRef: true,
253+
version: 1,
254+
id: 'lv_ABCDEFGHIJKL',
255+
kind: 'array',
256+
size: 12 * 1024 * 1024,
257+
executionId: 'execution-1',
258+
},
259+
},
260+
})
261+
262+
const response = await POST(req)
263+
const data = await response.json()
264+
265+
expect(response.status).toBe(500)
266+
expect(data.success).toBe(false)
267+
expect(data.error).toContain(
268+
'Large execution values require the JavaScript isolated-vm runtime'
269+
)
270+
})
243271
})
244272

245273
describe('Template Variable Resolution', () => {

apps/sim/app/api/function/execute/route.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1414
import { executeInE2B, executeShellInE2B } from '@/lib/execution/e2b'
1515
import { executeInIsolatedVM, type IsolatedVMBrokerHandler } from '@/lib/execution/isolated-vm'
1616
import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages'
17-
import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
17+
import { containsLargeValueRef, isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
1818
import {
1919
MAX_FUNCTION_INLINE_BYTES,
2020
MAX_INLINE_MATERIALIZATION_BYTES,
@@ -1013,6 +1013,12 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
10131013
contextVariables = { ...codeResolution.contextVariables, ...preResolvedContextVariables }
10141014
}
10151015

1016+
if (lang === CodeLanguage.Shell && containsLargeValueRef(contextVariables)) {
1017+
throw new Error(
1018+
'Large execution values require the JavaScript isolated-vm runtime. Select a nested field or read the value in a JavaScript function.'
1019+
)
1020+
}
1021+
10161022
let jsImports = ''
10171023
let jsRemainingCode = resolvedCode
10181024
let hasImports = false
@@ -1124,6 +1130,12 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
11241130
!isCustomTool &&
11251131
(lang === CodeLanguage.Python || (lang === CodeLanguage.JavaScript && hasImports))
11261132

1133+
if (useE2B && containsLargeValueRef(contextVariables)) {
1134+
throw new Error(
1135+
'Large execution values require the JavaScript isolated-vm runtime. Remove imports, select a nested field, or read the value in a JavaScript function without E2B.'
1136+
)
1137+
}
1138+
11271139
if (useE2B) {
11281140
logger.info(`[${requestId}] E2B status`, {
11291141
enabled: isE2bEnabled,

apps/sim/executor/execution/snapshot-serializer.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1+
import { LARGE_VALUE_THRESHOLD_BYTES } from '@/lib/execution/payloads/large-value-ref'
12
import type { DAG } from '@/executor/dag/builder'
23
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
34
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
45
import type { ExecutionContext, SerializedSnapshot } from '@/executor/types'
56

7+
function assertSnapshotValueIsCompact(value: unknown, label: string): void {
8+
const json = JSON.stringify(value)
9+
if (json && Buffer.byteLength(json, 'utf8') > LARGE_VALUE_THRESHOLD_BYTES) {
10+
throw new Error(`Cannot serialize pause snapshot with oversized ${label}; compact it first.`)
11+
}
12+
}
13+
614
function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {
715
if (!map) return undefined
816
return Object.fromEntries(map)
@@ -94,6 +102,9 @@ export function serializePauseSnapshot(
94102
dagIncomingEdges,
95103
}
96104

105+
assertSnapshotValueIsCompact(context.workflowVariables, 'workflow variables')
106+
assertSnapshotValueIsCompact(state.loopExecutions, 'loop execution state')
107+
97108
const workspaceId = metadataFromContext?.workspaceId ?? context.workspaceId
98109
if (!workspaceId) {
99110
throw new Error(
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { beforeEach, describe, expect, it, vi } from 'vitest'
5+
import { clearLargeValueCacheForTests } from '@/lib/execution/payloads/cache'
6+
import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
7+
import { BlockType } from '@/executor/constants'
8+
import { VariablesBlockHandler } from '@/executor/handlers/variables/variables-handler'
9+
import type { ExecutionContext } from '@/executor/types'
10+
import type { SerializedBlock } from '@/serializer/types'
11+
12+
const { mockUploadFile } = vi.hoisted(() => ({
13+
mockUploadFile: vi.fn(),
14+
}))
15+
16+
vi.mock('@/lib/uploads', () => ({
17+
StorageService: {
18+
uploadFile: mockUploadFile,
19+
},
20+
}))
21+
22+
function createContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
23+
return {
24+
workflowId: 'workflow-1',
25+
workspaceId: 'workspace-1',
26+
executionId: 'execution-1',
27+
userId: 'user-1',
28+
blockStates: new Map(),
29+
blockLogs: [],
30+
metadata: { duration: 0 },
31+
environmentVariables: {},
32+
workflowVariables: {
33+
'var-1': { id: 'var-1', name: 'issues', type: 'array', value: [] },
34+
},
35+
decisions: { router: new Map(), condition: new Map() },
36+
loopExecutions: new Map(),
37+
executedBlocks: new Set(),
38+
activeExecutionPath: new Set(),
39+
completedLoops: new Set(),
40+
...overrides,
41+
}
42+
}
43+
44+
function createBlock(): SerializedBlock {
45+
return {
46+
id: 'variables-block-1',
47+
metadata: { id: BlockType.VARIABLES, name: 'Variables' },
48+
position: { x: 0, y: 0 },
49+
config: { tool: BlockType.VARIABLES, params: {} },
50+
inputs: {},
51+
outputs: {},
52+
enabled: true,
53+
}
54+
}
55+
56+
describe('VariablesBlockHandler', () => {
57+
beforeEach(() => {
58+
vi.clearAllMocks()
59+
clearLargeValueCacheForTests()
60+
mockUploadFile.mockImplementation(async ({ customKey }) => ({ key: customKey }))
61+
})
62+
63+
it('preserves small assignments inline', async () => {
64+
const handler = new VariablesBlockHandler()
65+
const ctx = createContext()
66+
const value = [{ key: 'SIM-1', summary: 'Small issue' }]
67+
68+
const output = await handler.execute(ctx, createBlock(), {
69+
variables: [
70+
{
71+
variableId: 'var-1',
72+
variableName: 'issues',
73+
type: 'array',
74+
value,
75+
},
76+
],
77+
})
78+
79+
expect(ctx.workflowVariables?.['var-1'].value).toEqual(value)
80+
expect(output).toEqual({ issues: value })
81+
expect(mockUploadFile).not.toHaveBeenCalled()
82+
})
83+
84+
it('stores oversized assignments as durable refs in variables and block output', async () => {
85+
const handler = new VariablesBlockHandler()
86+
const ctx = createContext()
87+
const value = Array.from({ length: 120_000 }, (_, index) => ({
88+
key: `SIM-${index}`,
89+
summary: 'Issue summary that keeps each item small',
90+
}))
91+
92+
const output = await handler.execute(ctx, createBlock(), {
93+
variables: [
94+
{
95+
variableId: 'var-1',
96+
variableName: 'issues',
97+
type: 'array',
98+
value,
99+
},
100+
],
101+
})
102+
103+
const storedValue = ctx.workflowVariables?.['var-1'].value
104+
expect(isLargeValueRef(storedValue)).toBe(true)
105+
expect(output.issues).toBe(storedValue)
106+
expect(storedValue).toMatchObject({
107+
__simLargeValueRef: true,
108+
kind: 'array',
109+
executionId: 'execution-1',
110+
})
111+
expect(mockUploadFile).toHaveBeenCalledWith(
112+
expect.objectContaining({
113+
context: 'execution',
114+
preserveKey: true,
115+
customKey: expect.stringContaining('/execution-1/large-value-'),
116+
})
117+
)
118+
})
119+
120+
it('fails clearly when durable context is missing for oversized assignments', async () => {
121+
const handler = new VariablesBlockHandler()
122+
const ctx = createContext({ workspaceId: undefined, executionId: undefined })
123+
const value = Array.from({ length: 120_000 }, (_, index) => ({
124+
key: `SIM-${index}`,
125+
summary: 'Issue summary that keeps each item small',
126+
}))
127+
128+
await expect(
129+
handler.execute(ctx, createBlock(), {
130+
variables: [
131+
{
132+
variableId: 'var-1',
133+
variableName: 'issues',
134+
type: 'array',
135+
value,
136+
},
137+
],
138+
})
139+
).rejects.toThrow(
140+
'Cannot persist large execution value without workspace, workflow, and execution IDs'
141+
)
142+
143+
expect(mockUploadFile).not.toHaveBeenCalled()
144+
})
145+
146+
it('preserves existing variable metadata when compacting reassignment', async () => {
147+
const handler = new VariablesBlockHandler()
148+
const ctx = createContext({
149+
workflowVariables: {
150+
'var-1': {
151+
id: 'var-1',
152+
name: 'issues',
153+
type: 'array',
154+
value: [],
155+
isExisting: true,
156+
},
157+
},
158+
})
159+
const value = [{ key: 'SIM-1', summary: 'Updated' }]
160+
161+
await handler.execute(ctx, createBlock(), {
162+
variables: [
163+
{
164+
variableId: 'var-1',
165+
variableName: 'issues',
166+
type: 'array',
167+
value,
168+
},
169+
],
170+
})
171+
172+
expect(ctx.workflowVariables?.['var-1']).toEqual({
173+
id: 'var-1',
174+
name: 'issues',
175+
type: 'array',
176+
value,
177+
isExisting: true,
178+
})
179+
})
180+
})

apps/sim/executor/handlers/variables/variables-handler.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
3+
import { compactWorkflowVariableValue } from '@/lib/execution/payloads/serializer'
24
import type { BlockOutput } from '@/blocks/types'
35
import { BlockType } from '@/executor/constants'
46
import type { BlockHandler, ExecutionContext } from '@/executor/types'
@@ -24,36 +26,47 @@ export class VariablesBlockHandler implements BlockHandler {
2426

2527
const assignments = this.parseAssignments(inputs.variables)
2628

29+
const output: Record<string, any> = {}
30+
2731
for (const assignment of assignments) {
2832
const existingEntry = assignment.variableId
2933
? [assignment.variableId, ctx.workflowVariables[assignment.variableId]]
3034
: Object.entries(ctx.workflowVariables).find(
3135
([_, v]) => v.name === assignment.variableName
3236
)
37+
const value = await this.compactAssignmentValue(ctx, assignment.value)
3338

3439
if (existingEntry?.[1]) {
3540
const [id, variable] = existingEntry
3641
ctx.workflowVariables[id] = {
3742
...variable,
38-
value: assignment.value,
43+
value,
3944
}
45+
output[assignment.variableName] = value
4046
} else {
4147
logger.warn(`Variable "${assignment.variableName}" not found in workflow variables`)
4248
}
4349
}
4450

45-
const output: Record<string, any> = {}
46-
for (const assignment of assignments) {
47-
output[assignment.variableName] = assignment.value
48-
}
49-
5051
return output
51-
} catch (error: any) {
52-
logger.error('Variables block execution failed:', error)
53-
throw new Error(`Variables block execution failed: ${error.message}`)
52+
} catch (error) {
53+
const normalizedError = toError(error)
54+
logger.error('Variables block execution failed:', normalizedError)
55+
throw new Error(`Variables block execution failed: ${normalizedError.message}`)
5456
}
5557
}
5658

59+
private async compactAssignmentValue(ctx: ExecutionContext, value: any): Promise<any> {
60+
return compactWorkflowVariableValue(value, {
61+
workspaceId: ctx.workspaceId,
62+
workflowId: ctx.workflowId,
63+
executionId: ctx.executionId,
64+
userId: ctx.userId,
65+
largeValueExecutionIds: ctx.largeValueExecutionIds,
66+
allowLargeValueWorkflowScope: ctx.allowLargeValueWorkflowScope,
67+
})
68+
}
69+
5770
private parseAssignments(
5871
assignmentsInput: any
5972
): Array<{ variableId?: string; variableName: string; type: string; value: any }> {

0 commit comments

Comments
 (0)