diff --git a/.gitignore b/.gitignore index 6171d4a15..98db10d42 100644 --- a/.gitignore +++ b/.gitignore @@ -91,5 +91,5 @@ test-results .turbo # Claude -.claude/settings.local.json +**/.claude/settings.local.json .playwright-mcp/ \ No newline at end of file diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index fc1e8f8bf..c56dee697 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -290,6 +290,17 @@ export class PlatformServicesClient implements PlatformServices { await this.#rpcClient.call('resetAllBackoffs', []); } + /** + * Get the listen addresses of the libp2p node. + * In the browser runtime, this always returns an empty array since + * direct transport is only supported in Node.js. + * + * @returns An empty array. + */ + getListenAddresses(): string[] { + return []; + } + /** * Handle a remote message from a peer. * diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 6b6eba5e3..abe83cf05 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -50,6 +50,7 @@ vi.mock('@metamask/ocap-kernel', () => ({ closeConnection: mockCloseConnection, registerLocationHints: mockRegisterLocationHints, reconnectPeer: mockReconnectPeer, + getListenAddresses: vi.fn(() => []), }; }, ), diff --git a/packages/kernel-test/src/remote-comms.test.ts b/packages/kernel-test/src/remote-comms.test.ts index 63ae1badf..aae2e1250 100644 --- a/packages/kernel-test/src/remote-comms.test.ts +++ b/packages/kernel-test/src/remote-comms.test.ts @@ -139,6 +139,10 @@ class DirectNetworkService { // Mock implementation - in direct network, connections are always available return Promise.resolve(); }, + + getListenAddresses() { + return []; + }, }; } } @@ -286,10 +290,12 @@ describe('Remote Communications (Integration Tests)', () => { expect(status1.remoteComms).toStrictEqual({ state: 'connected', peerId: expect.any(String), + listenAddresses: [], }); expect(status2.remoteComms).toStrictEqual({ state: 'connected', peerId: expect.any(String), + listenAddresses: [], }); // Each kernel should have a unique peer ID const peerId1 = diff --git a/packages/nodejs/package.json b/packages/nodejs/package.json index db394666d..b84481062 100644 --- a/packages/nodejs/package.json +++ b/packages/nodejs/package.json @@ -49,9 +49,11 @@ "test:dev:quiet": "yarn test:dev --reporter @ocap/repo-tools/vitest-reporters/silent" }, "dependencies": { + "@chainsafe/libp2p-quic": "^1.1.8", "@endo/eventual-send": "^1.3.4", "@endo/promise-kit": "^1.1.13", "@libp2p/interface": "2.11.0", + "@libp2p/tcp": "10.1.19", "@libp2p/webrtc": "5.2.24", "@metamask/kernel-shims": "workspace:^", "@metamask/kernel-store": "workspace:^", diff --git a/packages/nodejs/src/kernel/PlatformServices.test.ts b/packages/nodejs/src/kernel/PlatformServices.test.ts index a1cc433c1..17bcd3f8e 100644 --- a/packages/nodejs/src/kernel/PlatformServices.test.ts +++ b/packages/nodejs/src/kernel/PlatformServices.test.ts @@ -73,6 +73,14 @@ vi.mock('node:worker_threads', () => ({ }), })); +vi.mock('@chainsafe/libp2p-quic', () => ({ + quic: () => ({ tag: 'mock-quic-transport' }), +})); + +vi.mock('@libp2p/tcp', () => ({ + tcp: () => ({ tag: 'mock-tcp-transport' }), +})); + vi.mock('@metamask/ocap-kernel', async (importOriginal) => { const actual = await importOriginal(); return { @@ -83,6 +91,10 @@ vi.mock('@metamask/ocap-kernel', async (importOriginal) => { closeConnection: mockCloseConnection, registerLocationHints: mockRegisterLocationHints, reconnectPeer: mockReconnectPeer, + resetAllBackoffs: vi.fn(), + getListenAddresses: vi.fn(() => [ + '/ip4/127.0.0.1/udp/12345/quic-v1/p2p/mock-peer-id', + ]), })), }; }); @@ -385,6 +397,163 @@ describe('NodejsPlatformServices', () => { // This is tested through integration tests expect(service).toBeInstanceOf(NodejsPlatformServices); }); + + it('injects QUIC directTransports when directListenAddresses contain /quic-v1', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + const remoteHandler = vi.fn(async () => 'response'); + + await service.initializeRemoteComms( + keySeed, + { + relays, + directListenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + remoteHandler, + ); + + const { initTransport } = await import('@metamask/ocap-kernel'); + expect(initTransport).toHaveBeenCalledWith( + keySeed, + expect.objectContaining({ + relays, + directTransports: [ + { + transport: { tag: 'mock-quic-transport' }, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + ], + }), + expect.any(Function), + undefined, + undefined, + undefined, + ); + }); + + it('injects TCP directTransports when directListenAddresses contain /tcp/', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const remoteHandler = vi.fn(async () => 'response'); + + await service.initializeRemoteComms( + keySeed, + { + directListenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + remoteHandler, + ); + + const { initTransport } = await import('@metamask/ocap-kernel'); + expect(initTransport).toHaveBeenCalledWith( + keySeed, + expect.objectContaining({ + directTransports: [ + { + transport: { tag: 'mock-tcp-transport' }, + listenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + ], + }), + expect.any(Function), + undefined, + undefined, + undefined, + ); + }); + + it('injects both QUIC and TCP when directListenAddresses contain both', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const remoteHandler = vi.fn(async () => 'response'); + + await service.initializeRemoteComms( + keySeed, + { + directListenAddresses: [ + '/ip4/0.0.0.0/udp/0/quic-v1', + '/ip4/0.0.0.0/tcp/4001', + ], + }, + remoteHandler, + ); + + const { initTransport } = await import('@metamask/ocap-kernel'); + expect(initTransport).toHaveBeenCalledWith( + keySeed, + expect.objectContaining({ + directTransports: [ + { + transport: { tag: 'mock-quic-transport' }, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + { + transport: { tag: 'mock-tcp-transport' }, + listenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + ], + }), + expect.any(Function), + undefined, + undefined, + undefined, + ); + }); + + it('throws for unsupported direct listen addresses', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const remoteHandler = vi.fn(async () => 'response'); + + await expect( + service.initializeRemoteComms( + '0xtest', + { + directListenAddresses: ['/ip4/0.0.0.0/udp/0/webrtc'], + }, + remoteHandler, + ), + ).rejects.toThrowError('Unsupported direct listen address'); + }); + + it('does not inject directTransports when no directListenAddresses provided', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + const remoteHandler = vi.fn(async () => 'response'); + + await service.initializeRemoteComms(keySeed, { relays }, remoteHandler); + + const { initTransport } = await import('@metamask/ocap-kernel'); + const callArgs = ( + initTransport as unknown as ReturnType + ).mock.calls.at(-1); + // Second argument is the options + expect(callArgs?.[1]).not.toHaveProperty('directTransports'); + }); + }); + + describe('getListenAddresses', () => { + it('returns listen addresses after initialization', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const remoteHandler = vi.fn(async () => ''); + + await service.initializeRemoteComms( + '0xabcd', + { relays: [] }, + remoteHandler, + ); + + const addresses = service.getListenAddresses(); + expect(addresses).toStrictEqual([ + '/ip4/127.0.0.1/udp/12345/quic-v1/p2p/mock-peer-id', + ]); + }); + + it('returns empty array before initialization', () => { + const service = new NodejsPlatformServices({ workerFilePath }); + expect(service.getListenAddresses()).toStrictEqual([]); + }); }); describe('sendRemoteMessage', () => { diff --git a/packages/nodejs/src/kernel/PlatformServices.ts b/packages/nodejs/src/kernel/PlatformServices.ts index efe9e8e38..71fcf80bf 100644 --- a/packages/nodejs/src/kernel/PlatformServices.ts +++ b/packages/nodejs/src/kernel/PlatformServices.ts @@ -1,8 +1,11 @@ +import { quic } from '@chainsafe/libp2p-quic'; import { makePromiseKit } from '@endo/promise-kit'; +import { tcp } from '@libp2p/tcp'; import { isJsonRpcMessage } from '@metamask/kernel-utils'; import type { JsonRpcMessage } from '@metamask/kernel-utils'; import { Logger } from '@metamask/logger'; import type { + DirectTransport, PlatformServices, VatId, RemoteMessageHandler, @@ -47,6 +50,8 @@ export class NodejsPlatformServices implements PlatformServices { #resetAllBackoffsFunc: (() => void) | null = null; + #getListenAddressesFunc: (() => string[]) | null = null; + #remoteMessageHandler: RemoteMessageHandler | undefined = undefined; readonly #workerFilePath: string; @@ -253,6 +258,51 @@ export class NodejsPlatformServices implements PlatformServices { throw Error('remote comms already initialized'); } this.#remoteMessageHandler = remoteMessageHandler; + + const { directListenAddresses, ...restOptions } = options; + + const directTransports: DirectTransport[] = []; + + if (directListenAddresses && directListenAddresses.length > 0) { + const quicAddresses: string[] = []; + const tcpAddresses: string[] = []; + + for (const addr of directListenAddresses) { + const isQuic = addr.includes('/quic-v1'); + const isTcp = addr.includes('/tcp/'); + + if (isQuic) { + quicAddresses.push(addr); + } else if (isTcp) { + tcpAddresses.push(addr); + } else { + throw new Error( + `Unsupported direct listen address: ${addr}. ` + + `Only QUIC (/quic-v1) and TCP (/tcp/) addresses are supported.`, + ); + } + } + + if (quicAddresses.length > 0) { + directTransports.push({ + transport: quic(), + listenAddresses: quicAddresses, + }); + } + + if (tcpAddresses.length > 0) { + directTransports.push({ + transport: tcp(), + listenAddresses: tcpAddresses, + }); + } + } + + const enhancedOptions: RemoteCommsOptions = { + ...restOptions, + ...(directTransports.length > 0 ? { directTransports } : {}), + }; + const { sendRemoteMessage, stop, @@ -260,9 +310,10 @@ export class NodejsPlatformServices implements PlatformServices { registerLocationHints, reconnectPeer, resetAllBackoffs, + getListenAddresses, } = await initTransport( keySeed, - options, + enhancedOptions, this.#handleRemoteMessage.bind(this), onRemoteGiveUp, incarnationId, @@ -274,6 +325,7 @@ export class NodejsPlatformServices implements PlatformServices { this.#registerLocationHintsFunc = registerLocationHints; this.#reconnectPeerFunc = reconnectPeer; this.#resetAllBackoffsFunc = resetAllBackoffs; + this.#getListenAddressesFunc = getListenAddresses; } /** @@ -293,6 +345,7 @@ export class NodejsPlatformServices implements PlatformServices { this.#registerLocationHintsFunc = null; this.#reconnectPeerFunc = null; this.#resetAllBackoffsFunc = null; + this.#getListenAddressesFunc = null; } /** @@ -347,5 +400,18 @@ export class NodejsPlatformServices implements PlatformServices { } this.#resetAllBackoffsFunc(); } + + /** + * Get the listen addresses of the libp2p node. + * Returns multiaddr strings that other peers can use to dial this node directly. + * + * @returns The listen address strings, or empty array if remote comms not initialized. + */ + getListenAddresses(): string[] { + if (!this.#getListenAddressesFunc) { + return []; + } + return this.#getListenAddressesFunc(); + } } harden(NodejsPlatformServices); diff --git a/packages/nodejs/test/e2e/quic-transport.test.ts b/packages/nodejs/test/e2e/quic-transport.test.ts new file mode 100644 index 000000000..3fec07e3b --- /dev/null +++ b/packages/nodejs/test/e2e/quic-transport.test.ts @@ -0,0 +1,330 @@ +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { Kernel, makeKernelStore } from '@metamask/ocap-kernel'; +import { delay } from '@ocap/repo-tools/test-utils'; +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import { makeTestKernel } from '../helpers/kernel.ts'; +import { + getVatRootRef, + launchVatAndGetURL, + makeRemoteVatConfig, + sendRemoteMessage, +} from '../helpers/remote-comms.ts'; + +// Increase timeout for network operations +const NETWORK_TIMEOUT = 30_000; + +/** + * Stop an operation with a timeout to prevent hangs during cleanup. + * + * @param stopFn - The stop function to call. + * @param timeoutMs - The timeout in milliseconds. + * @param label - A label for logging. + */ +async function stopWithTimeout( + stopFn: () => Promise, + timeoutMs: number, + label: string, +): Promise { + try { + await Promise.race([ + stopFn(), + new Promise((_resolve, reject) => + setTimeout(() => reject(new Error(`${label} timed out`)), timeoutMs), + ), + ]); + } catch { + // Ignore timeout errors during cleanup + } +} + +// Listen addresses for each kernel (port 0 = OS-assigned) +const quicListenAddress = '/ip4/127.0.0.1/udp/0/quic-v1'; +const tcpListenAddress = '/ip4/127.0.0.1/tcp/0'; + +/** + * Get the connected remote comms info from a kernel's status. + * + * @param kernel - The kernel to get info from. + * @returns The peer ID and listen addresses. + */ +async function getConnectedInfo(kernel: Kernel): Promise<{ + peerId: string; + listenAddresses: string[]; + quicAddresses: string[]; + tcpAddresses: string[]; +}> { + const status = await kernel.getStatus(); + if (status.remoteComms?.state !== 'connected') { + throw new Error('Remote comms not connected'); + } + const { peerId, listenAddresses } = status.remoteComms; + return { + peerId, + listenAddresses, + quicAddresses: listenAddresses.filter((addr) => addr.includes('/quic-v1/')), + tcpAddresses: listenAddresses.filter( + (addr) => addr.includes('/tcp/') && !addr.includes('/ws'), + ), + }; +} + +describe.sequential('Direct Transport E2E', () => { + let kernel1: Kernel; + let kernel2: Kernel; + let kernelDatabase1: Awaited>; + let kernelDatabase2: Awaited>; + let kernelStore1: ReturnType; + let kernelStore2: ReturnType; + + beforeEach(async () => { + kernelDatabase1 = await makeSQLKernelDatabase({ dbFilename: ':memory:' }); + kernelStore1 = makeKernelStore(kernelDatabase1); + + kernelDatabase2 = await makeSQLKernelDatabase({ dbFilename: ':memory:' }); + kernelStore2 = makeKernelStore(kernelDatabase2); + + kernel1 = await makeTestKernel(kernelDatabase1); + kernel2 = await makeTestKernel(kernelDatabase2); + }); + + afterEach(async () => { + const STOP_TIMEOUT = 3000; + await Promise.all([ + kernel1 && + stopWithTimeout( + async () => kernel1.stop(), + STOP_TIMEOUT, + 'kernel1.stop', + ), + kernel2 && + stopWithTimeout( + async () => kernel2.stop(), + STOP_TIMEOUT, + 'kernel2.stop', + ), + ]); + if (kernelDatabase1) { + kernelDatabase1.close(); + } + if (kernelDatabase2) { + kernelDatabase2.close(); + } + await delay(200); + }); + + describe('Initialization', () => { + it( + 'initializes remote comms with QUIC transport', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + + const info1 = await getConnectedInfo(kernel1); + const info2 = await getConnectedInfo(kernel2); + + // Each kernel should have QUIC listen addresses + expect(info1.quicAddresses.length).toBeGreaterThan(0); + expect(info2.quicAddresses.length).toBeGreaterThan(0); + + // Peer IDs should be distinct + expect(info1.peerId).not.toBe(info2.peerId); + }, + NETWORK_TIMEOUT, + ); + + it( + 'initializes remote comms with TCP transport', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [tcpListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [tcpListenAddress], + }); + + const info1 = await getConnectedInfo(kernel1); + const info2 = await getConnectedInfo(kernel2); + + // Each kernel should have TCP listen addresses + expect(info1.tcpAddresses.length).toBeGreaterThan(0); + expect(info2.tcpAddresses.length).toBeGreaterThan(0); + + // Peer IDs should be distinct + expect(info1.peerId).not.toBe(info2.peerId); + }, + NETWORK_TIMEOUT, + ); + + it( + 'initializes remote comms with both QUIC and TCP', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [quicListenAddress, tcpListenAddress], + }); + + const info1 = await getConnectedInfo(kernel1); + + expect(info1.quicAddresses.length).toBeGreaterThan(0); + expect(info1.tcpAddresses.length).toBeGreaterThan(0); + }, + NETWORK_TIMEOUT, + ); + }); + + describe('Direct Connectivity', () => { + it( + 'sends a message via direct QUIC', + async () => { + // Initialize both kernels with QUIC only + await kernel1.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + + // Get kernel2's QUIC addresses and register them on kernel1 + const info2 = await getConnectedInfo(kernel2); + await kernel1.registerLocationHints(info2.peerId, info2.quicAddresses); + + // Launch vats + const aliceConfig = makeRemoteVatConfig('Alice'); + const bobConfig = makeRemoteVatConfig('Bob'); + await launchVatAndGetURL(kernel1, aliceConfig); + const bobURL = await launchVatAndGetURL(kernel2, bobConfig); + + const aliceRef = getVatRootRef(kernel1, kernelStore1, 'Alice'); + + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'establishes bidirectional QUIC communication', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + + // Exchange QUIC addresses + const info1 = await getConnectedInfo(kernel1); + const info2 = await getConnectedInfo(kernel2); + + await kernel1.registerLocationHints(info2.peerId, info2.quicAddresses); + await kernel2.registerLocationHints(info1.peerId, info1.quicAddresses); + + // Launch vats + const aliceConfig = makeRemoteVatConfig('Alice'); + const bobConfig = makeRemoteVatConfig('Bob'); + const aliceURL = await launchVatAndGetURL(kernel1, aliceConfig); + const bobURL = await launchVatAndGetURL(kernel2, bobConfig); + + const aliceRef = getVatRootRef(kernel1, kernelStore1, 'Alice'); + const bobRef = getVatRootRef(kernel2, kernelStore2, 'Bob'); + + // Alice → Bob + const aliceToBob = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(aliceToBob).toContain('vat Bob got "hello" from Alice'); + + // Bob → Alice + const bobToAlice = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob'], + ); + expect(bobToAlice).toContain('vat Alice got "hello" from Bob'); + }, + NETWORK_TIMEOUT, + ); + + it( + 'sends multiple sequential messages via QUIC', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [quicListenAddress], + }); + + const info2 = await getConnectedInfo(kernel2); + await kernel1.registerLocationHints(info2.peerId, info2.quicAddresses); + + const aliceConfig = makeRemoteVatConfig('Alice'); + const bobConfig = makeRemoteVatConfig('Bob'); + await launchVatAndGetURL(kernel1, aliceConfig); + const bobURL = await launchVatAndGetURL(kernel2, bobConfig); + + const aliceRef = getVatRootRef(kernel1, kernelStore1, 'Alice'); + + for (let i = 0; i < 5; i++) { + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + } + }, + NETWORK_TIMEOUT, + ); + + it( + 'sends a message via direct TCP', + async () => { + await kernel1.initRemoteComms({ + directListenAddresses: [tcpListenAddress], + }); + await kernel2.initRemoteComms({ + directListenAddresses: [tcpListenAddress], + }); + + const info2 = await getConnectedInfo(kernel2); + await kernel1.registerLocationHints(info2.peerId, info2.tcpAddresses); + + const aliceConfig = makeRemoteVatConfig('Alice'); + const bobConfig = makeRemoteVatConfig('Bob'); + await launchVatAndGetURL(kernel1, aliceConfig); + const bobURL = await launchVatAndGetURL(kernel2, bobConfig); + + const aliceRef = getVatRootRef(kernel1, kernelStore1, 'Alice'); + + const response = await sendRemoteMessage( + kernel1, + aliceRef, + bobURL, + 'hello', + ['Alice'], + ); + expect(response).toContain('vat Bob got "hello" from Alice'); + }, + NETWORK_TIMEOUT, + ); + }); +}); diff --git a/packages/ocap-kernel/src/Kernel.test.ts b/packages/ocap-kernel/src/Kernel.test.ts index 021f52fc6..336398c71 100644 --- a/packages/ocap-kernel/src/Kernel.test.ts +++ b/packages/ocap-kernel/src/Kernel.test.ts @@ -54,6 +54,8 @@ const mocks = vi.hoisted(() => { reconnectPeer = vi.fn().mockResolvedValue(undefined); + registerLocationHints = vi.fn().mockResolvedValue(undefined); + getPeerId = vi.fn().mockReturnValue('mock-peer-id'); constructor() { @@ -111,6 +113,7 @@ describe('Kernel', () => { terminateAll: async () => undefined, stopRemoteComms: vi.fn(async () => undefined), resetAllBackoffs: vi.fn(async () => undefined), + getListenAddresses: vi.fn(() => []), } as unknown as PlatformServices; launchWorkerMock = vi @@ -1050,5 +1053,23 @@ describe('Kernel', () => { ); }); }); + + describe('registerLocationHints()', () => { + it('registers location hints via RemoteManager', async () => { + const kernel = await Kernel.make( + mockPlatformServices, + mockKernelDatabase, + ); + const remoteManagerInstance = mocks.RemoteManager.lastInstance; + await kernel.registerLocationHints('peer-123', [ + '/ip4/192.168.1.1/udp/4001/quic-v1/p2p/peer-123', + ]); + expect( + remoteManagerInstance.registerLocationHints, + ).toHaveBeenCalledWith('peer-123', [ + '/ip4/192.168.1.1/udp/4001/quic-v1/p2p/peer-123', + ]); + }); + }); }); }); diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index 6b864b03b..e66e535a2 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -327,6 +327,17 @@ export class Kernel { await this.#remoteManager.reconnectPeer(peerId, hints); } + /** + * Register location hints for a remote peer. + * Hints are multiaddr strings that can be used to dial the peer directly. + * + * @param peerId - The peer ID to register hints for. + * @param hints - The location hint multiaddr strings. + */ + async registerLocationHints(peerId: string, hints: string[]): Promise { + await this.#remoteManager.registerLocationHints(peerId, hints); + } + /** * Send a message from the kernel to an object in a vat. * @@ -583,6 +594,7 @@ export class Kernel { status.remoteComms = { state: 'connected', peerId: this.#remoteManager.getPeerId(), + listenAddresses: this.#platformServices.getListenAddresses(), }; } else if (this.#remoteManager.isIdentityInitialized()) { status.remoteComms = { diff --git a/packages/ocap-kernel/src/index.ts b/packages/ocap-kernel/src/index.ts index 4e402dca9..c9e1b804b 100644 --- a/packages/ocap-kernel/src/index.ts +++ b/packages/ocap-kernel/src/index.ts @@ -18,6 +18,7 @@ export type { SystemSubclusterConfig, } from './types.ts'; export type { + DirectTransport, RemoteIdentity, RemoteMessageHandler, SendRemoteMessage, diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts index 054e0e6a2..0fc082abe 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts @@ -98,7 +98,18 @@ vi.mock('@metamask/logger', () => ({ })); vi.mock('@multiformats/multiaddr', () => ({ - multiaddr: (addr: string) => addr, + multiaddr: (addr: string) => { + // Real multiaddr() throws on malformed addresses + if (!addr.startsWith('/')) { + throw new Error(`invalid multiaddr "${addr}"`); + } + // Extract the last /p2p/ segment if present + const peerIdMatch = addr.match(/\/p2p\/([^/]+)$/u); + return { + toString: () => addr, + getPeerId: () => peerIdMatch?.[1] ?? null, + }; + }, })); // Simple ByteStream mock @@ -162,6 +173,10 @@ describe('ConnectionFactory', () => { toString: () => 'test-peer-id', }, addEventListener: vi.fn(), + getMultiaddrs: vi.fn(() => [ + { toString: () => '/ip4/127.0.0.1/udp/12345/quic-v1/p2p/test-peer-id' }, + { toString: () => '/ip4/127.0.0.1/tcp/9001/ws/p2p/test-peer-id' }, + ]), dialProtocol: vi.fn( async ( addr: string, @@ -194,14 +209,17 @@ describe('ConnectionFactory', () => { /** * Create a new ConnectionFactory. * - * @param signal - The signal to use for the ConnectionFactory. - * @param maxRetryAttempts - Maximum number of retry attempts. + * @param options - Options for the factory. + * @param options.signal - The signal to use for the ConnectionFactory. + * @param options.maxRetryAttempts - Maximum number of retry attempts. + * @param options.directTransports - Optional direct transports with listen addresses. * @returns The ConnectionFactory. */ - async function createFactory( - signal?: AbortSignal, - maxRetryAttempts?: number, - ): Promise< + async function createFactory(options?: { + signal?: AbortSignal; + maxRetryAttempts?: number; + directTransports?: import('../types.ts').DirectTransport[]; + }): Promise< Awaited< ReturnType< typeof import('./connection-factory.ts').ConnectionFactory.make @@ -210,13 +228,14 @@ describe('ConnectionFactory', () => { > { const { ConnectionFactory } = await import('./connection-factory.ts'); const { Logger } = await import('@metamask/logger'); - return ConnectionFactory.make( + return ConnectionFactory.make({ keySeed, knownRelays, - new Logger(), - signal ?? new AbortController().signal, - maxRetryAttempts, - ); + logger: new Logger(), + signal: options?.signal ?? new AbortController().signal, + maxRetryAttempts: options?.maxRetryAttempts, + directTransports: options?.directTransports, + }); } describe('initialize', () => { @@ -258,9 +277,22 @@ describe('ConnectionFactory', () => { expect(callArgs.peerDiscovery).toBeDefined(); }); + it('omits bootstrap when no relays are provided', async () => { + const { ConnectionFactory } = await import('./connection-factory.ts'); + factory = await ConnectionFactory.make({ + keySeed, + knownRelays: [], + logger: new (await import('@metamask/logger')).Logger(), + signal: new AbortController().signal, + }); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.peerDiscovery).toBeUndefined(); + }); + it('accepts maxRetryAttempts parameter', async () => { const maxRetryAttempts = 5; - factory = await createFactory(undefined, maxRetryAttempts); + factory = await createFactory({ maxRetryAttempts }); expect(createLibp2p).toHaveBeenCalledOnce(); expect(libp2pState.startCalled).toBe(true); @@ -392,6 +424,20 @@ describe('ConnectionFactory', () => { ); expect(relay1Addresses).toHaveLength(2); // Just WebRTC and circuit }); + + it('returns empty array when no relays and no hints', async () => { + const { ConnectionFactory } = await import('./connection-factory.ts'); + factory = await ConnectionFactory.make({ + keySeed, + knownRelays: [], + logger: new (await import('@metamask/logger')).Logger(), + signal: new AbortController().signal, + }); + + const addresses = factory.candidateAddressStrings('peer123', []); + + expect(addresses).toStrictEqual([]); + }); }); describe('openChannelOnce', () => { @@ -442,7 +488,7 @@ describe('ConnectionFactory', () => { const controller = new AbortController(); controller.abort(); - factory = await createFactory(controller.signal); + factory = await createFactory({ signal: controller.signal }); await expect(factory.openChannelOnce('peer123')).rejects.toThrow( AbortError, @@ -472,7 +518,7 @@ describe('ConnectionFactory', () => { handle: vi.fn(), })); - factory = await createFactory(controller.signal); + factory = await createFactory({ signal: controller.signal }); // The error is caught, then on retry signal.aborted is checked and AbortError is thrown await expect(factory.openChannelOnce('peer123')).rejects.toThrow( @@ -587,6 +633,20 @@ describe('ConnectionFactory', () => { expect.stringContaining('opened channel to peer123'), ); }); + + it('throws fallback error when no relays and no hints', async () => { + const { ConnectionFactory } = await import('./connection-factory.ts'); + factory = await ConnectionFactory.make({ + keySeed, + knownRelays: [], + logger: new (await import('@metamask/logger')).Logger(), + signal: new AbortController().signal, + }); + + await expect(factory.openChannelOnce('peer123')).rejects.toThrow( + 'unable to open channel to peer123', + ); + }); }); describe('openChannelWithRetry', () => { @@ -698,12 +758,12 @@ describe('ConnectionFactory', () => { vi.resetModules(); const { ConnectionFactory } = await import('./connection-factory.ts'); const { Logger } = await import('@metamask/logger'); - factory = await ConnectionFactory.make( + factory = await ConnectionFactory.make({ keySeed, knownRelays, - new Logger(), - new AbortController().signal, - ); + logger: new Logger(), + signal: new AbortController().signal, + }); await factory.openChannelWithRetry('peer123'); @@ -746,12 +806,12 @@ describe('ConnectionFactory', () => { vi.resetModules(); const { ConnectionFactory } = await import('./connection-factory.ts'); const { Logger } = await import('@metamask/logger'); - factory = await ConnectionFactory.make( + factory = await ConnectionFactory.make({ keySeed, knownRelays, - new Logger(), - new AbortController().signal, - ); + logger: new Logger(), + signal: new AbortController().signal, + }); await factory.openChannelWithRetry('peer123'); @@ -797,13 +857,13 @@ describe('ConnectionFactory', () => { vi.resetModules(); const { ConnectionFactory } = await import('./connection-factory.ts'); const { Logger } = await import('@metamask/logger'); - factory = await ConnectionFactory.make( + factory = await ConnectionFactory.make({ keySeed, knownRelays, - new Logger(), - new AbortController().signal, + logger: new Logger(), + signal: new AbortController().signal, maxRetryAttempts, - ); + }); await factory.openChannelWithRetry('peer123'); @@ -1209,7 +1269,7 @@ describe('ConnectionFactory', () => { // Verify dial was made expect(libp2pState.dials).toHaveLength(1); - expect(libp2pState.dials[0]?.addr).toContain('hint.example'); + expect(libp2pState.dials[0]?.addr.toString()).toContain('hint.example'); // Clean up await factory.stop(); @@ -1239,4 +1299,181 @@ describe('ConnectionFactory', () => { expect(outboundChannel.peerId).toBe('outbound-peer'); }); }); + + describe('directTransports', () => { + it('includes a single direct transport in libp2p config', async () => { + const mockTransport = { tag: 'quic-transport' }; + factory = await createFactory({ + directTransports: [ + { + transport: mockTransport, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + ], + }); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.transports).toHaveLength(5); // 4 default + 1 direct + expect(callArgs.transports[4]).toBe(mockTransport); + }); + + it('includes multiple direct transports in libp2p config', async () => { + const mockQuic = { tag: 'quic-transport' }; + const mockTcp = { tag: 'tcp-transport' }; + factory = await createFactory({ + directTransports: [ + { + transport: mockQuic, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + { + transport: mockTcp, + listenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + ], + }); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.transports).toHaveLength(6); // 4 default + 2 direct + expect(callArgs.transports[4]).toBe(mockQuic); + expect(callArgs.transports[5]).toBe(mockTcp); + }); + + it('merges direct listen addresses with default addresses', async () => { + factory = await createFactory({ + directTransports: [ + { + transport: {}, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + ], + }); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.addresses.listen).toStrictEqual([ + '/webrtc', + '/p2p-circuit', + '/ip4/0.0.0.0/udp/0/quic-v1', + ]); + }); + + it('merges multiple transport listen addresses', async () => { + factory = await createFactory({ + directTransports: [ + { + transport: {}, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + { + transport: {}, + listenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + ], + }); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.addresses.listen).toStrictEqual([ + '/webrtc', + '/p2p-circuit', + '/ip4/0.0.0.0/udp/0/quic-v1', + '/ip4/0.0.0.0/tcp/4001', + ]); + }); + + it('does not add direct transports when not provided', async () => { + factory = await createFactory(); + + const callArgs = createLibp2p.mock.calls[0]?.[0]; + expect(callArgs.transports).toHaveLength(4); + expect(callArgs.addresses.listen).toStrictEqual([ + '/webrtc', + '/p2p-circuit', + ]); + }); + }); + + describe('getListenAddresses', () => { + it('returns multiaddr strings from libp2p', async () => { + factory = await createFactory(); + + const addresses = factory.getListenAddresses(); + + expect(addresses).toStrictEqual([ + '/ip4/127.0.0.1/udp/12345/quic-v1/p2p/test-peer-id', + '/ip4/127.0.0.1/tcp/9001/ws/p2p/test-peer-id', + ]); + }); + + it('returns empty array after stop', async () => { + factory = await createFactory(); + await factory.stop(); + + const addresses = factory.getListenAddresses(); + + expect(addresses).toStrictEqual([]); + }); + }); + + describe('candidateAddressStrings with direct addresses', () => { + it('places direct address hints first', async () => { + factory = await createFactory(); + + const directHint = '/ip4/192.168.1.1/udp/4001/quic-v1/p2p/peer123'; + const addresses = factory.candidateAddressStrings('peer123', [ + directHint, + ]); + + expect(addresses[0]).toBe(directHint); + // Relay addresses follow + expect(addresses[1]).toContain('/p2p-circuit/'); + }); + + it('does not wrap direct address hints in relay pattern', async () => { + factory = await createFactory(); + + const directHint = '/ip4/192.168.1.1/udp/4001/quic-v1/p2p/peer123'; + const addresses = factory.candidateAddressStrings('peer123', [ + directHint, + ]); + + // The direct address should appear exactly as provided + expect(addresses).toContain(directHint); + // It should NOT be wrapped in a relay circuit + const wrappedDirectAddresses = addresses.filter( + (addr: string) => + addr.includes('/p2p-circuit/') && addr.includes('quic-v1'), + ); + expect(wrappedDirectAddresses).toHaveLength(0); + }); + + it('handles mix of direct and relay hints', async () => { + factory = await createFactory(); + + const directHint = '/ip4/192.168.1.1/udp/4001/quic-v1/p2p/peer123'; + const relayHint = '/dns4/hint.example/tcp/443/wss/p2p/hint'; + const addresses = factory.candidateAddressStrings('peer123', [ + directHint, + relayHint, + ]); + + // Direct addresses come first + expect(addresses[0]).toBe(directHint); + // Relay hint addresses follow + expect(addresses[1]).toContain('hint.example'); + expect(addresses[1]).toContain('/p2p-circuit/'); + }); + + it('skips malformed hints and still generates relay addresses', async () => { + factory = await createFactory(); + + const addresses = factory.candidateAddressStrings('peer123', [ + 'not-a-valid-multiaddr', + ]); + + // Malformed hint is skipped, relay addresses still generated + expect(addresses).not.toContain('not-a-valid-multiaddr'); + expect(addresses.length).toBeGreaterThan(0); + expect(addresses.every((a) => a.includes('/p2p-circuit/'))).toBe(true); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts index 796196ac7..eece4d3fb 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts @@ -17,7 +17,12 @@ import { multiaddr } from '@multiformats/multiaddr'; import { byteStream } from 'it-byte-stream'; import { createLibp2p } from 'libp2p'; -import type { Channel, InboundConnectionHandler } from '../types.ts'; +import type { + Channel, + ConnectionFactoryOptions, + DirectTransport, + InboundConnectionHandler, +} from '../types.ts'; /** * Connection factory for libp2p network operations. @@ -38,56 +43,47 @@ export class ConnectionFactory { readonly #maxRetryAttempts: number; + readonly #directTransports: DirectTransport[]; + #inboundHandler?: InboundConnectionHandler; /** * Constructor for the ConnectionFactory. * - * @param keySeed - The key seed to use for the libp2p node. - * @param knownRelays - The known relays to use for the libp2p node. - * @param logger - The logger to use for the libp2p node. - * @param signal - The signal to use for the libp2p node. - * @param maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options - The options for the ConnectionFactory. + * @param options.keySeed - The key seed to use for the libp2p node. + * @param options.knownRelays - The known relays to use for the libp2p node. + * @param options.logger - The logger to use for the libp2p node. + * @param options.signal - The signal to use for the libp2p node. + * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses. */ // eslint-disable-next-line no-restricted-syntax - private constructor( - keySeed: string, - knownRelays: string[], - logger: Logger, - signal: AbortSignal, - maxRetryAttempts?: number, - ) { - this.#keySeed = keySeed; - this.#knownRelays = knownRelays; - this.#logger = logger; - this.#signal = signal; - this.#maxRetryAttempts = maxRetryAttempts ?? 0; + private constructor(options: ConnectionFactoryOptions) { + this.#keySeed = options.keySeed; + this.#knownRelays = options.knownRelays; + this.#logger = options.logger; + this.#signal = options.signal; + this.#maxRetryAttempts = options.maxRetryAttempts ?? 0; + this.#directTransports = options.directTransports ?? []; } /** * Create a new ConnectionFactory instance. * - * @param keySeed - The key seed to use for the libp2p node. - * @param knownRelays - The known relays to use for the libp2p node. - * @param logger - The logger to use for the libp2p node. - * @param signal - The signal to use for the libp2p node. - * @param maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options - The options for the ConnectionFactory. + * @param options.keySeed - The key seed to use for the libp2p node. + * @param options.knownRelays - The known relays to use for the libp2p node. + * @param options.logger - The logger to use for the libp2p node. + * @param options.signal - The signal to use for the libp2p node. + * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses. * @returns A promise for the new ConnectionFactory instance. */ static async make( - keySeed: string, - knownRelays: string[], - logger: Logger, - signal: AbortSignal, - maxRetryAttempts?: number, + options: ConnectionFactoryOptions, ): Promise { - const factory = new ConnectionFactory( - keySeed, - knownRelays, - logger, - signal, - maxRetryAttempts, - ); + const factory = new ConnectionFactory(options); await factory.#init(); return factory; } @@ -101,7 +97,11 @@ export class ConnectionFactory { this.#libp2p = await createLibp2p({ privateKey, addresses: { - listen: ['/webrtc', '/p2p-circuit'], + listen: [ + '/webrtc', + '/p2p-circuit', + ...this.#directTransports.flatMap((dt) => dt.listenAddresses), + ], appendAnnounce: ['/webrtc'], }, transports: [ @@ -120,6 +120,9 @@ export class ConnectionFactory { }, }), circuitRelayTransport(), + ...this.#directTransports.map( + (dt) => dt.transport as ReturnType, + ), ], connectionEncrypters: [noise()], streamMuxers: [yamux()], @@ -127,11 +130,16 @@ export class ConnectionFactory { // Allow private addresses for local testing denyDialMultiaddr: async () => false, }, - peerDiscovery: [ - bootstrap({ - list: this.#knownRelays, - }), - ], + // No peer discovery in direct connection mode + ...(this.#knownRelays.length > 0 + ? { + peerDiscovery: [ + bootstrap({ + list: this.#knownRelays, + }), + ], + } + : {}), services: { identify: identify(), ping: ping(), @@ -174,6 +182,19 @@ export class ConnectionFactory { this.#inboundHandler = handler; } + /** + * Get the listen addresses of the libp2p node. + * These are the multiaddr strings that other peers can use to dial this node. + * + * @returns The listen address strings. + */ + getListenAddresses(): string[] { + if (!this.#libp2p) { + return []; + } + return this.#libp2p.getMultiaddrs().map((ma) => ma.toString()); + } + /** * Generate key info from the seed. * @@ -195,14 +216,33 @@ export class ConnectionFactory { * @returns The candidate address strings. */ candidateAddressStrings(peerId: string, hints: string[]): string[] { - const possibleContacts = hints.concat( - ...this.#knownRelays.filter((relay) => !hints.includes(relay)), + const directAddresses: string[] = []; + const relayHints: string[] = []; + + for (const hint of hints) { + try { + if (multiaddr(hint).getPeerId() === peerId) { + directAddresses.push(hint); + } else { + relayHints.push(hint); + } + } catch { + // Skip malformed hints so relay fallback still works. + this.#logger.log(`skipping malformed hint: ${hint}`); + } + } + + const possibleRelays = relayHints.concat( + ...this.#knownRelays.filter((relay) => !relayHints.includes(relay)), ); - // Try WebRTC via relay first, then WebSocket via relay. - return possibleContacts.flatMap((relay) => [ + + // Direct addresses first, then WebRTC via relay, then WebSocket via relay. + const relayAddresses = possibleRelays.flatMap((relay) => [ `${relay}/p2p-circuit/webrtc/p2p/${peerId}`, `${relay}/p2p-circuit/p2p/${peerId}`, ]); + + return [...directAddresses, ...relayAddresses]; } /** @@ -392,15 +432,16 @@ export class ConnectionFactory { try { // Add a timeout to prevent hanging if libp2p.stop() doesn't complete const STOP_TIMEOUT_MS = 2000; + let timeoutId: ReturnType; await Promise.race([ this.#libp2p.stop(), - new Promise((_resolve, reject) => - setTimeout( + new Promise((_resolve, reject) => { + timeoutId = setTimeout( () => reject(new Error('libp2p.stop() timed out')), STOP_TIMEOUT_MS, - ), - ), - ]); + ); + }), + ]).finally(() => clearTimeout(timeoutId)); } catch (error) { this.#logger.error('libp2p.stop() failed or timed out:', error); // Continue anyway - we'll clear the reference diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 439f1449b..3c28b6667 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -80,6 +80,7 @@ const mockConnectionFactory = { onInboundConnection: vi.fn(), stop: vi.fn().mockResolvedValue(undefined), closeChannel: vi.fn().mockResolvedValue(undefined), + getListenAddresses: vi.fn().mockReturnValue([]), }; vi.mock('./connection-factory.ts', () => { @@ -241,13 +242,14 @@ describe('transport.initTransport', () => { await initTransport(keySeed, { relays: knownRelays }, vi.fn()); - expect(ConnectionFactory.make).toHaveBeenCalledWith( + expect(ConnectionFactory.make).toHaveBeenCalledWith({ keySeed, knownRelays, - expect.any(Object), // Logger instance - expect.any(AbortSignal), // signal from AbortController - undefined, // maxRetryAttempts (optional) - ); + logger: expect.any(Object), + signal: expect.any(AbortSignal), + maxRetryAttempts: undefined, + directTransports: undefined, + }); }); it('passes maxRetryAttempts to ConnectionFactory.make', async () => { @@ -257,16 +259,45 @@ describe('transport.initTransport', () => { await initTransport(keySeed, { relays: [], maxRetryAttempts }, vi.fn()); - expect(ConnectionFactory.make).toHaveBeenCalledWith( + expect(ConnectionFactory.make).toHaveBeenCalledWith({ keySeed, - [], - expect.any(Object), - expect.any(AbortSignal), + knownRelays: [], + logger: expect.any(Object), + signal: expect.any(AbortSignal), maxRetryAttempts, - ); + directTransports: undefined, + }); + }); + + it('passes directTransports to ConnectionFactory.make', async () => { + const { ConnectionFactory } = await import('./connection-factory.ts'); + const keySeed = '0xabcd'; + const mockQuic = { tag: 'quic' }; + const mockTcp = { tag: 'tcp' }; + const directTransports = [ + { + transport: mockQuic, + listenAddresses: ['/ip4/0.0.0.0/udp/0/quic-v1'], + }, + { + transport: mockTcp, + listenAddresses: ['/ip4/0.0.0.0/tcp/4001'], + }, + ]; + + await initTransport(keySeed, { relays: [], directTransports }, vi.fn()); + + expect(ConnectionFactory.make).toHaveBeenCalledWith({ + keySeed, + knownRelays: [], + logger: expect.any(Object), + signal: expect.any(AbortSignal), + maxRetryAttempts: undefined, + directTransports, + }); }); - it('returns sendRemoteMessage, stop, closeConnection, registerLocationHints, and reconnectPeer', async () => { + it('returns sendRemoteMessage, stop, closeConnection, registerLocationHints, reconnectPeer, and getListenAddresses', async () => { const result = await initTransport('0x1234', {}, vi.fn()); expect(result).toHaveProperty('sendRemoteMessage'); @@ -274,11 +305,13 @@ describe('transport.initTransport', () => { expect(result).toHaveProperty('closeConnection'); expect(result).toHaveProperty('registerLocationHints'); expect(result).toHaveProperty('reconnectPeer'); + expect(result).toHaveProperty('getListenAddresses'); expect(typeof result.sendRemoteMessage).toBe('function'); expect(typeof result.stop).toBe('function'); expect(typeof result.closeConnection).toBe('function'); expect(typeof result.registerLocationHints).toBe('function'); expect(typeof result.reconnectPeer).toBe('function'); + expect(typeof result.getListenAddresses).toBe('function'); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 015b54413..312e1b59b 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -76,6 +76,7 @@ export async function initTransport( registerLocationHints: (peerId: string, hints: string[]) => void; reconnectPeer: (peerId: string, hints?: string[]) => Promise; resetAllBackoffs: () => void; + getListenAddresses: () => string[]; }> { const { relays = [], @@ -86,6 +87,7 @@ export async function initTransport( stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, maxMessagesPerSecond = DEFAULT_MESSAGE_RATE_LIMIT, maxConnectionAttemptsPerMinute = DEFAULT_CONNECTION_RATE_LIMIT, + directTransports, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -115,13 +117,14 @@ export async function initTransport( } connectionLossHolder.impl(peerId); }; - const connectionFactory = await ConnectionFactory.make( + const connectionFactory = await ConnectionFactory.make({ keySeed, - relays, + knownRelays: relays, logger, signal, maxRetryAttempts, - ); + directTransports, + }); // Create handshake dependencies (only if incarnation ID is configured). // The incarnation ID is optional primarily for unit tests that don't need @@ -728,5 +731,6 @@ export async function initTransport( registerLocationHints, reconnectPeer, resetAllBackoffs, + getListenAddresses: () => connectionFactory.getListenAddresses(), }; } diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 84b8ddc93..93d474f38 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -1,3 +1,4 @@ +import type { Logger } from '@metamask/logger'; import type { ByteStream } from 'it-byte-stream'; export type InboundConnectionHandler = (channel: Channel) => void; @@ -99,6 +100,42 @@ export type RemoteCommsOptions = { * Uses a sliding 1-minute window. */ maxConnectionAttemptsPerMinute?: number | undefined; + /** + * Direct listen addresses for non-relay transports (e.g. QUIC, TCP). + * Example: `['/ip4/0.0.0.0/udp/0/quic-v1', '/ip4/0.0.0.0/tcp/4001']` + * + * The platform layer detects the required transports from the address strings + * and injects them automatically. Users never need to import transport packages. + */ + directListenAddresses?: string[] | undefined; + /** + * Internal option injected by platform services. Bundles direct transport + * implementations with their listen addresses. Users should use + * `directListenAddresses` instead. + * + * @internal + */ + directTransports?: DirectTransport[]; +}; + +/** + * A direct transport implementation bundled with its listen addresses. + */ +export type DirectTransport = { + transport: unknown; + listenAddresses: string[]; +}; + +/** + * Options for creating a ConnectionFactory instance. + */ +export type ConnectionFactoryOptions = { + keySeed: string; + knownRelays: string[]; + logger: Logger; + signal: AbortSignal; + maxRetryAttempts?: number | undefined; + directTransports?: DirectTransport[] | undefined; }; export type RemoteInfo = { diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index 83e52362e..c77d6ff89 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -377,6 +377,14 @@ export type PlatformServices = { * @returns A promise that resolves when backoffs have been reset. */ resetAllBackoffs: () => Promise; + /** + * Get the listen addresses of the libp2p node. + * Returns multiaddr strings that other peers can use to dial this node directly. + * Returns an empty array if remote comms is not initialized. + * + * @returns The listen address strings. + */ + getListenAddresses: () => string[]; }; // Cluster configuration @@ -469,6 +477,7 @@ const RemoteCommsIdentityOnlyStruct = object({ const RemoteCommsConnectedStruct = object({ state: literal('connected'), peerId: string(), + listenAddresses: array(string()), }); export const KernelStatusStruct = type({ diff --git a/packages/ocap-kernel/src/vats/VatSupervisor.ts b/packages/ocap-kernel/src/vats/VatSupervisor.ts index c8ddb25ce..98ae4f51d 100644 --- a/packages/ocap-kernel/src/vats/VatSupervisor.ts +++ b/packages/ocap-kernel/src/vats/VatSupervisor.ts @@ -43,7 +43,6 @@ import { isVatConfig, coerceVatSyscallObject } from '../types.ts'; const makeLiveSlots: MakeLiveSlotsFn = localMakeLiveSlots; -// eslint-disable-next-line n/no-unsupported-features/node-builtins export type FetchBlob = (bundleURL: string) => Promise; type SupervisorRpcClient = Pick< diff --git a/packages/ocap-kernel/test/remotes-mocks.ts b/packages/ocap-kernel/test/remotes-mocks.ts index 56bfa3659..5e27e9c62 100644 --- a/packages/ocap-kernel/test/remotes-mocks.ts +++ b/packages/ocap-kernel/test/remotes-mocks.ts @@ -62,6 +62,7 @@ export class MockRemotesFactory { registerLocationHints: vi.fn(), reconnectPeer: vi.fn(), resetAllBackoffs: vi.fn(), + getListenAddresses: vi.fn().mockReturnValue([]), }; } diff --git a/yarn.lock b/yarn.lock index e3b364372..cc2c6e1c8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -480,6 +480,93 @@ __metadata: languageName: node linkType: hard +"@chainsafe/libp2p-quic-darwin-arm64@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-darwin-arm64@npm:1.1.8" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-darwin-x64@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-darwin-x64@npm:1.1.8" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-linux-arm64-gnu@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-linux-arm64-gnu@npm:1.1.8" + conditions: os=linux & cpu=arm64 & libc=glibc + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-linux-arm64-musl@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-linux-arm64-musl@npm:1.1.8" + conditions: os=linux & cpu=arm64 & libc=musl + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-linux-x64-gnu@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-linux-x64-gnu@npm:1.1.8" + conditions: os=linux & cpu=x64 & libc=glibc + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-linux-x64-musl@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-linux-x64-musl@npm:1.1.8" + conditions: os=linux & cpu=x64 & libc=musl + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic-win32-x64-msvc@npm:1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic-win32-x64-msvc@npm:1.1.8" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + +"@chainsafe/libp2p-quic@npm:^1.1.8": + version: 1.1.8 + resolution: "@chainsafe/libp2p-quic@npm:1.1.8" + dependencies: + "@chainsafe/libp2p-quic-darwin-arm64": "npm:1.1.8" + "@chainsafe/libp2p-quic-darwin-x64": "npm:1.1.8" + "@chainsafe/libp2p-quic-linux-arm64-gnu": "npm:1.1.8" + "@chainsafe/libp2p-quic-linux-arm64-musl": "npm:1.1.8" + "@chainsafe/libp2p-quic-linux-x64-gnu": "npm:1.1.8" + "@chainsafe/libp2p-quic-linux-x64-musl": "npm:1.1.8" + "@chainsafe/libp2p-quic-win32-x64-msvc": "npm:1.1.8" + "@libp2p/crypto": "npm:^5.1.7" + "@libp2p/interface": "npm:^2.10.5" + "@libp2p/utils": "npm:^6.7.1" + "@multiformats/multiaddr": "npm:^12.4.0" + "@multiformats/multiaddr-matcher": "npm:^2.0.1" + it-stream-types: "npm:^2.0.2" + race-signal: "npm:^1.1.3" + uint8arraylist: "npm:^2.4.8" + dependenciesMeta: + "@chainsafe/libp2p-quic-darwin-arm64": + optional: true + "@chainsafe/libp2p-quic-darwin-x64": + optional: true + "@chainsafe/libp2p-quic-linux-arm64-gnu": + optional: true + "@chainsafe/libp2p-quic-linux-arm64-musl": + optional: true + "@chainsafe/libp2p-quic-linux-x64-gnu": + optional: true + "@chainsafe/libp2p-quic-linux-x64-musl": + optional: true + "@chainsafe/libp2p-quic-win32-x64-msvc": + optional: true + checksum: 10/7db59436903a62788c99b1be076231cbc21f7742f177d61757730a39b97037c002713e239a6f7d3f80088252c87e3048e684eeb9fba33b6de8e6a0374d380dd0 + languageName: node + linkType: hard + "@chainsafe/libp2p-yamux@npm:7.0.4": version: 7.0.4 resolution: "@chainsafe/libp2p-yamux@npm:7.0.4" @@ -1672,18 +1759,18 @@ __metadata: languageName: node linkType: hard -"@libp2p/crypto@npm:^5.0.0, @libp2p/crypto@npm:^5.1.8": - version: 5.1.12 - resolution: "@libp2p/crypto@npm:5.1.12" +"@libp2p/crypto@npm:^5.0.0, @libp2p/crypto@npm:^5.1.7, @libp2p/crypto@npm:^5.1.8": + version: 5.1.13 + resolution: "@libp2p/crypto@npm:5.1.13" dependencies: - "@libp2p/interface": "npm:^3.0.2" + "@libp2p/interface": "npm:^3.1.0" "@noble/curves": "npm:^2.0.1" "@noble/hashes": "npm:^2.0.1" multiformats: "npm:^13.4.0" protons-runtime: "npm:^5.6.0" uint8arraylist: "npm:^2.4.8" uint8arrays: "npm:^5.1.0" - checksum: 10/ff88cca89b27087654415054b54aca9e1c13b59ce3739f759a528b431956b59b19aa1259ed64887eda48d9390ba633a5f5dc89864d90c2fcf9722348a52931ee + checksum: 10/8e4a248ab34f668d2ad55e7c579df6043750e13ab2008e488a6ece7a6b3c4271728691da0f28eee52d134a6044fa8b8dc662af7c0a4a17fac8fc0a17b5bf19e3 languageName: node linkType: hard @@ -1883,7 +1970,7 @@ __metadata: languageName: node linkType: hard -"@libp2p/utils@npm:^6.0.0, @libp2p/utils@npm:^6.7.2": +"@libp2p/utils@npm:^6.0.0, @libp2p/utils@npm:^6.7.1, @libp2p/utils@npm:^6.7.2": version: 6.7.2 resolution: "@libp2p/utils@npm:6.7.2" dependencies: @@ -3062,7 +3149,7 @@ __metadata: languageName: node linkType: hard -"@multiformats/multiaddr-matcher@npm:^2.0.0": +"@multiformats/multiaddr-matcher@npm:^2.0.0, @multiformats/multiaddr-matcher@npm:^2.0.1": version: 2.0.2 resolution: "@multiformats/multiaddr-matcher@npm:2.0.2" dependencies: @@ -3872,9 +3959,11 @@ __metadata: resolution: "@ocap/nodejs@workspace:packages/nodejs" dependencies: "@arethetypeswrong/cli": "npm:^0.17.4" + "@chainsafe/libp2p-quic": "npm:^1.1.8" "@endo/eventual-send": "npm:^1.3.4" "@endo/promise-kit": "npm:^1.1.13" "@libp2p/interface": "npm:2.11.0" + "@libp2p/tcp": "npm:10.1.19" "@libp2p/webrtc": "npm:5.2.24" "@metamask/auto-changelog": "npm:^5.3.0" "@metamask/eslint-config": "npm:^15.0.0"