Skip to content

Commit 20cc018

Browse files
authored
fix(execution): fix isolated-vm memory leak and add worker recycling (#4108)
* fix(execution): fix isolated-vm memory leak and add worker recycling * fix(execution): mirror retirement check in send-failure path and fix pool sizing * chore(execution): remove verbose comments from isolated-vm changes * fix(execution): apply retiring-worker exclusion to drainQueue pool size check * fix(execution): increment lifetimeExecutions on parent-side timeout
1 parent 1acafe8 commit 20cc018

File tree

3 files changed

+87
-16
lines changed

3 files changed

+87
-16
lines changed

apps/sim/lib/core/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ export const env = createEnv({
235235
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas
236236
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms)
237237
IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms)
238+
IVM_MAX_EXECUTIONS_PER_WORKER: z.string().optional().default('500'), // Max lifetime executions before worker is recycled
238239

239240
// Knowledge Base Processing Configuration - Shared across all processing methods
240241
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)

apps/sim/lib/execution/isolated-vm-worker.cjs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,39 +142,54 @@ async function executeCode(request) {
142142
stdoutTruncated = true
143143
}
144144

145+
let context = null
146+
let bootstrapScript = null
147+
let userScript = null
148+
let logCallback = null
149+
let errorCallback = null
150+
let fetchCallback = null
151+
const externalCopies = []
152+
145153
try {
146154
isolate = new ivm.Isolate({ memoryLimit: 128 })
147-
const context = await isolate.createContext()
155+
context = await isolate.createContext()
148156
const jail = context.global
149157

150158
await jail.set('global', jail.derefInto())
151159

152-
const logCallback = new ivm.Callback((...args) => {
160+
logCallback = new ivm.Callback((...args) => {
153161
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
154162
appendStdout(`${message}\n`)
155163
})
156164
await jail.set('__log', logCallback)
157165

158-
const errorCallback = new ivm.Callback((...args) => {
166+
errorCallback = new ivm.Callback((...args) => {
159167
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
160168
appendStdout(`ERROR: ${message}\n`)
161169
})
162170
await jail.set('__error', errorCallback)
163171

164-
await jail.set('params', new ivm.ExternalCopy(params).copyInto())
165-
await jail.set('environmentVariables', new ivm.ExternalCopy(envVars).copyInto())
172+
const paramsCopy = new ivm.ExternalCopy(params)
173+
externalCopies.push(paramsCopy)
174+
await jail.set('params', paramsCopy.copyInto())
175+
176+
const envVarsCopy = new ivm.ExternalCopy(envVars)
177+
externalCopies.push(envVarsCopy)
178+
await jail.set('environmentVariables', envVarsCopy.copyInto())
166179

167180
for (const [key, value] of Object.entries(contextVariables)) {
168181
if (value === undefined) {
169182
await jail.set(key, undefined)
170183
} else if (value === null) {
171184
await jail.set(key, null)
172185
} else {
173-
await jail.set(key, new ivm.ExternalCopy(value).copyInto())
186+
const ctxCopy = new ivm.ExternalCopy(value)
187+
externalCopies.push(ctxCopy)
188+
await jail.set(key, ctxCopy.copyInto())
174189
}
175190
}
176191

177-
const fetchCallback = new ivm.Reference(async (url, optionsJson) => {
192+
fetchCallback = new ivm.Reference(async (url, optionsJson) => {
178193
return new Promise((resolve) => {
179194
const fetchId = ++fetchIdCounter
180195
const timeout = setTimeout(() => {
@@ -267,7 +282,7 @@ async function executeCode(request) {
267282
}
268283
`
269284

270-
const bootstrapScript = await isolate.compileScript(bootstrap)
285+
bootstrapScript = await isolate.compileScript(bootstrap)
271286
await bootstrapScript.run(context)
272287

273288
const wrappedCode = `
@@ -290,7 +305,7 @@ async function executeCode(request) {
290305
})()
291306
`
292307

293-
const userScript = await isolate.compileScript(wrappedCode, { filename: 'user-function.js' })
308+
userScript = await isolate.compileScript(wrappedCode, { filename: 'user-function.js' })
294309
const resultJson = await userScript.run(context, { timeout: timeoutMs, promise: true })
295310

296311
let result = null
@@ -357,8 +372,26 @@ async function executeCode(request) {
357372
},
358373
}
359374
} finally {
375+
const releaseables = [
376+
userScript,
377+
bootstrapScript,
378+
...externalCopies,
379+
fetchCallback,
380+
errorCallback,
381+
logCallback,
382+
context,
383+
]
384+
for (const obj of releaseables) {
385+
if (obj) {
386+
try {
387+
obj.release()
388+
} catch {}
389+
}
390+
}
360391
if (isolate) {
361-
isolate.dispose()
392+
try {
393+
isolate.dispose()
394+
} catch {}
362395
}
363396
}
364397
}

apps/sim/lib/execution/isolated-vm.ts

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
7070
Number.parseInt(env.IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER) ||
7171
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
7272
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
73+
const MAX_EXECUTIONS_PER_WORKER = Number.parseInt(env.IVM_MAX_EXECUTIONS_PER_WORKER) || 500
7374
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
7475
const LEASE_REDIS_DEADLINE_MS = 200
7576
const QUEUE_RETRY_DELAY_MS = 1000
@@ -89,6 +90,8 @@ interface WorkerInfo {
8990
pendingExecutions: Map<number, PendingExecution>
9091
idleTimeout: ReturnType<typeof setTimeout> | null
9192
id: number
93+
lifetimeExecutions: number
94+
retiring: boolean
9295
}
9396

9497
interface QueuedExecution {
@@ -538,8 +541,20 @@ function handleWorkerMessage(workerId: number, message: unknown) {
538541
owner.activeExecutions = Math.max(0, owner.activeExecutions - 1)
539542
maybeCleanupOwner(owner.ownerKey)
540543
}
544+
workerInfo!.lifetimeExecutions++
545+
if (workerInfo!.lifetimeExecutions >= MAX_EXECUTIONS_PER_WORKER && !workerInfo!.retiring) {
546+
workerInfo!.retiring = true
547+
logger.info('Worker marked for retirement', {
548+
workerId,
549+
lifetimeExecutions: workerInfo!.lifetimeExecutions,
550+
})
551+
}
552+
if (workerInfo!.retiring && workerInfo!.activeExecutions === 0) {
553+
cleanupWorker(workerId)
554+
} else {
555+
resetWorkerIdleTimeout(workerId)
556+
}
541557
pending.resolve(msg.result as IsolatedVMExecutionResult)
542-
resetWorkerIdleTimeout(workerId)
543558
drainQueue()
544559
}
545560
return
@@ -679,6 +694,8 @@ function spawnWorker(): Promise<WorkerInfo> {
679694
pendingExecutions: new Map(),
680695
idleTimeout: null,
681696
id: workerId,
697+
lifetimeExecutions: 0,
698+
retiring: false,
682699
}
683700

684701
workerInfo.readyPromise = new Promise<void>((resolve, reject) => {
@@ -710,7 +727,8 @@ function spawnWorker(): Promise<WorkerInfo> {
710727

711728
import('node:child_process')
712729
.then(({ spawn }) => {
713-
const proc = spawn('node', [workerPath], {
730+
// Required for isolated-vm on Node.js 20+ (issue #377)
731+
const proc = spawn('node', ['--no-node-snapshot', workerPath], {
714732
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
715733
serialization: 'json',
716734
})
@@ -801,6 +819,7 @@ function selectWorker(): WorkerInfo | null {
801819
let best: WorkerInfo | null = null
802820
for (const w of workers.values()) {
803821
if (!w.ready) continue
822+
if (w.retiring) continue
804823
if (w.activeExecutions >= MAX_PER_WORKER) continue
805824
if (!best || w.activeExecutions < best.activeExecutions) {
806825
best = w
@@ -818,7 +837,8 @@ async function acquireWorker(): Promise<WorkerInfo | null> {
818837
const existing = selectWorker()
819838
if (existing) return existing
820839

821-
const currentPoolSize = workers.size + spawnInProgress
840+
const activeWorkerCount = [...workers.values()].filter((w) => !w.retiring).length
841+
const currentPoolSize = activeWorkerCount + spawnInProgress
822842
if (currentPoolSize < POOL_SIZE) {
823843
try {
824844
return await spawnWorker()
@@ -850,12 +870,24 @@ function dispatchToWorker(
850870
totalActiveExecutions--
851871
ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1)
852872
maybeCleanupOwner(ownerState.ownerKey)
873+
workerInfo.lifetimeExecutions++
874+
if (workerInfo.lifetimeExecutions >= MAX_EXECUTIONS_PER_WORKER && !workerInfo.retiring) {
875+
workerInfo.retiring = true
876+
logger.info('Worker marked for retirement', {
877+
workerId: workerInfo.id,
878+
lifetimeExecutions: workerInfo.lifetimeExecutions,
879+
})
880+
}
853881
resolve({
854882
result: null,
855883
stdout: '',
856884
error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' },
857885
})
858-
resetWorkerIdleTimeout(workerInfo.id)
886+
if (workerInfo.retiring && workerInfo.activeExecutions === 0) {
887+
cleanupWorker(workerInfo.id)
888+
} else {
889+
resetWorkerIdleTimeout(workerInfo.id)
890+
}
859891
drainQueue()
860892
}, req.timeoutMs + 1000)
861893

@@ -878,7 +910,11 @@ function dispatchToWorker(
878910
stdout: '',
879911
error: { message: 'Code execution failed to start. Please try again.', name: 'Error' },
880912
})
881-
resetWorkerIdleTimeout(workerInfo.id)
913+
if (workerInfo.retiring && workerInfo.activeExecutions === 0) {
914+
cleanupWorker(workerInfo.id)
915+
} else {
916+
resetWorkerIdleTimeout(workerInfo.id)
917+
}
882918
// Defer to break synchronous recursion: drainQueue → dispatchToWorker → catch → drainQueue
883919
queueMicrotask(() => drainQueue())
884920
}
@@ -952,7 +988,8 @@ function drainQueue() {
952988
while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) {
953989
const worker = selectWorker()
954990
if (!worker) {
955-
const currentPoolSize = workers.size + spawnInProgress
991+
const activeWorkerCount = [...workers.values()].filter((w) => !w.retiring).length
992+
const currentPoolSize = activeWorkerCount + spawnInProgress
956993
if (currentPoolSize < POOL_SIZE) {
957994
spawnWorker()
958995
.then(() => drainQueue())

0 commit comments

Comments
 (0)