From 1e9cc0082f4959162ee24c3bc18c0a1961af2db0 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sun, 14 Jun 2026 14:28:37 +0100 Subject: [PATCH 1/4] fix(sessions): durably write-ahead the start prompt so it can't be lost Starting a local task run can lose the user's prompt: it only exists in memory until the session has fully initialized and `session/prompt` has been delivered. If `agent.start` exceeds the 30s `SESSION_VALIDATION_TIMEOUT_MS` (common in large monorepos) or the app is reloaded/quit/crashes during the retry window, the prompt is gone and the run is stranded in `queued`. Persist the prompt as a durable write-ahead record BEFORE any session/agent work, and clear it only once it has actually been delivered: - New `PendingPromptStore` contract in core and an `electronStorage`-backed zustand store in ui (keyed by taskId, mirrors sessionConfigStore). - `createNewLocalSession` writes the prompt ahead before `createTaskRun` / `agent.start`, records the taskRunId once the run exists, recovers a saved prompt when a later connect arrives without one, and clears the record after `sendPrompt` resolves. A persisted record means "this prompt is owed delivery", which is also the hook for a future server-side reconciler to re-drive orphaned runs. Generated-By: PostHog Code Task-Id: 22d61751-8d18-4d56-a79e-4741045320c1 --- packages/core/src/sessions/pendingPrompt.ts | 51 +++++++++++ packages/core/src/sessions/sessionService.ts | 90 ++++++++++++++++--- .../sessions/pendingPromptStore.test.ts | 78 ++++++++++++++++ .../features/sessions/pendingPromptStore.ts | 64 +++++++++++++ ...onServiceHost.recovery.integration.test.ts | 14 +++ .../sessions/sessionServiceHost.test.ts | 14 +++ .../features/sessions/sessionServiceHost.ts | 2 + 7 files changed, 299 insertions(+), 14 deletions(-) create mode 100644 packages/core/src/sessions/pendingPrompt.ts create mode 100644 packages/ui/src/features/sessions/pendingPromptStore.test.ts create mode 100644 packages/ui/src/features/sessions/pendingPromptStore.ts diff --git a/packages/core/src/sessions/pendingPrompt.ts b/packages/core/src/sessions/pendingPrompt.ts new file mode 100644 index 000000000..64dde181e --- /dev/null +++ b/packages/core/src/sessions/pendingPrompt.ts @@ -0,0 +1,51 @@ +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import type { Adapter, ExecutionMode } from "@posthog/shared"; + +/** + * A durable, write-ahead record of a prompt the user is trying to start a + * local task run with. + * + * The prompt is the one thing in the start-a-task flow that the user cannot + * cheaply reproduce, yet today it only exists in memory until a session has + * fully initialized and `session/prompt` has been delivered. If session + * initialization throws or times out (common in large monorepos, where init + * can exceed the 30s `SESSION_VALIDATION_TIMEOUT_MS` budget), or the app is + * reloaded/quit/crashes during the retry window, the prompt is lost. + * + * To make that impossible we persist this record BEFORE any agent/session + * setup is attempted, and only clear it once the prompt has actually been + * delivered to the agent. A persisted record therefore means "this prompt is + * owed delivery and has not been delivered yet" — the basis for recovering it + * on the next connect, and (in a later change) for a server-side reconciler to + * re-drive orphaned runs. + */ +export interface PendingPromptRecord { + taskId: string; + taskTitle: string; + repoPath: string; + initialPrompt: ContentBlock[]; + /** Latest run id this prompt was attached to, if a run has been created. */ + taskRunId?: string; + executionMode?: ExecutionMode; + adapter?: Adapter; + model?: string; + reasoningLevel?: string; + /** Epoch ms when the prompt was first written ahead. */ + createdAt: number; +} + +/** + * Durable storage for {@link PendingPromptRecord}s, keyed by `taskId` (one + * in-flight prompt per task — retries reuse the same key). Implementations + * must survive an app restart. + */ +export interface PendingPromptStore { + /** Write-ahead (or overwrite) the pending prompt for a task. */ + save(record: PendingPromptRecord): void; + /** Get the pending prompt for a task, if one is owed delivery. */ + get(taskId: string): PendingPromptRecord | undefined; + /** Clear the pending prompt for a task once delivered or abandoned. */ + remove(taskId: string): void; + /** All outstanding pending prompts, e.g. for a recovery sweep on startup. */ + list(): PendingPromptRecord[]; +} diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 0eb24e96d..40fbe955d 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -57,6 +57,7 @@ import { OFFLINE_SESSION_MESSAGE, routeLocalConnect, } from "./connectRouting"; +import type { PendingPromptStore } from "./pendingPrompt"; import { type PermissionSelectionPlan, planPermissionResponse, @@ -252,6 +253,11 @@ export interface SessionServiceDeps { setAdapter(taskRunId: string, adapter: Adapter): void; removeAdapter(taskRunId: string): void; }; + /** + * Durable, write-ahead storage for the prompt a local task run is starting + * with, so session-init timeouts or app restarts can never lose it. + */ + pendingPrompts: PendingPromptStore; readonly settings: { customInstructions?: string | null }; usageLimit: { show: (...args: any[]) => any }; readonly addDirectoryDialog: { open: boolean }; @@ -1060,6 +1066,41 @@ export class SessionService { model?: string, reasoningLevel?: string, ): Promise { + // Recover a previously written-ahead prompt when this connect didn't + // carry one itself: a persisted record means an earlier attempt for this + // task saved a prompt and never delivered it (init timed out, or the app + // was restarted before delivery). Adopt its prompt and the settings the + // user originally chose, unless the caller overrode them. + const recovered = initialPrompt?.length + ? undefined + : this.d.pendingPrompts.get(taskId); + const effectivePrompt = initialPrompt?.length + ? initialPrompt + : recovered?.initialPrompt; + const effectiveExecutionMode = executionMode ?? recovered?.executionMode; + const effectiveAdapter = adapter ?? recovered?.adapter; + const effectiveModel = model ?? recovered?.model; + const effectiveReasoningLevel = reasoningLevel ?? recovered?.reasoningLevel; + + // Write-ahead the prompt BEFORE any session/agent work. This is the one + // step that makes losing it impossible: everything below (run creation, + // the 30s-bounded agent.start, prompt delivery) can throw or be killed by + // an app restart, and the prompt still survives on disk to be recovered on + // the next connect. Cleared only once it has actually been delivered. + if (effectivePrompt?.length) { + this.d.pendingPrompts.save({ + taskId, + taskTitle, + repoPath, + initialPrompt: effectivePrompt, + executionMode: effectiveExecutionMode, + adapter: effectiveAdapter, + model: effectiveModel, + reasoningLevel: effectiveReasoningLevel, + createdAt: recovered?.createdAt ?? Date.now(), + }); + } + const { client } = auth; if (!client) { throw new Error("Unable to reach server. Please check your connection."); @@ -1070,19 +1111,37 @@ export class SessionService { throw new Error("Failed to create task run. Please try again."); } + // Record the run id on the write-ahead entry so it points at the run the + // prompt is being delivered to (a server-side reconciler can use this to + // re-drive an orphaned run). + if (effectivePrompt?.length) { + this.d.pendingPrompts.save({ + taskId, + taskTitle, + repoPath, + initialPrompt: effectivePrompt, + taskRunId: taskRun.id, + executionMode: effectiveExecutionMode, + adapter: effectiveAdapter, + model: effectiveModel, + reasoningLevel: effectiveReasoningLevel, + createdAt: recovered?.createdAt ?? Date.now(), + }); + } + const { customInstructions: startCustomInstructions } = this.d.settings; - const preferredModel = model ?? this.d.DEFAULT_GATEWAY_MODEL; + const preferredModel = effectiveModel ?? this.d.DEFAULT_GATEWAY_MODEL; const result = await this.d.trpc.agent.start.mutate({ taskId, taskRunId: taskRun.id, repoPath, apiHost: auth.apiHost, projectId: auth.projectId, - permissionMode: executionMode, - adapter, + permissionMode: effectiveExecutionMode, + adapter: effectiveAdapter, customInstructions: startCustomInstructions || undefined, - effort: effortLevelSchema.safeParse(reasoningLevel).success - ? (reasoningLevel as EffortLevel) + effort: effortLevelSchema.safeParse(effectiveReasoningLevel).success + ? (effectiveReasoningLevel as EffortLevel) : undefined, model: preferredModel, }); @@ -1090,7 +1149,7 @@ export class SessionService { const session = createBaseSession(taskRun.id, taskId, taskTitle); session.channel = result.channel; session.status = "connected"; - session.adapter = adapter; + session.adapter = effectiveAdapter; const configOptions = result.configOptions as | SessionConfigOption[] | undefined; @@ -1102,15 +1161,15 @@ export class SessionService { } // Persist the adapter - if (adapter) { - this.d.adapterStore.setAdapter(taskRun.id, adapter); + if (effectiveAdapter) { + this.d.adapterStore.setAdapter(taskRun.id, effectiveAdapter); } // Store the initial prompt on the session so retry/reset flows can // re-send it if the session errors after this point (e.g. subscription // error, agent crash, or prompt failure). - if (initialPrompt?.length) { - session.initialPrompt = initialPrompt; + if (effectivePrompt?.length) { + session.initialPrompt = effectivePrompt; } this.d.store.setSession(session); @@ -1119,12 +1178,15 @@ export class SessionService { this.d.track(ANALYTICS_EVENTS.TASK_RUN_STARTED, { task_id: taskId, execution_type: "local", - initial_mode: executionMode, - adapter, + initial_mode: effectiveExecutionMode, + adapter: effectiveAdapter, }); - if (initialPrompt?.length) { - await this.sendPrompt(taskId, initialPrompt); + if (effectivePrompt?.length) { + await this.sendPrompt(taskId, effectivePrompt); + // Delivered: the prompt now lives in the run's session log, so the + // write-ahead copy is no longer the only record of it and can be cleared. + this.d.pendingPrompts.remove(taskId); } } diff --git a/packages/ui/src/features/sessions/pendingPromptStore.test.ts b/packages/ui/src/features/sessions/pendingPromptStore.test.ts new file mode 100644 index 000000000..ca750f77d --- /dev/null +++ b/packages/ui/src/features/sessions/pendingPromptStore.test.ts @@ -0,0 +1,78 @@ +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import type { PendingPromptRecord } from "@posthog/core/sessions/pendingPrompt"; +import { beforeEach, describe, expect, it } from "vitest"; +import { + pendingPromptStore, + usePendingPromptStore, +} from "./pendingPromptStore"; + +function record( + taskId: string, + text: string, + overrides: Partial = {}, +): PendingPromptRecord { + const initialPrompt: ContentBlock[] = [{ type: "text", text }]; + return { + taskId, + taskTitle: `Task ${taskId}`, + repoPath: "/repo", + initialPrompt, + createdAt: 1, + ...overrides, + }; +} + +describe("pendingPromptStore", () => { + beforeEach(() => { + usePendingPromptStore.setState({ promptsByTaskId: {} }); + }); + + it("saves and reads back a pending prompt by taskId", () => { + pendingPromptStore.save(record("t1", "do the thing")); + + expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([ + { type: "text", text: "do the thing" }, + ]); + }); + + it("returns undefined when no prompt is owed", () => { + expect(pendingPromptStore.get("missing")).toBeUndefined(); + }); + + it("overwrites the record for a task on a re-save (retry reuses the key)", () => { + pendingPromptStore.save(record("t1", "first")); + pendingPromptStore.save(record("t1", "first", { taskRunId: "run-2" })); + + expect(pendingPromptStore.list()).toHaveLength(1); + expect(pendingPromptStore.get("t1")?.taskRunId).toBe("run-2"); + }); + + it("removes a delivered prompt and leaves others intact", () => { + pendingPromptStore.save(record("t1", "one")); + pendingPromptStore.save(record("t2", "two")); + + pendingPromptStore.remove("t1"); + + expect(pendingPromptStore.get("t1")).toBeUndefined(); + expect(pendingPromptStore.get("t2")).toBeDefined(); + expect(pendingPromptStore.list().map((r) => r.taskId)).toEqual(["t2"]); + }); + + it("remove is a no-op for an unknown task", () => { + pendingPromptStore.save(record("t1", "one")); + pendingPromptStore.remove("nope"); + expect(pendingPromptStore.list()).toHaveLength(1); + }); + + it("lists all outstanding prompts for a recovery sweep", () => { + pendingPromptStore.save(record("t1", "one")); + pendingPromptStore.save(record("t2", "two")); + + expect( + pendingPromptStore + .list() + .map((r) => r.taskId) + .sort(), + ).toEqual(["t1", "t2"]); + }); +}); diff --git a/packages/ui/src/features/sessions/pendingPromptStore.ts b/packages/ui/src/features/sessions/pendingPromptStore.ts new file mode 100644 index 000000000..ad7a060a3 --- /dev/null +++ b/packages/ui/src/features/sessions/pendingPromptStore.ts @@ -0,0 +1,64 @@ +import type { + PendingPromptRecord, + PendingPromptStore, +} from "@posthog/core/sessions/pendingPrompt"; +import { electronStorage } from "@posthog/ui/shell/rendererStorage"; +import { create } from "zustand"; +import { persist } from "zustand/middleware"; + +interface PendingPromptState { + /** Map of taskId -> the prompt owed delivery for that task. */ + promptsByTaskId: Record; +} + +interface PendingPromptActions { + savePrompt: (record: PendingPromptRecord) => void; + getPrompt: (taskId: string) => PendingPromptRecord | undefined; + removePrompt: (taskId: string) => void; + listPrompts: () => PendingPromptRecord[]; +} + +type PendingPromptStoreState = PendingPromptState & PendingPromptActions; + +export const usePendingPromptStore = create()( + persist( + (set, get) => ({ + promptsByTaskId: {}, + + savePrompt: (record) => + set((state) => ({ + promptsByTaskId: { + ...state.promptsByTaskId, + [record.taskId]: record, + }, + })), + + getPrompt: (taskId) => get().promptsByTaskId[taskId], + + removePrompt: (taskId) => + set((state) => { + if (!(taskId in state.promptsByTaskId)) return state; + const { [taskId]: _removed, ...rest } = state.promptsByTaskId; + return { promptsByTaskId: rest }; + }), + + listPrompts: () => Object.values(get().promptsByTaskId), + }), + { + name: "pending-prompt-storage", + storage: electronStorage, + partialize: (state) => ({ promptsByTaskId: state.promptsByTaskId }), + }, + ), +); + +/** + * Non-hook adapter implementing the core {@link PendingPromptStore} contract, + * wired into the session service dependencies. + */ +export const pendingPromptStore: PendingPromptStore = { + save: (record) => usePendingPromptStore.getState().savePrompt(record), + get: (taskId) => usePendingPromptStore.getState().getPrompt(taskId), + remove: (taskId) => usePendingPromptStore.getState().removePrompt(taskId), + list: () => usePendingPromptStore.getState().listPrompts(), +}; diff --git a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts index f2ad8ec4d..67c04f3aa 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts @@ -135,6 +135,20 @@ vi.mock( () => mockSessionConfigStore, ); +const mockPendingPromptStore = vi.hoisted(() => ({ + pendingPromptStore: { + save: vi.fn(), + get: vi.fn(() => undefined), + remove: vi.fn(), + list: vi.fn(() => []), + }, +})); + +vi.mock( + "@posthog/ui/features/sessions/pendingPromptStore", + () => mockPendingPromptStore, +); + const mockAdapterFns = vi.hoisted(() => ({ setAdapter: vi.fn(), getAdapter: vi.fn(), diff --git a/packages/ui/src/features/sessions/sessionServiceHost.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.test.ts index 5e68378bc..480093f09 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.test.ts @@ -186,6 +186,20 @@ vi.mock( () => mockSessionConfigStore, ); +const mockPendingPromptStore = vi.hoisted(() => ({ + pendingPromptStore: { + save: vi.fn(), + get: vi.fn(() => undefined), + remove: vi.fn(), + list: vi.fn(() => []), + }, +})); + +vi.mock( + "@posthog/ui/features/sessions/pendingPromptStore", + () => mockPendingPromptStore, +); + const mockAdapterFns = vi.hoisted(() => ({ setAdapter: vi.fn(), getAdapter: vi.fn(), diff --git a/packages/ui/src/features/sessions/sessionServiceHost.ts b/packages/ui/src/features/sessions/sessionServiceHost.ts index 8dd499b72..8b2d23bf9 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.ts @@ -24,6 +24,7 @@ import { fetchAuthState } from "@posthog/ui/features/auth/authQueries"; import { useUsageLimitStore } from "@posthog/ui/features/billing/usageLimitStore"; import { useAddDirectoryDialogStore } from "@posthog/ui/features/folder-picker/addDirectoryDialogStore"; import { TaskNotificationService } from "@posthog/ui/features/notifications/notifications"; +import { pendingPromptStore } from "@posthog/ui/features/sessions/pendingPromptStore"; import { useSessionAdapterStore } from "@posthog/ui/features/sessions/sessionAdapterStore"; import { getPersistedConfigOptions, @@ -106,6 +107,7 @@ function buildSessionServiceDeps(): SessionServiceDeps { removeAdapter: (taskRunId) => useSessionAdapterStore.getState().removeAdapter(taskRunId), }, + pendingPrompts: pendingPromptStore, get settings() { return useSettingsStore.getState(); }, From bc252dbd4b3439a20467f8edf310b269a811fb35 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sun, 14 Jun 2026 14:39:15 +0100 Subject: [PATCH 2/4] =?UTF-8?q?refactor(sessions):=20address=20review=20?= =?UTF-8?q?=E2=80=94=20dedupe=20write-ahead=20save,=20log=20recovery,=20so?= =?UTF-8?q?ften=20wording?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From qa-swarm review of this PR: - Build the PendingPromptRecord once and re-save with `{ ...pending, taskRunId }` instead of assembling the 9-field payload twice (convergent paul + xp). - Log when a written-ahead prompt is actually recovered, so the safety net is observable in the wild. - Soften the "impossible to lose" wording in the doc + comment: persistence is async/best-effort, so it's "very unlikely", not guaranteed. Deferred to a follow-up: an integration test exercising the recovery branch of createNewLocalSession (currently the store mock returns undefined everywhere). Generated-By: PostHog Code Task-Id: 22d61751-8d18-4d56-a79e-4741045320c1 --- packages/core/src/sessions/pendingPrompt.ts | 9 ++-- packages/core/src/sessions/sessionService.ts | 51 +++++++++++--------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/packages/core/src/sessions/pendingPrompt.ts b/packages/core/src/sessions/pendingPrompt.ts index 64dde181e..b629821aa 100644 --- a/packages/core/src/sessions/pendingPrompt.ts +++ b/packages/core/src/sessions/pendingPrompt.ts @@ -12,9 +12,12 @@ import type { Adapter, ExecutionMode } from "@posthog/shared"; * can exceed the 30s `SESSION_VALIDATION_TIMEOUT_MS` budget), or the app is * reloaded/quit/crashes during the retry window, the prompt is lost. * - * To make that impossible we persist this record BEFORE any agent/session - * setup is attempted, and only clear it once the prompt has actually been - * delivered to the agent. A persisted record therefore means "this prompt is + * To make that loss very unlikely we persist this record BEFORE any + * agent/session setup is attempted, and only clear it once the prompt has + * actually been delivered to the agent. (Persistence is async and + * best-effort, so it is not an absolute guarantee — a crash in the first + * moments of a cold start can still race the write.) A persisted record + * therefore means "this prompt is * owed delivery and has not been delivered yet" — the basis for recovering it * on the next connect, and (in a later change) for a server-side reconciler to * re-drive orphaned runs. diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 40fbe955d..9f091866b 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -57,7 +57,7 @@ import { OFFLINE_SESSION_MESSAGE, routeLocalConnect, } from "./connectRouting"; -import type { PendingPromptStore } from "./pendingPrompt"; +import type { PendingPromptRecord, PendingPromptStore } from "./pendingPrompt"; import { type PermissionSelectionPlan, planPermissionResponse, @@ -1082,13 +1082,25 @@ export class SessionService { const effectiveModel = model ?? recovered?.model; const effectiveReasoningLevel = reasoningLevel ?? recovered?.reasoningLevel; - // Write-ahead the prompt BEFORE any session/agent work. This is the one - // step that makes losing it impossible: everything below (run creation, - // the 30s-bounded agent.start, prompt delivery) can throw or be killed by - // an app restart, and the prompt still survives on disk to be recovered on - // the next connect. Cleared only once it has actually been delivered. + if (recovered) { + // Surface when the safety net actually catches something, so we can tell + // it's working (and gauge how long prompts sit undelivered). + this.d.log.info("Recovered a written-ahead prompt for a task", { + taskId, + ageMs: Date.now() - recovered.createdAt, + }); + } + + // Write-ahead the prompt BEFORE any session/agent work. This is the step + // that makes losing it very unlikely: everything below (run creation, the + // 30s-bounded agent.start, prompt delivery) can throw or be killed by an + // app restart, and the prompt still survives to be recovered on the next + // connect. Cleared only once it has actually been delivered. (Persistence + // is async and best-effort, so a crash in the first moments of a cold + // start can still race it — hence "very unlikely", not "guaranteed".) + let pending: PendingPromptRecord | undefined; if (effectivePrompt?.length) { - this.d.pendingPrompts.save({ + pending = { taskId, taskTitle, repoPath, @@ -1098,7 +1110,8 @@ export class SessionService { model: effectiveModel, reasoningLevel: effectiveReasoningLevel, createdAt: recovered?.createdAt ?? Date.now(), - }); + }; + this.d.pendingPrompts.save(pending); } const { client } = auth; @@ -1111,22 +1124,12 @@ export class SessionService { throw new Error("Failed to create task run. Please try again."); } - // Record the run id on the write-ahead entry so it points at the run the - // prompt is being delivered to (a server-side reconciler can use this to - // re-drive an orphaned run). - if (effectivePrompt?.length) { - this.d.pendingPrompts.save({ - taskId, - taskTitle, - repoPath, - initialPrompt: effectivePrompt, - taskRunId: taskRun.id, - executionMode: effectiveExecutionMode, - adapter: effectiveAdapter, - model: effectiveModel, - reasoningLevel: effectiveReasoningLevel, - createdAt: recovered?.createdAt ?? Date.now(), - }); + // Stamp the same write-ahead entry with the run id it's now being + // delivered to (a server-side reconciler can use this to re-drive an + // orphaned run). + if (pending) { + pending = { ...pending, taskRunId: taskRun.id }; + this.d.pendingPrompts.save(pending); } const { customInstructions: startCustomInstructions } = this.d.settings; From ebc75e6b308c04f8dae395b973d96591f409d8ae Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sun, 14 Jun 2026 14:42:29 +0100 Subject: [PATCH 3/4] fix(sessions): keep written-ahead prompt when send resolves without delivering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From Codex review of this PR: sendPrompt can resolve without actually delivering — "queued" (session busy) or "rate_limited" (usage-limit error swallowed by sendLocalPrompt) — but we cleared the write-ahead record unconditionally after it resolved, reintroducing a narrow loss window. Only clear the record on confirmed delivery. Generated-By: PostHog Code Task-Id: 22d61751-8d18-4d56-a79e-4741045320c1 --- packages/core/src/sessions/sessionService.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index 9f091866b..ce8a56a4e 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -1186,10 +1186,16 @@ export class SessionService { }); if (effectivePrompt?.length) { - await this.sendPrompt(taskId, effectivePrompt); - // Delivered: the prompt now lives in the run's session log, so the - // write-ahead copy is no longer the only record of it and can be cleared. - this.d.pendingPrompts.remove(taskId); + const { stopReason } = await this.sendPrompt(taskId, effectivePrompt); + // Clear only once the prompt was actually handed to the agent. sendPrompt + // can resolve WITHOUT delivering — "queued" (session busy) or + // "rate_limited" (usage limit swallowed by sendLocalPrompt) — and in + // those cases the write-ahead copy must survive so the prompt isn't lost. + // On real delivery it now lives in the run's session log, so clearing the + // only-other-copy here is safe. + if (stopReason !== "queued" && stopReason !== "rate_limited") { + this.d.pendingPrompts.remove(taskId); + } } } From 9ec4d33c5ec2b5b280b6c30d34cfab4e7a6a7253 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sun, 14 Jun 2026 15:00:15 +0100 Subject: [PATCH 4/4] feat(sessions): recover owed prompts on resume + cover both routes with tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the remaining review findings on this PR: - Codex P1: a run stranded before its prompt was delivered routes to the resume-existing path on the next connect, not createNewLocalSession. Deliver the owed write-ahead prompt there too, once the resumed log is confirmed to carry no prompt yet. Extracted a shared deliverWrittenAheadPrompt() helper so the deliver-then-clear-on-confirmed-delivery logic lives once. - xp YAGNI: dropped the speculative list()/taskRunId surface (and the second save that only existed to populate taskRunId) — the reconciler that would consume them isn't in this PR. This also removes the duplicated save entirely. - Tests: cover the recovery branch end-to-end against the real store — delivery on the create-new route, delivery on the resume-existing route, and no delivery when the resumed run already has a prompt event. removePrompt's guard divergence from sessionConfigStore is intentional (the guard is the safer side), left as-is. Generated-By: PostHog Code Task-Id: 22d61751-8d18-4d56-a79e-4741045320c1 --- packages/core/src/sessions/pendingPrompt.ts | 11 +- packages/core/src/sessions/sessionService.ts | 60 ++++--- .../sessions/pendingPromptStore.test.ts | 28 ++-- .../features/sessions/pendingPromptStore.ts | 4 - ...onServiceHost.recovery.integration.test.ts | 158 +++++++++++++++++- .../sessions/sessionServiceHost.test.ts | 1 - 6 files changed, 207 insertions(+), 55 deletions(-) diff --git a/packages/core/src/sessions/pendingPrompt.ts b/packages/core/src/sessions/pendingPrompt.ts index b629821aa..286bd6f11 100644 --- a/packages/core/src/sessions/pendingPrompt.ts +++ b/packages/core/src/sessions/pendingPrompt.ts @@ -17,18 +17,15 @@ import type { Adapter, ExecutionMode } from "@posthog/shared"; * actually been delivered to the agent. (Persistence is async and * best-effort, so it is not an absolute guarantee — a crash in the first * moments of a cold start can still race the write.) A persisted record - * therefore means "this prompt is - * owed delivery and has not been delivered yet" — the basis for recovering it - * on the next connect, and (in a later change) for a server-side reconciler to - * re-drive orphaned runs. + * therefore means "this prompt is owed delivery and has not been delivered + * yet" — the basis for recovering it on the next connect, whether that connect + * starts a fresh run or resumes the stranded one. */ export interface PendingPromptRecord { taskId: string; taskTitle: string; repoPath: string; initialPrompt: ContentBlock[]; - /** Latest run id this prompt was attached to, if a run has been created. */ - taskRunId?: string; executionMode?: ExecutionMode; adapter?: Adapter; model?: string; @@ -49,6 +46,4 @@ export interface PendingPromptStore { get(taskId: string): PendingPromptRecord | undefined; /** Clear the pending prompt for a task once delivered or abandoned. */ remove(taskId: string): void; - /** All outstanding pending prompts, e.g. for a recovery sweep on startup. */ - list(): PendingPromptRecord[]; } diff --git a/packages/core/src/sessions/sessionService.ts b/packages/core/src/sessions/sessionService.ts index ce8a56a4e..ab84985df 100644 --- a/packages/core/src/sessions/sessionService.ts +++ b/packages/core/src/sessions/sessionService.ts @@ -57,7 +57,7 @@ import { OFFLINE_SESSION_MESSAGE, routeLocalConnect, } from "./connectRouting"; -import type { PendingPromptRecord, PendingPromptStore } from "./pendingPrompt"; +import type { PendingPromptStore } from "./pendingPrompt"; import { type PermissionSelectionPlan, planPermissionResponse, @@ -828,6 +828,21 @@ export class SessionService { ), ); } + + // A run stranded before its prompt was ever delivered (e.g. the + // original session-init timed out) routes here on the next connect + // rather than to createNewLocalSession, since it already has a run id + // and log. Its prompt is still owed: if the resumed log carries no + // prompt yet but we have a write-ahead record, deliver it now. + const owed = this.d.pendingPrompts.get(taskId); + if (owed?.initialPrompt.length && !hasSessionPromptEvent(events)) { + this.localRepoPaths.set(taskId, repoPath); + this.d.log.info("Recovered a written-ahead prompt on resume", { + taskId, + ageMs: Date.now() - owed.createdAt, + }); + await this.deliverWrittenAheadPrompt(taskId, owed.initialPrompt); + } return true; } else { this.d.log.warn("Reconnect returned null", { taskId, taskRunId }); @@ -1098,9 +1113,8 @@ export class SessionService { // connect. Cleared only once it has actually been delivered. (Persistence // is async and best-effort, so a crash in the first moments of a cold // start can still race it — hence "very unlikely", not "guaranteed".) - let pending: PendingPromptRecord | undefined; if (effectivePrompt?.length) { - pending = { + this.d.pendingPrompts.save({ taskId, taskTitle, repoPath, @@ -1110,8 +1124,7 @@ export class SessionService { model: effectiveModel, reasoningLevel: effectiveReasoningLevel, createdAt: recovered?.createdAt ?? Date.now(), - }; - this.d.pendingPrompts.save(pending); + }); } const { client } = auth; @@ -1124,14 +1137,6 @@ export class SessionService { throw new Error("Failed to create task run. Please try again."); } - // Stamp the same write-ahead entry with the run id it's now being - // delivered to (a server-side reconciler can use this to re-drive an - // orphaned run). - if (pending) { - pending = { ...pending, taskRunId: taskRun.id }; - this.d.pendingPrompts.save(pending); - } - const { customInstructions: startCustomInstructions } = this.d.settings; const preferredModel = effectiveModel ?? this.d.DEFAULT_GATEWAY_MODEL; const result = await this.d.trpc.agent.start.mutate({ @@ -1186,16 +1191,25 @@ export class SessionService { }); if (effectivePrompt?.length) { - const { stopReason } = await this.sendPrompt(taskId, effectivePrompt); - // Clear only once the prompt was actually handed to the agent. sendPrompt - // can resolve WITHOUT delivering — "queued" (session busy) or - // "rate_limited" (usage limit swallowed by sendLocalPrompt) — and in - // those cases the write-ahead copy must survive so the prompt isn't lost. - // On real delivery it now lives in the run's session log, so clearing the - // only-other-copy here is safe. - if (stopReason !== "queued" && stopReason !== "rate_limited") { - this.d.pendingPrompts.remove(taskId); - } + await this.deliverWrittenAheadPrompt(taskId, effectivePrompt); + } + } + + /** + * Deliver a written-ahead prompt and clear its durable record only on + * confirmed delivery. `sendPrompt` can resolve WITHOUT delivering — "queued" + * (session busy) or "rate_limited" (usage limit swallowed by + * `sendLocalPrompt`) — and in those cases the write-ahead copy must survive + * so the prompt isn't lost. On real delivery the prompt now lives in the + * run's session log, so clearing the only-other-copy is safe. + */ + private async deliverWrittenAheadPrompt( + taskId: string, + prompt: ContentBlock[], + ): Promise { + const { stopReason } = await this.sendPrompt(taskId, prompt); + if (stopReason !== "queued" && stopReason !== "rate_limited") { + this.d.pendingPrompts.remove(taskId); } } diff --git a/packages/ui/src/features/sessions/pendingPromptStore.test.ts b/packages/ui/src/features/sessions/pendingPromptStore.test.ts index ca750f77d..ffc1b5785 100644 --- a/packages/ui/src/features/sessions/pendingPromptStore.test.ts +++ b/packages/ui/src/features/sessions/pendingPromptStore.test.ts @@ -22,6 +22,10 @@ function record( }; } +function storedTaskIds(): string[] { + return Object.keys(usePendingPromptStore.getState().promptsByTaskId).sort(); +} + describe("pendingPromptStore", () => { beforeEach(() => { usePendingPromptStore.setState({ promptsByTaskId: {} }); @@ -41,10 +45,12 @@ describe("pendingPromptStore", () => { it("overwrites the record for a task on a re-save (retry reuses the key)", () => { pendingPromptStore.save(record("t1", "first")); - pendingPromptStore.save(record("t1", "first", { taskRunId: "run-2" })); + pendingPromptStore.save(record("t1", "second")); - expect(pendingPromptStore.list()).toHaveLength(1); - expect(pendingPromptStore.get("t1")?.taskRunId).toBe("run-2"); + expect(storedTaskIds()).toEqual(["t1"]); + expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([ + { type: "text", text: "second" }, + ]); }); it("removes a delivered prompt and leaves others intact", () => { @@ -55,24 +61,12 @@ describe("pendingPromptStore", () => { expect(pendingPromptStore.get("t1")).toBeUndefined(); expect(pendingPromptStore.get("t2")).toBeDefined(); - expect(pendingPromptStore.list().map((r) => r.taskId)).toEqual(["t2"]); + expect(storedTaskIds()).toEqual(["t2"]); }); it("remove is a no-op for an unknown task", () => { pendingPromptStore.save(record("t1", "one")); pendingPromptStore.remove("nope"); - expect(pendingPromptStore.list()).toHaveLength(1); - }); - - it("lists all outstanding prompts for a recovery sweep", () => { - pendingPromptStore.save(record("t1", "one")); - pendingPromptStore.save(record("t2", "two")); - - expect( - pendingPromptStore - .list() - .map((r) => r.taskId) - .sort(), - ).toEqual(["t1", "t2"]); + expect(storedTaskIds()).toEqual(["t1"]); }); }); diff --git a/packages/ui/src/features/sessions/pendingPromptStore.ts b/packages/ui/src/features/sessions/pendingPromptStore.ts index ad7a060a3..0e4ed4b18 100644 --- a/packages/ui/src/features/sessions/pendingPromptStore.ts +++ b/packages/ui/src/features/sessions/pendingPromptStore.ts @@ -15,7 +15,6 @@ interface PendingPromptActions { savePrompt: (record: PendingPromptRecord) => void; getPrompt: (taskId: string) => PendingPromptRecord | undefined; removePrompt: (taskId: string) => void; - listPrompts: () => PendingPromptRecord[]; } type PendingPromptStoreState = PendingPromptState & PendingPromptActions; @@ -41,8 +40,6 @@ export const usePendingPromptStore = create()( const { [taskId]: _removed, ...rest } = state.promptsByTaskId; return { promptsByTaskId: rest }; }), - - listPrompts: () => Object.values(get().promptsByTaskId), }), { name: "pending-prompt-storage", @@ -60,5 +57,4 @@ export const pendingPromptStore: PendingPromptStore = { save: (record) => usePendingPromptStore.getState().savePrompt(record), get: (taskId) => usePendingPromptStore.getState().getPrompt(taskId), remove: (taskId) => usePendingPromptStore.getState().removePrompt(taskId), - list: () => usePendingPromptStore.getState().listPrompts(), }; diff --git a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts index 67c04f3aa..28d5a2f4d 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.recovery.integration.test.ts @@ -138,9 +138,10 @@ vi.mock( const mockPendingPromptStore = vi.hoisted(() => ({ pendingPromptStore: { save: vi.fn(), - get: vi.fn(() => undefined), + get: vi.fn( + (_taskId?: string): PendingPromptRecord | undefined => undefined, + ), remove: vi.fn(), - list: vi.fn(() => []), }, })); @@ -293,6 +294,8 @@ vi.mock("@posthog/core/sessions/sessionEvents", async () => { // NOTE: deliberately NOT mocking "@posthog/ui/features/sessions/sessionStore" — // the real Zustand store is the whole point of this test. +import type { PendingPromptRecord } from "@posthog/core/sessions/pendingPrompt"; +import type { Task } from "@posthog/shared/domain-types"; import type { AgentSession } from "@posthog/ui/features/sessions/sessionStore"; import { sessionStoreSetters, @@ -962,3 +965,154 @@ describe("SessionService cloud queue recovery (real store, e2e)", () => { ).toHaveLength(1); }); }); + +describe("SessionService written-ahead prompt recovery (real store, e2e)", () => { + const OWED_PROMPT = [{ type: "text" as const, text: "do the thing" }]; + + function owedRecord() { + return { + taskId: TASK_ID, + taskTitle: "Recovered task", + repoPath: "/repo", + initialPrompt: OWED_PROMPT, + createdAt: 1, + }; + } + + function localTask(overrides: Partial = {}): Task { + return { + id: TASK_ID, + task_number: 1, + slug: "recovered-task", + title: "Recovered task", + description: "", + origin_product: "twig", + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + ...overrides, + }; + } + + beforeEach(() => { + vi.clearAllMocks(); + resetSessionService(); + useSessionStore.setState({ sessions: {}, taskIdIndex: {} }); + mockConvertStoredEntriesToEvents.mockImplementation(() => []); + mockTrpcAgent.onSessionEvent.subscribe.mockReturnValue({ + unsubscribe: vi.fn(), + }); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(undefined); + mockTrpcAgent.prompt.mutate.mockResolvedValue({ stopReason: "end_turn" }); + }); + + it("delivers an owed prompt on a fresh connect (create-new route) and clears it", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockAuthenticatedClient.createTaskRun.mockResolvedValue({ id: RUN_ID }); + mockTrpcAgent.start.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + + // No initialPrompt on the connect — the prompt must come from the store. + await service.connectToTask({ task: localTask(), repoPath: "/repo" }); + + expect(mockTrpcAgent.start.mutate).toHaveBeenCalled(); + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith( + expect.objectContaining({ sessionId: RUN_ID, prompt: OWED_PROMPT }), + ); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).toHaveBeenCalledWith(TASK_ID); + }); + + it("delivers an owed prompt when resuming a run stranded before delivery (resume-existing route)", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockTrpcWorkspace.verify.query.mockResolvedValue({ exists: true }); + mockTrpcAgent.reconnect.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + + await service.connectToTask({ + task: localTask({ + latest_run: { + id: RUN_ID, + task: TASK_ID, + team: 123, + environment: "local", + status: "queued", + log_url: "https://logs.example.com/run", + error_message: null, + output: null, + state: {}, + branch: null, + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + completed_at: null, + }, + }), + repoPath: "/repo", + }); + + expect(mockTrpcAgent.reconnect.mutate).toHaveBeenCalled(); + // The resumed log carries no prompt event (convert mock → []), so the owed + // prompt is delivered onto the resumed run. + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith( + expect.objectContaining({ sessionId: RUN_ID, prompt: OWED_PROMPT }), + ); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).toHaveBeenCalledWith(TASK_ID); + }); + + it("does not deliver when the resumed run already has a prompt event", async () => { + const service = getSessionService(); + mockPendingPromptStore.pendingPromptStore.get.mockReturnValue(owedRecord()); + mockTrpcWorkspace.verify.query.mockResolvedValue({ exists: true }); + mockTrpcAgent.reconnect.mutate.mockResolvedValue({ + channel: `agent-event:${RUN_ID}`, + configOptions: [], + }); + // Resumed log already contains a session/prompt request → nothing owed. + mockConvertStoredEntriesToEvents.mockReturnValue([ + { + type: "acp_message" as const, + ts: Date.now(), + message: { + jsonrpc: "2.0" as const, + id: 1, + method: "session/prompt", + params: {}, + }, + }, + ]); + + await service.connectToTask({ + task: localTask({ + latest_run: { + id: RUN_ID, + task: TASK_ID, + team: 123, + environment: "local", + status: "in_progress", + log_url: "https://logs.example.com/run", + error_message: null, + output: null, + state: {}, + branch: null, + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + completed_at: null, + }, + }), + repoPath: "/repo", + }); + + expect(mockTrpcAgent.prompt.mutate).not.toHaveBeenCalled(); + expect( + mockPendingPromptStore.pendingPromptStore.remove, + ).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/ui/src/features/sessions/sessionServiceHost.test.ts b/packages/ui/src/features/sessions/sessionServiceHost.test.ts index 480093f09..c94aed608 100644 --- a/packages/ui/src/features/sessions/sessionServiceHost.test.ts +++ b/packages/ui/src/features/sessions/sessionServiceHost.test.ts @@ -191,7 +191,6 @@ const mockPendingPromptStore = vi.hoisted(() => ({ save: vi.fn(), get: vi.fn(() => undefined), remove: vi.fn(), - list: vi.fn(() => []), }, }));