diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 04c4cd24d..1a1e79a3f 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -20,6 +20,7 @@ import { } from "./buildConversationItems"; import { GitActionMessage } from "./GitActionMessage"; import { GitActionResult } from "./GitActionResult"; +import { mergeConversationItems } from "./mergeConversationItems"; import { SessionFooter } from "./SessionFooter"; import { QueuedMessageView } from "./session-update/QueuedMessageView"; import { @@ -102,13 +103,18 @@ export function ConversationView({ [queuedMessages], ); - const items = useMemo(() => { - const result: ConversationItem[] = [ - ...conversationItems, - ...optimisticItems, - ]; - return queuedItems.length > 0 ? [...result, ...queuedItems] : result; - }, [conversationItems, optimisticItems, queuedItems]); + const isCloud = session?.isCloud ?? false; + + const items = useMemo( + () => + mergeConversationItems({ + conversationItems, + optimisticItems, + queuedItems, + isCloud, + }), + [conversationItems, optimisticItems, queuedItems, isCloud], + ); // Keep MCP App tool call items mounted so their iframes and bridges // survive scrolling out of the virtualized viewport. diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts new file mode 100644 index 000000000..a8434c718 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts @@ -0,0 +1,102 @@ +import type { QueuedMessage } from "@features/sessions/stores/sessionStore"; +import { describe, expect, it } from "vitest"; +import type { ConversationItem } from "./buildConversationItems"; +import { mergeConversationItems } from "./mergeConversationItems"; + +function userMessage( + id: string, + content: string, +): Extract { + return { type: "user_message", id, content, timestamp: 0 }; +} + +function queuedItem( + id: string, + content: string, +): Extract { + const message: QueuedMessage = { + id, + content, + rawPrompt: [{ type: "text", text: content }], + queuedAt: 0, + }; + return { type: "queued", id, message }; +} + +describe("mergeConversationItems", () => { + it("local: appends optimistic at the chronological end", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("a", "first")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["a", "opt"]); + }); + + it("local: queued items always come last", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("a", "first")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [queuedItem("q1", "later")], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["a", "opt", "q1"]); + }); + + it("local: does NOT dedupe — duplicate echoes are intentionally retained", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("echo", "hello")], + optimisticItems: [userMessage("opt", "hello")], + queuedItems: [], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["echo", "opt"]); + }); + + it("cloud: pins optimistic at the top", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("setup", "setup info")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "setup"]); + }); + + it("cloud: filters echoed user_message that matches optimistic content", () => { + const result = mergeConversationItems({ + conversationItems: [ + userMessage("echo", "hello"), + userMessage("other", "different"), + ], + optimisticItems: [userMessage("opt", "hello")], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "other"]); + }); + + it("cloud: dedupe is no-op when there are no optimistic items", () => { + const result = mergeConversationItems({ + conversationItems: [ + userMessage("a", "first"), + userMessage("b", "second"), + ], + optimisticItems: [], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["a", "b"]); + }); + + it("cloud: queued items always come last", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("setup", "setup")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [queuedItem("q1", "later")], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "setup", "q1"]); + }); +}); diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts new file mode 100644 index 000000000..8aa755cb1 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts @@ -0,0 +1,53 @@ +import type { ConversationItem } from "./buildConversationItems"; + +type QueuedItem = Extract; + +interface MergeConversationItemsArgs { + conversationItems: ConversationItem[]; + optimisticItems: ConversationItem[]; + queuedItems: QueuedItem[]; + isCloud: boolean; +} + +// Cloud's initial optimistic is pinned to the top so the user's prompt stays +// visible above setup progress. When the agent echoes it back via +// `session/prompt`, the duplicate `user_message` is filtered out by content +// match so the bubble doesn't disappear-then-reappear when the echo lands. +// +// Local sessions keep optimistic at the chronological end — they rely on +// `replaceOptimisticWithEvent` to swap optimistic↔real in place. +export function mergeConversationItems({ + conversationItems, + optimisticItems, + queuedItems, + isCloud, +}: MergeConversationItemsArgs): ConversationItem[] { + if (!isCloud) { + const result: ConversationItem[] = [ + ...conversationItems, + ...optimisticItems, + ]; + return queuedItems.length > 0 ? [...result, ...queuedItems] : result; + } + + const optimisticUserContents = new Set( + optimisticItems + .filter( + (item): item is Extract => + item.type === "user_message", + ) + .map((item) => item.content), + ); + const dedupedConversation = + optimisticUserContents.size === 0 + ? conversationItems + : conversationItems.filter((item) => { + if (item.type !== "user_message") return true; + return !optimisticUserContents.has(item.content); + }); + const result: ConversationItem[] = [ + ...optimisticItems, + ...dedupedConversation, + ]; + return queuedItems.length > 0 ? [...result, ...queuedItems] : result; +} diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index cc4ff9e71..7de8a5f2a 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -91,6 +91,7 @@ export function useSessionConnection({ initialMode, adapter, initialModel, + task.description ?? undefined, ); return cleanup; }, [ @@ -106,6 +107,7 @@ export function useSessionConnection({ task.latest_run?.model, task.latest_run?.runtime_adapter, task.latest_run?.state?.initial_permission_mode, + task.description, ]); useEffect(() => { diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index e64e4c4a5..a76eefdd9 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -62,7 +62,16 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ removeQueuedMessage: vi.fn(), clearMessageQueue: vi.fn(), dequeueMessagesAsText: vi.fn(() => null), - dequeueMessages: vi.fn(() => []), + dequeueMessages: vi.fn( + () => + [] as Array<{ + id: string; + content: string; + rawPrompt?: unknown; + queuedAt: number; + }>, + ), + prependQueuedMessages: vi.fn(), setPendingPermissions: vi.fn(), getSessionByTaskId: vi.fn(), getSessions: vi.fn(() => ({})), @@ -881,6 +890,127 @@ describe("SessionService", () => { }); }); + it("flushes queued cloud messages on _posthog/turn_complete", async () => { + const service = getSessionService(); + // Reset auth client (a prior test may have set it to null). + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + }); + + it("re-enqueues queued cloud messages when the dispatch fails", async () => { + const service = getSessionService(); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockRejectedValue( + new Error("transient backend failure"), + ); + + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.prependQueuedMessages, + ).toHaveBeenCalledWith("task-123", [queuedMessage]); + }); + }); + it("clears isPromptPending from structured turn completion logs on hydration", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ @@ -1241,51 +1371,335 @@ describe("SessionService", () => { 8, ); }); - - it("ignores stale async starts when the same watcher is replaced", async () => { + it("flips status to connected on _posthog/run_started", async () => { const service = getSessionService(); - let resolveFirstWatchStart!: () => void; - let resolveSecondWatchStart!: () => void; + const hydratedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + hydratedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": hydratedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); - mockTrpcCloudTask.watch.mutate - .mockImplementationOnce( - () => - new Promise((resolve) => { - resolveFirstWatchStart = resolve; - }), - ) - .mockImplementationOnce( - () => - new Promise((resolve) => { - resolveSecondWatchStart = resolve; - }), + const runStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([runStartedEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + { status: "connected" }, ); + }); + }); + + it("does not re-flip status when run_started arrives but session is already connected", async () => { + const service = getSessionService(); + const connectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + connectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": connectedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + const runStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([runStartedEvent]); service.watchCloudTask( "task-123", "run-123", "https://api.anthropic.com", 123, + undefined, + "https://logs.example.com/run-123", ); - service.stopCloudTaskWatch("task-123"); + + // Wait long enough for the hydration callback to run; assert the + // store was never told to set status: "connected" again. + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + { status: "connected" }, + ); + }); + + it("seeds an optimistic user-message when hydrating a brand-new task with no prior history", async () => { + const service = getSessionService(); + const freshSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(freshSession); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": freshSession, + }); + // Empty history — fetchSessionLogs returns no entries. + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(""); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + service.watchCloudTask( "task-123", "run-123", "https://api.anthropic.com", 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "build me a thing", ); - resolveSecondWatchStart(); - await Promise.resolve(); - await Promise.resolve(); + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "build me a thing", + }), + ); + }); + }); - resolveFirstWatchStart(); - await Promise.resolve(); - await Promise.resolve(); + it("seeds an optimistic user-message when persisted entries exist but no session/prompt yet (agent emitted lifecycle notifications first)", async () => { + const service = getSessionService(); + const freshSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(freshSession); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": freshSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + JSON.stringify({ + jsonrpc: "2.0", + method: "_posthog/run_started", + params: { + sessionId: "acp-session-1", + runId: "run-123", + taskId: "task-123", + }, + }), + ); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + // Lifecycle notification only — no session/prompt request yet. + const lifecycleNotification = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session-1", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([ + lifecycleNotification, + ]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "build me a thing", + ); - expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledTimes(2); + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "build me a thing", + }), + ); + }); }); + it("does NOT seed an optimistic user-message when hydration finds prior history", async () => { + const service = getSessionService(); + const reopenedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + reopenedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": reopenedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + // Non-empty history: a prior session/prompt exists. + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + JSON.stringify({ + type: "request", + timestamp: "2024-01-01T00:00:00Z", + request: { + jsonrpc: "2.0", + id: 1, + method: "session/prompt", + params: { prompt: [{ type: "text", text: "hello there" }] }, + }, + }), + ); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + const priorPrompt = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + id: 1, + method: "session/prompt", + params: { prompt: [{ type: "text", text: "hello there" }] }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([priorPrompt]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "hello there", + ); + + // Wait for hydration to run. + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ events: [priorPrompt] }), + ); + }); + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).not.toHaveBeenCalled(); + }); + it("ignores stale async starts when the same watcher is replaced", async () => { + const service = getSessionService(); + let resolveFirstWatchStart!: () => void; + let resolveSecondWatchStart!: () => void; + + mockTrpcCloudTask.watch.mutate + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFirstWatchStart = resolve; + }), + ) + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSecondWatchStart = resolve; + }), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + service.stopCloudTaskWatch("task-123"); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + resolveSecondWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + resolveFirstWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledTimes(2); + }); + it("sends a compensating unwatch if teardown wins the race after watch starts", async () => { const service = getSessionService(); let resolveWatchStart!: () => void; @@ -1577,6 +1991,29 @@ describe("SessionService", () => { ); }); + it("queues cloud prompt when session.status is not connected (agent not ready)", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + status: "disconnected", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }]; + const result = await service.sendPrompt("task-123", prompt); + + expect(result.stopReason).toBe("queued"); + expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( + "task-123", + "wake me up", + prompt, + ); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + }); + it("preserves cloud attachment prompts when queueing a follow-up", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index eda53cbbb..a9645edfa 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -226,6 +226,8 @@ export class SessionService { private connectingTasks = new Map>(); private localRepoPaths = new Map(); private localRecoveryAttempts = new Map>(); + /** Re-entrance guard for cloud queue dispatch (per taskId). */ + private dispatchingCloudQueues = new Set(); private nextCloudTaskWatchToken = 0; private subscriptions = new Map< string, @@ -1002,6 +1004,7 @@ export class SessionService { this.localRecoveryAttempts.clear(); this.cloudPermissionRequestIds.clear(); this.cloudLogGapReconciles.clear(); + this.dispatchingCloudQueues.clear(); this.idleKilledSubscription?.unsubscribe(); this.idleKilledSubscription = null; } @@ -1047,6 +1050,46 @@ export class SessionService { currentPromptId: null, }); } + // Lifecycle handshake from the agent — flip status to "connected" + // so the UI can release the queue-while-not-ready guard. This is + // the explicit "agent is up and accepting user messages" signal, + // emitted by `agent-server.ts` once the ACP session is fully + // wired. We deliberately do NOT drain the queue here: the agent + // is about to start `sendInitialTaskMessage` (or `sendResumeMessage`), + // and dispatching a queued user_message right now would race with + // its `clientConnection.prompt()` and one of the prompts would end + // up cancelled. The `turn_complete` handler below drains once the + // agent's initial / resume turn is actually finished. + if ( + "method" in msg && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.RUN_STARTED) + ) { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (session?.isCloud && session.status !== "connected") { + sessionStoreSetters.updateSession(taskRunId, { + status: "connected", + }); + } + } + // Canonical "turn boundary" — flush any queued cloud messages now + // that the agent is idle and accepting the next prompt. + if ( + "method" in msg && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE) + ) { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (session?.isCloud && session.messageQueue.length > 0) { + const taskId = session.taskId; + setTimeout(() => { + this.sendQueuedCloudMessages(taskId).catch((err) => + log.error("turn_complete-driven cloud queue flush failed", { + taskId, + error: err, + }), + ); + }, 0); + } + } } } @@ -1553,6 +1596,31 @@ export class SessionService { return { stopReason: "queued" }; } + // Agent-readiness guard: until we've received `_posthog/run_started` + // (which flips `session.status` to `"connected"`), the agent may + // still be booting / restoring after a sandbox restart, or mid- + // initial-prompt — sending now would race with its + // `clientConnection.prompt(initialPrompt)` on the same ACP session. + // Funnel through the queue; the run_started or turn_complete + // handlers will drain it once the agent is provably ready. + if ( + !options?.skipQueueGuard && + session.isCloud && + session.status !== "connected" + ) { + sessionStoreSetters.enqueueMessage( + session.taskId, + transport.promptText, + prompt, + ); + log.info("Cloud message queued (agent not ready)", { + taskId: session.taskId, + sessionStatus: session.status, + queueLength: session.messageQueue.length + 1, + }); + return { stopReason: "queued" }; + } + if (!options?.skipQueueGuard && session.isPromptPending) { sessionStoreSetters.enqueueMessage( session.taskId, @@ -1642,71 +1710,58 @@ export class SessionService { } } - private async sendQueuedCloudMessages( - taskId: string, - attempt = 0, - pendingPrompt?: string | ContentBlock[], - ): Promise<{ stopReason: string }> { - // First attempt: atomically dequeue. Retries reuse the already-dequeued prompt. - const combinedPrompt = - pendingPrompt ?? - combineQueuedCloudPrompts(sessionStoreSetters.dequeueMessages(taskId)); - if (!combinedPrompt) return { stopReason: "skipped" }; + /** + * Dispatches all currently queued cloud messages as a single combined + * prompt. Drains the queue up-front and rolls it back on failure so the + * next dispatch trigger (turn_complete, cloudStatus flip) can retry. A + * per-taskId re-entrance guard prevents concurrent triggers from + * double-dispatching. + * + * Pre-flight conditions match what `sendCloudPrompt` would otherwise + * silently re-queue on (sandbox not in_progress, prompt already pending). + * Skipping early lets the next trigger retry instead of re-queueing the + * already-dequeued prompt back into the same queue. + */ + private async sendQueuedCloudMessages(taskId: string): Promise { + if (this.dispatchingCloudQueues.has(taskId)) return; - const session = sessionStoreSetters.getSessionByTaskId(taskId); - if (!session) { - log.warn("No session found for queued cloud messages, message lost", { + this.dispatchingCloudQueues.add(taskId); + try { + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session?.isCloud || session.messageQueue.length === 0) return; + // Terminal cloud runs route through `resumeCloudRun`, which spins a + // new run and consumes the prompt itself — so dispatch is fine. + // Otherwise gate on the agent-ready handshake (`run_started` flips + // status to "connected") to avoid racing with `sendInitialTaskMessage`. + const isTerminal = isTerminalStatus(session.cloudStatus); + const canSendNow = + isTerminal || + (session.cloudStatus === "in_progress" && + session.status === "connected"); + if (!canSendNow || session.isPromptPending) return; + + const drained = sessionStoreSetters.dequeueMessages(taskId); + const combined = combineQueuedCloudPrompts(drained); + if (!combined) return; + + log.info("Sending queued cloud messages", { taskId, + drainedCount: drained.length, }); - return { stopReason: "no_session" }; - } - - log.info("Sending queued cloud messages", { - taskId, - promptLength: combinedPrompt.length, - attempt, - }); - try { - return await this.sendCloudPrompt(session, combinedPrompt, { - skipQueueGuard: true, - }); - } catch (error) { - const maxRetries = 5; - if (attempt < maxRetries) { - const delayMs = Math.min(1000 * 2 ** attempt, 10_000); - log.warn("Cloud message send failed, scheduling retry", { - taskId, - attempt, - delayMs, - error: String(error), + try { + await this.sendCloudPrompt(session, combined, { + skipQueueGuard: true, }); - return new Promise((resolve) => { - setTimeout(() => { - resolve( - this.sendQueuedCloudMessages( - taskId, - attempt + 1, - combinedPrompt, - ).catch((err) => { - log.error("Queued cloud message retry failed", { - taskId, - attempt: attempt + 1, - error: err, - }); - return { stopReason: "error" }; - }), - ); - }, delayMs); + } catch (err) { + log.warn("Cloud queue dispatch failed; re-enqueueing", { + taskId, + error: String(err), }); + sessionStoreSetters.prependQueuedMessages(taskId, drained); } - - log.error("Queued cloud message send failed after max retries", { - taskId, - attempts: attempt + 1, - }); - toast.error("Failed to send follow-up message. Please try again."); - return { stopReason: "error" }; + } finally { + this.dispatchingCloudQueues.delete(taskId); } } @@ -2421,6 +2476,7 @@ export class SessionService { initialMode?: string, adapter: Adapter = "claude", initialModel?: string, + taskDescription?: string, ): () => void { const taskRunId = runId; const existingWatcher = this.cloudTaskWatchers.get(taskId); @@ -2497,6 +2553,10 @@ export class SessionService { adapter, ); sessionStoreSetters.setSession(session); + // Optimistic seeding for the initial task description is deferred + // until `hydrateCloudTaskSessionFromLogs` confirms there's no prior + // conversation. Otherwise reopening a task with history would flash + // the description at top until hydration replaced it. } else { // Ensure cloud flag and configOptions are set on existing sessions const updates: Partial = {}; @@ -2527,7 +2587,12 @@ export class SessionService { ); if (shouldHydrateSession) { - this.hydrateCloudTaskSessionFromLogs(taskId, taskRunId, logUrl); + this.hydrateCloudTaskSessionFromLogs( + taskId, + taskRunId, + logUrl, + taskDescription, + ); } // Subscribe before starting the main-process watcher so the first replayed @@ -2595,18 +2660,38 @@ export class SessionService { taskId: string, taskRunId: string, logUrl?: string, + taskDescription?: string, ): void { void (async () => { const { rawEntries } = await this.fetchSessionLogs(logUrl, taskRunId); - if (rawEntries.length === 0) { - return; - } const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session || session.taskRunId !== taskRunId) { return; } + const events = convertStoredEntriesToEvents(rawEntries); + const hasUserPrompt = events.some( + (e) => + isJsonRpcRequest(e.message) && e.message.method === "session/prompt", + ); + + // Seed the optimistic user-message bubble whenever the agent has + // not yet recorded an initial `session/prompt` request — covers the + // brand-new task case as well as "agent has emitted lifecycle + // notifications but hasn't received its first prompt yet". + if (!hasUserPrompt && taskDescription?.trim()) { + sessionStoreSetters.appendOptimisticItem(taskRunId, { + type: "user_message", + content: taskDescription, + timestamp: Date.now(), + }); + } + + if (rawEntries.length === 0) { + return; + } + // If live updates already populated a processed count, don't overwrite // that newer state with the persisted baseline fetched during startup. if ( @@ -2616,7 +2701,6 @@ export class SessionService { return; } - const events = convertStoredEntriesToEvents(rawEntries); sessionStoreSetters.updateSession(taskRunId, { events, isCloud: true, @@ -3002,20 +3086,13 @@ export class SessionService { } } - // Flush queued messages when a cloud turn completes (detected via live log updates) - const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; - if ( - sessionAfterLogs && - !sessionAfterLogs.isPromptPending && - sessionAfterLogs.messageQueue.length > 0 - ) { - this.sendQueuedCloudMessages(sessionAfterLogs.taskId).catch((err) => { - log.error("Failed to send queued cloud messages after turn complete", { - taskId: sessionAfterLogs.taskId, - error: err, - }); - }); - } + // NOTE: Don't auto-flush on `!isPromptPending && queue.length > 0` here. + // Setup-phase log batches (`_posthog/progress`, `_posthog/console`) stream + // in BEFORE the agent emits its initial `session/prompt` request, so + // `isPromptPending` is still false during those batches — firing the + // dispatcher then races with the agent's initial `clientConnection.prompt`. + // The canonical "agent is idle" signal is `_posthog/turn_complete`, which + // is handled in `updatePromptStateFromEvents`. // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { @@ -3027,23 +3104,10 @@ export class SessionService { branch: update.branch, }); - // Auto-send queued messages when a resumed run becomes active - if (update.status === "in_progress") { - const session = sessionStoreSetters.getSessions()[taskRunId]; - if (session && session.messageQueue.length > 0) { - // Clear the pending flag first — resumeCloudRun sets it as a guard - // while waiting for the run to start. Now that the run is active, - // sendCloudPrompt needs the flag clear to actually send. - sessionStoreSetters.updateSession(taskRunId, { - isPromptPending: false, - }); - this.sendQueuedCloudMessages(session.taskId).catch(() => { - // Retries exhausted — message was re-enqueued by - // sendQueuedCloudMessages, future stream-based completion detection - // will keep trying - }); - } - } + // No cloudStatus="in_progress" auto-flush here. `run_started` from + // the agent-server is the canonical "agent is ready" trigger and + // handles both initial boot and post-restart recovery; firing + // earlier would race with `sendInitialTaskMessage`. if (isTerminalStatus(update.status)) { // Clean up any pending resume messages that couldn't be sent diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts index 96aea7c42..8839274f2 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts @@ -1,6 +1,10 @@ import type { SessionConfigOption } from "@agentclientprotocol/sdk"; -import { describe, expect, it } from "vitest"; -import { cycleModeOption } from "./sessionStore"; +import { beforeEach, describe, expect, it } from "vitest"; +import { + cycleModeOption, + sessionStoreSetters, + useSessionStore, +} from "./sessionStore"; function createModeOption( currentValue: string, @@ -65,3 +69,92 @@ describe("cycleModeOption", () => { expect(cycleModeOption(option, { allowBypassPermissions })).toBe(expected); }); }); + +describe("dequeueMessages", () => { + beforeEach(() => { + useSessionStore.setState((state) => { + state.sessions = {}; + state.taskIdIndex = {}; + }); + }); + + it("returns plain objects that survive after the immer setState exits", () => { + sessionStoreSetters.setSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Test", + channel: "agent-event:run-123", + events: [], + startedAt: 0, + status: "connected", + isPromptPending: false, + isCompacting: false, + promptStartedAt: null, + pendingPermissions: new Map(), + pausedDurationMs: 0, + messageQueue: [], + optimisticItems: [], + }); + sessionStoreSetters.enqueueMessage("task-123", "first", [ + { type: "text", text: "first" }, + ]); + sessionStoreSetters.enqueueMessage("task-123", "second", [ + { type: "text", text: "second" }, + ]); + + const drained = sessionStoreSetters.dequeueMessages("task-123"); + + // Reading members of drained items must NOT throw "Cannot perform 'get' + // on a proxy that has been revoked" — the silent root cause behind the + // cloud-queue dispatcher losing messages. Items returned must be plain + // objects, not immer drafts that get revoked when setState exits. + expect(() => drained.map((m) => m.content)).not.toThrow(); + expect(drained.map((m) => m.content)).toEqual(["first", "second"]); + expect(useSessionStore.getState().sessions["run-123"].messageQueue).toEqual( + [], + ); + }); +}); + +describe("prependQueuedMessages", () => { + beforeEach(() => { + useSessionStore.setState((state) => { + state.sessions = {}; + state.taskIdIndex = {}; + }); + }); + + it("splices messages back at the head of the queue", () => { + sessionStoreSetters.setSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Test", + channel: "agent-event:run-123", + events: [], + startedAt: 0, + status: "connected", + isPromptPending: false, + isCompacting: false, + promptStartedAt: null, + pendingPermissions: new Map(), + pausedDurationMs: 0, + messageQueue: [], + optimisticItems: [], + }); + sessionStoreSetters.enqueueMessage("task-123", "live", [ + { type: "text", text: "live" }, + ]); + + sessionStoreSetters.prependQueuedMessages("task-123", [ + { + id: "rolled-back", + content: "rolled-back", + rawPrompt: [{ type: "text", text: "rolled-back" }], + queuedAt: 0, + }, + ]); + + const queue = useSessionStore.getState().sessions["run-123"].messageQueue; + expect(queue.map((m) => m.content)).toEqual(["rolled-back", "live"]); + }); +}); diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 08b7c7f11..c4079c91b 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -376,18 +376,43 @@ export const sessionStoreSetters = { }, dequeueMessages: (taskId: string): QueuedMessage[] => { - let queuedMessages: QueuedMessage[] = []; + // Read the queue from the frozen committed state BEFORE entering the + // immer draft, otherwise the items returned are proxies that get + // revoked when setState exits and any later access throws + // "Cannot perform 'get' on a proxy that has been revoked". + const state = useSessionStore.getState(); + const taskRunId = state.taskIdIndex[taskId]; + if (!taskRunId) return []; + const session = state.sessions[taskRunId]; + if (!session || session.messageQueue.length === 0) return []; + + const queuedMessages = [...session.messageQueue]; + + useSessionStore.setState((draft) => { + const trid = draft.taskIdIndex[taskId]; + if (!trid) return; + const draftSession = draft.sessions[trid]; + if (draftSession) { + draftSession.messageQueue = []; + } + }); + + return queuedMessages; + }, + + /** + * Splice messages back at the head of the queue. Used to roll back a + * dispatch attempt that drained the queue but failed before delivery. + */ + prependQueuedMessages: (taskId: string, messages: QueuedMessage[]) => { + if (messages.length === 0) return; useSessionStore.setState((state) => { const taskRunId = state.taskIdIndex[taskId]; if (!taskRunId) return; - const session = state.sessions[taskRunId]; - if (!session || session.messageQueue.length === 0) return; - - queuedMessages = [...session.messageQueue]; - session.messageQueue = []; + if (!session) return; + session.messageQueue = [...messages, ...session.messageQueue]; }); - return queuedMessages; }, appendOptimisticItem: ( diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index f61aa293d..f18e4ebb7 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -382,6 +382,35 @@ describe("AgentServer HTTP Mode", () => { }, 20000); }); + describe("session lifecycle", () => { + it("emits _posthog/run_started after session initialization", async () => { + await createServer().start(); + + // The notification is persisted via `logWriter.appendRawLine` which the + // mock backend's append_log handler captures into `appendLogCalls`. + await vi.waitFor( + () => { + const allEntries = appendLogCalls.flat() as Array<{ + type?: string; + notification?: { + method?: string; + params?: Record; + }; + }>; + const runStarted = allEntries.find( + (e) => e?.notification?.method === "_posthog/run_started", + ); + expect(runStarted).toBeDefined(); + expect(runStarted?.notification?.params).toMatchObject({ + runId: "test-run-id", + taskId: "test-task-id", + }); + }, + { timeout: 15000, interval: 100 }, + ); + }, 30000); + }); + describe("getInitialPromptOverride", () => { it("returns override string from run state", () => { const s = createServer(); diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 5d7a2a713..4774d2c84 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -958,6 +958,29 @@ export class AgentServer { await logAgentshRuntimeInfo(this.logger); this.logger.debug(`Initial permission mode: ${initialPermissionMode}`); + // Lifecycle handshake: clients gate "agent is ready to accept user + // messages" on this notification. Persisted to the session log so + // warm reconnects (sandbox restart with snapshot resume) replay it + // and see the agent come online again. + const runStartedNotification = { + jsonrpc: "2.0" as const, + method: POSTHOG_NOTIFICATIONS.RUN_STARTED, + params: { + sessionId: acpSessionId, + runId: payload.run_id, + taskId: payload.task_id, + }, + }; + this.broadcastEvent({ + type: "notification", + timestamp: new Date().toISOString(), + notification: runStartedNotification, + }); + this.session.logWriter.appendRawLine( + payload.run_id, + JSON.stringify(runStartedNotification), + ); + // Signal in_progress so the UI can start polling for updates this.posthogAPI .updateTaskRun(payload.task_id, payload.run_id, {