Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/native-worker-threads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/unenv-preset": minor
---

Add support for native `node:worker_threads` module from workerd when the `enable_nodejs_worker_threads_module` compatibility flag is enabled.

This feature is currently experimental and requires `nodejs_compat`, `experimental`, and `enable_nodejs_worker_threads_module` compatibility flags to be set.
2 changes: 1 addition & 1 deletion packages/unenv-preset/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
},
"peerDependencies": {
"unenv": "2.0.0-rc.24",
"workerd": "^1.20260213.0"
"workerd": "^1.20260214.0"
},
"peerDependenciesMeta": {
"workerd": {
Expand Down
40 changes: 40 additions & 0 deletions packages/unenv-preset/src/preset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export function getCloudflarePreset({
const v8Overrides = getV8Overrides(compat);
const ttyOverrides = getTtyOverrides(compat);
const childProcessOverrides = getChildProcessOverrides(compat);
const workerThreadsOverrides = getWorkerThreadsOverrides(compat);

// "dynamic" as they depend on the compatibility date and flags
const dynamicNativeModules = [
Expand All @@ -107,6 +108,7 @@ export function getCloudflarePreset({
...v8Overrides.nativeModules,
...ttyOverrides.nativeModules,
...childProcessOverrides.nativeModules,
...workerThreadsOverrides.nativeModules,
];

// "dynamic" as they depend on the compatibility date and flags
Expand All @@ -131,6 +133,7 @@ export function getCloudflarePreset({
...v8Overrides.hybridModules,
...ttyOverrides.hybridModules,
...childProcessOverrides.hybridModules,
...workerThreadsOverrides.hybridModules,
];

return {
Expand Down Expand Up @@ -1017,3 +1020,40 @@ function getChildProcessOverrides({
hybridModules: [],
};
}

/**
* Returns the overrides for `node:worker_threads` (unenv or workerd)
*
* The native worker_threads implementation:
* - can be enabled with the "enable_nodejs_worker_threads_module" flag
* - can be disabled with the "disable_nodejs_worker_threads_module" flag
* - is experimental (no default enable date)
*/
function getWorkerThreadsOverrides({
compatibilityFlags,
}: {
compatibilityDate: string;
compatibilityFlags: string[];
}): { nativeModules: string[]; hybridModules: string[] } {
const disabledByFlag = compatibilityFlags.includes(
"disable_nodejs_worker_threads_module"
);

const enabledByFlag =
compatibilityFlags.includes("enable_nodejs_worker_threads_module") &&
compatibilityFlags.includes("experimental");

// worker_threads is experimental, no default enable date
const enabled = enabledByFlag && !disabledByFlag;

// When enabled, use the native `worker_threads` module from workerd
return enabled
? {
nativeModules: ["worker_threads"],
hybridModules: [],
}
: {
nativeModules: [],
hybridModules: [],
};
}
65 changes: 49 additions & 16 deletions packages/wrangler/e2e/unenv-preset/preset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,30 @@ const localTestConfigs: TestConfig[] = [
},
},
],
// node:worker_threads (experimental - no default enable date)
[
// TODO: add test for disabled by date (no date defined yet)
// TODO: add test for enabled by date (no date defined yet)
{
name: "worker_threads disabled by default",
compatibilityDate: "2024-09-23",
compatibilityFlags: ["experimental"],
expectRuntimeFlags: {
enable_nodejs_worker_threads_module: false,
},
},
{
name: "worker_threads enabled by flag",
compatibilityDate: "2024-09-23",
compatibilityFlags: [
"enable_nodejs_worker_threads_module",
"experimental",
],
expectRuntimeFlags: {
enable_nodejs_worker_threads_module: true,
},
},
],
// node:repl (experimental, no default enable date)
[
// TODO: add test for disabled by date (no date defined yet)
Expand Down Expand Up @@ -745,15 +769,18 @@ describe.each(localTestConfigs)(

test.for(Object.keys(WorkerdTests))(
"%s",
{ timeout: 5_000 },
{ timeout: 20_000 },
async (testName) => {
// Retries the callback until it succeeds or times out.
// Useful for the i.e. DNS tests where underlying requests might error/timeout.
await vi.waitFor(async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
});
await vi.waitFor(
async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
},
{ timeout: 19_000, interval: 200 }
);
}
);
}
Expand Down Expand Up @@ -804,11 +831,14 @@ describe.runIf(Boolean(CLOUDFLARE_ACCOUNT_ID))(
async (testName) => {
// Retries the callback until it succeeds or times out.
// Useful for the i.e. DNS tests where underlying requests might error/timeout.
await vi.waitFor(async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
});
await vi.waitFor(
async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
},
{ timeout: 19_000, interval: 200 }
);
}
);
}
Expand Down Expand Up @@ -860,11 +890,14 @@ describe.runIf(Boolean(CLOUDFLARE_ACCOUNT_ID))(
async (testName) => {
// Retries the callback until it succeeds or times out.
// Useful for the i.e. DNS tests where underlying requests might error/timeout.
await vi.waitFor(async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
});
await vi.waitFor(
async () => {
const response = await fetch(`${url}/${testName}`);
const body = await response.text();
expect(body).toMatch("passed");
},
{ timeout: 19_000, interval: 200 }
);
}
);
}
Expand Down
81 changes: 81 additions & 0 deletions packages/wrangler/e2e/unenv-preset/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,87 @@ export const WorkerdTests: Record<string, () => void> = {
/not implemented|ERR_METHOD_NOT_IMPLEMENTED/
);
},

async testWorkerThreads() {
const workerThreads = await import("node:worker_threads");

// Common exports available in both unenv stub and native workerd
for (const target of [workerThreads, workerThreads.default]) {
assertTypeOfProperties(target, {
SHARE_ENV: "symbol",
getEnvironmentData: "function",
isMainThread: "boolean",
isMarkedAsUntransferable: "function",
markAsUntransferable: "function",
markAsUncloneable: "function",
moveMessagePortToContext: "function",
receiveMessageOnPort: "function",
setEnvironmentData: "function",
postMessageToThread: "function",
isInternalThread: "boolean",
BroadcastChannel: "function",
MessageChannel: "function",
Worker: "function",
});

// These are values, not functions
assert.strictEqual(target.parentPort, null, "parentPort should be null");
assert.strictEqual(target.threadId, 0, "threadId should be 0");
assert.strictEqual(target.workerData, null, "workerData should be null");
assert.deepStrictEqual(
target.resourceLimits,
{},
"resourceLimits should be empty object"
);
assert.strictEqual(
target.isMainThread,
true,
"isMainThread should be true"
);
assert.strictEqual(
(target as Record<string, unknown>).isInternalThread,
false,
"isInternalThread should be false"
);
}

// Test SHARE_ENV symbol value
assert.strictEqual(
workerThreads.SHARE_ENV,
Symbol.for("nodejs.worker_threads.SHARE_ENV"),
"SHARE_ENV should be the correct symbol"
);

// Test getEnvironmentData/setEnvironmentData
workerThreads.setEnvironmentData("test-key", "test-value");
assert.strictEqual(
workerThreads.getEnvironmentData("test-key"),
"test-value",
"getEnvironmentData should return the set value"
);

// Test MessageChannel creates ports
const channel = new workerThreads.MessageChannel();
assert.ok(channel.port1, "MessageChannel should have port1");
assert.ok(channel.port2, "MessageChannel should have port2");

// Test behavioral differences between native workerd and unenv stub
const isNative =
getRuntimeFlagValue("enable_nodejs_worker_threads_module") === true;
if (isNative) {
// Native workerd: Worker and BroadcastChannel constructors throw
assert.throws(
() => new workerThreads.Worker("test.js"),
/ERR_METHOD_NOT_IMPLEMENTED/,
"Worker constructor should throw ERR_METHOD_NOT_IMPLEMENTED"
);
assert.throws(
() => new workerThreads.BroadcastChannel("test"),
/ERR_METHOD_NOT_IMPLEMENTED/,
"BroadcastChannel constructor should throw ERR_METHOD_NOT_IMPLEMENTED"
);
}
},
};

/**
Expand Down
Loading
Loading