From 3d20afd2f0e961121458c2e0a8da902ee4b47716 Mon Sep 17 00:00:00 2001 From: Advait Shinde Date: Sat, 28 Mar 2026 15:36:13 +0000 Subject: [PATCH 1/2] Handle ACP connection transport failures cleanly --- src/acp.test.ts | 59 +++++++++++++++++++++++++++++++++++++++++++++++++ src/acp.ts | 30 +++++++++++++++++++++---- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/src/acp.test.ts b/src/acp.test.ts index 74c7a0b..20fc8ee 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -25,6 +25,7 @@ import { PROTOCOL_VERSION, ndJsonStream, } from "./acp.js"; +import type { AnyMessage } from "./acp.js"; describe("Connection", () => { let clientToAgent: TransformStream; @@ -971,6 +972,64 @@ describe("Connection", () => { expect(closeLog).toContain("client connection closed (signal)"); }); + it("rejects pending requests when the stream errors", async () => { + let readableController!: ReadableStreamDefaultController; + + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + + const connection = new ClientSideConnection( + () => new TestClient(), + { + readable: new ReadableStream({ + start(controller) { + readableController = controller; + }, + }), + writable: new WritableStream({ + async write() { + // no-op + }, + }), + }, + ); + + const requestPromise = connection.newSession({ + cwd: "/test", + mcpServers: [], + }); + const error = new Error("stream exploded"); + + readableController.error(error); + + await expect(requestPromise).rejects.toThrow("stream exploded"); + await expect(connection.closed).resolves.toBeUndefined(); + expect(connection.signal.aborted).toBe(true); + }); + it("supports removing signal event listeners", async () => { const closeLog: string[] = []; diff --git a/src/acp.ts b/src/acp.ts index 75ace72..6567434 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -11,11 +11,15 @@ import type { AnyResponse, Result, ErrorResponse, - PendingResponse, RequestHandler, NotificationHandler, } from "./jsonrpc.js"; +type ConnectionPendingResponse = { + resolve: (response: unknown) => void; + reject: (error: unknown) => void; +}; + /** * An agent-side connection to a client. * @@ -931,7 +935,8 @@ export class ClientSideConnection implements Agent { export type { AnyMessage } from "./jsonrpc.js"; class Connection { - #pendingResponses: Map = new Map(); + #pendingResponses: Map = + new Map(); #nextRequestId: number = 0; #requestHandler: RequestHandler; #notificationHandler: NotificationHandler; @@ -951,7 +956,7 @@ class Connection { this.#closedPromise = new Promise((resolve) => { this.#abortController.signal.addEventListener("abort", () => resolve()); }); - this.#receive(); + void this.#receive(); } /** @@ -986,6 +991,8 @@ class Connection { async #receive() { const reader = this.#stream.readable.getReader(); + let closeError: unknown = undefined; + try { while (true) { const { value: message, done } = await reader.read(); @@ -1017,10 +1024,25 @@ class Connection { } } } + } catch (error) { + closeError = error; } finally { reader.releaseLock(); - this.#abortController.abort(); + this.#close(closeError); + } + } + + #close(error?: unknown) { + if (this.#abortController.signal.aborted) { + return; + } + + const closeError: unknown = error ?? new Error("ACP connection closed"); + for (const pendingResponse of this.#pendingResponses.values()) { + pendingResponse.reject(closeError); } + this.#pendingResponses.clear(); + this.#abortController.abort(closeError); } async #processMessage(message: AnyMessage) { From a0e7f3658dc255af8c65558d6e4ce718b856c318 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 1 Apr 2026 13:50:01 +0200 Subject: [PATCH 2/2] Harden a few more cases --- src/acp.test.ts | 125 ++++++++++++++++++++++++++++++++---------------- src/acp.ts | 76 +++++++++++++++++------------ src/jsonrpc.ts | 5 -- 3 files changed, 129 insertions(+), 77 deletions(-) diff --git a/src/acp.test.ts b/src/acp.test.ts index 20fc8ee..a04c623 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -972,50 +972,45 @@ describe("Connection", () => { expect(closeLog).toContain("client connection closed (signal)"); }); + class MinimalTestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile(_: ReadTextFileRequest): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + it("rejects pending requests when the stream errors", async () => { let readableController!: ReadableStreamDefaultController; - class TestClient implements Client { - async writeTextFile( - _: WriteTextFileRequest, - ): Promise { - return {}; - } - async readTextFile( - _: ReadTextFileRequest, - ): Promise { - return { content: "test" }; - } - async requestPermission( - _: RequestPermissionRequest, - ): Promise { - return { - outcome: { - outcome: "selected", - optionId: "allow", - }, - }; - } - async sessionUpdate(_: SessionNotification): Promise { - // no-op - } - } - - const connection = new ClientSideConnection( - () => new TestClient(), - { - readable: new ReadableStream({ - start(controller) { - readableController = controller; - }, - }), - writable: new WritableStream({ - async write() { - // no-op - }, - }), - }, - ); + const connection = new ClientSideConnection(() => new MinimalTestClient(), { + readable: new ReadableStream({ + start(controller) { + readableController = controller; + }, + }), + writable: new WritableStream({ + async write() { + // no-op + }, + }), + }); const requestPromise = connection.newSession({ cwd: "/test", @@ -1030,6 +1025,54 @@ describe("Connection", () => { expect(connection.signal.aborted).toBe(true); }); + it("rejects pending requests when the writable stream errors", async () => { + const writeError = new Error("write failed"); + + const connection = new ClientSideConnection(() => new MinimalTestClient(), { + readable: new ReadableStream({ + // Never produces messages; stays open. + start() {}, + }), + writable: new WritableStream({ + async write() { + throw writeError; + }, + }), + }); + + const requestPromise = connection.newSession({ + cwd: "/test", + mcpServers: [], + }); + + await expect(requestPromise).rejects.toThrow("write failed"); + await expect(connection.closed).resolves.toBeUndefined(); + expect(connection.signal.aborted).toBe(true); + }); + + it("rejects requests issued after the connection is closed", async () => { + const connection = new ClientSideConnection(() => new MinimalTestClient(), { + readable: new ReadableStream({ + start(controller) { + // Close the readable stream immediately so the connection closes. + controller.close(); + }, + }), + writable: new WritableStream({ + async write() { + // no-op + }, + }), + }); + + await connection.closed; + expect(connection.signal.aborted).toBe(true); + + await expect( + connection.newSession({ cwd: "/test", mcpServers: [] }), + ).rejects.toThrow("ACP connection closed"); + }); + it("supports removing signal event listeners", async () => { const closeLog: string[] = []; diff --git a/src/acp.ts b/src/acp.ts index 6567434..fdc365b 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -990,44 +990,47 @@ class Connection { } async #receive() { - const reader = this.#stream.readable.getReader(); let closeError: unknown = undefined; try { - while (true) { - const { value: message, done } = await reader.read(); - if (done) { - break; - } - if (!message) { - continue; - } + const reader = this.#stream.readable.getReader(); + try { + while (!this.#abortController.signal.aborted) { + const { value: message, done } = await reader.read(); + if (done) { + break; + } + if (!message) { + continue; + } - try { - this.#processMessage(message); - } catch (err) { - console.error( - "Unexpected error during message processing:", - message, - err, - ); - // Only send error response if the message had an id (was a request) - if ("id" in message && message.id !== undefined) { - this.#sendMessage({ - jsonrpc: "2.0", - id: message.id, - error: { - code: -32700, - message: "Parse error", - }, - }); + try { + this.#processMessage(message); + } catch (err) { + console.error( + "Unexpected error during message processing:", + message, + err, + ); + // Only send error response if the message had an id (was a request) + if ("id" in message && message.id !== undefined) { + this.#sendMessage({ + jsonrpc: "2.0", + id: message.id, + error: { + code: -32700, + message: "Parse error", + }, + }); + } } } + } finally { + reader.releaseLock(); } } catch (error) { closeError = error; } finally { - reader.releaseLock(); this.#close(closeError); } } @@ -1162,7 +1165,8 @@ class Connection { if ("result" in response) { pendingResponse.resolve(response.result); } else if ("error" in response) { - pendingResponse.reject(response.error); + const { code, message, data } = response.error; + pendingResponse.reject(new RequestError(code, message, data)); } this.#pendingResponses.delete(response.id); } else { @@ -1171,6 +1175,7 @@ class Connection { } async sendRequest(method: string, params?: Req): Promise { + this.#throwIfClosed(); const id = this.#nextRequestId++; const responsePromise = new Promise((resolve, reject) => { this.#pendingResponses.set(id, { resolve, reject }); @@ -1180,9 +1185,19 @@ class Connection { } async sendNotification(method: string, params?: N): Promise { + this.#throwIfClosed(); await this.#sendMessage({ jsonrpc: "2.0", method, params }); } + #throwIfClosed() { + if (this.#abortController.signal.aborted) { + throw ( + this.#abortController.signal.reason ?? + new Error("ACP connection closed") + ); + } + } + async #sendMessage(message: AnyMessage) { this.#writeQueue = this.#writeQueue .then(async () => { @@ -1194,8 +1209,7 @@ class Connection { } }) .catch((error) => { - // Continue processing writes on error - console.error("ACP write error:", error); + this.#close(error); }); return this.#writeQueue; } diff --git a/src/jsonrpc.ts b/src/jsonrpc.ts index b5d8214..6f556d6 100644 --- a/src/jsonrpc.ts +++ b/src/jsonrpc.ts @@ -36,11 +36,6 @@ export type ErrorResponse = { data?: unknown; }; -export type PendingResponse = { - resolve: (response: unknown) => void; - reject: (error: ErrorResponse) => void; -}; - export type RequestHandler = ( method: string, params: unknown,