diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 8b74b9c9bad3..ad4d6acdf060 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -58,8 +58,28 @@ const decodeTodos = Schema.decodeUnknownResult(Schema.fromJsonString(Schema.Arra const DEFAULT_VARIANT_VALUE = "default" +const PERMISSION_TIMEOUT_MS = 60_000 +const EVENT_RETRY_INITIAL_MS = 250 +const EVENT_RETRY_MAX_MS = 10_000 + const log = Log.create({ service: "acp-agent" }) +/** Check if an error indicates the ACP connection was closed. */ +function isConnectionClosedError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase() + return ( + msg.includes("connection closed") || + msg.includes("connection aborted") || + msg.includes("pipe closed") || + msg.includes("broken pipe") || + msg.includes("stream is not writable") || + msg.includes("channel closed") + ) + } + return false +} + async function getContextLimit( sdk: OpencodeClient, providerID: ProviderID, @@ -79,56 +99,6 @@ async function getContextLimit( return model?.limit.context ?? null } -async function sendUsageUpdate( - connection: AgentSideConnection, - sdk: OpencodeClient, - sessionID: string, - directory: string, -): Promise { - const messages = await sdk.session - .messages({ sessionID, directory }, { throwOnError: true }) - .then((x) => x.data) - .catch((error) => { - log.error("failed to fetch messages for usage update", { error }) - return undefined - }) - - if (!messages) return - - const assistantMessages = messages.filter( - (m): m is { info: AssistantMessage; parts: SessionMessageResponse["parts"] } => m.info.role === "assistant", - ) - - const lastAssistant = assistantMessages[assistantMessages.length - 1] - if (!lastAssistant) return - - const msg = lastAssistant.info - if (!msg.providerID || !msg.modelID) return - const size = await getContextLimit(sdk, ProviderID.make(msg.providerID), ModelID.make(msg.modelID), directory) - - if (!size) { - // Cannot calculate usage without known context size - return - } - - const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) - const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) - - await connection - .sessionUpdate({ - sessionId: sessionID, - update: { - sessionUpdate: "usage_update", - used, - size, - cost: { amount: totalCost, currency: "USD" }, - }, - }) - .catch((error) => { - log.error("failed to send usage update", { error }) - }) -} - export function init({ sdk: _sdk }: { sdk: OpencodeClient }) { return { create: (connection: AgentSideConnection, fullConfig: ACPConfig) => { @@ -152,13 +122,35 @@ export class Agent implements ACPAgent { { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] + /** Tracks sessions that have been cancelled via ACP cancel notification. */ + private cancelledSessions = new Set() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) - this.startEventSubscription() + + // Defer with queueMicrotask because AgentSideConnection's constructor + // calls the toAgent factory (this constructor) *before* its inner + // Connection is initialised, so connection.signal is unavailable + // synchronously. + queueMicrotask(() => { + connection.signal.addEventListener( + "abort", + () => { + log.info("ACP connection closed, cleaning up agent") + this.eventAbort.abort(connection.signal.reason ?? new Error("ACP connection closed")) + this.permissionQueues.clear() + void this.sessionManager.abortAll().catch((error) => { + log.error("failed to abort sessions on disconnect", { error }) + }) + }, + { once: true }, + ) + + this.startEventSubscription() + }) } private startEventSubscription() { @@ -166,27 +158,125 @@ export class Agent implements ACPAgent { this.eventStarted = true this.runEventSubscription().catch((error) => { if (this.eventAbort.signal.aborted) return - log.error("event subscription failed", { error }) + log.error("event subscription failed permanently", { error }) }) } private async runEventSubscription() { - while (true) { - if (this.eventAbort.signal.aborted) return - const events = await this.sdk.global.event({ - signal: this.eventAbort.signal, - }) - for await (const event of events.stream) { - if (this.eventAbort.signal.aborted) return - const payload = event?.payload - if (!payload) continue - await this.handleEvent(payload as Event).catch((error) => { - log.error("failed to handle event", { error, type: payload.type }) + let retryDelayMs = EVENT_RETRY_INITIAL_MS + + while (!this.eventAbort.signal.aborted && !this.connection.signal.aborted) { + try { + const events = await this.sdk.global.event({ + signal: this.eventAbort.signal, }) + + // Reset retry delay on successful connection. + retryDelayMs = EVENT_RETRY_INITIAL_MS + + for await (const event of events.stream) { + if (this.eventAbort.signal.aborted || this.connection.signal.aborted) return + const payload = event?.payload + if (!payload) continue + await this.handleEvent(payload as Event).catch((error) => { + log.error("failed to handle event", { error, type: payload.type }) + }) + } + } catch (error) { + if (this.eventAbort.signal.aborted || this.connection.signal.aborted) return + log.error("event subscription error; retrying", { error, retryDelayMs }) + await new Promise((resolve) => { + const timer = setTimeout(resolve, retryDelayMs) + this.eventAbort.signal.addEventListener("abort", () => { + clearTimeout(timer) + resolve() + }, { once: true }) + }) + retryDelayMs = Math.min(retryDelayMs * 2, EVENT_RETRY_MAX_MS) } } } + /** + * Central helper for sending session updates to the ACP client. + * Detects connection closure and aborts the event subscription to + * prevent further writes to a dead transport. + */ +private async sendSessionUpdate( + params: Parameters[0], + ): Promise { + if (this.connection.signal.aborted) { + log.warn("skipping session update; ACP connection closed", { + sessionId: params.sessionId, + updateType: params.update?.sessionUpdate, + }) + return + } + + try { + await this.connection.sessionUpdate(params) + } catch (error) { + // Treat any send failure as a potential connection death. + // If the connection is already aborted, clean up. Otherwise, + // abort the event subscription to prevent further writes to + // a potentially broken transport. + if (this.connection.signal.aborted || isConnectionClosedError(error)) { + this.eventAbort.abort( + this.connection.signal.reason ?? new Error("ACP connection closed"), + ) + return + } + // For unexpected errors, abort event subscription to prevent + // cascading failures from repeated writes to a broken transport. + log.error("failed to send ACP session update; aborting event subscription", { error }) + this.eventAbort.abort(error instanceof Error ? error : new Error(String(error))) + } + } + + private async sendUsageUpdate( + sessionID: string, + directory: string, + ): Promise { + const messages = await this.sdk.session + .messages({ sessionID, directory }, { throwOnError: true }) + .then((x) => x.data) + .catch((error) => { + log.error("failed to fetch messages for usage update", { error }) + return undefined + }) + + if (!messages) return + + const assistantMessages = messages.filter( + (m): m is { info: AssistantMessage; parts: SessionMessageResponse["parts"] } => m.info.role === "assistant", + ) + + const lastAssistant = assistantMessages[assistantMessages.length - 1] + if (!lastAssistant) return + + const msg = lastAssistant.info + if (!msg.providerID || !msg.modelID) return + const size = await getContextLimit(this.sdk, ProviderID.make(msg.providerID), ModelID.make(msg.modelID), directory) + + if (!size) { + // Cannot calculate usage without known context size + return + } + + const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) + const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) + + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "usage_update", + used, + size, + cost: { amount: totalCost, currency: "USD" }, + }, + }) + } + private async handleEvent(event: Event) { switch (event.type) { case "permission.asked": { @@ -199,32 +289,70 @@ export class Agent implements ACPAgent { .then(async () => { const directory = session.cwd - const res = await this.connection + // Race permission request against timeout and connection close. + // This prevents indefinite hangs when the ACP client stops + // responding (e.g., Zed becomes unresponsive or disconnects). + const permissionPromise = this.connection .requestPermission({ sessionId: permission.sessionID, toolCall: { toolCallId: permission.tool?.callID ?? permission.id, status: "pending", - title: permission.permission, + title: toolTitle(permission.permission, permission.metadata), rawInput: permission.metadata, kind: toToolKind(permission.permission), locations: toLocations(permission.permission, permission.metadata), }, options: this.permissionOptions, }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, + + // Race permission request against timeout and connection close. + // This prevents indefinite hangs when the ACP client stops + // responding (e.g., Zed becomes unresponsive or disconnects). + let timeoutId: ReturnType | undefined + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error(`Permission request timed out after ${PERMISSION_TIMEOUT_MS}ms`)), PERMISSION_TIMEOUT_MS) + }) + + let onAbort: (() => void) | undefined + const disconnectPromise = new Promise((_, reject) => { + if (this.connection.signal.aborted) { + reject(new Error("ACP connection closed")) + return + } + onAbort = () => reject(new Error("ACP connection closed")) + this.connection.signal.addEventListener("abort", onAbort, { once: true }) + }) + + let res: Awaited | undefined + try { + res = await Promise.race([permissionPromise, timeoutPromise, disconnectPromise]) + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error) + log.error("permission request failed", { + error: errorMsg, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + + // Auto-reject the permission in OpenCode so the session + // doesn't hang forever waiting for a reply. + await this.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }).catch((replyError) => { + log.error("failed to reject permission after failure", { + error: replyError, permissionID: permission.id, - sessionID: permission.sessionID, }) - await this.sdk.permission.reply({ - requestID: permission.id, - reply: "reject", - directory, - }) - return undefined }) + return + } finally { + // Clean up timeout and abort listener to prevent leaks. + if (timeoutId !== undefined) clearTimeout(timeoutId) + if (onAbort) this.connection.signal.removeEventListener("abort", onAbort) + } if (!res) return if (res.outcome.outcome !== "selected") { @@ -248,6 +376,8 @@ export class Agent implements ACPAgent { sessionId: session.id, path: filepath, content: newContent, + }).catch((error) => { + log.error("failed to write ACP client text file", { error, sessionID: session.id, filepath }) }) } } @@ -301,7 +431,7 @@ export class Agent implements ACPAgent { toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, }, @@ -329,7 +459,7 @@ export class Agent implements ACPAgent { toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, ...(content.length > 0 && { content }), @@ -382,7 +512,7 @@ export class Agent implements ACPAgent { status: "completed", kind, content, - title: part.state.title, + title: toolTitle(part.tool, part.state.input, part.state.title), rawInput: part.state.input, rawOutput: completedToolRawOutput(part), }, @@ -403,7 +533,7 @@ export class Agent implements ACPAgent { toolCallId: part.callID, status: "failed", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), rawInput: part.state.input, content: [ { @@ -614,7 +744,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return result } catch (e) { @@ -713,7 +843,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return mode } catch (e) { @@ -747,7 +877,7 @@ export class Agent implements ACPAgent { sessionId, }) - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await this.sendUsageUpdate(sessionId, directory) return result } catch (e) { @@ -806,15 +936,14 @@ export class Agent implements ACPAgent { }, }) } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "in_progress", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), locations: toLocations(part.tool, part.state.input), rawInput: part.state.input, ...(runningContent.length > 0 && { content: runningContent }), @@ -857,8 +986,7 @@ export class Agent implements ACPAgent { } } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", @@ -866,7 +994,7 @@ export class Agent implements ACPAgent { status: "completed", kind, content, - title: part.state.title, + title: toolTitle(part.tool, part.state.input, part.state.title), rawInput: part.state.input, rawOutput: completedToolRawOutput(part), }, @@ -878,15 +1006,14 @@ export class Agent implements ACPAgent { case "error": this.toolStarts.delete(part.callID) this.shellSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "tool_call_update", toolCallId: part.callID, status: "failed", kind: toToolKind(part.tool), - title: part.tool, + title: toolTitle(part.tool, part.state.input), rawInput: part.state.input, content: [ { @@ -965,8 +1092,7 @@ export class Agent implements ACPAgent { if (effectiveMime.startsWith("image/")) { // Image - send as image block - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -994,8 +1120,7 @@ export class Agent implements ACPAgent { } : { uri: fileUri, mimeType: effectiveMime, blob: base64Data } - await this.connection - .sessionUpdate({ + await this.sendSessionUpdate({ sessionId, update: { sessionUpdate: messageChunk, @@ -1042,17 +1167,18 @@ export class Agent implements ACPAgent { private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) + const input = part.state.input ?? {} await this.connection .sessionUpdate({ sessionId, update: { sessionUpdate: "tool_call", toolCallId: part.callID, - title: part.tool, + title: toolTitle(part.tool, input), kind: toToolKind(part.tool), status: "pending", - locations: [], - rawInput: {}, + locations: toLocations(part.tool, input), + rawInput: input, }, }) .catch((error) => { @@ -1182,13 +1308,15 @@ export class Agent implements ACPAgent { }), ) - setTimeout(() => { - void this.connection.sessionUpdate({ +setTimeout(() => { + void this.sendSessionUpdate({ sessionId, update: { sessionUpdate: "available_commands_update", availableCommands, }, + }).catch((error) => { + log.error("failed to send available commands update to ACP", { error }) }) }, 0) @@ -1231,7 +1359,7 @@ export class Agent implements ACPAgent { ? { availableModes: modeState.availableModes, currentModeId: modeState.currentModeId } : undefined - await this.connection.sessionUpdate({ + await this.sendSessionUpdate({ sessionId: session.id, update: { sessionUpdate: "config_option_update", @@ -1315,11 +1443,29 @@ export class Agent implements ACPAgent { } } + /** + * Check if the session was cancelled or the connection was lost during + * a prompt, and return the appropriate stop reason. + */ + private checkStopReason(sessionID: string): "cancelled" | undefined { + if (this.cancelledSessions.has(sessionID)) { + this.cancelledSessions.delete(sessionID) + return "cancelled" + } + if (this.connection.signal.aborted) { + return "cancelled" + } + return undefined + } + async prompt(params: PromptRequest) { const sessionID = params.sessionId const session = this.sessionManager.get(sessionID) const directory = session.cwd + // Clear any prior cancelled state for this session. + this.cancelledSessions.delete(sessionID) + const current = session.model const model = current ?? (await defaultModel(this.config, directory)) if (!current) { @@ -1431,20 +1577,59 @@ export class Agent implements ACPAgent { }) if (!cmd) { - const response = await this.sdk.session.prompt({ - sessionID, - model: { - providerID: model.providerID, - modelID: model.modelID, - }, - variant: this.sessionManager.getVariant(sessionID), - parts, - agent, - directory, - }) + let response + try { + response = await this.sdk.session.prompt({ + sessionID, + model: { + providerID: model.providerID, + modelID: model.modelID, + }, + variant: this.sessionManager.getVariant(sessionID), + parts, + agent, + directory, + }, { throwOnError: true }) + } catch (error) { + // Surface OpenCode errors (rate limits, auth failures, etc.) to Zed. + // Without this, Zed shows "loading" forever or a cryptic JSON-RPC error. + const errorMessage = error instanceof Error ? error.message : String(error) + log.error("prompt failed", { error: errorMessage, sessionID }) + + // Check if the connection was lost during the prompt. + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } + + // Send the error as a text message so Zed displays it to the user. + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_message_chunk", + messageId: `error-${Date.now()}`, + content: { + type: "text", + text: `Error: ${errorMessage}`, + }, + }, + }).catch(() => { + // If we can't even send the error message, the connection is likely dead. + }) + + return { + stopReason: "end_turn" as const, + _meta: {}, + } + } const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1457,17 +1642,50 @@ export class Agent implements ACPAgent { .list({ directory }, { throwOnError: true }) .then((x) => x.data!.find((c) => c.name === cmd.name)) if (command) { - const response = await this.sdk.session.command({ - sessionID, - command: command.name, - arguments: cmd.args, - model: model.providerID + "/" + model.modelID, - agent, - directory, - }) + let response + try { + response = await this.sdk.session.command({ + sessionID, + command: command.name, + arguments: cmd.args, + model: model.providerID + "/" + model.modelID, + agent, + directory, + }, { throwOnError: true }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + log.error("command failed", { error: errorMessage, sessionID }) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } + + await this.sendSessionUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_message_chunk", + messageId: `error-${Date.now()}`, + content: { + type: "text", + text: `Error: ${errorMessage}`, + }, + }, + }).catch(() => {}) + + return { + stopReason: "end_turn" as const, + _meta: {}, + } + } const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1490,7 +1708,12 @@ export class Agent implements ACPAgent { break } - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await this.sendUsageUpdate(sessionID, directory) + + const cancelled = this.checkStopReason(sessionID) + if (cancelled) { + return { stopReason: cancelled, _meta: {} } + } return { stopReason: "end_turn" as const, @@ -1499,14 +1722,32 @@ export class Agent implements ACPAgent { } async cancel(params: CancelNotification) { - const session = this.sessionManager.get(params.sessionId) - await this.config.sdk.session.abort( - { + const session = this.sessionManager.tryGet(params.sessionId) + if (!session) { + log.warn("cancel for unknown ACP session", { sessionID: params.sessionId }) + return + } + + // Mark session as cancelled so prompt() can return "cancelled" stop reason. + this.cancelledSessions.add(params.sessionId) + + // Clear any pending permission queues for this session. + this.permissionQueues.delete(params.sessionId) + + try { + await this.config.sdk.session.abort( + { + sessionID: params.sessionId, + directory: session.cwd, + }, + { throwOnError: true }, + ) + } catch (error) { + log.error("failed to abort ACP session on cancel", { + error, sessionID: params.sessionId, - directory: session.cwd, - }, - { throwOnError: true }, - ) + }) + } } private async loadSessionMessages(directory: string, sessionId: string, limit?: number) { @@ -1572,6 +1813,19 @@ function toToolKind(toolName: string): ToolKind { } } +function taskToolTitle(input: Record): string { + const type = typeof input["subagent_type"] === "string" ? input["subagent_type"] : "" + const desc = typeof input["description"] === "string" ? input["description"] : "" + const kind = type ? type.charAt(0).toUpperCase() + type.slice(1) : "Unknown" + if (desc) return `${kind} — ${desc}` + return `${kind} Task` +} + +function toolTitle(toolName: string, input: Record, stateTitle?: string): string { + if (toolName === "task") return taskToolTitle(input) + return stateTitle ?? toolName +} + function toLocations(toolName: string, input: Record): { path: string }[] { const tool = toolName.toLocaleLowerCase() diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index cc1ed0be3098..1a0b00fcac17 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -119,4 +119,40 @@ export class ACPSessionManager { this.sessions.delete(sessionId) return session } + + /** + * Remove all tracked sessions. Called when the ACP connection closes + * to prevent stale state from persisting. + */ + clear(): void { + this.sessions.clear() + } + + /** + * Abort all tracked OpenCode sessions and clear state. + * Called when the ACP connection closes unexpectedly. + */ + async abortAll(): Promise { + const sessions = [...this.sessions.values()] + this.sessions.clear() + + await Promise.allSettled( + sessions.map((session) => + this.sdk.session + .abort( + { + sessionID: session.id, + directory: session.cwd, + }, + { throwOnError: true }, + ) + .catch((error) => { + log.error("failed to abort session on disconnect", { + error, + sessionID: session.id, + }) + }), + ), + ) + } } diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index b3b7df486b31..bf2da70db6c2 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -56,18 +56,25 @@ export const AcpCommand = effectCmd({ const stream = ndJsonStream(input, output) const agent = ACP.init({ sdk }) - new AgentSideConnection((conn) => { + const connection = new AgentSideConnection((conn) => { return agent.create(conn, { sdk }) }, stream) log.info("setup connection") process.stdin.resume() - yield* Effect.promise( - () => - new Promise((resolve, reject) => { - process.stdin.on("end", () => resolve()) - process.stdin.on("error", reject) - }), - ) + + // Wait for either the ACP connection to close or stdin to end. + // This ensures the process exits when the client disconnects, + // preventing orphaned sessions and event subscriptions. + yield* Effect.promise(async () => { + const connectionClosed = connection.closed.catch(() => undefined) + const stdinEnded = new Promise((resolve) => { + process.stdin.on("end", () => resolve()) + process.stdin.on("error", () => resolve()) + }) + + await Promise.race([connectionClosed, stdinEnded]) + log.info("ACP connection or stdin ended, shutting down") + }) }), }) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index a01680e30596..5fcccbcd7740 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -199,6 +199,8 @@ function createFakeAgent() { updates.set(sessionId, list) } + const connectionAbort = new AbortController() + const connection = { async sessionUpdate(params: SessionUpdateParams) { sessionUpdates.push(params) @@ -215,6 +217,13 @@ function createFakeAgent() { async requestPermission(_params: RequestPermissionParams): Promise { return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult }, + async writeTextFile(_params: any): Promise {}, + get signal() { + return connectionAbort.signal + }, + get closed(): Promise { + return new Promise(() => {}) + }, } as unknown as AgentSideConnection const { controller, stream } = createEventStream() @@ -324,6 +333,7 @@ function createFakeAgent() { const stop = () => { controller.close() ;(agent as any).eventAbort.abort() + connectionAbort.abort() } return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection }