From 1c8f370a65c5de4838a5b4f12ea07c636304d5a0 Mon Sep 17 00:00:00 2001 From: me-alt Date: Sun, 24 May 2026 19:10:21 +0200 Subject: [PATCH] fix: reconcile ACP assistant chunks before end_turn --- packages/opencode/src/acp/agent.ts | 645 ++++++++++-------- .../test/acp/event-subscription.test.ts | 168 ++++- 2 files changed, 521 insertions(+), 292 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 8b74b9c9bad3..c3e5dbe7a0eb 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -54,6 +54,8 @@ import { ShellID } from "@/tool/shell/id" type ModeOption = { id: string; name: string; description?: string } type ModelOption = { modelId: string; name: string } +type SessionUpdateParams = Parameters[0] +type SendSessionUpdate = (params: SessionUpdateParams) => Promise const decodeTodos = Schema.decodeUnknownResult(Schema.fromJsonString(Schema.Array(Todo.Info))) const DEFAULT_VARIANT_VALUE = "default" @@ -80,7 +82,7 @@ async function getContextLimit( } async function sendUsageUpdate( - connection: AgentSideConnection, + sendUpdate: SendSessionUpdate, sdk: OpencodeClient, sessionID: string, directory: string, @@ -114,19 +116,46 @@ async function sendUsageUpdate( const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0) const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0) - await connection - .sessionUpdate({ - sessionId: sessionID, - update: { - sessionUpdate: "usage_update", - used, - size, - cost: { amount: totalCost, currency: "USD" }, - }, - }) - .catch((error) => { - log.error("failed to send usage update", { error }) - }) + await sendUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "usage_update", + used, + size, + cost: { amount: totalCost, currency: "USD" }, + }, + }).catch((error) => { + log.error("failed to send usage update", { error }) + }) +} + +function sentLength(part: SessionMessageResponse["parts"][number]) { + if (part.type !== "text" && part.type !== "reasoning") return 0 + return part.text.length +} + +function textDeltaFromPart(part: SessionMessageResponse["parts"][number], seen: number): string | undefined { + if (part.type !== "text" && part.type !== "reasoning") return undefined + if (seen >= part.text.length) return undefined + + return part.text.slice(seen) +} + +function liveDeltaFromPart( + part: SessionMessageResponse["parts"][number], + seen: number, + delta: string, +): string | undefined { + if (part.type !== "text" && part.type !== "reasoning") return undefined + if (seen <= 0) return delta + if (part.text.length === 0) return delta + if (seen >= part.text.length) return undefined + + const remaining = part.text.slice(seen) + if (remaining.startsWith(delta)) return delta + if (delta.endsWith(remaining)) return remaining + + return remaining.slice(0, delta.length) } export function init({ sdk: _sdk }: { sdk: OpencodeClient }) { @@ -144,6 +173,8 @@ export class Agent implements ACPAgent { private sessionManager: ACPSessionManager private eventAbort = new AbortController() private eventStarted = false + private sendChain = Promise.resolve() + private streamedPartChars = new Map() private shellSnapshots = new Map() private toolStarts = new Set() private permissionQueues = new Map>() @@ -161,6 +192,58 @@ export class Agent implements ACPAgent { this.startEventSubscription() } + private sendUpdate(params: SessionUpdateParams) { + const send = this.sendChain.then(() => this.connection.sessionUpdate(params)) + this.sendChain = send.catch((error) => { + log.error("failed to send session update to ACP", { error }) + }) + return send + } + + private recordStreamedPart(partID: string, delta: string) { + this.streamedPartChars.set(partID, (this.streamedPartChars.get(partID) ?? 0) + delta.length) + } + + private async reconcileAssistantMessage(sessionID: string, messageID: string, directory: string) { + const message = await this.sdk.session + .message({ sessionID, messageID, directory }, { throwOnError: false }) + .then((x) => x.data) + .catch((error) => { + log.error("failed to fetch assistant message for ACP reconciliation", { error, sessionID, messageID }) + return undefined + }) + + if (!message || message.info.role !== "assistant") return + + for (const part of message.parts) { + const seen = this.streamedPartChars.get(part.id) ?? 0 + const delta = textDeltaFromPart(part, seen) + if (!delta) continue + + if (part.type === "text" && part.ignored !== true) { + await this.sendUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_message_chunk", + messageId: messageID, + content: { type: "text", text: delta }, + }, + }) + this.streamedPartChars.set(part.id, sentLength(part)) + } else if (part.type === "reasoning") { + await this.sendUpdate({ + sessionId: sessionID, + update: { + sessionUpdate: "agent_thought_chunk", + messageId: messageID, + content: { type: "text", text: delta }, + }, + }) + this.streamedPartChars.set(part.id, sentLength(part)) + } + } + } + private startEventSubscription() { if (this.eventStarted) return this.eventStarted = true @@ -293,22 +376,20 @@ export class Agent implements ACPAgent { const hash = Hash.fast(output) if (part.tool === ShellID.ToolID) { if (this.shellSnapshots.get(part.callID) === hash) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((error) => { - log.error("failed to send tool in_progress to ACP", { error }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + kind: toToolKind(part.tool), + title: part.tool, + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }).catch((error) => { + log.error("failed to send tool in_progress to ACP", { error }) + }) return } this.shellSnapshots.set(part.callID, hash) @@ -321,23 +402,21 @@ export class Agent implements ACPAgent { }, }) } - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - ...(content.length > 0 && { content }), - }, - }) - .catch((error) => { - log.error("failed to send tool in_progress to ACP", { error }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + kind: toToolKind(part.tool), + title: part.tool, + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + ...(content.length > 0 && { content }), + }, + }).catch((error) => { + log.error("failed to send tool in_progress to ACP", { error }) + }) return case "completed": { @@ -349,80 +428,74 @@ export class Agent implements ACPAgent { if (part.tool === "todowrite") { const parsedTodos = decodeTodos(part.state.output) if (Result.isSuccess(parsedTodos)) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.success.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((error) => { - log.error("failed to send session update for todo", { error }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.success.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }).catch((error) => { + log.error("failed to send session update for todo", { error }) + }) } else { log.error("failed to parse todo output", { error: parsedTodos.failure }) } } - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: completedToolRawOutput(part), - }, - }) - .catch((error) => { - log.error("failed to send tool completed to ACP", { error }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawInput: part.state.input, + rawOutput: completedToolRawOutput(part), + }, + }).catch((error) => { + log.error("failed to send tool completed to ACP", { error }) + }) return } case "error": this.toolStarts.delete(part.callID) this.shellSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + kind: toToolKind(part.tool), + title: part.tool, + rawInput: part.state.input, + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, }, - ], - rawOutput: { - error: part.state.error, - metadata: part.state.metadata, }, + ], + rawOutput: { + error: part.state.error, + metadata: part.state.metadata, }, - }) - .catch((error) => { - log.error("failed to send tool error to ACP", { error }) - }) + }, + }).catch((error) => { + log.error("failed to send tool error to ACP", { error }) + }) return } } @@ -462,40 +535,48 @@ export class Agent implements ACPAgent { if (!part) return if (part.type === "text" && props.field === "text" && part.ignored !== true) { - await this.connection - .sessionUpdate({ + const delta = liveDeltaFromPart(part, this.streamedPartChars.get(part.id) ?? 0, props.delta) + if (!delta) return + + try { + await this.sendUpdate({ sessionId, update: { sessionUpdate: "agent_message_chunk", messageId: props.messageID, content: { type: "text", - text: props.delta, + text: delta, }, }, }) - .catch((error) => { - log.error("failed to send text delta to ACP", { error }) - }) + this.recordStreamedPart(part.id, delta) + } catch (error) { + log.error("failed to send text delta to ACP", { error }) + } return } if (part.type === "reasoning" && props.field === "text") { - await this.connection - .sessionUpdate({ + const delta = liveDeltaFromPart(part, this.streamedPartChars.get(part.id) ?? 0, props.delta) + if (!delta) return + + try { + await this.sendUpdate({ sessionId, update: { sessionUpdate: "agent_thought_chunk", messageId: props.messageID, content: { type: "text", - text: props.delta, + text: delta, }, }, }) - .catch((error) => { - log.error("failed to send reasoning delta to ACP", { error }) - }) + this.recordStreamedPart(part.id, delta) + } catch (error) { + log.error("failed to send reasoning delta to ACP", { error }) + } } return } @@ -614,7 +695,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionId, directory) return result } catch (e) { @@ -713,7 +794,7 @@ export class Agent implements ACPAgent { await this.processMessage(msg) } - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionId, directory) return mode } catch (e) { @@ -747,7 +828,7 @@ export class Agent implements ACPAgent { sessionId, }) - await sendUsageUpdate(this.connection, this.sdk, sessionId, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionId, directory) return result } catch (e) { @@ -806,23 +887,21 @@ export class Agent implements ACPAgent { }, }) } - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - kind: toToolKind(part.tool), - title: part.tool, - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - ...(runningContent.length > 0 && { content: runningContent }), - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + kind: toToolKind(part.tool), + title: part.tool, + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + ...(runningContent.length > 0 && { content: runningContent }), + }, + }).catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) break case "completed": this.toolStarts.delete(part.callID) @@ -833,100 +912,92 @@ export class Agent implements ACPAgent { if (part.tool === "todowrite") { const parsedTodos = decodeTodos(part.state.output) if (Result.isSuccess(parsedTodos)) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.success.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.success.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }).catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) } else { log.error("failed to parse todo output", { error: parsedTodos.failure }) } } - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawInput: part.state.input, - rawOutput: completedToolRawOutput(part), - }, - }) - .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawInput: part.state.input, + rawOutput: completedToolRawOutput(part), + }, + }).catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) break case "error": this.toolStarts.delete(part.callID) this.shellSnapshots.delete(part.callID) - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - kind: toToolKind(part.tool), - title: part.tool, - rawInput: part.state.input, - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + kind: toToolKind(part.tool), + title: part.tool, + rawInput: part.state.input, + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, }, - ], - rawOutput: { - error: part.state.error, - metadata: part.state.metadata, }, + ], + rawOutput: { + error: part.state.error, + metadata: part.state.metadata, }, - }) - .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) - }) + }, + }).catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) break } } else if (part.type === "text") { if (part.text) { const audience: Role[] | undefined = part.synthetic ? ["assistant"] : part.ignored ? ["user"] : undefined - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk", - messageId: message.info.id, - content: { - type: "text", - text: part.text, - ...(audience && { annotations: { audience } }), - }, + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk", + messageId: message.info.id, + content: { + type: "text", + text: part.text, + ...(audience && { annotations: { audience } }), }, - }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) - }) + }, + }).catch((err) => { + log.error("failed to send text to ACP", { error: err }) + }) } } else if (part.type === "file") { // Replay file attachments as appropriate ACP content blocks. @@ -943,18 +1014,16 @@ export class Agent implements ACPAgent { if (url.startsWith("file://")) { // Local file reference - send as resource_link - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: messageChunk, - messageId: message.info.id, - content: { type: "resource_link", uri: url, name: filename, mimeType: mime }, - }, - }) - .catch((err) => { - log.error("failed to send resource_link to ACP", { error: err }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: messageChunk, + messageId: message.info.id, + content: { type: "resource_link", uri: url, name: filename, mimeType: mime }, + }, + }).catch((err) => { + log.error("failed to send resource_link to ACP", { error: err }) + }) } else if (url.startsWith("data:")) { // Embedded content - parse data URL and send as appropriate block type const base64Match = url.match(/^data:([^;]+);base64,(.*)$/) @@ -965,23 +1034,21 @@ export class Agent implements ACPAgent { if (effectiveMime.startsWith("image/")) { // Image - send as image block - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: messageChunk, - messageId: message.info.id, - content: { - type: "image", - mimeType: effectiveMime, - data: base64Data, - uri: pathToFileURL(filename).href, - }, + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: messageChunk, + messageId: message.info.id, + content: { + type: "image", + mimeType: effectiveMime, + data: base64Data, + uri: pathToFileURL(filename).href, }, - }) - .catch((err) => { - log.error("failed to send image to ACP", { error: err }) - }) + }, + }).catch((err) => { + log.error("failed to send image to ACP", { error: err }) + }) } else { // Non-image: text types get decoded, binary types stay as blob const isText = effectiveMime.startsWith("text/") || effectiveMime === "application/json" @@ -994,38 +1061,34 @@ export class Agent implements ACPAgent { } : { uri: fileUri, mimeType: effectiveMime, blob: base64Data } - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: messageChunk, - messageId: message.info.id, - content: { type: "resource", resource }, - }, - }) - .catch((err) => { - log.error("failed to send resource to ACP", { error: err }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: messageChunk, + messageId: message.info.id, + content: { type: "resource", resource }, + }, + }).catch((err) => { + log.error("failed to send resource to ACP", { error: err }) + }) } } // URLs that don't match file:// or data: are skipped (unsupported) } else if (part.type === "reasoning") { if (part.text) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - messageId: message.info.id, - content: { - type: "text", - text: part.text, - }, + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + messageId: message.info.id, + content: { + type: "text", + text: part.text, }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) + }, + }).catch((err) => { + log.error("failed to send reasoning to ACP", { error: err }) + }) } } } @@ -1042,22 +1105,20 @@ export class Agent implements ACPAgent { private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((error) => { - log.error("failed to send tool pending to ACP", { error }) - }) + await this.sendUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }).catch((error) => { + log.error("failed to send tool pending to ACP", { error }) + }) } private async loadAvailableModes(directory: string): Promise { @@ -1183,7 +1244,7 @@ export class Agent implements ACPAgent { ) setTimeout(() => { - void this.connection.sessionUpdate({ + void this.sendUpdate({ sessionId, update: { sessionUpdate: "available_commands_update", @@ -1231,7 +1292,7 @@ export class Agent implements ACPAgent { ? { availableModes: modeState.availableModes, currentModeId: modeState.currentModeId } : undefined - await this.connection.sessionUpdate({ + await this.sendUpdate({ sessionId: session.id, update: { sessionUpdate: "config_option_update", @@ -1444,7 +1505,8 @@ export class Agent implements ACPAgent { }) const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + if (msg?.id) await this.reconcileAssistantMessage(sessionID, msg.id, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, @@ -1467,7 +1529,8 @@ export class Agent implements ACPAgent { }) const msg = response.data?.info - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + if (msg?.id) await this.reconcileAssistantMessage(sessionID, msg.id, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, @@ -1490,7 +1553,7 @@ export class Agent implements ACPAgent { break } - await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) + await sendUsageUpdate(this.sendUpdate.bind(this), this.sdk, sessionID, directory) return { stopReason: "end_turn" as const, diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index a01680e30596..7f8d922ca1b1 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -222,6 +222,21 @@ function createFakeAgent() { eventSubscribe: 0, sessionCreate: 0, } + const sessionMessages = new Map() + const defaultAssistant = { + id: "msg_prompt", + sessionID: "ses_1", + role: "assistant", + providerID: "opencode", + modelID: "big-pickle", + cost: 0, + tokens: { + input: 1, + output: 1, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + } const sdk = { global: { @@ -251,7 +266,13 @@ function createFakeAgent() { messages: async () => { return { data: [] } }, + prompt: async () => { + return { data: { info: defaultAssistant } } + }, message: async (params?: any) => { + const message = sessionMessages.get(params?.messageID) + if (message) return { data: message } + // Return a message with parts that can be looked up by partID return { data: { @@ -326,7 +347,7 @@ function createFakeAgent() { ;(agent as any).eventAbort.abort() } - return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection } + return { agent, controller, calls, updates, chunks, sessionUpdates, stop, sdk, connection, sessionMessages } } describe("acp.agent event subscription", () => { @@ -426,6 +447,151 @@ describe("acp.agent event subscription", () => { }) }) + test("replays canonical assistant text before prompt returns end_turn", async () => { + await using tmp = await tmpdir() + await provideTestInstance({ + directory: tmp.path, + fn: async () => { + const { agent, chunks, sessionMessages, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const text = "There is only one user directory in /home: `agent`." + + sessionMessages.set("msg_prompt", { + info: { + id: "msg_prompt", + sessionID: sessionId, + role: "assistant", + }, + parts: [ + { + id: "part_prompt", + type: "text", + text, + }, + ], + }) + + const result = await agent.prompt({ + sessionId, + prompt: [{ type: "text", text: "list files" }], + } as any) + + expect(result.stopReason).toBe("end_turn") + expect(chunks.get(sessionId)).toBe(text) + + stop() + }, + }) + }) + + test("reconciles only the unstreamed assistant text suffix", async () => { + await using tmp = await tmpdir() + await provideTestInstance({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, sessionMessages, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const text = "hello world" + + sessionMessages.set("msg_prompt", { + info: { + id: "msg_prompt", + sessionID: sessionId, + role: "assistant", + }, + parts: [ + { + id: "part_prompt", + type: "text", + text, + }, + ], + }) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_prompt", + partID: "part_prompt", + field: "text", + delta: "hello ", + }, + }, + } as any) + + await pollUntil(() => chunks.get(sessionId) === "hello ", "initial delta never arrived") + + const result = await agent.prompt({ + sessionId, + prompt: [{ type: "text", text: "finish" }], + } as any) + + expect(result.stopReason).toBe("end_turn") + expect(chunks.get(sessionId)).toBe(text) + + stop() + }, + }) + }) + + test("does not duplicate text when a live delta arrives after reconciliation", async () => { + await using tmp = await tmpdir() + await provideTestInstance({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, sessionMessages, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const text = "late reply" + + sessionMessages.set("msg_prompt", { + info: { + id: "msg_prompt", + sessionID: sessionId, + role: "assistant", + }, + parts: [ + { + id: "part_prompt", + type: "text", + text, + }, + ], + }) + + await agent.prompt({ + sessionId, + prompt: [{ type: "text", text: "finish" }], + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_prompt", + partID: "part_prompt", + field: "text", + delta: text, + }, + }, + } as any) + + await new Promise((resolve) => setTimeout(resolve, 25)) + + expect(chunks.get(sessionId)).toBe(text) + + stop() + }, + }) + }) + test("keeps concurrent sessions isolated when message.part.delta events are interleaved", async () => { await using tmp = await tmpdir() await provideTestInstance({