Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/dev-worker-disconnect-loop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"trigger.dev": patch
---

Fix dev workers spinning at 100% CPU after the parent CLI disconnects. Orphaned `trigger-dev-run-worker` (and indexer) processes were caught in an `uncaughtException` feedback loop: a periodic IPC send via `process.send` would throw `ERR_IPC_CHANNEL_CLOSED` once the parent closed the channel, which re-entered the same handler that itself called `process.send`, scheduled via `setImmediate` and amplified by source-map-support's `prepareStackTrace`. Fixed by (1) silently dropping packets in `ZodIpcConnection` when the channel is disconnected, (2) adding a `process.on("disconnect", ...)` handler in dev workers so they exit cleanly when the CLI closes the IPC channel, and (3) wrapping all `uncaughtException`-path `process.send` calls in a `safeSend` guard that checks `process.connected` and swallows synchronous throws.
57 changes: 36 additions & 21 deletions packages/cli-v3/src/entryPoints/dev-index-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,45 @@ sourceMapSupport.install({
hookRequire: false,
});

// If the parent CLI closes the IPC channel, exit cleanly instead of being
// re-parented to init and busy-looping on `process.send` against a dead channel.
process.on("disconnect", () => {
process.exit(0);
});

function safeSend(message: unknown) {
if (!process.connected || !process.send) {
return;
}
try {
process.send(message);
} catch {
// swallow: a throw here would re-enter this handler and busy-loop the worker
}
}

process.on("uncaughtException", function (error, origin) {
if (error instanceof Error) {
process.send &&
process.send({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
});
safeSend({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
});
} else {
process.send &&
process.send({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
origin,
safeSend({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
version: "v1",
});
origin,
},
version: "v1",
});
}
});

Expand Down Expand Up @@ -183,7 +198,7 @@ await sendMessageInCatalog(
importErrors,
},
async (msg) => {
process.send?.(msg);
safeSend(msg);
}
).catch((err) => {
if (err instanceof ZodSchemaParsedError) {
Expand Down
64 changes: 40 additions & 24 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,53 @@ sourceMapSupport.install({
hookRequire: false,
});

// If the parent CLI closes the IPC channel (process restart, crash, lost
// handle), exit cleanly instead of being re-parented to init and busy-looping
// on `process.send` that throws against a dead channel.
process.on("disconnect", () => {
process.exit(0);
});

function safeSend(message: unknown) {
if (!process.connected || !process.send) {
return;
}
try {
process.send(message);
} catch {
// swallow: a throw here would re-enter this handler and busy-loop the worker
}
}

process.on("uncaughtException", function (error, origin) {
logError("Uncaught exception", { error, origin });
if (error instanceof Error) {
process.send &&
process.send({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
safeSend({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
});
version: "v1",
},
});
} else {
process.send &&
process.send({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
origin,
safeSend({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
version: "v1",
origin,
},
});
version: "v1",
},
});
}
});

Expand Down
53 changes: 31 additions & 22 deletions packages/cli-v3/src/entryPoints/managed-index-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,39 @@ sourceMapSupport.install({
hookRequire: false,
});

function safeSend(message: unknown) {
if (!process.connected || !process.send) {
return;
}
try {
process.send(message);
} catch {
// swallow: a throw here would re-enter this handler and busy-loop the worker
}
}

process.on("uncaughtException", function (error, origin) {
if (error instanceof Error) {
process.send &&
process.send({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
});
safeSend({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
});
} else {
process.send &&
process.send({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
origin,
safeSend({
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
version: "v1",
});
origin,
},
version: "v1",
});
}
});

Expand Down Expand Up @@ -191,7 +200,7 @@ await sendMessageInCatalog(
importErrors,
},
async (msg) => {
process.send?.(msg);
safeSend(msg);
}
).catch((err) => {
if (err instanceof ZodSchemaParsedError) {
Expand All @@ -200,7 +209,7 @@ await sendMessageInCatalog(
"TASKS_FAILED_TO_PARSE",
{ zodIssues: err.error.issues, tasks },
async (msg) => {
process.send?.(msg);
safeSend(msg);
}
);
} else {
Expand Down
57 changes: 33 additions & 24 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,46 @@ sourceMapSupport.install({
hookRequire: false,
});

function safeSend(message: unknown) {
if (!process.connected || !process.send) {
return;
}
try {
process.send(message);
} catch {
// swallow: a throw here would re-enter this handler and busy-loop the worker
}
}

process.on("uncaughtException", function (error, origin) {
console.error("Uncaught exception", { error, origin });
if (error instanceof Error) {
process.send &&
process.send({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
version: "v1",
safeSend({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: { name: error.name, message: error.message, stack: error.stack },
origin,
},
});
version: "v1",
},
});
} else {
process.send &&
process.send({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
origin,
safeSend({
type: "EVENT",
message: {
type: "UNCAUGHT_EXCEPTION",
payload: {
error: {
name: "Error",
message: typeof error === "string" ? error : JSON.stringify(error),
},
version: "v1",
origin,
},
});
version: "v1",
},
});
}
});

Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/v3/zodIpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ interface ZodIpcConnectionOptions<
process: {
send?: (message: any) => any;
on?: (event: "message", listener: (message: any) => void) => void;
connected?: boolean;
};
handlers?: ZodIpcMessageHandlers<TListenCatalog, TEmitCatalog>;
}
Expand Down Expand Up @@ -257,7 +258,19 @@ export class ZodIpcConnection<
}

async #sendPacket(packet: Packet) {
await this.opts.process.send?.(packet);
// When the IPC channel is closed (e.g. parent process exited), there is no
// recipient — drop the packet rather than letting `process.send` throw
// ERR_IPC_CHANNEL_CLOSED, which would otherwise propagate as an
// uncaughtException and re-enter any handler that itself calls `process.send`.
if (this.opts.process.connected === false) {
return;
}

try {
await this.opts.process.send?.(packet);
} catch {
// swallow: channel raced from open to closed between the check and the send
}
}

async send<K extends GetSocketMessagesWithoutCallback<TEmitCatalog>>(
Expand Down
Loading