diff --git a/.gitignore b/.gitignore index 1279a097f..6171d4a15 100644 --- a/.gitignore +++ b/.gitignore @@ -92,3 +92,4 @@ test-results # Claude .claude/settings.local.json +.playwright-mcp/ \ No newline at end of file diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index 165d93a62..28a75a2d6 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -8,6 +8,7 @@ import type { VatId, VatConfig, RemoteCommsOptions, + OnIncarnationChange, } from '@metamask/ocap-kernel'; import { platformServicesMethodSpecs, @@ -62,6 +63,8 @@ export class PlatformServicesClient implements PlatformServices { #remoteGiveUpHandler: ((peerId: string) => void) | undefined = undefined; + #remoteIncarnationChangeHandler: OnIncarnationChange | undefined = undefined; + /** * **ATTN:** Prefer {@link PlatformServicesClient.make} over constructing * this class directly. @@ -96,6 +99,7 @@ export class PlatformServicesClient implements PlatformServices { this.#rpcServer = new RpcService(kernelRemoteHandlers, { remoteDeliver: this.#remoteDeliver.bind(this), remoteGiveUp: this.#remoteGiveUp.bind(this), + remoteIncarnationChange: this.#remoteIncarnationChange.bind(this), }); // Start draining messages immediately after construction @@ -195,6 +199,7 @@ export class PlatformServicesClient implements PlatformServices { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -204,9 +209,11 @@ export class PlatformServicesClient implements PlatformServices { remoteMessageHandler: (from: string, message: string) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { this.#remoteMessageHandler = remoteMessageHandler; this.#remoteGiveUpHandler = onRemoteGiveUp; + this.#remoteIncarnationChangeHandler = onIncarnationChange; await this.#rpcClient.call('initializeRemoteComms', { keySeed, ...Object.fromEntries( @@ -297,6 +304,19 @@ export class PlatformServicesClient implements PlatformServices { return null; } + /** + * Handle a remote incarnation change notification from the server. + * + * @param peerId - The peer ID of the remote that restarted. + * @returns A promise that resolves when handling is complete. + */ + async #remoteIncarnationChange(peerId: string): Promise { + if (this.#remoteIncarnationChangeHandler) { + this.#remoteIncarnationChangeHandler(peerId); + } + return null; + } + /** * Send a message to the server. * diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 8336102ce..d15109e41 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -407,6 +407,7 @@ describe('PlatformServicesServer', () => { expect.any(Function), expect.any(Function), undefined, + expect.any(Function), ); }); @@ -430,6 +431,7 @@ describe('PlatformServicesServer', () => { expect.any(Function), expect.any(Function), undefined, + expect.any(Function), ); }); diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index cf0bdea7f..e6d12bfef 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -293,6 +293,7 @@ export class PlatformServicesServer { this.#handleRemoteMessage.bind(this), this.#handleRemoteGiveUp.bind(this), incarnationId, + this.#handleRemoteIncarnationChange.bind(this), ); this.#sendRemoteMessageFunc = sendRemoteMessage; this.#stopRemoteCommsFunc = stop; @@ -404,5 +405,22 @@ export class PlatformServicesServer { this.#logger.error('Error notifying kernel of remote give up:', error); }); } + + /** + * Handle when a remote peer's incarnation changes (peer restarted). + * Notifies the kernel worker via RPC to reset the RemoteHandle state. + * + * @param peerId - The peer ID of the remote that restarted. + */ + #handleRemoteIncarnationChange(peerId: string): void { + this.#rpcClient + .call('remoteIncarnationChange', { peerId }) + .catch((error) => { + this.#logger.error( + 'Error notifying kernel of remote incarnation change:', + error, + ); + }); + } } harden(PlatformServicesServer); diff --git a/packages/nodejs/src/kernel/PlatformServices.test.ts b/packages/nodejs/src/kernel/PlatformServices.test.ts index e0c6d0e24..a1cc433c1 100644 --- a/packages/nodejs/src/kernel/PlatformServices.test.ts +++ b/packages/nodejs/src/kernel/PlatformServices.test.ts @@ -250,6 +250,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), undefined, undefined, + undefined, ); }); @@ -272,6 +273,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), undefined, undefined, + undefined, ); }); @@ -296,6 +298,7 @@ describe('NodejsPlatformServices', () => { expect.any(Function), giveUpHandler, undefined, + undefined, ); }); @@ -322,6 +325,36 @@ describe('NodejsPlatformServices', () => { expect.any(Function), giveUpHandler, incarnationId, + undefined, + ); + }); + + it('initializes remote comms with onIncarnationChange callback', async () => { + const service = new NodejsPlatformServices({ workerFilePath }); + const keySeed = '0x1234567890abcdef'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + const remoteHandler = vi.fn(async () => 'response'); + const giveUpHandler = vi.fn(); + const incarnationId = 'test-incarnation-id'; + const incarnationChangeHandler = vi.fn(); + + await service.initializeRemoteComms( + keySeed, + { relays }, + remoteHandler, + giveUpHandler, + incarnationId, + incarnationChangeHandler, + ); + + const { initTransport } = await import('@metamask/ocap-kernel'); + expect(initTransport).toHaveBeenCalledWith( + keySeed, + { relays }, + expect.any(Function), + giveUpHandler, + incarnationId, + incarnationChangeHandler, ); }); diff --git a/packages/nodejs/src/kernel/PlatformServices.ts b/packages/nodejs/src/kernel/PlatformServices.ts index 160dea58c..3c7202295 100644 --- a/packages/nodejs/src/kernel/PlatformServices.ts +++ b/packages/nodejs/src/kernel/PlatformServices.ts @@ -9,6 +9,7 @@ import type { SendRemoteMessage, StopRemoteComms, RemoteCommsOptions, + OnIncarnationChange, } from '@metamask/ocap-kernel'; import { initTransport } from '@metamask/ocap-kernel'; import { NodeWorkerDuplexStream } from '@metamask/streams'; @@ -228,6 +229,7 @@ export class NodejsPlatformServices implements PlatformServices { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - This kernel's incarnation ID for handshake protocol. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -237,6 +239,7 @@ export class NodejsPlatformServices implements PlatformServices { remoteMessageHandler: (from: string, message: string) => Promise, onRemoteGiveUp?: (peerId: string) => void, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { if (this.#sendRemoteMessageFunc) { throw Error('remote comms already initialized'); @@ -254,6 +257,7 @@ export class NodejsPlatformServices implements PlatformServices { this.#handleRemoteMessage.bind(this), onRemoteGiveUp, incarnationId, + onIncarnationChange, ); this.#sendRemoteMessageFunc = sendRemoteMessage; this.#stopRemoteCommsFunc = stop; diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 772905244..104a05e99 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -917,8 +917,8 @@ describe.sequential('Remote Communications E2E', () => { const response = kunser(result); // The message should fail because incarnation changed. - // The handshake detects the new incarnation and triggers onRemoteGiveUp, - // which rejects pending promises with a "Remote connection lost" error. + // The handshake detects the new incarnation and triggers onIncarnationChange, + // which resets RemoteHandle state and rejects pending work. expect(response).toBeInstanceOf(Error); expect((response as Error).message).toMatch(/Remote connection lost/u); }, diff --git a/packages/ocap-kernel/src/index.ts b/packages/ocap-kernel/src/index.ts index 2e4aa3532..13e0eca34 100644 --- a/packages/ocap-kernel/src/index.ts +++ b/packages/ocap-kernel/src/index.ts @@ -19,6 +19,7 @@ export type { SendRemoteMessage, StopRemoteComms, RemoteCommsOptions, + OnIncarnationChange, } from './remotes/types.ts'; export type { RemoteMessageBase } from './remotes/kernel/RemoteHandle.ts'; export { diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 59896146e..d3e4dea59 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1166,4 +1166,71 @@ describe('RemoteHandle', () => { expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq }); }); + + describe('handlePeerRestart', () => { + it('resets sequence numbers for fresh start', async () => { + const remote = makeRemote(); + + // Build up some state by sending and receiving messages + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 5, + method: 'deliver', + params: ['notify', resolutions], + }), + ); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // Send a new message - should start from seq=1 + vi.mocked(mockRemoteComms.sendRemoteMessage).mockClear(); + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]![1]; + const parsed = JSON.parse(sentString); + expect(parsed.seq).toBe(1); + // ack should not be included since highestReceivedSeq was reset to 0 + expect(parsed.ack).toBeUndefined(); + }); + + it('clears persisted sequence state', async () => { + const remote = makeRemote(); + + // Build up state + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + // Verify state exists before restart + expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeDefined(); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // Verify state was cleared + expect(mockKernelStore.getRemoteSeqState(mockRemoteId)).toBeUndefined(); + }); + + it('rejects pending URL redemptions', async () => { + const remote = makeRemote(); + + // Start a redemption but don't resolve it + const redeemPromise = remote.redeemOcapURL('ocap:test@peer,relay'); + + // Call handlePeerRestart + remote.handlePeerRestart(); + + // The pending redemption should be rejected + await expect(redeemPromise).rejects.toThrow('Remote peer restarted'); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index a8aa71a10..e1e47688b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -931,4 +931,40 @@ export class RemoteHandle implements EndpointHandle { this.#clearDelayedAck(); this.rejectPendingRedemptions('Remote connection cleanup'); } + + /** + * Handle a peer restart (incarnation change). + * Resets all state for a fresh start: clears timers, rejects pending messages + * and redemptions, resets sequence numbers, and clears persisted seq state. + * Called when the handshake detects that the remote peer has restarted. + */ + handlePeerRestart(): void { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: handling peer restart, resetting state`, + ); + + // Clear timers + this.#clearAckTimeout(); + this.#clearDelayedAck(); + + // Reject all pending messages - they will never be ACKed by the restarted peer + if (this.#hasPendingMessages()) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: rejecting ${this.#getPendingCount()} pending messages due to peer restart`, + ); + this.#rejectAllPending('Remote peer restarted'); + } + + // Reject pending URL redemptions - the remote won't have context for them + this.rejectPendingRedemptions('Remote peer restarted'); + + // Reset sequence numbers for fresh start + this.#nextSendSeq = 0; + this.#highestReceivedSeq = 0; + this.#startSeq = 0; + this.#retryCount = 0; + + // Clear persisted sequence state + this.#kernelStore.clearRemoteSeqState(this.remoteId); + } } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 232f758e1..4d7b1f67b 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -22,6 +22,7 @@ describe('RemoteManager', () => { let remoteManager: RemoteManager; let mockPlatformServices: PlatformServices; let kernelStore: ReturnType; + let kernelKVStore: ReturnType['kernelKVStore']; let mockKernelQueue: KernelQueue; let logger: Logger; let mockRemoteComms: RemoteComms; @@ -29,6 +30,7 @@ describe('RemoteManager', () => { beforeEach(() => { const kernelDatabase = makeMapKernelDatabase(); + kernelKVStore = kernelDatabase.kernelKVStore; kernelStore = makeKernelStore(kernelDatabase); logger = new Logger('test'); @@ -81,6 +83,7 @@ describe('RemoteManager', () => { undefined, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); @@ -109,6 +112,7 @@ describe('RemoteManager', () => { undefined, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); @@ -137,6 +141,7 @@ describe('RemoteManager', () => { keySeed, expect.any(Function), kernelStore.provideIncarnationId(), + expect.any(Function), // onIncarnationChange ); }); @@ -564,4 +569,67 @@ describe('RemoteManager', () => { expect(resolvePromisesSpy).not.toHaveBeenCalled(); }); }); + + describe('handleIncarnationChange', () => { + beforeEach(async () => { + const messageHandler = vi.fn(); + vi.mocked(remoteComms.initRemoteComms).mockResolvedValue(mockRemoteComms); + remoteManager.setMessageHandler(messageHandler); + await remoteManager.initRemoteComms(); + }); + + it('calls handlePeerRestart on remote when incarnation changes', () => { + const peerId = 'peer-that-restarted'; + const remote = remoteManager.establishRemote(peerId); + const handlePeerRestartSpy = vi.spyOn(remote, 'handlePeerRestart'); + // Get the onIncarnationChange callback (9th argument, index 8) + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + expect(handlePeerRestartSpy).toHaveBeenCalled(); + }); + + it('rejects kernel promises where remote is decider', () => { + const peerId = 'peer-with-promises'; + const remote = remoteManager.establishRemote(peerId); + const { remoteId } = remote; + + // Set up a promise where the remote is the decider + const [kpid] = kernelStore.initKernelPromise(); + kernelStore.setPromiseDecider(kpid, remoteId); + + // Set up the cle. key that getPromisesByDecider looks for + // The key format is cle.{decider}.{eref} = kpid + kernelKVStore.set(`cle.${remoteId}.p+1`, kpid); + + const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); + + // Trigger incarnation change + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + + // Should reject the promise with incarnation change error + expect(resolvePromisesSpy).toHaveBeenCalledWith(remoteId, [ + [kpid, true, expect.objectContaining({ body: expect.any(String) })], + ]); + }); + + it('does nothing when remote does not exist', () => { + const peerId = 'non-existent-peer'; + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + expect(() => onIncarnationChange(peerId)).not.toThrow(); + }); + + it('does not reject promises when there are none', () => { + const peerId = 'peer-without-promises'; + remoteManager.establishRemote(peerId); + const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + const onIncarnationChange = initCall?.[8] as (peerId: string) => void; + onIncarnationChange(peerId); + expect(resolvePromisesSpy).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index f2fefef97..ac87f3cae 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -134,6 +134,40 @@ export class RemoteManager { } } + /** + * Handle when a remote peer's incarnation changes (peer restarted). + * Resets the RemoteHandle state and rejects kernel promises for a fresh start. + * + * @param peerId - The peer ID of the remote that restarted. + */ + #handleIncarnationChange(peerId: string): void { + const remote = this.#remotesByPeer.get(peerId); + if (!remote) { + // Remote not found - might not have been established yet + this.#logger?.log( + `Incarnation change for unknown peer ${peerId.slice(0, 8)}, ignoring`, + ); + return; + } + + this.#logger?.log( + `Handling incarnation change for peer ${peerId.slice(0, 8)}`, + ); + + // Reset RemoteHandle state (pending messages, redemptions, seq numbers) + remote.handlePeerRestart(); + + // Reject all kernel promises where this remote is the decider + // The restarted peer has lost its state and won't resolve these promises + const { remoteId } = remote; + const failure = kser( + Error(`Remote peer restarted: ${peerId} (incarnation changed)`), + ); + for (const kpid of this.#kernelStore.getPromisesByDecider(remoteId)) { + this.#kernelQueue.resolvePromises(remoteId, [[kpid, true, failure]]); + } + } + /** * Initialize the remote comms object at kernel startup. * @@ -162,6 +196,7 @@ export class RemoteManager { this.#keySeed, this.#handleRemoteGiveUp.bind(this), this.#incarnationId, + this.#handleIncarnationChange.bind(this), ); // Restore all remotes that were previously established diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts index 724e36748..486aa6a2d 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.test.ts @@ -144,6 +144,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, // onRemoteGiveUp undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -167,6 +168,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -187,6 +189,7 @@ describe('remote-comms', () => { mockRemoteMessageHandler, onRemoteGiveUp, undefined, // incarnationId + undefined, // onIncarnationChange ); }); @@ -208,6 +211,30 @@ describe('remote-comms', () => { mockRemoteMessageHandler, undefined, incarnationId, + undefined, // onIncarnationChange + ); + }); + + it('passes onIncarnationChange callback to platformServices', async () => { + const onIncarnationChange = vi.fn(); + await initRemoteComms( + mockKernelStore, + mockPlatformServices, + mockRemoteMessageHandler, + {}, + undefined, // logger + undefined, // keySeed + undefined, // onRemoteGiveUp + undefined, // incarnationId + onIncarnationChange, + ); + expect(mockPlatformServices.initializeRemoteComms).toHaveBeenCalledWith( + expect.any(String), + expect.any(Object), + mockRemoteMessageHandler, + undefined, + undefined, // incarnationId + onIncarnationChange, ); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts index fe952c195..00aa32614 100644 --- a/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts +++ b/packages/ocap-kernel/src/remotes/kernel/remote-comms.ts @@ -13,6 +13,7 @@ import type { RemoteComms, RemoteMessageHandler, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from '../types.ts'; @@ -105,6 +106,7 @@ export function getKnownRelays(kv: KVStore): string[] { * @param keySeed - Optional seed for libp2p key generation. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * * @returns the initialized remote comms object. */ @@ -117,6 +119,7 @@ export async function initRemoteComms( keySeed?: string, onRemoteGiveUp?: OnRemoteGiveUp, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise { let peerId: string; let ocapURLKey: Uint8Array; @@ -170,6 +173,7 @@ export async function initRemoteComms( remoteMessageHandler, onRemoteGiveUp, incarnationId, + onIncarnationChange, ); /** diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index a9a923e4f..8ae97d042 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -90,7 +90,9 @@ describe('reconnection-lifecycle', () => { checkConnectionRateLimit: vi.fn(), closeChannel: vi.fn().mockResolvedValue(undefined), registerChannel: vi.fn(), - doOutboundHandshake: vi.fn().mockResolvedValue(true), + doOutboundHandshake: vi + .fn() + .mockResolvedValue({ success: true, incarnationChanged: false }), } as unknown as ReconnectionLifecycleDeps; }); @@ -238,9 +240,10 @@ describe('reconnection-lifecycle', () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) .mockReturnValueOnce(false); - (deps.doOutboundHandshake as ReturnType).mockResolvedValue( - false, - ); + (deps.doOutboundHandshake as ReturnType).mockResolvedValue({ + success: false, + incarnationChanged: false, + }); const lifecycle = makeReconnectionLifecycle(deps); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 5de8187b9..34ba7206b 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -38,8 +38,10 @@ export type ReconnectionLifecycleDeps = { channel: Channel, errorContext?: string, ) => void; - /** Perform outbound handshake. Returns true if successful. */ - doOutboundHandshake: (channel: Channel) => Promise; + /** Perform outbound handshake. Returns success status and whether incarnation changed. */ + doOutboundHandshake: ( + channel: Channel, + ) => Promise<{ success: boolean; incarnationChanged: boolean }>; }; /** @@ -236,9 +238,9 @@ export function makeReconnectionLifecycle( throw error; } // Perform handshake before registering the channel - let handshakeOk; + let handshakeResult; try { - handshakeOk = await doOutboundHandshake(channel); + handshakeResult = await doOutboundHandshake(channel); } catch (handshakeError) { // Handshake threw (e.g., onRemoteGiveUp callback failed) - close channel to prevent leak try { @@ -248,7 +250,7 @@ export function makeReconnectionLifecycle( } throw handshakeError; } - if (!handshakeOk) { + if (!handshakeResult.success) { // Handshake failures are retryable (could be transient network issues) // Return null to signal retry instead of throwing non-retryable error logger.log( @@ -257,6 +259,8 @@ export function makeReconnectionLifecycle( await closeChannel(channel, peerId); return null; } + // Note: incarnationChanged is handled by the callback in doOutboundHandshake + // For reconnection, we still register the channel - messages will be fresh registerChannel(peerId, channel, 'reading channel to'); } diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index e52cf10ed..83c0e4bbf 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -2582,7 +2582,7 @@ describe('transport.initTransport', () => { ); }); - it('calls onRemoteGiveUp when incarnation changes', async () => { + it('calls onIncarnationChange when incarnation changes', async () => { let inboundHandler: ((channel: MockChannel) => void) | undefined; mockConnectionFactory.onInboundConnection.mockImplementation( (handler: (channel: MockChannel) => void) => { @@ -2590,14 +2590,15 @@ describe('transport.initTransport', () => { }, ); - const onRemoteGiveUp = vi.fn(); + const onIncarnationChange = vi.fn(); const localIncarnationId = 'local-incarnation'; await initTransport( '0x1234', {}, vi.fn().mockResolvedValue(''), - onRemoteGiveUp, + undefined, // onRemoteGiveUp localIncarnationId, + onIncarnationChange, ); // First handshake from remote peer @@ -2623,8 +2624,8 @@ describe('transport.initTransport', () => { ); }); - // First incarnation should not trigger onRemoteGiveUp - expect(onRemoteGiveUp).not.toHaveBeenCalled(); + // First incarnation should not trigger onIncarnationChange + expect(onIncarnationChange).not.toHaveBeenCalled(); // Second handshake with different incarnation (simulating peer restart) const mockInboundChannel2 = createMockChannel('remote-peer'); @@ -2649,8 +2650,8 @@ describe('transport.initTransport', () => { ); }); - // Changed incarnation should trigger onRemoteGiveUp - expect(onRemoteGiveUp).toHaveBeenCalledWith('remote-peer'); + // Changed incarnation should trigger onIncarnationChange + expect(onIncarnationChange).toHaveBeenCalledWith('remote-peer'); }); it('passes regular messages to remoteMessageHandler after handshake', async () => { diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 52d11a178..9315f2a02 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -37,6 +37,7 @@ import type { StopRemoteComms, Channel, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from '../types.ts'; @@ -57,6 +58,7 @@ import type { * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @param localIncarnationId - This kernel's incarnation ID for handshake protocol. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes (peer restarted). * * @returns a function to send messages **and** a `stop()` to cancel/release everything. */ @@ -66,6 +68,7 @@ export async function initTransport( remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, localIncarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ): Promise<{ sendRemoteMessage: SendRemoteMessage; stop: StopRemoteComms; @@ -133,31 +136,33 @@ export async function initTransport( /** * Perform outbound handshake and handle incarnation changes. - * Returns true if handshake succeeded (or was skipped), false if it failed. * * @param channel - The channel to perform handshake on. - * @returns True if handshake succeeded or was skipped. + * @returns Object with success status and whether incarnation changed. */ - async function doOutboundHandshake(channel: Channel): Promise { + async function doOutboundHandshake( + channel: Channel, + ): Promise<{ success: boolean; incarnationChanged: boolean }> { if (!handshakeDeps) { - return true; // No handshake configured, skip + return { success: true, incarnationChanged: false }; // No handshake configured, skip } let result; try { result = await performOutboundHandshake(channel, handshakeDeps); } catch (problem) { outputError(channel.peerId, 'outbound handshake', problem); - return false; + return { success: false, incarnationChanged: false }; } // Handle incarnation change outside try-catch so callback errors // don't incorrectly mark the handshake as failed - if (result.incarnationChanged && onRemoteGiveUp) { + if (result.incarnationChanged) { logger.log( - `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, triggering promise rejection`, + `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, resetting remote state`, ); - onRemoteGiveUp(channel.peerId); + // Call incarnation change callback first to reset RemoteHandle state + onIncarnationChange?.(channel.peerId); } - return true; + return { success: true, incarnationChanged: result.incarnationChanged }; } /** @@ -180,11 +185,12 @@ export async function initTransport( } // Handle incarnation change outside try-catch so callback errors // don't incorrectly mark the handshake as failed - if (result.incarnationChanged && onRemoteGiveUp) { + if (result.incarnationChanged) { logger.log( - `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, triggering promise rejection`, + `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, resetting remote state`, ); - onRemoteGiveUp(channel.peerId); + // Call incarnation change callback first to reset RemoteHandle state + onIncarnationChange?.(channel.peerId); } return true; } @@ -439,6 +445,15 @@ export async function initTransport( // Get or establish channel let { channel } = state; if (!channel) { + // Clear permanent failure status - user is explicitly trying to communicate + // This allows user-initiated messages to "resurrect" a permanently-failed peer + if (reconnectionManager.isPermanentlyFailed(targetPeerId)) { + logger.log( + `${targetPeerId.slice(0, 8)}:: clearing permanent failure status on user-initiated send`, + ); + reconnectionManager.clearPermanentFailure(targetPeerId); + } + // Check connection limit before attempting to dial checkConnectionLimit(); @@ -479,9 +494,9 @@ export async function initTransport( throw error; } // Perform handshake before registering the channel - let handshakeOk; + let handshakeResult; try { - handshakeOk = await doOutboundHandshake(channel); + handshakeResult = await doOutboundHandshake(channel); } catch (handshakeError) { // Handshake threw (e.g., onRemoteGiveUp callback failed) - close channel to prevent leak try { @@ -491,10 +506,17 @@ export async function initTransport( } throw handshakeError; } - if (!handshakeOk) { + if (!handshakeResult.success) { await connectionFactory.closeChannel(channel, targetPeerId); throw Error('Handshake failed'); } + if (handshakeResult.incarnationChanged) { + // Peer restarted - don't send stale message, let caller retry with fresh state + registerChannel(targetPeerId, channel, 'reading channel to'); + throw Error( + 'Remote peer restarted: message not sent to avoid stale delivery', + ); + } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index f278ff93a..586eb1d15 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -26,6 +26,13 @@ export type RemoteComms = { export type OnRemoteGiveUp = (peerId: string) => void; +/** + * Callback invoked when a remote peer's incarnation ID changes (peer restarted). + * + * @param peerId - The peer ID whose incarnation changed. + */ +export type OnIncarnationChange = (peerId: string) => void; + /** * Options for initializing remote communications. */ diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/index.ts b/packages/ocap-kernel/src/rpc/kernel-remote/index.ts index 1221bafa1..1042cac17 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/index.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/index.ts @@ -5,21 +5,33 @@ import type { } from './remoteDeliver.ts'; import { remoteGiveUpSpec, remoteGiveUpHandler } from './remoteGiveUp.ts'; import type { RemoteGiveUpSpec, RemoteGiveUpHandler } from './remoteGiveUp.ts'; +import { + remoteIncarnationChangeSpec, + remoteIncarnationChangeHandler, +} from './remoteIncarnationChange.ts'; +import type { + RemoteIncarnationChangeSpec, + RemoteIncarnationChangeHandler, +} from './remoteIncarnationChange.ts'; export const kernelRemoteHandlers = { remoteDeliver: remoteDeliverHandler, remoteGiveUp: remoteGiveUpHandler, + remoteIncarnationChange: remoteIncarnationChangeHandler, } as { remoteDeliver: RemoteDeliverHandler; remoteGiveUp: RemoteGiveUpHandler; + remoteIncarnationChange: RemoteIncarnationChangeHandler; }; export const kernelRemoteMethodSpecs = { remoteDeliver: remoteDeliverSpec, remoteGiveUp: remoteGiveUpSpec, + remoteIncarnationChange: remoteIncarnationChangeSpec, } as { remoteDeliver: RemoteDeliverSpec; remoteGiveUp: RemoteGiveUpSpec; + remoteIncarnationChange: RemoteIncarnationChangeSpec; }; type Handlers = diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts new file mode 100644 index 000000000..f2655c556 --- /dev/null +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts @@ -0,0 +1,43 @@ +import type { MethodSpec, Handler } from '@metamask/kernel-rpc-methods'; +import { object, string, literal } from '@metamask/superstruct'; +import type { Infer } from '@metamask/superstruct'; + +const paramsStruct = object({ + peerId: string(), +}); + +type Params = Infer; + +export type RemoteIncarnationChangeSpec = MethodSpec< + 'remoteIncarnationChange', + { peerId: string }, + null +>; + +export const remoteIncarnationChangeSpec: RemoteIncarnationChangeSpec = { + method: 'remoteIncarnationChange', + params: paramsStruct, + result: literal(null), +}; + +export type HandleRemoteIncarnationChange = (peerId: string) => Promise; + +type RemoteIncarnationChangeHooks = { + remoteIncarnationChange: HandleRemoteIncarnationChange; +}; + +export type RemoteIncarnationChangeHandler = Handler< + 'remoteIncarnationChange', + Params, + Promise, + RemoteIncarnationChangeHooks +>; + +export const remoteIncarnationChangeHandler: RemoteIncarnationChangeHandler = { + ...remoteIncarnationChangeSpec, + hooks: { remoteIncarnationChange: true }, + implementation: async ({ remoteIncarnationChange }, params) => { + await remoteIncarnationChange(params.peerId); + return null; + }, +}; diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index af0cb55a1..fef585f41 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -52,6 +52,7 @@ describe('kernel store', () => { 'clear', 'clearEmptySubclusters', 'clearReachableFlag', + 'clearRemoteSeqState', 'collectGarbage', 'createCrankSavepoint', 'decRefCount', diff --git a/packages/ocap-kernel/src/store/methods/remote.test.ts b/packages/ocap-kernel/src/store/methods/remote.test.ts index 82abbeda0..86660cf79 100644 --- a/packages/ocap-kernel/src/store/methods/remote.test.ts +++ b/packages/ocap-kernel/src/store/methods/remote.test.ts @@ -341,4 +341,46 @@ describe('remote store methods', () => { expect(deleted).toBe(0); }); }); + + describe('clearRemoteSeqState', () => { + it('deletes all seq state and pending messages', () => { + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockKV.set(`remoteSeq.${remoteId1}.highestReceivedSeq`, '5'); + mockKV.set(`remoteSeq.${remoteId1}.startSeq`, '2'); + mockKV.set(`remotePending.${remoteId1}.2`, '{"seq":2}'); + mockKV.set(`remotePending.${remoteId1}.3`, '{"seq":3}'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.2`, + `remotePending.${remoteId1}.3`, + ]); + + remoteMethods.clearRemoteSeqState(remoteId1); + + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + expect(mockKV.has(`remoteSeq.${remoteId1}.highestReceivedSeq`)).toBe( + false, + ); + expect(mockKV.has(`remoteSeq.${remoteId1}.startSeq`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); + }); + + it('does not affect remote relationship info', () => { + mockKV.set(`remote.${remoteId1}`, JSON.stringify(remoteInfo1)); + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockGetPrefixedKeys.mockReturnValue([]); + + remoteMethods.clearRemoteSeqState(remoteId1); + + // Remote info should still exist + expect(mockKV.has(`remote.${remoteId1}`)).toBe(true); + // Seq state should be cleared + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + }); + + it('does nothing when no state exists', () => { + mockGetPrefixedKeys.mockReturnValue([]); + expect(() => remoteMethods.clearRemoteSeqState(remoteId1)).not.toThrow(); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/methods/remote.ts b/packages/ocap-kernel/src/store/methods/remote.ts index 14c6e1059..f968c9387 100644 --- a/packages/ocap-kernel/src/store/methods/remote.ts +++ b/packages/ocap-kernel/src/store/methods/remote.ts @@ -230,6 +230,27 @@ export function getRemoteMethods(ctx: StoreContext) { return deletedCount; } + /** + * Clear all sequence state for a remote (seq counters + all pending messages). + * Called when a remote peer restarts (incarnation changes) to reset for fresh communication. + * Unlike deleteRemotePendingState, this does NOT delete the remote relationship itself. + * + * @param remoteId - The remote whose sequence state is to be cleared. + */ + function clearRemoteSeqState(remoteId: RemoteId): void { + // Delete seq state + const seqPrefix = `${REMOTE_SEQ_BASE}${remoteId}.`; + kv.delete(`${seqPrefix}nextSendSeq`); + kv.delete(`${seqPrefix}highestReceivedSeq`); + kv.delete(`${seqPrefix}startSeq`); + + // Delete all pending messages + const pendingPrefix = `${REMOTE_PENDING_BASE}${remoteId}.`; + for (const key of getPrefixedKeys(pendingPrefix)) { + kv.delete(key); + } + } + return { getAllRemoteRecords, getRemoteInfo, @@ -245,5 +266,6 @@ export function getRemoteMethods(ctx: StoreContext) { deletePendingMessage, deleteRemotePendingState, cleanupOrphanMessages, + clearRemoteSeqState, }; } diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index e23f14421..5ed09797b 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -34,6 +34,7 @@ import type { SendRemoteMessage, StopRemoteComms, OnRemoteGiveUp, + OnIncarnationChange, RemoteCommsOptions, } from './remotes/types.ts'; import { Fail } from './utils/assert.ts'; @@ -325,6 +326,7 @@ export type PlatformServices = { * @param remoteMessageHandler - A handler function to receive remote messages. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote. * @param incarnationId - Unique identifier for this kernel instance. + * @param onIncarnationChange - Optional callback when a remote peer's incarnation changes. * @returns A promise that resolves once network access has been established * or rejects if there is some problem doing so. */ @@ -334,6 +336,7 @@ export type PlatformServices = { remoteMessageHandler: RemoteMessageHandler, onRemoteGiveUp?: OnRemoteGiveUp, incarnationId?: string, + onIncarnationChange?: OnIncarnationChange, ) => Promise; /** * Stop network communications.