Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/ocap-kernel/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use length-prefixed framing for remote messages so payloads larger than the underlying transport's per-frame cutoff (e.g. `@libp2p/webrtc`'s 16 KB datachannel limit) are reassembled correctly on the receiver ([#957](https://github.com/MetaMask/ocap-kernel/pull/957))
- Replace `byteStream` with `lpStream` on every remote channel; the byte-oriented stream did not preserve `write()` boundaries, so any message the transport split into multiple frames was parsed from the first frame only, silently dropped without acknowledgement, and the sender retried until giving up after `MAX_RETRIES`
- Surface receiver-side framing-cap violations (`InvalidDataLengthError`, `InvalidDataLengthLengthError`) as `ResourceLimitError` with `limitType: 'messageSize'` so size errors look the same whether they tripped on the sender's `validateMessageSize` or the receiver's framing decoder
- Restore IO channels for persisted subclusters at kernel init so re-incarnated vats find their IOService references live ([#963](https://github.com/MetaMask/ocap-kernel/pull/963))
- `SubclusterManager.restorePersistedIOChannels()` walks every persisted subcluster, finds those whose config declares `io`, and re-creates the channels via `IOManager` before `initializeAllVats` runs
- Without this, any vat that opened an IO channel via `launchSubcluster` lost its channel across `daemon stop` / `daemon start` and silently held a dead IOService reference

## [0.7.0]

Expand Down
6 changes: 6 additions & 0 deletions packages/ocap-kernel/src/Kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ export class Kernel {
// longer have a config, to ensure that orphaned vats aren't started
this.#subclusterManager.initSystemSubclusters(configs);

// Re-create IO channels for persisted subclusters whose configs
// declare them. This must happen before `initializeAllVats` so
// that re-incarnated vats find their IOService references live
// when they make their first method call.
await this.#subclusterManager.restorePersistedIOChannels();

// Start all vats that were previously running before starting the queue
// This ensures that any messages in the queue have their target vats ready
await this.#vatManager.initializeAllVats();
Expand Down
115 changes: 115 additions & 0 deletions packages/ocap-kernel/src/vats/SubclusterManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -821,4 +821,119 @@ describe('SubclusterManager', () => {
).not.toHaveBeenCalled();
});
});

describe('restorePersistedIOChannels', () => {
const makeMockIOManager = () => ({
createChannels: vi.fn().mockResolvedValue(undefined),
destroyChannels: vi.fn().mockResolvedValue(undefined),
});

const subclusterWithIO = (
id: string,
io: Record<string, { type: 'socket'; path: string }>,
): Subcluster => ({
id,
config: {
bootstrap: 'svc',
vats: { svc: { sourceSpec: 'svc.js' } },
io,
},
vats: { svc: 'v1' as VatId },
});

const subclusterWithoutIO = (id: string): Subcluster => ({
id,
config: createMockClusterConfig(`s-${id}`),
vats: { [`s-${id}Vat`]: 'v1' as VatId },
});

it('re-creates channels for every persisted subcluster declaring io', async () => {
const ioA = { llm: { type: 'socket' as const, path: '/tmp/a.sock' } };
const ioB = { wire: { type: 'socket' as const, path: '/tmp/b.sock' } };
mockKernelStore.getSubclusters.mockReturnValue([
subclusterWithIO('s1', ioA),
subclusterWithIO('s2', ioB),
]);
const ioManager = makeMockIOManager();
const mgr = new SubclusterManager({
kernelStore: mockKernelStore,
kernelQueue: mockKernelQueue,
vatManager: mockVatManager,
getKernelService: mockGetKernelService,
queueMessage: mockQueueMessage,
ioManager: ioManager as never,
});

await mgr.restorePersistedIOChannels();

expect(ioManager.createChannels).toHaveBeenCalledTimes(2);
expect(ioManager.createChannels).toHaveBeenCalledWith('s1', ioA);
expect(ioManager.createChannels).toHaveBeenCalledWith('s2', ioB);
});

it('skips subclusters whose config does not declare io', async () => {
mockKernelStore.getSubclusters.mockReturnValue([
subclusterWithoutIO('s1'),
subclusterWithIO('s2', {
io: { type: 'socket' as const, path: '/tmp/s2.sock' },
}),
]);
const ioManager = makeMockIOManager();
const mgr = new SubclusterManager({
kernelStore: mockKernelStore,
kernelQueue: mockKernelQueue,
vatManager: mockVatManager,
getKernelService: mockGetKernelService,
queueMessage: mockQueueMessage,
ioManager: ioManager as never,
});

await mgr.restorePersistedIOChannels();

expect(ioManager.createChannels).toHaveBeenCalledTimes(1);
expect(ioManager.createChannels).toHaveBeenCalledWith(
's2',
expect.objectContaining({ io: expect.anything() }),
);
});

it('is a no-op when no IOManager was provided', async () => {
mockKernelStore.getSubclusters.mockReturnValue([
subclusterWithIO('s1', {
llm: { type: 'socket' as const, path: '/tmp/a.sock' },
}),
]);
// Default subclusterManager from beforeEach has no ioManager;
// a missing manager must not cause an error here.
expect(
await subclusterManager.restorePersistedIOChannels(),
).toBeUndefined();
});

it('continues past a per-subcluster failure', async () => {
mockKernelStore.getSubclusters.mockReturnValue([
subclusterWithIO('s1', {
llm: { type: 'socket' as const, path: '/tmp/a.sock' },
}),
subclusterWithIO('s2', {
wire: { type: 'socket' as const, path: '/tmp/b.sock' },
}),
]);
const ioManager = makeMockIOManager();
ioManager.createChannels.mockImplementationOnce(async () => {
throw new Error('socket in use');
});
const mgr = new SubclusterManager({
kernelStore: mockKernelStore,
kernelQueue: mockKernelQueue,
vatManager: mockVatManager,
getKernelService: mockGetKernelService,
queueMessage: mockQueueMessage,
ioManager: ioManager as never,
});

expect(await mgr.restorePersistedIOChannels()).toBeUndefined();
expect(ioManager.createChannels).toHaveBeenCalledTimes(2);
});
});
});
50 changes: 50 additions & 0 deletions packages/ocap-kernel/src/vats/SubclusterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,56 @@ export class SubclusterManager {
this.#restorePersistedSystemSubclusters(configs);
}

/**
* Re-create IO channels for every persisted subcluster whose config
* declares one.
*
* IO channels (`config.io`) are external to the kernel's durable
* state — they're live OS-process resources (Unix sockets, pipes,
* etc.) backed by the host's IOManager. `launchSubcluster` creates
* them at subcluster-launch time, but on a kernel restart the vats
* get re-incarnated from persisted state without going through
* `launchSubcluster` again, so any IO channels they were using are
* gone. This method bridges that gap: invoked from the kernel's
* init path before `initializeAllVats`, it walks the persisted
* subcluster table and re-creates each subcluster's declared IO
* channels so the re-incarnated vats find their IOService
* references live and ready when they make their first method call.
*
* If a particular subcluster's `createChannels` call fails (e.g.
* the socket path is now in use by another process), the failure
* is logged but does not abort the broader init — other
* subclusters can still come up. Vats backed by the failed
* subcluster will see method calls on their IOServices fail at
* first use.
*
* No-op when no IOManager was provided to this SubclusterManager.
*/
async restorePersistedIOChannels(): Promise<void> {
if (!this.#ioManager) {
return;
}
for (const subcluster of this.#kernelStore.getSubclusters()) {
if (!subcluster.config.io) {
continue;
}
try {
await this.#ioManager.createChannels(
subcluster.id,
subcluster.config.io,
);
this.#logger.info(
`Restored IO channels for persisted subcluster ${subcluster.id}`,
);
} catch (error) {
this.#logger.error(
`Failed to restore IO channels for subcluster ${subcluster.id}:`,
error,
);
}
}
}

/**
* Launch new system subclusters that aren't already in persistence.
* This must be called after the kernel queue is running since launchSubcluster
Expand Down
Loading