From 0a3326949ed6e1cf94527441c972d53717aa4688 Mon Sep 17 00:00:00 2001 From: m0n99 Date: Sun, 24 May 2026 09:44:41 +0700 Subject: [PATCH 1/2] fix(acp): stabilize ACP connection and surface errors to clients --- packages/opencode/src/acp/agent.ts | 474 +++++++++++++----- packages/opencode/src/acp/session.ts | 36 ++ packages/opencode/src/cli/cmd/acp.ts | 23 +- .../test/acp/event-subscription.test.ts | 10 + 4 files changed, 417 insertions(+), 126 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 8b74b9c9bad3..c1c47f6b5a22 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -58,8 +58,28 @@ const decodeTodos = Schema.decodeUnknownResult(Schema.fromJsonString(Schema.Arra const DEFAULT_VARIANT_VALUE = "default" +const PERMISSION_TIMEOUT_MS = 60_000 +const EVENT_RETRY_INITIAL_MS = 250 +const EVENT_RETRY_MAX_MS = 10_000 + const log = Log.create({ service: "acp-agent" }) +/** Check if an error indicates the ACP connection was closed. */ +function isConnectionClosedError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase() + return ( + msg.includes("connection closed") || + msg.includes("connection aborted") || + msg.includes("pipe closed") || + msg.includes("broken pipe") || + msg.includes("stream is not writable") || + msg.includes("channel closed") + ) + } + return false +} + async function getContextLimit( sdk: OpencodeClient, providerID: ProviderID, @@ -79,56 +99,6 @@ async function getContextLimit( return model?.limit.context ?? null } -async function sendUsageUpdate( - connection: AgentSideConnection, - sdk: OpencodeClient, - sessionID: string, - directory: string, -): Promise { - const messages = await sdk.session - .messages({ sessionID, directory }, { throwOnError: true }) - .then((x) => x.data) - .catch((error) => { - log.error("failed to fetch messages for usage update", { error }) - return undefined - }) - - if (!messages) return - - const assistantMessages = messages.filter( - (m): m is { info: AssistantMessage; parts: SessionMessageResponse["parts"] } => m.info.role === "assistant", - ) - - const lastAssistant = assistantMessages[assistantMessages.length - 1] - if (!lastAssistant) return - - const msg = lastAssistant.info - if (!msg.providerID || !msg.modelID) return - const size = await getContextLimit(sdk, ProviderID.make(msg.providerID), ModelID.make(msg.modelID), directory) - - if (!size) { - // Cannot calculate usage without known context size - return - } - - const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) - const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) - - await connection - .sessionUpdate({ - sessionId: sessionID, - update: { - sessionUpdate: "usage_update", - used, - size, - cost: { amount: totalCost, currency: "USD" }, - }, - }) - .catch((error) => { - log.error("failed to send usage update", { error }) - }) -} - export function init({ sdk: _sdk }: { sdk: OpencodeClient }) { return { create: (connection: AgentSideConnection, fullConfig: ACPConfig) => { @@ -152,12 +122,32 @@ export class Agent implements ACPAgent { { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] + /** Tracks sessions that have been cancelled via ACP cancel notification. */ + private cancelledSessions = new Set() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + + // Bind agent lifecycle to ACP connection. When the connection closes, + // abort event subscription and clean up pending state so the agent + // doesn't continue running against a dead client. + connection.signal.addEventListener( + "abort", + () => { + log.info("ACP connection closed, cleaning up agent") + this.eventAbort.abort(connection.signal.reason ?? new Error("ACP connection closed")) + this.permissionQueues.clear() + // Abort all tracked OpenCode sessions to prevent orphaned prompts. + void this.sessionManager.abortAll().catch((error) => { + log.error("failed to abort sessions on disconnect", { error }) + }) + }, + { once: true }, + ) + this.startEventSubscription() } @@ -166,27 +156,125 @@ export class Agent implements ACPAgent { this.eventStarted = true this.runEventSubscription().catch((error) => { if (this.eventAbort.signal.aborted) return - log.error("event subscription failed", { error }) + log.error("event subscription failed permanently", { error }) }) } private async runEventSubscription() { - while (true) { - if (this.eventAbort.signal.aborted) return - const events = await this.sdk.global.event({ - signal: this.eventAbort.signal, - }) - for await (const event of events.stream) { - if (this.eventAbort.signal.aborted) return - const payload = event?.payload - if (!payload) continue - await this.handleEvent(payload as Event).catch((error) => { - log.error("failed to handle event", { error, type: payload.type }) + let retryDelayMs = EVENT_RETRY_INITIAL_MS + + while (!this.eventAbort.signal.aborted && !this.connection.signal.aborted) { + try { + const events = await this.sdk.global.event({ + signal: this.eventAbort.signal, }) + + // Reset retry delay on successful connection. + retryDelayMs = EVENT_RETRY_INITIAL_MS + + for await (const event of events.stream) { + if (this.eventAbort.signal.aborted || this.connection.signal.aborted) return + const payload = event?.payload + if (!payload) continue + await this.handleEvent(payload as Event).catch((error) => { + log.error("failed to handle event", { error, type: payload.type }) + }) + } + } catch (error) { + if (this.eventAbort.signal.aborted || this.connection.signal.aborted) return + log.error("event subscription error; retrying", { error, retryDelayMs }) + await new Promise((resolve) => { + const timer = setTimeout(resolve, retryDelayMs) + this.eventAbort.signal.addEventListener("abort", () => { + clearTimeout(timer) + resolve() + }, { once: true }) + }) + retryDelayMs = Math.min(retryDelayMs * 2, EVENT_RETRY_MAX_MS) } } } + /** + * Central helper for sending session updates to the ACP client. + * Detects connection closure and aborts the event subscription to + * prevent further writes to a dead transport. + */ +private async sendSessionUpdate( + params: Parameters[0], + ): Promise { + if (this.connection.signal.aborted) { + log.warn("skipping session update; ACP connection closed", { + sessionId: params.sessionId, + updateType: params.update?.sessionUpdate, + }) + return + } + + try { + await this.connection.sessionUpdate(params) + } catch (error) { + // Treat any send failure as a potential connection death. + // If the connection is already aborted, clean up. Otherwise, + // abort the event subscription to prevent further writes to + // a potentially broken transport. + if (this.connection.signal.aborted || isConnectionClosedError(error)) { + this.eventAbort.abort( + this.connection.signal.reason ?? new Error("ACP connection closed"), + ) + return + } + // For unexpected errors, abort event subscription to prevent + // cascading failures from repeated writes to a broken transport. + log.error("failed to send ACP session update; aborting event subscription", { error }) + this.eventAbort.abort(error instanceof Error ? error : new Error(String(error))) + } + } + + private async sendUsageUpdate( + sessionID: string, + directory: string, + ): Promise { + const messages = await this.sdk.session + .messages({ sessionID, directory }, { throwOnError: true }) + .then((x) => x.data) + .catch((error) => { + log.error("failed to fetch messages for usage update", { error }) + return undefined + }) + + if (!messages) return + + const assistantMessages = messages.filter( + (m): m is { info: AssistantMessage; parts: SessionMessageResponse["parts"] } => m.info.role === "assistant", + ) + + const lastAssistant = assistantMessages[assistantMessages.length - 1] + if (!lastAssistant) return + + const msg = lastAssistant.info + if (!msg.providerID || !msg.modelID) return + const size = await getContextLimit(this.sdk, ProviderID.make(msg.providerID), ModelID.make(msg.modelID), directory) + + if (!size) { + // Cannot calculate usage without known context size + return + } + + const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) + const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) + + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "usage_update", + used, + size, + cost: { amount: totalCost, currency: "USD" }, + }, + }) + } + private async handleEvent(event: Event) { switch (event.type) { case "permission.asked": { @@ -199,7 +287,10 @@ export class Agent implements ACPAgent { .then(async () => { const directory = session.cwd - const res = await this.connection + // Race permission request against timeout and connection close. + // This prevents indefinite hangs when the ACP client stops + // responding (e.g., Zed becomes unresponsive or disconnects). + const permissionPromise = this.connection .requestPermission({ sessionId: permission.sessionID, toolCall: { @@ -212,19 +303,54 @@ export class Agent implements ACPAgent { }, options: this.permissionOptions, }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, + + // Race permission request against timeout and connection close. + // This prevents indefinite hangs when the ACP client stops + // responding (e.g., Zed becomes unresponsive or disconnects). + let timeoutId: ReturnType | undefined + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error(`Permission request timed out after ${PERMISSION_TIMEOUT_MS}ms`)), PERMISSION_TIMEOUT_MS) + }) + + let onAbort: (() => void) | undefined + const disconnectPromise = new Promise((_, reject) => { + if (this.connection.signal.aborted) { + reject(new Error("ACP connection closed")) + return + } + onAbort = () => reject(new Error("ACP connection closed")) + this.connection.signal.addEventListener("abort", onAbort, { once: true }) + }) + + let res: Awaited | undefined + try { + res = await Promise.race([permissionPromise, timeoutPromise, disconnectPromise]) + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error) + log.error("permission request failed", { + error: errorMsg, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + + // Auto-reject the permission in OpenCode so the session + // doesn't hang forever waiting for a reply. + await this.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }).catch((replyError) => { + log.error("failed to reject permission after failure", { + error: replyError, permissionID: permission.id, - sessionID: permission.sessionID, - }) - await this.sdk.permission.reply({ - requestID: permission.id, - reply: "reject", - directory, }) - return undefined }) + return + } finally { + // Clean up timeout and abort listener to prevent leaks. + if (timeoutId !== undefined) clearTimeout(timeoutId) + if (onAbort) this.connection.signal.removeEventListener("abort", onAbort) + } if (!res) return if (res.outcome.outcome !== "selected") { @@ -248,6 +374,8 @@ export class Agent implements ACPAgent { sessionId: session.id, path: filepath, content: newContent, + }).catch((error) => { + log.error("failed to write ACP client text file", { error, sessionID: session.id, filepath }) }) } } @@ -614,7 +742,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return result } catch (e) { @@ -713,7 +841,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return mode } catch (e) { @@ -747,7 +875,7 @@ export class Agent implements ACPAgent { sessionId, }) - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return result } catch (e) { @@ -806,8 +934,7 @@ export class Agent implements ACPAgent { }, }) } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -857,8 +984,7 @@ export class Agent implements ACPAgent { } } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -878,8 +1004,7 @@ export class Agent implements ACPAgent { case "error": this.toolStarts.delete(part.callID) this.shellSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -965,8 +1090,7 @@ export class Agent implements ACPAgent { if (effectiveMime.startsWith("image/")) { // Image - send as image block - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -994,8 +1118,7 @@ export class Agent implements ACPAgent { } : { uri: fileUri, mimeType: effectiveMime, blob: base64Data } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -1182,13 +1305,15 @@ export class Agent implements ACPAgent { }), ) - setTimeout(() => { - void this.connection.sessionUpdate({ +setTimeout(() => { + void this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "available_commands_update", availableCommands, }, + }).catch((error) => { + log.error("failed to send available commands update to ACP", { error }) }) }, 0) @@ -1231,7 +1356,7 @@ export class Agent implements ACPAgent { ? { availableModes: modeState.availableModes, currentModeId: modeState.currentModeId } : undefined - await this.connection.sessionUpdate({ + await this.sendSessionUpdate({ sessionId: session.id, update: { sessionUpdate: "config_option_update", @@ -1315,11 +1440,29 @@ export class Agent implements ACPAgent { } } + /** + * Check if the session was cancelled or the connection was lost during + * a prompt, and return the appropriate stop reason. + */ + private checkStopReason(sessionID: string): "cancelled" | undefined { + if (this.cancelledSessions.has(sessionID)) { + this.cancelledSessions.delete(sessionID) + return "cancelled" + } + if (this.connection.signal.aborted) { + return "cancelled" + } + return undefined + } + async prompt(params: PromptRequest) { const sessionID = params.sessionId const session = this.sessionManager.get(sessionID) const directory = session.cwd + // Clear any prior cancelled state for this session. + this.cancelledSessions.delete(sessionID) + const current = session.model const model = current ?? (await defaultModel(this.config, directory)) if (!current) { @@ -1431,20 +1574,59 @@ export class Agent implements ACPAgent { }) if (!cmd) { - const response = await this.sdk.session.prompt({ - sessionID, - model: { - providerID: model.providerID, - modelID: model.modelID, - }, - variant: this.sessionManager.getVariant(sessionID), - parts, - agent, - directory, - }) + let response + try { + response = await this.sdk.session.prompt({ + sessionID, + model: { + providerID: model.providerID, + modelID: model.modelID, + }, + variant: this.sessionManager.getVariant(sessionID), + parts, + agent, + directory, + }, { throwOnError: true }) + } catch (error) { + // Surface OpenCode errors (rate limits, auth failures, etc.) to Zed. + // Without this, Zed shows "loading" forever or a cryptic JSON-RPC error. + const errorMessage = error instanceof Error ? error.message : String(error) + log.error("prompt failed", { error: errorMessage, sessionID }) + + // Check if the connection was lost during the prompt. + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } + + // Send the error as a text message so Zed displays it to the user. + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_message_chunk", + messageId: `error-${Date.now()}`, + content: { + type: "text", + text: `Error: ${errorMessage}`, + }, + }, + }).catch(() => { + // If we can't even send the error message, the connection is likely dead. + }) + + return { + stopReason: "end_turn" as const, + _meta: {}, + } + } const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1457,17 +1639,50 @@ export class Agent implements ACPAgent { .list({ directory }, { throwOnError: true }) .then((x) => x.data!.find((c) => c.name === cmd.name)) if (command) { - const response = await this.sdk.session.command({ - sessionID, - command: command.name, - arguments: cmd.args, - model: model.providerID + "/" + model.modelID, - agent, - directory, - }) + let response + try { + response = await this.sdk.session.command({ + sessionID, + command: command.name, + arguments: cmd.args, + model: model.providerID + "/" + model.modelID, + agent, + directory, + }, { throwOnError: true }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + log.error("command failed", { error: errorMessage, sessionID }) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } + + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_message_chunk", + messageId: `error-${Date.now()}`, + content: { + type: "text", + text: `Error: ${errorMessage}`, + }, + }, + }).catch(() => {}) + + return { + stopReason: "end_turn" as const, + _meta: {}, + } + } const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1490,7 +1705,12 @@ export class Agent implements ACPAgent { break } - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1499,14 +1719,32 @@ export class Agent implements ACPAgent { } async cancel(params: CancelNotification) { - const session = this.sessionManager.get(params.sessionId) - await this.config.sdk.session.abort( - { + const session = this.sessionManager.tryGet(params.sessionId) + if (!session) { + log.warn("cancel for unknown ACP session", { sessionID: params.sessionId }) + return + } + + // Mark session as cancelled so prompt() can return "cancelled" stop reason. + this.cancelledSessions.add(params.sessionId) + + // Clear any pending permission queues for this session. + this.permissionQueues.delete(params.sessionId) + + try { + await this.config.sdk.session.abort( + { + sessionID: params.sessionId, + directory: session.cwd, + }, + { throwOnError: true }, + ) + } catch (error) { + log.error("failed to abort ACP session on cancel", { + error, sessionID: params.sessionId, - directory: session.cwd, - }, - { throwOnError: true }, - ) + }) + } } private async loadSessionMessages(directory: string, sessionId: string, limit?: number) { diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index cc1ed0be3098..1a0b00fcac17 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -119,4 +119,40 @@ export class ACPSessionManager { this.sessions.delete(sessionId) return session } + + /** + * Remove all tracked sessions. Called when the ACP connection closes + * to prevent stale state from persisting. + */ + clear(): void { + this.sessions.clear() + } + + /** + * Abort all tracked OpenCode sessions and clear state. + * Called when the ACP connection closes unexpectedly. + */ + async abortAll(): Promise { + const sessions = [...this.sessions.values()] + this.sessions.clear() + + await Promise.allSettled( + sessions.map((session) => + this.sdk.session + .abort( + { + sessionID: session.id, + directory: session.cwd, + }, + { throwOnError: true }, + ) + .catch((error) => { + log.error("failed to abort session on disconnect", { + error, + sessionID: session.id, + }) + }), + ), + ) + } } diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index b3b7df486b31..bf2da70db6c2 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -56,18 +56,25 @@ export const AcpCommand = effectCmd({ const stream = ndJsonStream(input, output) const agent = ACP.init({ sdk }) - new AgentSideConnection((conn) => { + const connection = new AgentSideConnection((conn) => { return agent.create(conn, { sdk }) }, stream) log.info("setup connection") process.stdin.resume() - yield* Effect.promise( - () => - new Promise((resolve, reject) => { - process.stdin.on("end", () => resolve()) - process.stdin.on("error", reject) - }), - ) + + // Wait for either the ACP connection to close or stdin to end. + // This ensures the process exits when the client disconnects, + // preventing orphaned sessions and event subscriptions. + yield* Effect.promise(async () => { + const connectionClosed = connection.closed.catch(() => undefined) + const stdinEnded = new Promise((resolve) => { + process.stdin.on("end", () => resolve()) + process.stdin.on("error", () => resolve()) + }) + + await Promise.race([connectionClosed, stdinEnded]) + log.info("ACP connection or stdin ended, shutting down") + }) }), }) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index a01680e30596..5fcccbcd7740 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -199,6 +199,8 @@ function createFakeAgent() { updates.set(sessionId, list) } + const connectionAbort = new AbortController() + const connection = { async sessionUpdate(params: SessionUpdateParams) { sessionUpdates.push(params) @@ -215,6 +217,13 @@ function createFakeAgent() { async requestPermission(_params: RequestPermissionParams): Promise { return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult }, + async writeTextFile(_params: any): Promise {}, + get signal() { + return connectionAbort.signal + }, + get closed(): Promise { + return new Promise(() => {}) + }, } as unknown as AgentSideConnection const { controller, stream } = createEventStream() @@ -324,6 +333,7 @@ function createFakeAgent() { const stop = () => { controller.close() ;(agent as any).eventAbort.abort() + connectionAbort.abort() } return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection } From 97e826a1d06a3ee386d25c15d2e477c87eb2f837 Mon Sep 17 00:00:00 2001 From: m0n99 Date: Sun, 24 May 2026 19:23:43 +0700 Subject: [PATCH 2/2] fix(acp): defer signal access until AgentSideConnection inner Connection is ready The AgentSideConnection constructor calls the toAgent factory before its inner Connection is initialised, so accessing connection.signal synchronously in the Agent constructor crashes. Wrap both the abort listener and startEventSubscription in queueMicrotask to defer until the Connection is available. --- packages/opencode/src/acp/agent.ts | 72 ++++++++++++++++++------------ 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index c1c47f6b5a22..ad4d6acdf060 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -131,24 +131,26 @@ export class Agent implements ACPAgent { this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) - // Bind agent lifecycle to ACP connection. When the connection closes, - // abort event subscription and clean up pending state so the agent - // doesn't continue running against a dead client. - connection.signal.addEventListener( - "abort", - () => { - log.info("ACP connection closed, cleaning up agent") - this.eventAbort.abort(connection.signal.reason ?? new Error("ACP connection closed")) - this.permissionQueues.clear() - // Abort all tracked OpenCode sessions to prevent orphaned prompts. - void this.sessionManager.abortAll().catch((error) => { - log.error("failed to abort sessions on disconnect", { error }) - }) - }, - { once: true }, - ) + // Defer with queueMicrotask because AgentSideConnection's constructor + // calls the toAgent factory (this constructor) *before* its inner + // Connection is initialised, so connection.signal is unavailable + // synchronously. + queueMicrotask(() => { + connection.signal.addEventListener( + "abort", + () => { + log.info("ACP connection closed, cleaning up agent") + this.eventAbort.abort(connection.signal.reason ?? new Error("ACP connection closed")) + this.permissionQueues.clear() + void this.sessionManager.abortAll().catch((error) => { + log.error("failed to abort sessions on disconnect", { error }) + }) + }, + { once: true }, + ) - this.startEventSubscription() + this.startEventSubscription() + }) } private startEventSubscription() { @@ -296,7 +298,7 @@ private async sendSessionUpdate( toolCall: { toolCallId: permission.tool?.callID ?? permission.id, status: "pending", - title: permission.permission, + title: toolTitle(permission.permission, permission.metadata), rawInput: permission.metadata, kind: toToolKind(permission.permission), locations: toLocations(permission.permission, permission.metadata), @@ -429,7 +431,7 @@ private async sendSessionUpdate( toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, }, @@ -457,7 +459,7 @@ private async sendSessionUpdate( toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, ...(content.length > 0 && { content }), @@ -510,7 +512,7 @@ private async sendSessionUpdate( status: "completed", kind, content, - title: part.state.title, + title: toolTitle(part.tool, part.state.input, part.state.title), rawInput: part.state.input, rawOutput: completedToolRawOutput(part), }, @@ -531,7 +533,7 @@ private async sendSessionUpdate( toolCallId: part.callID, status: "failed", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), rawInput: part.state.input, content: [ { @@ -941,7 +943,7 @@ private async sendSessionUpdate( toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, ...(runningContent.length > 0 && { content: runningContent }), @@ -992,7 +994,7 @@ private async sendSessionUpdate( status: "completed", kind, content, - title: part.state.title, + title: toolTitle(part.tool, part.state.input, part.state.title), rawInput: part.state.input, rawOutput: completedToolRawOutput(part), }, @@ -1011,7 +1013,7 @@ private async sendSessionUpdate( toolCallId: part.callID, status: "failed", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), rawInput: part.state.input, content: [ { @@ -1165,17 +1167,18 @@ private async sendSessionUpdate( private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) + const input = part.state.input ?? {} await this.connection .sessionUpdate({ sessionId, update: { sessionUpdate: "tool_call", toolCallId: part.callID, - title: part.tool, + title: toolTitle(part.tool, input), kind: toToolKind(part.tool), status: "pending", - locations: [], - rawInput: {}, + locations: toLocations(part.tool, input), + rawInput: input, }, }) .catch((error) => { @@ -1810,6 +1813,19 @@ function toToolKind(toolName: string): ToolKind { } } +function taskToolTitle(input: Record): string { + const type = typeof input["subagent_type"] === "string" ? input["subagent_type"] : "" + const desc = typeof input["description"] === "string" ? input["description"] : "" + const kind = type ? type.charAt(0).toUpperCase() + type.slice(1) : "Unknown" + if (desc) return `${kind} — ${desc}` + return `${kind} Task` +} + +function toolTitle(toolName: string, input: Record, stateTitle?: string): string { + if (toolName === "task") return taskToolTitle(input) + return stateTitle ?? toolName +} + function toLocations(toolName: string, input: Record): { path: string }[] { const tool = toolName.toLocaleLowerCase()