From d0130f562b1e095fc7a7803c02f49d2951a93f12 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Wed, 29 Apr 2026 09:27:08 +0200 Subject: [PATCH 1/5] chore(agent): emit _posthog/run_started lifecycle notification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire up the long-declared `_posthog/run_started` notification after the ACP session is fully initialized. Documented in `acp-extensions.ts` as the canonical "agent is up and accepting user messages" handshake. Persisted to the session log so warm reconnects (sandbox restart with snapshot resume) replay it. Adapter-agnostic — emitted at the server layer rather than per-adapter. Generated-By: PostHog Code Task-Id: 8228d7eb-50f0-4148-bbc3-d47617e982f7 --- packages/agent/src/server/agent-server.ts | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 5d7a2a713..4774d2c84 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -958,6 +958,29 @@ export class AgentServer { await logAgentshRuntimeInfo(this.logger); this.logger.debug(`Initial permission mode: ${initialPermissionMode}`); + // Lifecycle handshake: clients gate "agent is ready to accept user + // messages" on this notification. Persisted to the session log so + // warm reconnects (sandbox restart with snapshot resume) replay it + // and see the agent come online again. + const runStartedNotification = { + jsonrpc: "2.0" as const, + method: POSTHOG_NOTIFICATIONS.RUN_STARTED, + params: { + sessionId: acpSessionId, + runId: payload.run_id, + taskId: payload.task_id, + }, + }; + this.broadcastEvent({ + type: "notification", + timestamp: new Date().toISOString(), + notification: runStartedNotification, + }); + this.session.logWriter.appendRawLine( + payload.run_id, + JSON.stringify(runStartedNotification), + ); + // Signal in_progress so the UI can start polling for updates this.posthogAPI .updateTaskRun(payload.task_id, payload.run_id, { From c1784a6da15c60e62b31f17655df49dda5b7a97b Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Wed, 29 Apr 2026 09:27:50 +0200 Subject: [PATCH 2/5] fix(code): make cloud message-queue dispatch reliable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cloud follow-up messages typed during sandbox setup, sandbox death, or restore could be silently lost: the dispatcher's old retry-loop drained the queue up-front, held the prompt in a local var, and on retry exhaustion surfaced a generic toast without re-enqueuing. Multiple auto-flush triggers also raced with the agent's initial `prompt()` call, producing `stopReason: cancelled` on otherwise-good user messages. - Dispatcher refactored to peek-and-confirm: drain → send → re-prepend on failure. Per-taskId re-entrance guard (`dispatchingCloudQueues`) prevents two concurrent triggers from double-dispatching. - `_posthog/run_started` flips `session.status` to `"connected"` (the explicit agent-ready handshake from #1). - `_posthog/turn_complete` is the only queue-drain trigger now — it's the safe boundary, fires when the agent has actually finished a turn. - `sendCloudPrompt` queues if `status !== "connected"` — covers the initial-boot window and sandbox restart/restore window with one signal instead of the previous optimistic-pending heuristic. - Removed the cloudStatus="in_progress" and post-log `!isPromptPending` auto-flushes; both raced with `sendInitialTaskMessage`. - `dequeueMessages` now reads the frozen committed state before entering the immer draft. Drained items used to be revoked proxies that crashed `combineQueuedCloudPrompts` once the setState callback exited — the silent root cause of "queue clears, message lost". - `prependQueuedMessages` setter rolls the queue back when a dispatch fails so the next trigger retries. Generated-By: PostHog Code Task-Id: 8228d7eb-50f0-4148-bbc3-d47617e982f7 --- .../features/sessions/service/service.test.ts | 132 +++++++++- .../features/sessions/service/service.ts | 246 +++++++++++------- .../features/sessions/stores/sessionStore.ts | 39 ++- 3 files changed, 316 insertions(+), 101 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index e64e4c4a5..0e74a4c53 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -62,7 +62,16 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ removeQueuedMessage: vi.fn(), clearMessageQueue: vi.fn(), dequeueMessagesAsText: vi.fn(() => null), - dequeueMessages: vi.fn(() => []), + dequeueMessages: vi.fn( + () => + [] as Array<{ + id: string; + content: string; + rawPrompt?: unknown; + queuedAt: number; + }>, + ), + prependQueuedMessages: vi.fn(), setPendingPermissions: vi.fn(), getSessionByTaskId: vi.fn(), getSessions: vi.fn(() => ({})), @@ -881,6 +890,127 @@ describe("SessionService", () => { }); }); + it("flushes queued cloud messages on _posthog/turn_complete", async () => { + const service = getSessionService(); + // Reset auth client (a prior test may have set it to null). + mockBuildAuthenticatedClient.mockReturnValue(mockAuthenticatedClient); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); + + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-123", + method: "user_message", + params: expect.objectContaining({ content: "follow up" }), + }), + ); + }); + }); + + it("re-enqueues queued cloud messages when the dispatch fails", async () => { + const service = getSessionService(); + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + const sessionWithQueue = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + cloudStatus: "in_progress", + events: [], + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + sessionWithQueue, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": sessionWithQueue, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + mockTrpcCloudTask.sendCommand.mutate.mockRejectedValue( + new Error("transient backend failure"), + ); + + const turnCompleteEvent = { + type: "acp_message" as const, + ts: 1700000001, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([turnCompleteEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.prependQueuedMessages, + ).toHaveBeenCalledWith("task-123", [queuedMessage]); + }); + }); + it("clears isPromptPending from structured turn completion logs on hydration", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index eda53cbbb..67fc75578 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -226,6 +226,8 @@ export class SessionService { private connectingTasks = new Map>(); private localRepoPaths = new Map(); private localRecoveryAttempts = new Map>(); + /** Re-entrance guard for cloud queue dispatch (per taskId). */ + private dispatchingCloudQueues = new Set(); private nextCloudTaskWatchToken = 0; private subscriptions = new Map< string, @@ -1002,6 +1004,7 @@ export class SessionService { this.localRecoveryAttempts.clear(); this.cloudPermissionRequestIds.clear(); this.cloudLogGapReconciles.clear(); + this.dispatchingCloudQueues.clear(); this.idleKilledSubscription?.unsubscribe(); this.idleKilledSubscription = null; } @@ -1047,6 +1050,46 @@ export class SessionService { currentPromptId: null, }); } + // Lifecycle handshake from the agent — flip status to "connected" + // so the UI can release the queue-while-not-ready guard. This is + // the explicit "agent is up and accepting user messages" signal, + // emitted by `agent-server.ts` once the ACP session is fully + // wired. We deliberately do NOT drain the queue here: the agent + // is about to start `sendInitialTaskMessage` (or `sendResumeMessage`), + // and dispatching a queued user_message right now would race with + // its `clientConnection.prompt()` and one of the prompts would end + // up cancelled. The `turn_complete` handler below drains once the + // agent's initial / resume turn is actually finished. + if ( + "method" in msg && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.RUN_STARTED) + ) { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (session?.isCloud && session.status !== "connected") { + sessionStoreSetters.updateSession(taskRunId, { + status: "connected", + }); + } + } + // Canonical "turn boundary" — flush any queued cloud messages now + // that the agent is idle and accepting the next prompt. + if ( + "method" in msg && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE) + ) { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (session?.isCloud && session.messageQueue.length > 0) { + const taskId = session.taskId; + setTimeout(() => { + this.sendQueuedCloudMessages(taskId).catch((err) => + log.error("turn_complete-driven cloud queue flush failed", { + taskId, + error: err, + }), + ); + }, 0); + } + } } } @@ -1553,6 +1596,31 @@ export class SessionService { return { stopReason: "queued" }; } + // Agent-readiness guard: until we've received `_posthog/run_started` + // (which flips `session.status` to `"connected"`), the agent may + // still be booting / restoring after a sandbox restart, or mid- + // initial-prompt — sending now would race with its + // `clientConnection.prompt(initialPrompt)` on the same ACP session. + // Funnel through the queue; the run_started or turn_complete + // handlers will drain it once the agent is provably ready. + if ( + !options?.skipQueueGuard && + session.isCloud && + session.status !== "connected" + ) { + sessionStoreSetters.enqueueMessage( + session.taskId, + transport.promptText, + prompt, + ); + log.info("Cloud message queued (agent not ready)", { + taskId: session.taskId, + sessionStatus: session.status, + queueLength: session.messageQueue.length + 1, + }); + return { stopReason: "queued" }; + } + if (!options?.skipQueueGuard && session.isPromptPending) { sessionStoreSetters.enqueueMessage( session.taskId, @@ -1642,71 +1710,58 @@ export class SessionService { } } - private async sendQueuedCloudMessages( - taskId: string, - attempt = 0, - pendingPrompt?: string | ContentBlock[], - ): Promise<{ stopReason: string }> { - // First attempt: atomically dequeue. Retries reuse the already-dequeued prompt. - const combinedPrompt = - pendingPrompt ?? - combineQueuedCloudPrompts(sessionStoreSetters.dequeueMessages(taskId)); - if (!combinedPrompt) return { stopReason: "skipped" }; + /** + * Dispatches all currently queued cloud messages as a single combined + * prompt. Drains the queue up-front and rolls it back on failure so the + * next dispatch trigger (turn_complete, cloudStatus flip) can retry. A + * per-taskId re-entrance guard prevents concurrent triggers from + * double-dispatching. + * + * Pre-flight conditions match what `sendCloudPrompt` would otherwise + * silently re-queue on (sandbox not in_progress, prompt already pending). + * Skipping early lets the next trigger retry instead of re-queueing the + * already-dequeued prompt back into the same queue. + */ + private async sendQueuedCloudMessages(taskId: string): Promise { + if (this.dispatchingCloudQueues.has(taskId)) return; - const session = sessionStoreSetters.getSessionByTaskId(taskId); - if (!session) { - log.warn("No session found for queued cloud messages, message lost", { + this.dispatchingCloudQueues.add(taskId); + try { + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session?.isCloud || session.messageQueue.length === 0) return; + // Terminal cloud runs route through `resumeCloudRun`, which spins a + // new run and consumes the prompt itself — so dispatch is fine. + // Otherwise gate on the agent-ready handshake (`run_started` flips + // status to "connected") to avoid racing with `sendInitialTaskMessage`. + const isTerminal = isTerminalStatus(session.cloudStatus); + const canSendNow = + isTerminal || + (session.cloudStatus === "in_progress" && + session.status === "connected"); + if (!canSendNow || session.isPromptPending) return; + + const drained = sessionStoreSetters.dequeueMessages(taskId); + const combined = combineQueuedCloudPrompts(drained); + if (!combined) return; + + log.info("Sending queued cloud messages", { taskId, + drainedCount: drained.length, }); - return { stopReason: "no_session" }; - } - - log.info("Sending queued cloud messages", { - taskId, - promptLength: combinedPrompt.length, - attempt, - }); - try { - return await this.sendCloudPrompt(session, combinedPrompt, { - skipQueueGuard: true, - }); - } catch (error) { - const maxRetries = 5; - if (attempt < maxRetries) { - const delayMs = Math.min(1000 * 2 ** attempt, 10_000); - log.warn("Cloud message send failed, scheduling retry", { - taskId, - attempt, - delayMs, - error: String(error), + try { + await this.sendCloudPrompt(session, combined, { + skipQueueGuard: true, }); - return new Promise((resolve) => { - setTimeout(() => { - resolve( - this.sendQueuedCloudMessages( - taskId, - attempt + 1, - combinedPrompt, - ).catch((err) => { - log.error("Queued cloud message retry failed", { - taskId, - attempt: attempt + 1, - error: err, - }); - return { stopReason: "error" }; - }), - ); - }, delayMs); + } catch (err) { + log.warn("Cloud queue dispatch failed; re-enqueueing", { + taskId, + error: String(err), }); + sessionStoreSetters.prependQueuedMessages(taskId, drained); } - - log.error("Queued cloud message send failed after max retries", { - taskId, - attempts: attempt + 1, - }); - toast.error("Failed to send follow-up message. Please try again."); - return { stopReason: "error" }; + } finally { + this.dispatchingCloudQueues.delete(taskId); } } @@ -2421,6 +2476,7 @@ export class SessionService { initialMode?: string, adapter: Adapter = "claude", initialModel?: string, + taskDescription?: string, ): () => void { const taskRunId = runId; const existingWatcher = this.cloudTaskWatchers.get(taskId); @@ -2497,6 +2553,10 @@ export class SessionService { adapter, ); sessionStoreSetters.setSession(session); + // Optimistic seeding for the initial task description is deferred + // until `hydrateCloudTaskSessionFromLogs` confirms there's no prior + // conversation. Otherwise reopening a task with history would flash + // the description at top until hydration replaced it. } else { // Ensure cloud flag and configOptions are set on existing sessions const updates: Partial = {}; @@ -2527,7 +2587,12 @@ export class SessionService { ); if (shouldHydrateSession) { - this.hydrateCloudTaskSessionFromLogs(taskId, taskRunId, logUrl); + this.hydrateCloudTaskSessionFromLogs( + taskId, + taskRunId, + logUrl, + taskDescription, + ); } // Subscribe before starting the main-process watcher so the first replayed @@ -2595,18 +2660,30 @@ export class SessionService { taskId: string, taskRunId: string, logUrl?: string, + taskDescription?: string, ): void { void (async () => { const { rawEntries } = await this.fetchSessionLogs(logUrl, taskRunId); - if (rawEntries.length === 0) { - return; - } const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session || session.taskRunId !== taskRunId) { return; } + // No persisted history yet → brand-new task. Seed the optimistic + // user-message bubble with the task description so it anchors the + // top of the conversation while the agent boots. + if (rawEntries.length === 0) { + if (taskDescription?.trim()) { + sessionStoreSetters.appendOptimisticItem(taskRunId, { + type: "user_message", + content: taskDescription, + timestamp: Date.now(), + }); + } + return; + } + // If live updates already populated a processed count, don't overwrite // that newer state with the persisted baseline fetched during startup. if ( @@ -2627,6 +2704,9 @@ export class SessionService { // baseline already contains an in-flight session/prompt — the live delta // path otherwise sees delta <= 0 and never re-evaluates the tail. this.updatePromptStateFromEvents(taskRunId, events); + + // History present → seed nothing. The real first user prompt is + // already in `events` and renders chronologically. })().catch((err: unknown) => { log.warn("Failed to hydrate cloud task session from logs", { taskId, @@ -3002,20 +3082,13 @@ export class SessionService { } } - // Flush queued messages when a cloud turn completes (detected via live log updates) - const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; - if ( - sessionAfterLogs && - !sessionAfterLogs.isPromptPending && - sessionAfterLogs.messageQueue.length > 0 - ) { - this.sendQueuedCloudMessages(sessionAfterLogs.taskId).catch((err) => { - log.error("Failed to send queued cloud messages after turn complete", { - taskId: sessionAfterLogs.taskId, - error: err, - }); - }); - } + // NOTE: Don't auto-flush on `!isPromptPending && queue.length > 0` here. + // Setup-phase log batches (`_posthog/progress`, `_posthog/console`) stream + // in BEFORE the agent emits its initial `session/prompt` request, so + // `isPromptPending` is still false during those batches — firing the + // dispatcher then races with the agent's initial `clientConnection.prompt`. + // The canonical "agent is idle" signal is `_posthog/turn_complete`, which + // is handled in `updatePromptStateFromEvents`. // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { @@ -3027,23 +3100,10 @@ export class SessionService { branch: update.branch, }); - // Auto-send queued messages when a resumed run becomes active - if (update.status === "in_progress") { - const session = sessionStoreSetters.getSessions()[taskRunId]; - if (session && session.messageQueue.length > 0) { - // Clear the pending flag first — resumeCloudRun sets it as a guard - // while waiting for the run to start. Now that the run is active, - // sendCloudPrompt needs the flag clear to actually send. - sessionStoreSetters.updateSession(taskRunId, { - isPromptPending: false, - }); - this.sendQueuedCloudMessages(session.taskId).catch(() => { - // Retries exhausted — message was re-enqueued by - // sendQueuedCloudMessages, future stream-based completion detection - // will keep trying - }); - } - } + // No cloudStatus="in_progress" auto-flush here. `run_started` from + // the agent-server is the canonical "agent is ready" trigger and + // handles both initial boot and post-restart recovery; firing + // earlier would race with `sendInitialTaskMessage`. if (isTerminalStatus(update.status)) { // Clean up any pending resume messages that couldn't be sent diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 08b7c7f11..c4079c91b 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -376,18 +376,43 @@ export const sessionStoreSetters = { }, dequeueMessages: (taskId: string): QueuedMessage[] => { - let queuedMessages: QueuedMessage[] = []; + // Read the queue from the frozen committed state BEFORE entering the + // immer draft, otherwise the items returned are proxies that get + // revoked when setState exits and any later access throws + // "Cannot perform 'get' on a proxy that has been revoked". + const state = useSessionStore.getState(); + const taskRunId = state.taskIdIndex[taskId]; + if (!taskRunId) return []; + const session = state.sessions[taskRunId]; + if (!session || session.messageQueue.length === 0) return []; + + const queuedMessages = [...session.messageQueue]; + + useSessionStore.setState((draft) => { + const trid = draft.taskIdIndex[taskId]; + if (!trid) return; + const draftSession = draft.sessions[trid]; + if (draftSession) { + draftSession.messageQueue = []; + } + }); + + return queuedMessages; + }, + + /** + * Splice messages back at the head of the queue. Used to roll back a + * dispatch attempt that drained the queue but failed before delivery. + */ + prependQueuedMessages: (taskId: string, messages: QueuedMessage[]) => { + if (messages.length === 0) return; useSessionStore.setState((state) => { const taskRunId = state.taskIdIndex[taskId]; if (!taskRunId) return; - const session = state.sessions[taskRunId]; - if (!session || session.messageQueue.length === 0) return; - - queuedMessages = [...session.messageQueue]; - session.messageQueue = []; + if (!session) return; + session.messageQueue = [...messages, ...session.messageQueue]; }); - return queuedMessages; }, appendOptimisticItem: ( From d3c4ad4f62743da609e1ef8394fb498d3369ffab Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Wed, 29 Apr 2026 09:28:55 +0200 Subject: [PATCH 3/5] feat(code): pin user prompt at top during cloud sandbox setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Render the optimistic user-message bubble immediately on cloud task creation instead of waiting for the agent to echo it back via SSE (which can be many seconds while the sandbox provisions, clones, and boots). The optimistic seed itself is plumbed in #2 — this commit makes it visible in the right place. - ConversationView pins optimistic items above conversationItems for cloud sessions and content-dedups the agent's eventual echo so the bubble doesn't disappear-then-reappear when the real `session/prompt` event lands. Local sessions are unchanged: optimistic stays at chronological end and `replaceOptimisticWithEvent` swaps in place. - `useSessionConnection` plumbs `task.description` into `watchCloudTask` so the seed has content. Generated-By: PostHog Code Task-Id: 8228d7eb-50f0-4148-bbc3-d47617e982f7 --- .../sessions/components/ConversationView.tsx | 37 ++++++++++++++++++- .../sessions/hooks/useSessionConnection.ts | 2 + 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 04c4cd24d..7dd5620bf 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -102,13 +102,46 @@ export function ConversationView({ [queuedMessages], ); + const isCloud = session?.isCloud ?? false; + const items = useMemo(() => { + // Cloud's initial optimistic is pinned to the top so the user's + // prompt stays visible above setup progress / agent activity. When + // the agent echoes it back via `session/prompt`, the duplicate + // `user_message` is filtered here by content match so the bubble + // doesn't disappear-then-reappear when the echo lands. + // + // Local sessions keep optimistic at chronological end — they rely on + // `replaceOptimisticWithEvent` to swap optimistic↔real in place. + if (!isCloud) { + const result: ConversationItem[] = [ + ...conversationItems, + ...optimisticItems, + ]; + return queuedItems.length > 0 ? [...result, ...queuedItems] : result; + } + + const optimisticUserContents = new Set( + optimisticItems + .filter( + (item): item is Extract => + item.type === "user_message", + ) + .map((item) => item.content), + ); + const dedupedConversation = + optimisticUserContents.size === 0 + ? conversationItems + : conversationItems.filter((item) => { + if (item.type !== "user_message") return true; + return !optimisticUserContents.has(item.content); + }); const result: ConversationItem[] = [ - ...conversationItems, ...optimisticItems, + ...dedupedConversation, ]; return queuedItems.length > 0 ? [...result, ...queuedItems] : result; - }, [conversationItems, optimisticItems, queuedItems]); + }, [conversationItems, optimisticItems, queuedItems, isCloud]); // Keep MCP App tool call items mounted so their iframes and bridges // survive scrolling out of the virtualized viewport. diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index cc4ff9e71..7de8a5f2a 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -91,6 +91,7 @@ export function useSessionConnection({ initialMode, adapter, initialModel, + task.description ?? undefined, ); return cleanup; }, [ @@ -106,6 +107,7 @@ export function useSessionConnection({ task.latest_run?.model, task.latest_run?.runtime_adapter, task.latest_run?.state?.initial_permission_mode, + task.description, ]); useEffect(() => { From d47081cf3e535656cc8eae13f59be83c4269b4d6 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Wed, 29 Apr 2026 09:50:33 +0200 Subject: [PATCH 4/5] test: cover cloud queue dispatch, run_started, and optimistic merge Extracts the cloud-vs-local item merge logic from ConversationView into a pure helper so it can be unit-tested without rendering. Adds coverage for the run_started lifecycle notification, queue dispatch reliability (immer-proxy revocation, rollback on failure, status-gated send), and hydrate-time optimistic seeding. Generated-By: PostHog Code Task-Id: 8228d7eb-50f0-4148-bbc3-d47617e982f7 --- .../sessions/components/ConversationView.tsx | 49 +-- .../components/mergeConversationItems.test.ts | 102 +++++++ .../components/mergeConversationItems.ts | 53 ++++ .../features/sessions/service/service.test.ts | 284 ++++++++++++++++-- .../sessions/stores/sessionStore.test.ts | 97 +++++- .../agent/src/server/agent-server.test.ts | 29 ++ 6 files changed, 550 insertions(+), 64 deletions(-) create mode 100644 apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts create mode 100644 apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 7dd5620bf..1a1e79a3f 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -20,6 +20,7 @@ import { } from "./buildConversationItems"; import { GitActionMessage } from "./GitActionMessage"; import { GitActionResult } from "./GitActionResult"; +import { mergeConversationItems } from "./mergeConversationItems"; import { SessionFooter } from "./SessionFooter"; import { QueuedMessageView } from "./session-update/QueuedMessageView"; import { @@ -104,44 +105,16 @@ export function ConversationView({ const isCloud = session?.isCloud ?? false; - const items = useMemo(() => { - // Cloud's initial optimistic is pinned to the top so the user's - // prompt stays visible above setup progress / agent activity. When - // the agent echoes it back via `session/prompt`, the duplicate - // `user_message` is filtered here by content match so the bubble - // doesn't disappear-then-reappear when the echo lands. - // - // Local sessions keep optimistic at chronological end — they rely on - // `replaceOptimisticWithEvent` to swap optimistic↔real in place. - if (!isCloud) { - const result: ConversationItem[] = [ - ...conversationItems, - ...optimisticItems, - ]; - return queuedItems.length > 0 ? [...result, ...queuedItems] : result; - } - - const optimisticUserContents = new Set( - optimisticItems - .filter( - (item): item is Extract => - item.type === "user_message", - ) - .map((item) => item.content), - ); - const dedupedConversation = - optimisticUserContents.size === 0 - ? conversationItems - : conversationItems.filter((item) => { - if (item.type !== "user_message") return true; - return !optimisticUserContents.has(item.content); - }); - const result: ConversationItem[] = [ - ...optimisticItems, - ...dedupedConversation, - ]; - return queuedItems.length > 0 ? [...result, ...queuedItems] : result; - }, [conversationItems, optimisticItems, queuedItems, isCloud]); + const items = useMemo( + () => + mergeConversationItems({ + conversationItems, + optimisticItems, + queuedItems, + isCloud, + }), + [conversationItems, optimisticItems, queuedItems, isCloud], + ); // Keep MCP App tool call items mounted so their iframes and bridges // survive scrolling out of the virtualized viewport. diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts new file mode 100644 index 000000000..a8434c718 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.test.ts @@ -0,0 +1,102 @@ +import type { QueuedMessage } from "@features/sessions/stores/sessionStore"; +import { describe, expect, it } from "vitest"; +import type { ConversationItem } from "./buildConversationItems"; +import { mergeConversationItems } from "./mergeConversationItems"; + +function userMessage( + id: string, + content: string, +): Extract { + return { type: "user_message", id, content, timestamp: 0 }; +} + +function queuedItem( + id: string, + content: string, +): Extract { + const message: QueuedMessage = { + id, + content, + rawPrompt: [{ type: "text", text: content }], + queuedAt: 0, + }; + return { type: "queued", id, message }; +} + +describe("mergeConversationItems", () => { + it("local: appends optimistic at the chronological end", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("a", "first")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["a", "opt"]); + }); + + it("local: queued items always come last", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("a", "first")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [queuedItem("q1", "later")], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["a", "opt", "q1"]); + }); + + it("local: does NOT dedupe — duplicate echoes are intentionally retained", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("echo", "hello")], + optimisticItems: [userMessage("opt", "hello")], + queuedItems: [], + isCloud: false, + }); + expect(result.map((i) => i.id)).toEqual(["echo", "opt"]); + }); + + it("cloud: pins optimistic at the top", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("setup", "setup info")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "setup"]); + }); + + it("cloud: filters echoed user_message that matches optimistic content", () => { + const result = mergeConversationItems({ + conversationItems: [ + userMessage("echo", "hello"), + userMessage("other", "different"), + ], + optimisticItems: [userMessage("opt", "hello")], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "other"]); + }); + + it("cloud: dedupe is no-op when there are no optimistic items", () => { + const result = mergeConversationItems({ + conversationItems: [ + userMessage("a", "first"), + userMessage("b", "second"), + ], + optimisticItems: [], + queuedItems: [], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["a", "b"]); + }); + + it("cloud: queued items always come last", () => { + const result = mergeConversationItems({ + conversationItems: [userMessage("setup", "setup")], + optimisticItems: [userMessage("opt", "draft")], + queuedItems: [queuedItem("q1", "later")], + isCloud: true, + }); + expect(result.map((i) => i.id)).toEqual(["opt", "setup", "q1"]); + }); +}); diff --git a/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts new file mode 100644 index 000000000..8aa755cb1 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/components/mergeConversationItems.ts @@ -0,0 +1,53 @@ +import type { ConversationItem } from "./buildConversationItems"; + +type QueuedItem = Extract; + +interface MergeConversationItemsArgs { + conversationItems: ConversationItem[]; + optimisticItems: ConversationItem[]; + queuedItems: QueuedItem[]; + isCloud: boolean; +} + +// Cloud's initial optimistic is pinned to the top so the user's prompt stays +// visible above setup progress. When the agent echoes it back via +// `session/prompt`, the duplicate `user_message` is filtered out by content +// match so the bubble doesn't disappear-then-reappear when the echo lands. +// +// Local sessions keep optimistic at the chronological end — they rely on +// `replaceOptimisticWithEvent` to swap optimistic↔real in place. +export function mergeConversationItems({ + conversationItems, + optimisticItems, + queuedItems, + isCloud, +}: MergeConversationItemsArgs): ConversationItem[] { + if (!isCloud) { + const result: ConversationItem[] = [ + ...conversationItems, + ...optimisticItems, + ]; + return queuedItems.length > 0 ? [...result, ...queuedItems] : result; + } + + const optimisticUserContents = new Set( + optimisticItems + .filter( + (item): item is Extract => + item.type === "user_message", + ) + .map((item) => item.content), + ); + const dedupedConversation = + optimisticUserContents.size === 0 + ? conversationItems + : conversationItems.filter((item) => { + if (item.type !== "user_message") return true; + return !optimisticUserContents.has(item.content); + }); + const result: ConversationItem[] = [ + ...optimisticItems, + ...dedupedConversation, + ]; + return queuedItems.length > 0 ? [...result, ...queuedItems] : result; +} diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 0e74a4c53..bec4edfd9 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1371,50 +1371,263 @@ describe("SessionService", () => { 8, ); }); - - it("ignores stale async starts when the same watcher is replaced", async () => { + it("flips status to connected on _posthog/run_started", async () => { const service = getSessionService(); - let resolveFirstWatchStart!: () => void; - let resolveSecondWatchStart!: () => void; + const hydratedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + hydratedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": hydratedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); - mockTrpcCloudTask.watch.mutate - .mockImplementationOnce( - () => - new Promise((resolve) => { - resolveFirstWatchStart = resolve; - }), - ) - .mockImplementationOnce( - () => - new Promise((resolve) => { - resolveSecondWatchStart = resolve; - }), + const runStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([runStartedEvent]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + { status: "connected" }, ); + }); + }); + + it("does not re-flip status when run_started arrives but session is already connected", async () => { + const service = getSessionService(); + const connectedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + connectedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": connectedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue("{}"); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + const runStartedEvent = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([runStartedEvent]); service.watchCloudTask( "task-123", "run-123", "https://api.anthropic.com", 123, + undefined, + "https://logs.example.com/run-123", ); - service.stopCloudTaskWatch("task-123"); + + // Wait long enough for the hydration callback to run; assert the + // store was never told to set status: "connected" again. + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + { status: "connected" }, + ); + }); + + it("seeds an optimistic user-message when hydrating a brand-new task with no prior history", async () => { + const service = getSessionService(); + const freshSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(freshSession); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": freshSession, + }); + // Empty history — fetchSessionLogs returns no entries. + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(""); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + service.watchCloudTask( "task-123", "run-123", "https://api.anthropic.com", 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "build me a thing", ); - resolveSecondWatchStart(); - await Promise.resolve(); - await Promise.resolve(); + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "build me a thing", + }), + ); + }); + }); - resolveFirstWatchStart(); - await Promise.resolve(); - await Promise.resolve(); + it("does NOT seed an optimistic user-message when hydration finds prior history", async () => { + const service = getSessionService(); + const reopenedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + reopenedSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": reopenedSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + // Non-empty history: a prior session/prompt exists. + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + JSON.stringify({ + type: "request", + timestamp: "2024-01-01T00:00:00Z", + request: { + jsonrpc: "2.0", + id: 1, + method: "session/prompt", + params: { prompt: [{ type: "text", text: "hello there" }] }, + }, + }), + ); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + const priorPrompt = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + id: 1, + method: "session/prompt", + params: { prompt: [{ type: "text", text: "hello there" }] }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([priorPrompt]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "hello there", + ); - expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledTimes(2); + // Wait for hydration to run. + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ events: [priorPrompt] }), + ); + }); + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).not.toHaveBeenCalled(); }); + it("ignores stale async starts when the same watcher is replaced", async () => { + const service = getSessionService(); + let resolveFirstWatchStart!: () => void; + let resolveSecondWatchStart!: () => void; + + mockTrpcCloudTask.watch.mutate + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFirstWatchStart = resolve; + }), + ) + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSecondWatchStart = resolve; + }), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + service.stopCloudTaskWatch("task-123"); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + resolveSecondWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + resolveFirstWatchStart(); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockTrpcCloudTask.watch.mutate).toHaveBeenCalledTimes(2); + }); it("sends a compensating unwatch if teardown wins the race after watch starts", async () => { const service = getSessionService(); @@ -1707,6 +1920,29 @@ describe("SessionService", () => { ); }); + it("queues cloud prompt when session.status is not connected (agent not ready)", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + status: "disconnected", + isPromptPending: false, + }), + ); + + const prompt: ContentBlock[] = [{ type: "text", text: "wake me up" }]; + const result = await service.sendPrompt("task-123", prompt); + + expect(result.stopReason).toBe("queued"); + expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( + "task-123", + "wake me up", + prompt, + ); + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + }); + it("preserves cloud attachment prompts when queueing a follow-up", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts index 96aea7c42..8839274f2 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.test.ts @@ -1,6 +1,10 @@ import type { SessionConfigOption } from "@agentclientprotocol/sdk"; -import { describe, expect, it } from "vitest"; -import { cycleModeOption } from "./sessionStore"; +import { beforeEach, describe, expect, it } from "vitest"; +import { + cycleModeOption, + sessionStoreSetters, + useSessionStore, +} from "./sessionStore"; function createModeOption( currentValue: string, @@ -65,3 +69,92 @@ describe("cycleModeOption", () => { expect(cycleModeOption(option, { allowBypassPermissions })).toBe(expected); }); }); + +describe("dequeueMessages", () => { + beforeEach(() => { + useSessionStore.setState((state) => { + state.sessions = {}; + state.taskIdIndex = {}; + }); + }); + + it("returns plain objects that survive after the immer setState exits", () => { + sessionStoreSetters.setSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Test", + channel: "agent-event:run-123", + events: [], + startedAt: 0, + status: "connected", + isPromptPending: false, + isCompacting: false, + promptStartedAt: null, + pendingPermissions: new Map(), + pausedDurationMs: 0, + messageQueue: [], + optimisticItems: [], + }); + sessionStoreSetters.enqueueMessage("task-123", "first", [ + { type: "text", text: "first" }, + ]); + sessionStoreSetters.enqueueMessage("task-123", "second", [ + { type: "text", text: "second" }, + ]); + + const drained = sessionStoreSetters.dequeueMessages("task-123"); + + // Reading members of drained items must NOT throw "Cannot perform 'get' + // on a proxy that has been revoked" — the silent root cause behind the + // cloud-queue dispatcher losing messages. Items returned must be plain + // objects, not immer drafts that get revoked when setState exits. + expect(() => drained.map((m) => m.content)).not.toThrow(); + expect(drained.map((m) => m.content)).toEqual(["first", "second"]); + expect(useSessionStore.getState().sessions["run-123"].messageQueue).toEqual( + [], + ); + }); +}); + +describe("prependQueuedMessages", () => { + beforeEach(() => { + useSessionStore.setState((state) => { + state.sessions = {}; + state.taskIdIndex = {}; + }); + }); + + it("splices messages back at the head of the queue", () => { + sessionStoreSetters.setSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Test", + channel: "agent-event:run-123", + events: [], + startedAt: 0, + status: "connected", + isPromptPending: false, + isCompacting: false, + promptStartedAt: null, + pendingPermissions: new Map(), + pausedDurationMs: 0, + messageQueue: [], + optimisticItems: [], + }); + sessionStoreSetters.enqueueMessage("task-123", "live", [ + { type: "text", text: "live" }, + ]); + + sessionStoreSetters.prependQueuedMessages("task-123", [ + { + id: "rolled-back", + content: "rolled-back", + rawPrompt: [{ type: "text", text: "rolled-back" }], + queuedAt: 0, + }, + ]); + + const queue = useSessionStore.getState().sessions["run-123"].messageQueue; + expect(queue.map((m) => m.content)).toEqual(["rolled-back", "live"]); + }); +}); diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index f61aa293d..f18e4ebb7 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -382,6 +382,35 @@ describe("AgentServer HTTP Mode", () => { }, 20000); }); + describe("session lifecycle", () => { + it("emits _posthog/run_started after session initialization", async () => { + await createServer().start(); + + // The notification is persisted via `logWriter.appendRawLine` which the + // mock backend's append_log handler captures into `appendLogCalls`. + await vi.waitFor( + () => { + const allEntries = appendLogCalls.flat() as Array<{ + type?: string; + notification?: { + method?: string; + params?: Record; + }; + }>; + const runStarted = allEntries.find( + (e) => e?.notification?.method === "_posthog/run_started", + ); + expect(runStarted).toBeDefined(); + expect(runStarted?.notification?.params).toMatchObject({ + runId: "test-run-id", + taskId: "test-task-id", + }); + }, + { timeout: 15000, interval: 100 }, + ); + }, 30000); + }); + describe("getInitialPromptOverride", () => { it("returns override string from run state", () => { const s = createServer(); From d7949695056555d7331b9b9b9a7496d91c3b603c Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Wed, 29 Apr 2026 10:41:16 +0200 Subject: [PATCH 5/5] fix(code): seed cloud optimistic when only lifecycle entries exist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hydrate-time seed only fired when persisted log entries were literally empty. By the time the hydrate fetch resolves, the agent has usually already emitted `_posthog/run_started` and setup-progress notifications, so a brand-new task with no user prompt yet would skip seeding and the task description would no longer be pinned at the top. Switch the gate to "no `session/prompt` request in events" — covers both the empty-log case and the lifecycle-notifications-only case. Generated-By: PostHog Code Task-Id: 8228d7eb-50f0-4148-bbc3-d47617e982f7 --- .../features/sessions/service/service.test.ts | 71 +++++++++++++++++++ .../features/sessions/service/service.ts | 32 +++++---- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index bec4edfd9..a76eefdd9 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1518,6 +1518,77 @@ describe("SessionService", () => { }); }); + it("seeds an optimistic user-message when persisted entries exist but no session/prompt yet (agent emitted lifecycle notifications first)", async () => { + const service = getSessionService(); + const freshSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "disconnected", + isCloud: true, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(freshSession); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": freshSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + JSON.stringify({ + jsonrpc: "2.0", + method: "_posthog/run_started", + params: { + sessionId: "acp-session-1", + runId: "run-123", + taskId: "task-123", + }, + }), + ); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + // Lifecycle notification only — no session/prompt request yet. + const lifecycleNotification = { + type: "acp_message" as const, + ts: 1700000000, + message: { + jsonrpc: "2.0" as const, + method: "_posthog/run_started", + params: { + sessionId: "acp-session-1", + runId: "run-123", + taskId: "task-123", + }, + }, + }; + mockConvertStoredEntriesToEvents.mockReturnValueOnce([ + lifecycleNotification, + ]); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + undefined, + "claude", + undefined, + "build me a thing", + ); + + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "build me a thing", + }), + ); + }); + }); + it("does NOT seed an optimistic user-message when hydration finds prior history", async () => { const service = getSessionService(); const reopenedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 67fc75578..a9645edfa 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2670,17 +2670,25 @@ export class SessionService { return; } - // No persisted history yet → brand-new task. Seed the optimistic - // user-message bubble with the task description so it anchors the - // top of the conversation while the agent boots. + const events = convertStoredEntriesToEvents(rawEntries); + const hasUserPrompt = events.some( + (e) => + isJsonRpcRequest(e.message) && e.message.method === "session/prompt", + ); + + // Seed the optimistic user-message bubble whenever the agent has + // not yet recorded an initial `session/prompt` request — covers the + // brand-new task case as well as "agent has emitted lifecycle + // notifications but hasn't received its first prompt yet". + if (!hasUserPrompt && taskDescription?.trim()) { + sessionStoreSetters.appendOptimisticItem(taskRunId, { + type: "user_message", + content: taskDescription, + timestamp: Date.now(), + }); + } + if (rawEntries.length === 0) { - if (taskDescription?.trim()) { - sessionStoreSetters.appendOptimisticItem(taskRunId, { - type: "user_message", - content: taskDescription, - timestamp: Date.now(), - }); - } return; } @@ -2693,7 +2701,6 @@ export class SessionService { return; } - const events = convertStoredEntriesToEvents(rawEntries); sessionStoreSetters.updateSession(taskRunId, { events, isCloud: true, @@ -2704,9 +2711,6 @@ export class SessionService { // baseline already contains an in-flight session/prompt — the live delta // path otherwise sees delta <= 0 and never re-evaluates the tail. this.updatePromptStateFromEvents(taskRunId, events); - - // History present → seed nothing. The real first user prompt is - // already in `events` and renders chronologically. })().catch((err: unknown) => { log.warn("Failed to hydrate cloud task session from logs", { taskId,