diff --git a/packages/core/src/sessions/pendingPrompt.ts b/packages/core/src/sessions/pendingPrompt.ts new file mode 100644 index 000000000..286bd6f11 --- /dev/null +++ b/packages/core/src/sessions/pendingPrompt.ts @@ -0,0 +1,49 @@ +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import type { Adapter, ExecutionMode } from "@posthog/shared"; + +/** + * A durable, write-ahead record of a prompt the user is trying to start a + * local task run with. + * + * The prompt is the one thing in the start-a-task flow that the user cannot + * cheaply reproduce, yet today it only exists in memory until a session has + * fully initialized and `session/prompt` has been delivered. If session + * initialization throws or times out (common in large monorepos, where init + * can exceed the 30s `SESSION_VALIDATION_TIMEOUT_MS` budget), or the app is + * reloaded/quit/crashes during the retry window, the prompt is lost. + * + * To make that loss very unlikely we persist this record BEFORE any + * agent/session setup is attempted, and only clear it once the prompt has + * actually been delivered to the agent. (Persistence is async and + * best-effort, so it is not an absolute guarantee — a crash in the first + * moments of a cold start can still race the write.) A persisted record + * therefore means "this prompt is owed delivery and has not been delivered + * yet" — the basis for recovering it on the next connect, whether that connect + * starts a fresh run or resumes the stranded one. + */ +export interface PendingPromptRecord { + taskId: string; + taskTitle: string; + repoPath: string; + initialPrompt: ContentBlock[]; + executionMode?: ExecutionMode; + adapter?: Adapter; + model?: string; + reasoningLevel?: string; + /** Epoch ms when the prompt was first written ahead. */ + createdAt: number; +} + +/** + * Durable storage for {@link PendingPromptRecord}s, keyed by `taskId` (one + * in-flight prompt per task — retries reuse the same key). Implementations + * must survive an app restart. + */ +export interface PendingPromptStore { + /** Write-ahead (or overwrite) the pending prompt for a task. */ + save(record: PendingPromptRecord): void; + /** Get the pending prompt for a task, if one is owed delivery. */ + get(taskId: string): PendingPromptRecord | undefined; + /** Clear the pending prompt for a task once delivered or abandoned. */ + remove(taskId: string): void; +} diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 0eb24e96d..ab84985df 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -57,6 +57,7 @@ import { OFFLINE_SESSION_MESSAGE, routeLocalConnect, } from "./connectRouting"; +import type { PendingPromptStore } from "./pendingPrompt"; import { type PermissionSelectionPlan, planPermissionResponse, @@ -252,6 +253,11 @@ export interface SessionServiceDeps { setAdapter(taskRunId: string, adapter: Adapter): void; removeAdapter(taskRunId: string): void; }; + /** + * Durable, write-ahead storage for the prompt a local task run is starting + * with, so session-init timeouts or app restarts can never lose it. + */ + pendingPrompts: PendingPromptStore; readonly settings: { customInstructions?: string | null }; usageLimit: { show: (...args: any[]) => any }; readonly addDirectoryDialog: { open: boolean }; @@ -822,6 +828,21 @@ export class SessionService { ), ); } + + // A run stranded before its prompt was ever delivered (e.g. the + // original session-init timed out) routes here on the next connect + // rather than to createNewLocalSession, since it already has a run id + // and log. Its prompt is still owed: if the resumed log carries no + // prompt yet but we have a write-ahead record, deliver it now. + const owed = this.d.pendingPrompts.get(taskId); + if (owed?.initialPrompt.length && !hasSessionPromptEvent(events)) { + this.localRepoPaths.set(taskId, repoPath); + this.d.log.info("Recovered a written-ahead prompt on resume", { + taskId, + ageMs: Date.now() - owed.createdAt, + }); + await this.deliverWrittenAheadPrompt(taskId, owed.initialPrompt); + } return true; } else { this.d.log.warn("Reconnect returned null", { taskId, taskRunId }); @@ -1060,6 +1081,52 @@ export class SessionService { model?: string, reasoningLevel?: string, ): Promise { + // Recover a previously written-ahead prompt when this connect didn't + // carry one itself: a persisted record means an earlier attempt for this + // task saved a prompt and never delivered it (init timed out, or the app + // was restarted before delivery). Adopt its prompt and the settings the + // user originally chose, unless the caller overrode them. + const recovered = initialPrompt?.length + ? undefined + : this.d.pendingPrompts.get(taskId); + const effectivePrompt = initialPrompt?.length + ? initialPrompt + : recovered?.initialPrompt; + const effectiveExecutionMode = executionMode ?? recovered?.executionMode; + const effectiveAdapter = adapter ?? recovered?.adapter; + const effectiveModel = model ?? recovered?.model; + const effectiveReasoningLevel = reasoningLevel ?? recovered?.reasoningLevel; + + if (recovered) { + // Surface when the safety net actually catches something, so we can tell + // it's working (and gauge how long prompts sit undelivered). + this.d.log.info("Recovered a written-ahead prompt for a task", { + taskId, + ageMs: Date.now() - recovered.createdAt, + }); + } + + // Write-ahead the prompt BEFORE any session/agent work. This is the step + // that makes losing it very unlikely: everything below (run creation, the + // 30s-bounded agent.start, prompt delivery) can throw or be killed by an + // app restart, and the prompt still survives to be recovered on the next + // connect. Cleared only once it has actually been delivered. (Persistence + // is async and best-effort, so a crash in the first moments of a cold + // start can still race it — hence "very unlikely", not "guaranteed".) + if (effectivePrompt?.length) { + this.d.pendingPrompts.save({ + taskId, + taskTitle, + repoPath, + initialPrompt: effectivePrompt, + executionMode: effectiveExecutionMode, + adapter: effectiveAdapter, + model: effectiveModel, + reasoningLevel: effectiveReasoningLevel, + createdAt: recovered?.createdAt ?? Date.now(), + }); + } + const { client } = auth; if (!client) { throw new Error("Unable to reach server. Please check your connection."); @@ -1071,18 +1138,18 @@ export class SessionService { } const { customInstructions: startCustomInstructions } = this.d.settings; - const preferredModel = model ?? this.d.DEFAULT_GATEWAY_MODEL; + const preferredModel = effectiveModel ?? this.d.DEFAULT_GATEWAY_MODEL; const result = await this.d.trpc.agent.start.mutate({ taskId, taskRunId: taskRun.id, repoPath, apiHost: auth.apiHost, projectId: auth.projectId, - permissionMode: executionMode, - adapter, + permissionMode: effectiveExecutionMode, + adapter: effectiveAdapter, customInstructions: startCustomInstructions || undefined, - effort: effortLevelSchema.safeParse(reasoningLevel).success - ? (reasoningLevel as EffortLevel) + effort: effortLevelSchema.safeParse(effectiveReasoningLevel).success + ? (effectiveReasoningLevel as EffortLevel) : undefined, model: preferredModel, }); @@ -1090,7 +1157,7 @@ export class SessionService { const session = createBaseSession(taskRun.id, taskId, taskTitle); session.channel = result.channel; session.status = "connected"; - session.adapter = adapter; + session.adapter = effectiveAdapter; const configOptions = result.configOptions as | SessionConfigOption[] | undefined; @@ -1102,15 +1169,15 @@ export class SessionService { } // Persist the adapter - if (adapter) { - this.d.adapterStore.setAdapter(taskRun.id, adapter); + if (effectiveAdapter) { + this.d.adapterStore.setAdapter(taskRun.id, effectiveAdapter); } // Store the initial prompt on the session so retry/reset flows can // re-send it if the session errors after this point (e.g. subscription // error, agent crash, or prompt failure). - if (initialPrompt?.length) { - session.initialPrompt = initialPrompt; + if (effectivePrompt?.length) { + session.initialPrompt = effectivePrompt; } this.d.store.setSession(session); @@ -1119,12 +1186,30 @@ export class SessionService { this.d.track(ANALYTICS_EVENTS.TASK_RUN_STARTED, { task_id: taskId, execution_type: "local", - initial_mode: executionMode, - adapter, + initial_mode: effectiveExecutionMode, + adapter: effectiveAdapter, }); - if (initialPrompt?.length) { - await this.sendPrompt(taskId, initialPrompt); + if (effectivePrompt?.length) { + await this.deliverWrittenAheadPrompt(taskId, effectivePrompt); + } + } + + /** + * Deliver a written-ahead prompt and clear its durable record only on + * confirmed delivery. `sendPrompt` can resolve WITHOUT delivering — "queued" + * (session busy) or "rate_limited" (usage limit swallowed by + * `sendLocalPrompt`) — and in those cases the write-ahead copy must survive + * so the prompt isn't lost. On real delivery the prompt now lives in the + * run's session log, so clearing the only-other-copy is safe. + */ + private async deliverWrittenAheadPrompt( + taskId: string, + prompt: ContentBlock[], + ): Promise { + const { stopReason } = await this.sendPrompt(taskId, prompt); + if (stopReason !== "queued" && stopReason !== "rate_limited") { + this.d.pendingPrompts.remove(taskId); } } diff --git a/packages/ui/src/features/sessions/pendingPromptStore.test.ts b/packages/ui/src/features/sessions/pendingPromptStore.test.ts new file mode 100644 index 000000000..ffc1b5785 --- /dev/null +++ b/packages/ui/src/features/sessions/pendingPromptStore.test.ts @@ -0,0 +1,72 @@ +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import type { PendingPromptRecord } from "@posthog/core/sessions/pendingPrompt"; +import { beforeEach, describe, expect, it } from "vitest"; +import { + pendingPromptStore, + usePendingPromptStore, +} from "./pendingPromptStore"; + +function record( + taskId: string, + text: string, + overrides: Partial = {}, +): PendingPromptRecord { + const initialPrompt: ContentBlock[] = [{ type: "text", text }]; + return { + taskId, + taskTitle: `Task ${taskId}`, + repoPath: "/repo", + initialPrompt, + createdAt: 1, + ...overrides, + }; +} + +function storedTaskIds(): string[] { + return Object.keys(usePendingPromptStore.getState().promptsByTaskId).sort(); +} + +describe("pendingPromptStore", () => { + beforeEach(() => { + usePendingPromptStore.setState({ promptsByTaskId: {} }); + }); + + it("saves and reads back a pending prompt by taskId", () => { + pendingPromptStore.save(record("t1", "do the thing")); + + expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([ + { type: "text", text: "do the thing" }, + ]); + }); + + it("returns undefined when no prompt is owed", () => { + expect(pendingPromptStore.get("missing")).toBeUndefined(); + }); + + it("overwrites the record for a task on a re-save (retry reuses the key)", () => { + pendingPromptStore.save(record("t1", "first")); + pendingPromptStore.save(record("t1", "second")); + + expect(storedTaskIds()).toEqual(["t1"]); + expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([ + { type: "text", text: "second" }, + ]); + }); + + it("removes a delivered prompt and leaves others intact", () => { + pendingPromptStore.save(record("t1", "one")); + pendingPromptStore.save(record("t2", "two")); + + pendingPromptStore.remove("t1"); + + expect(pendingPromptStore.get("t1")).toBeUndefined(); + expect(pendingPromptStore.get("t2")).toBeDefined(); + expect(storedTaskIds()).toEqual(["t2"]); + }); + + it("remove is a no-op for an unknown task", () => { + pendingPromptStore.save(record("t1", "one")); + pendingPromptStore.remove("nope"); + expect(storedTaskIds()).toEqual(["t1"]); + }); +}); diff --git a/packages/ui/src/features/sessions/pendingPromptStore.ts b/packages/ui/src/features/sessions/pendingPromptStore.ts new file mode 100644 index 000000000..0e4ed4b18 --- /dev/null +++ b/packages/ui/src/features/sessions/pendingPromptStore.ts @@ -0,0 +1,60 @@ +import type { + PendingPromptRecord, + PendingPromptStore, +} from "@posthog/core/sessions/pendingPrompt"; +import { electronStorage } from "@posthog/ui/shell/rendererStorage"; +import { create } from "zustand"; +import { persist } from "zustand/middleware"; + +interface PendingPromptState { + /** Map of taskId -> the prompt owed delivery for that task. */ + promptsByTaskId: Record; +} + +interface PendingPromptActions { + savePrompt: (record: PendingPromptRecord) => void; + getPrompt: (taskId: string) => PendingPromptRecord | undefined; + removePrompt: (taskId: string) => void; +} + +type PendingPromptStoreState = PendingPromptState & PendingPromptActions; + +export const usePendingPromptStore = create()( + persist( + (set, get) => ({ + promptsByTaskId: {}, + + savePrompt: (record) => + set((state) => ({ + promptsByTaskId: { + ...state.promptsByTaskId, + [record.taskId]: record, + }, + })), + + getPrompt: (taskId) => get().promptsByTaskId[taskId], + + removePrompt: (taskId) => + set((state) => { + if (!(taskId in state.promptsByTaskId)) return state; + const { [taskId]: _removed, ...rest } = state.promptsByTaskId; + return { promptsByTaskId: rest }; + }), + }), + { + name: "pending-prompt-storage", + storage: electronStorage, + partialize: (state) => ({ promptsByTaskId: state.promptsByTaskId }), + }, + ), +); + +/** + * Non-hook adapter implementing the core {@link PendingPromptStore} contract, + * wired into the session service dependencies. + */ +export const pendingPromptStore: PendingPromptStore = { + save: (record) => usePendingPromptStore.getState().savePrompt(record), + get: (taskId) => usePendingPromptStore.getState().getPrompt(taskId), + remove: (taskId) => usePendingPromptStore.getState().removePrompt(taskId), +}; diff --git a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts index f2ad8ec4d..28d5a2f4d 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts @@ -135,6 +135,21 @@ vi.mock( () => mockSessionConfigStore, ); +const mockPendingPromptStore = vi.hoisted(() => ({ + pendingPromptStore: { + save: vi.fn(), + get: vi.fn( + (_taskId?: string): PendingPromptRecord | undefined => undefined, + ), + remove: vi.fn(), + }, +})); + +vi.mock( + "@posthog/ui/features/sessions/pendingPromptStore", + () => mockPendingPromptStore, +); + const mockAdapterFns = vi.hoisted(() => ({ setAdapter: vi.fn(), getAdapter: vi.fn(), @@ -279,6 +294,8 @@ vi.mock("@posthog/core/sessions/sessionEvents", async () => { // NOTE: deliberately NOT mocking "@posthog/ui/features/sessions/sessionStore" — // the real Zustand store is the whole point of this test. +import type { PendingPromptRecord } from "@posthog/core/sessions/pendingPrompt"; +import type { Task } from "@posthog/shared/domain-types"; import type { AgentSession } from "@posthog/ui/features/sessions/sessionStore"; import { sessionStoreSetters, @@ -948,3 +965,154 @@ describe("SessionService cloud queue recovery (real store, e2e)", () => { ).toHaveLength(1); }); }); + +describe("SessionService written-ahead prompt recovery (real store, e2e)", () => { + const OWED_PROMPT = [{ type: "text" as const, text: "do the thing" }]; + + function owedRecord() { + return { + taskId: TASK_ID, + taskTitle: "Recovered task", + repoPath: "/repo", + initialPrompt: OWED_PROMPT, + createdAt: 1, + }; + } + + function localTask(overrides: Partial = {}): Task { + return { + id: TASK_ID, + task_number: 1, + slug: "recovered-task", + title: "Recovered task", + description: "", + origin_product: "twig", + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + ...overrides, + }; + } + + beforeEach(() => { + vi.clearAllMocks(); + resetSessionService(); + useSessionStore.setState({ sessions: {}, taskIdIndex: {} }); + mockConvertStoredEntriesToEvents.mockImplementation(() => []); + mockTrpcAgent.onSessionEvent.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(undefined); + mockTrpcAgent.prompt.mutate.mockResolvedValue({ stopReason: "end_turn" }); + }); + + it("delivers an owed prompt on a fresh connect (create-new route) and clears it", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockAuthenticatedClient.createTaskRun.mockResolvedValue({ id: RUN_ID }); + mockTrpcAgent.start.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + + // No initialPrompt on the connect — the prompt must come from the store. + await service.connectToTask({ task: localTask(), repoPath: "/repo" }); + + expect(mockTrpcAgent.start.mutate).toHaveBeenCalled(); + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith( + expect.objectContaining({ sessionId: RUN_ID, prompt: OWED_PROMPT }), + ); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).toHaveBeenCalledWith(TASK_ID); + }); + + it("delivers an owed prompt when resuming a run stranded before delivery (resume-existing route)", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockTrpcWorkspace.verify.query.mockResolvedValue({ exists: true }); + mockTrpcAgent.reconnect.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + + await service.connectToTask({ + task: localTask({ + latest_run: { + id: RUN_ID, + task: TASK_ID, + team: 123, + environment: "local", + status: "queued", + log_url: "https://logs.example.com/run", + error_message: null, + output: null, + state: {}, + branch: null, + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + completed_at: null, + }, + }), + repoPath: "/repo", + }); + + expect(mockTrpcAgent.reconnect.mutate).toHaveBeenCalled(); + // The resumed log carries no prompt event (convert mock → []), so the owed + // prompt is delivered onto the resumed run. + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith( + expect.objectContaining({ sessionId: RUN_ID, prompt: OWED_PROMPT }), + ); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).toHaveBeenCalledWith(TASK_ID); + }); + + it("does not deliver when the resumed run already has a prompt event", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockTrpcWorkspace.verify.query.mockResolvedValue({ exists: true }); + mockTrpcAgent.reconnect.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + // Resumed log already contains a session/prompt request → nothing owed. + mockConvertStoredEntriesToEvents.mockReturnValue([ + { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + id: 1, + method: "session/prompt", + params: {}, + }, + }, + ]); + + await service.connectToTask({ + task: localTask({ + latest_run: { + id: RUN_ID, + task: TASK_ID, + team: 123, + environment: "local", + status: "in_progress", + log_url: "https://logs.example.com/run", + error_message: null, + output: null, + state: {}, + branch: null, + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + completed_at: null, + }, + }), + repoPath: "/repo", + }); + + expect(mockTrpcAgent.prompt.mutate).not.toHaveBeenCalled(); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/ui/src/features/sessions/sessionServiceHost.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.test.ts index 5e68378bc..c94aed608 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.test.ts @@ -186,6 +186,19 @@ vi.mock( () => mockSessionConfigStore, ); +const mockPendingPromptStore = vi.hoisted(() => ({ + pendingPromptStore: { + save: vi.fn(), + get: vi.fn(() => undefined), + remove: vi.fn(), + }, +})); + +vi.mock( + "@posthog/ui/features/sessions/pendingPromptStore", + () => mockPendingPromptStore, +); + const mockAdapterFns = vi.hoisted(() => ({ setAdapter: vi.fn(), getAdapter: vi.fn(), diff --git a/packages/ui/src/features/sessions/sessionServiceHost.ts b/packages/ui/src/features/sessions/sessionServiceHost.ts index 8dd499b72..8b2d23bf9 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.ts @@ -24,6 +24,7 @@ import { fetchAuthState } from "@posthog/ui/features/auth/authQueries"; import { useUsageLimitStore } from "@posthog/ui/features/billing/usageLimitStore"; import { useAddDirectoryDialogStore } from "@posthog/ui/features/folder-picker/addDirectoryDialogStore"; import { TaskNotificationService } from "@posthog/ui/features/notifications/notifications"; +import { pendingPromptStore } from "@posthog/ui/features/sessions/pendingPromptStore"; import { useSessionAdapterStore } from "@posthog/ui/features/sessions/sessionAdapterStore"; import { getPersistedConfigOptions, @@ -106,6 +107,7 @@ function buildSessionServiceDeps(): SessionServiceDeps { removeAdapter: (taskRunId) => useSessionAdapterStore.getState().removeAdapter(taskRunId), }, + pendingPrompts: pendingPromptStore, get settings() { return useSettingsStore.getState(); },