Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,5 @@ test-results
.turbo

# Claude
.claude/settings.local.json
**/.claude/settings.local.json
.playwright-mcp/
11 changes: 11 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ export class PlatformServicesClient implements PlatformServices {
await this.#rpcClient.call('resetAllBackoffs', []);
}

/**
* Get the listen addresses of the libp2p node.
* In the browser runtime, this always returns an empty array since
* direct transport is only supported in Node.js.
*
* @returns An empty array.
*/
getListenAddresses(): string[] {
return [];
}

/**
* Handle a remote message from a peer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ vi.mock('@metamask/ocap-kernel', () => ({
closeConnection: mockCloseConnection,
registerLocationHints: mockRegisterLocationHints,
reconnectPeer: mockReconnectPeer,
getListenAddresses: vi.fn(() => []),
};
},
),
Expand Down
6 changes: 6 additions & 0 deletions packages/kernel-test/src/remote-comms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ class DirectNetworkService {
// Mock implementation - in direct network, connections are always available
return Promise.resolve();
},

getListenAddresses() {
return [];
},
};
}
}
Expand Down Expand Up @@ -286,10 +290,12 @@ describe('Remote Communications (Integration Tests)', () => {
expect(status1.remoteComms).toStrictEqual({
state: 'connected',
peerId: expect.any(String),
listenAddresses: [],
});
expect(status2.remoteComms).toStrictEqual({
state: 'connected',
peerId: expect.any(String),
listenAddresses: [],
});
// Each kernel should have a unique peer ID
const peerId1 =
Expand Down
2 changes: 2 additions & 0 deletions packages/nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@
"test:dev:quiet": "yarn test:dev --reporter @ocap/repo-tools/vitest-reporters/silent"
},
"dependencies": {
"@chainsafe/libp2p-quic": "^1.1.8",
"@endo/eventual-send": "^1.3.4",
"@endo/promise-kit": "^1.1.13",
"@libp2p/interface": "2.11.0",
"@libp2p/tcp": "10.1.19",
"@libp2p/webrtc": "5.2.24",
"@metamask/kernel-shims": "workspace:^",
"@metamask/kernel-store": "workspace:^",
Expand Down
169 changes: 169 additions & 0 deletions packages/nodejs/src/kernel/PlatformServices.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ vi.mock('node:worker_threads', () => ({
}),
}));

vi.mock('@chainsafe/libp2p-quic', () => ({
quic: () => ({ tag: 'mock-quic-transport' }),
}));

vi.mock('@libp2p/tcp', () => ({
tcp: () => ({ tag: 'mock-tcp-transport' }),
}));

vi.mock('@metamask/ocap-kernel', async (importOriginal) => {
const actual = await importOriginal<typeof import('@metamask/ocap-kernel')>();
return {
Expand All @@ -83,6 +91,10 @@ vi.mock('@metamask/ocap-kernel', async (importOriginal) => {
closeConnection: mockCloseConnection,
registerLocationHints: mockRegisterLocationHints,
reconnectPeer: mockReconnectPeer,
resetAllBackoffs: vi.fn(),
getListenAddresses: vi.fn(() => [
'/ip4/127.0.0.1/udp/12345/quic-v1/p2p/mock-peer-id',
]),
})),
};
});
Expand Down Expand Up @@ -385,6 +397,163 @@ describe('NodejsPlatformServices', () => {
// This is tested through integration tests
expect(service).toBeInstanceOf(NodejsPlatformServices);
});

it('injects QUIC directTransports when directListenAddresses contain /quic-v1', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const keySeed = '0x1234567890abcdef';
const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'];
const remoteHandler = vi.fn(async () => 'response');

await service.initializeRemoteComms(
keySeed,
{
relays,
directListenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'],
},
remoteHandler,
);

const { initTransport } = await import('@metamask/ocap-kernel');
expect(initTransport).toHaveBeenCalledWith(
keySeed,
expect.objectContaining({
relays,
directTransports: [
{
transport: { tag: 'mock-quic-transport' },
listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'],
},
],
}),
expect.any(Function),
undefined,
undefined,
undefined,
);
});

it('injects TCP directTransports when directListenAddresses contain /tcp/', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const keySeed = '0x1234567890abcdef';
const remoteHandler = vi.fn(async () => 'response');

await service.initializeRemoteComms(
keySeed,
{
directListenAddresses: ['/ip4/0.0.0.0/tcp/4001'],
},
remoteHandler,
);

const { initTransport } = await import('@metamask/ocap-kernel');
expect(initTransport).toHaveBeenCalledWith(
keySeed,
expect.objectContaining({
directTransports: [
{
transport: { tag: 'mock-tcp-transport' },
listenAddresses: ['/ip4/0.0.0.0/tcp/4001'],
},
],
}),
expect.any(Function),
undefined,
undefined,
undefined,
);
});

it('injects both QUIC and TCP when directListenAddresses contain both', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const keySeed = '0x1234567890abcdef';
const remoteHandler = vi.fn(async () => 'response');

await service.initializeRemoteComms(
keySeed,
{
directListenAddresses: [
'/ip4/0.0.0.0/udp/0/quic-v1',
'/ip4/0.0.0.0/tcp/4001',
],
},
remoteHandler,
);

const { initTransport } = await import('@metamask/ocap-kernel');
expect(initTransport).toHaveBeenCalledWith(
keySeed,
expect.objectContaining({
directTransports: [
{
transport: { tag: 'mock-quic-transport' },
listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'],
},
{
transport: { tag: 'mock-tcp-transport' },
listenAddresses: ['/ip4/0.0.0.0/tcp/4001'],
},
],
}),
expect.any(Function),
undefined,
undefined,
undefined,
);
});

it('throws for unsupported direct listen addresses', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const remoteHandler = vi.fn(async () => 'response');

await expect(
service.initializeRemoteComms(
'0xtest',
{
directListenAddresses: ['/ip4/0.0.0.0/udp/0/webrtc'],
},
remoteHandler,
),
).rejects.toThrowError('Unsupported direct listen address');
});

it('does not inject directTransports when no directListenAddresses provided', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const keySeed = '0x1234567890abcdef';
const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'];
const remoteHandler = vi.fn(async () => 'response');

await service.initializeRemoteComms(keySeed, { relays }, remoteHandler);

const { initTransport } = await import('@metamask/ocap-kernel');
const callArgs = (
initTransport as unknown as ReturnType<typeof vi.fn>
).mock.calls.at(-1);
// Second argument is the options
expect(callArgs?.[1]).not.toHaveProperty('directTransports');
});
});

describe('getListenAddresses', () => {
it('returns listen addresses after initialization', async () => {
const service = new NodejsPlatformServices({ workerFilePath });
const remoteHandler = vi.fn(async () => '');

await service.initializeRemoteComms(
'0xabcd',
{ relays: [] },
remoteHandler,
);

const addresses = service.getListenAddresses();
expect(addresses).toStrictEqual([
'/ip4/127.0.0.1/udp/12345/quic-v1/p2p/mock-peer-id',
]);
});

it('returns empty array before initialization', () => {
const service = new NodejsPlatformServices({ workerFilePath });
expect(service.getListenAddresses()).toStrictEqual([]);
});
});

describe('sendRemoteMessage', () => {
Expand Down
68 changes: 67 additions & 1 deletion packages/nodejs/src/kernel/PlatformServices.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { quic } from '@chainsafe/libp2p-quic';
import { makePromiseKit } from '@endo/promise-kit';
import { tcp } from '@libp2p/tcp';
import { isJsonRpcMessage } from '@metamask/kernel-utils';
import type { JsonRpcMessage } from '@metamask/kernel-utils';
import { Logger } from '@metamask/logger';
import type {
DirectTransport,
PlatformServices,
VatId,
RemoteMessageHandler,
Expand Down Expand Up @@ -47,6 +50,8 @@ export class NodejsPlatformServices implements PlatformServices {

#resetAllBackoffsFunc: (() => void) | null = null;

#getListenAddressesFunc: (() => string[]) | null = null;

#remoteMessageHandler: RemoteMessageHandler | undefined = undefined;

readonly #workerFilePath: string;
Expand Down Expand Up @@ -253,16 +258,62 @@ export class NodejsPlatformServices implements PlatformServices {
throw Error('remote comms already initialized');
}
this.#remoteMessageHandler = remoteMessageHandler;

const { directListenAddresses, ...restOptions } = options;

const directTransports: DirectTransport[] = [];

if (directListenAddresses && directListenAddresses.length > 0) {
const quicAddresses: string[] = [];
const tcpAddresses: string[] = [];

for (const addr of directListenAddresses) {
const isQuic = addr.includes('/quic-v1');
const isTcp = addr.includes('/tcp/');

if (isQuic) {
quicAddresses.push(addr);
} else if (isTcp) {
tcpAddresses.push(addr);
} else {
throw new Error(
`Unsupported direct listen address: ${addr}. ` +
`Only QUIC (/quic-v1) and TCP (/tcp/) addresses are supported.`,
);
}
}

if (quicAddresses.length > 0) {
directTransports.push({
transport: quic(),
listenAddresses: quicAddresses,
});
}

if (tcpAddresses.length > 0) {
directTransports.push({
transport: tcp(),
listenAddresses: tcpAddresses,
});
}
}

const enhancedOptions: RemoteCommsOptions = {
...restOptions,
...(directTransports.length > 0 ? { directTransports } : {}),
};

const {
sendRemoteMessage,
stop,
closeConnection,
registerLocationHints,
reconnectPeer,
resetAllBackoffs,
getListenAddresses,
} = await initTransport(
keySeed,
options,
enhancedOptions,
this.#handleRemoteMessage.bind(this),
onRemoteGiveUp,
incarnationId,
Expand All @@ -274,6 +325,7 @@ export class NodejsPlatformServices implements PlatformServices {
this.#registerLocationHintsFunc = registerLocationHints;
this.#reconnectPeerFunc = reconnectPeer;
this.#resetAllBackoffsFunc = resetAllBackoffs;
this.#getListenAddressesFunc = getListenAddresses;
}

/**
Expand All @@ -293,6 +345,7 @@ export class NodejsPlatformServices implements PlatformServices {
this.#registerLocationHintsFunc = null;
this.#reconnectPeerFunc = null;
this.#resetAllBackoffsFunc = null;
this.#getListenAddressesFunc = null;
}

/**
Expand Down Expand Up @@ -347,5 +400,18 @@ export class NodejsPlatformServices implements PlatformServices {
}
this.#resetAllBackoffsFunc();
}

/**
* Get the listen addresses of the libp2p node.
* Returns multiaddr strings that other peers can use to dial this node directly.
*
* @returns The listen address strings, or empty array if remote comms not initialized.
*/
getListenAddresses(): string[] {
if (!this.#getListenAddressesFunc) {
return [];
}
return this.#getListenAddressesFunc();
}
}
harden(NodejsPlatformServices);
Loading
Loading