Skip to content

Commit e87c280

Browse files
committed
progress
1 parent 955a43c commit e87c280

15 files changed

Lines changed: 280 additions & 208 deletions

File tree

apps/sim/app/api/function/execute/route.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ import {
1010
} from '@/lib/copilot/request/tools/files'
1111
import { isE2bEnabled } from '@/lib/core/config/feature-flags'
1212
import { generateRequestId } from '@/lib/core/utils/request'
13-
import { collectUserFileKeys } from '@/lib/core/utils/user-file'
1413
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1514
import { executeInE2B, executeShellInE2B } from '@/lib/execution/e2b'
1615
import { executeInIsolatedVM, type IsolatedVMBrokerHandler } from '@/lib/execution/isolated-vm'
1716
import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages'
17+
import { recordMaterializedAccessKeys } from '@/lib/execution/payloads/access-keys'
1818
import {
1919
isLargeArrayManifest,
2020
materializeLargeArrayManifest,
2121
} from '@/lib/execution/payloads/large-array-manifest'
22-
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
2322
import { containsLargeValueRef, isLargeValueRef } from '@/lib/execution/payloads/large-value-ref'
2423
import {
2524
MAX_FUNCTION_INLINE_BYTES,
@@ -766,24 +765,8 @@ function createFunctionRuntimeBrokers(
766765
logger,
767766
}
768767

769-
const recordMaterializedKeys = (value: unknown) => {
770-
const keys = collectLargeValueKeys(value)
771-
const existingKeys = new Set(largeValueKeys)
772-
for (const key of keys) {
773-
if (!existingKeys.has(key)) {
774-
existingKeys.add(key)
775-
largeValueKeys.push(key)
776-
}
777-
}
778-
const keysForFiles = collectUserFileKeys(value)
779-
const existingFileKeys = new Set(fileKeys)
780-
for (const key of keysForFiles) {
781-
if (!existingFileKeys.has(key)) {
782-
existingFileKeys.add(key)
783-
fileKeys.push(key)
784-
}
785-
}
786-
}
768+
const recordMaterializedKeys = (value: unknown) =>
769+
recordMaterializedAccessKeys({ largeValueKeys, fileKeys }, value)
787770

788771
const readFile = async (args: unknown, encoding: 'base64' | 'text', chunked = false) => {
789772
const fileArgs = getBrokerFileArgs(args)

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ async function handleExecutePost(
806806
allowLargeValueWorkflowScope,
807807
userId: actorUserId,
808808
maxBytes: base64MaxBytes,
809+
preserveLargeValueMetadata: true,
809810
})) as NormalizedBlockOutput)
810811
: result.output
811812

@@ -1377,6 +1378,7 @@ async function handleExecutePost(
13771378
allowLargeValueWorkflowScope,
13781379
userId: actorUserId,
13791380
maxBytes: base64MaxBytes,
1381+
preserveLargeValueMetadata: true,
13801382
})
13811383
: result.output
13821384
const compactSseOutput = await compactRoutePayload(sseOutput, {

apps/sim/executor/execution/block-executor.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ export class BlockExecutor {
207207
allowLargeValueWorkflowScope: ctx.allowLargeValueWorkflowScope,
208208
userId: ctx.userId,
209209
maxBytes: ctx.base64MaxBytes,
210+
preserveLargeValueMetadata: true,
210211
})) as NormalizedBlockOutput
211212
}
212213

apps/sim/executor/execution/executor.ts

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createLogger, type Logger } from '@sim/logger'
22
import { normalizeStringArray } from '@/lib/core/utils/arrays'
33
import { normalizeStringRecord, normalizeWorkflowVariables } from '@/lib/core/utils/records'
44
import { collectUserFileKeys } from '@/lib/core/utils/user-file'
5+
import { mergeFileKeys, mergeLargeValueKeys } from '@/lib/execution/payloads/access-keys'
56
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
67
import { StartBlockPath } from '@/lib/workflows/triggers/triggers'
78
import type { DAG } from '@/executor/dag/builder'
@@ -217,33 +218,13 @@ export class DAGExecutor {
217218
loopExecutions: filteredLoopExecutions,
218219
parallelExecutions: filteredParallelExecutions,
219220
})
220-
if (context.largeValueKeys) {
221-
const existingKeys = new Set(context.largeValueKeys)
222-
for (const key of filteredLargeValueKeys) {
223-
if (!existingKeys.has(key)) {
224-
existingKeys.add(key)
225-
context.largeValueKeys.push(key)
226-
}
227-
}
228-
} else {
229-
context.largeValueKeys = filteredLargeValueKeys
230-
}
221+
mergeLargeValueKeys(context, filteredLargeValueKeys)
231222
const filteredFileKeys = collectUserFileKeys({
232223
blockStates: filteredBlockStates,
233224
loopExecutions: filteredLoopExecutions,
234225
parallelExecutions: filteredParallelExecutions,
235226
})
236-
if (context.fileKeys) {
237-
const existingKeys = new Set(context.fileKeys)
238-
for (const key of filteredFileKeys) {
239-
if (!existingKeys.has(key)) {
240-
existingKeys.add(key)
241-
context.fileKeys.push(key)
242-
}
243-
}
244-
} else {
245-
context.fileKeys = filteredFileKeys
246-
}
227+
mergeFileKeys(context, filteredFileKeys)
247228
context.subflowParentMap = this.buildSubflowParentMap(dag)
248229

249230
const engine = this.buildExecutionPipeline(context, dag, state)

apps/sim/executor/handlers/function/function-handler.ts

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
} from '@/lib/core/utils/records'
66
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/execution/constants'
77
import { DEFAULT_CODE_LANGUAGE } from '@/lib/execution/languages'
8+
import { mergeFileKeys, mergeLargeValueKeys } from '@/lib/execution/payloads/access-keys'
89
import { BlockType } from '@/executor/constants'
910
import type { BlockHandler, ExecutionContext } from '@/executor/types'
1011
import { collectBlockData } from '@/executor/utils/block-data'
@@ -84,27 +85,8 @@ export class FunctionBlockHandler implements BlockHandler {
8485
throw new Error(result.error || 'Function execution failed')
8586
}
8687

87-
if (result.largeValueKeys?.length) {
88-
ctx.largeValueKeys ??= []
89-
const existingKeys = new Set(ctx.largeValueKeys)
90-
for (const key of result.largeValueKeys) {
91-
if (!existingKeys.has(key)) {
92-
existingKeys.add(key)
93-
ctx.largeValueKeys.push(key)
94-
}
95-
}
96-
}
97-
98-
if (result.fileKeys?.length) {
99-
ctx.fileKeys ??= []
100-
const existingKeys = new Set(ctx.fileKeys)
101-
for (const key of result.fileKeys) {
102-
if (!existingKeys.has(key)) {
103-
existingKeys.add(key)
104-
ctx.fileKeys.push(key)
105-
}
106-
}
107-
}
88+
mergeLargeValueKeys(ctx, result.largeValueKeys ?? [])
89+
mergeFileKeys(ctx, result.fileKeys ?? [])
10890

10991
return result.output
11092
}

apps/sim/executor/utils/subflow-utils.server.ts

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,17 @@
11
import { toError } from '@sim/utils/errors'
2-
import { collectUserFileKeys } from '@/lib/core/utils/user-file'
2+
import { recordMaterializedAccessKeys } from '@/lib/execution/payloads/access-keys'
33
import {
44
isLargeArrayManifest,
55
LARGE_ARRAY_MANIFEST_MARKER,
66
materializeLargeArrayManifest,
77
} from '@/lib/execution/payloads/large-array-manifest'
8-
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
98
import { isLargeValueRef, LARGE_VALUE_REF_MARKER } from '@/lib/execution/payloads/large-value-ref'
109
import { MAX_DURABLE_LARGE_VALUE_BYTES } from '@/lib/execution/payloads/materialization.server'
1110
import { materializeLargeValueRef } from '@/lib/execution/payloads/store'
1211
import { REFERENCE } from '@/executor/constants'
1312
import type { ExecutionContext } from '@/executor/types'
1413
import type { VariableResolver } from '@/executor/variables/resolver'
1514

16-
function recordMaterializedKeys(ctx: ExecutionContext, value: unknown): void {
17-
const largeValueKeys = collectLargeValueKeys(value)
18-
if (largeValueKeys.length > 0) {
19-
ctx.largeValueKeys ??= []
20-
const existingKeys = new Set(ctx.largeValueKeys)
21-
for (const key of largeValueKeys) {
22-
if (!existingKeys.has(key)) {
23-
existingKeys.add(key)
24-
ctx.largeValueKeys.push(key)
25-
}
26-
}
27-
}
28-
29-
const fileKeys = collectUserFileKeys(value)
30-
if (fileKeys.length > 0) {
31-
ctx.fileKeys ??= []
32-
const existingKeys = new Set(ctx.fileKeys)
33-
for (const key of fileKeys) {
34-
if (!existingKeys.has(key)) {
35-
existingKeys.add(key)
36-
ctx.fileKeys.push(key)
37-
}
38-
}
39-
}
40-
}
41-
4215
async function normalizeCollectionValue(ctx: ExecutionContext, value: unknown): Promise<any[]> {
4316
if (Array.isArray(value)) {
4417
return value
@@ -56,7 +29,7 @@ async function normalizeCollectionValue(ctx: ExecutionContext, value: unknown):
5629
userId: ctx.userId,
5730
maxBytes: MAX_DURABLE_LARGE_VALUE_BYTES,
5831
})
59-
recordMaterializedKeys(ctx, materialized)
32+
recordMaterializedAccessKeys(ctx, materialized)
6033
return materialized
6134
}
6235

@@ -75,7 +48,7 @@ async function normalizeCollectionValue(ctx: ExecutionContext, value: unknown):
7548
if (materialized === undefined) {
7649
throw new Error('Large execution value is unavailable.')
7750
}
78-
recordMaterializedKeys(ctx, materialized)
51+
recordMaterializedAccessKeys(ctx, materialized)
7952
return normalizeCollectionValue(ctx, materialized)
8053
}
8154

apps/sim/executor/variables/resolvers/reference-async.server.ts

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { collectUserFileKeys, isUserFileWithMetadata } from '@/lib/core/utils/user-file'
1+
import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
2+
import { recordMaterializedAccessKeys } from '@/lib/execution/payloads/access-keys'
23
import {
34
isLargeArrayManifest,
45
type LargeArrayManifest,
56
readLargeArrayManifestSlice,
67
} from '@/lib/execution/payloads/large-array-manifest'
7-
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
88
import {
99
assertNoLargeValueRefs,
1010
getLargeValueMaterializationError,
@@ -23,31 +23,10 @@ function withLocalLargeValueExecutionIds(
2323
context: PathNavigationContext,
2424
materializedValue: unknown
2525
): PathNavigationContext {
26-
const sourceKeys = collectLargeValueKeys(materializedValue)
27-
const fileKeys = collectUserFileKeys(materializedValue)
28-
if (sourceKeys.length === 0 && fileKeys.length === 0) {
26+
if (!context.executionContext) {
2927
return context
3028
}
31-
if (!context.executionContext.largeValueKeys) {
32-
context.executionContext.largeValueKeys = []
33-
}
34-
const existingKeys = new Set(context.executionContext.largeValueKeys)
35-
for (const key of sourceKeys) {
36-
if (!existingKeys.has(key)) {
37-
existingKeys.add(key)
38-
context.executionContext.largeValueKeys.push(key)
39-
}
40-
}
41-
if (!context.executionContext.fileKeys) {
42-
context.executionContext.fileKeys = []
43-
}
44-
const existingFileKeys = new Set(context.executionContext.fileKeys)
45-
for (const key of fileKeys) {
46-
if (!existingFileKeys.has(key)) {
47-
existingFileKeys.add(key)
48-
context.executionContext.fileKeys.push(key)
49-
}
50-
}
29+
recordMaterializedAccessKeys(context.executionContext, materializedValue)
5130
return {
5231
...context,
5332
executionContext: {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { collectUserFileKeys } from '@/lib/core/utils/user-file'
2+
import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
3+
4+
export interface ExactAccessKeyContext {
5+
largeValueKeys?: string[]
6+
fileKeys?: string[]
7+
}
8+
9+
export function mergeUniqueKeys(target: string[], source: readonly string[]): void {
10+
if (source.length === 0) {
11+
return
12+
}
13+
const existingKeys = new Set(target)
14+
for (const key of source) {
15+
if (!existingKeys.has(key)) {
16+
existingKeys.add(key)
17+
target.push(key)
18+
}
19+
}
20+
}
21+
22+
export function mergeLargeValueKeys(context: ExactAccessKeyContext, keys: readonly string[]): void {
23+
if (keys.length === 0) {
24+
return
25+
}
26+
context.largeValueKeys ??= []
27+
mergeUniqueKeys(context.largeValueKeys, keys)
28+
}
29+
30+
export function mergeFileKeys(context: ExactAccessKeyContext, keys: readonly string[]): void {
31+
if (keys.length === 0) {
32+
return
33+
}
34+
context.fileKeys ??= []
35+
mergeUniqueKeys(context.fileKeys, keys)
36+
}
37+
38+
export function recordMaterializedAccessKeys(context: ExactAccessKeyContext, value: unknown): void {
39+
mergeLargeValueKeys(context, collectLargeValueKeys(value))
40+
mergeFileKeys(context, collectUserFileKeys(value))
41+
}

0 commit comments

Comments
 (0)