Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions packages/core/src/sessions/pendingPrompt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { ContentBlock } from "@agentclientprotocol/sdk";
import type { Adapter, ExecutionMode } from "@posthog/shared";

/**
* A durable, write-ahead record of a prompt the user is trying to start a
* local task run with.
*
* The prompt is the one thing in the start-a-task flow that the user cannot
* cheaply reproduce, yet today it only exists in memory until a session has
* fully initialized and `session/prompt` has been delivered. If session
* initialization throws or times out (common in large monorepos, where init
* can exceed the 30s `SESSION_VALIDATION_TIMEOUT_MS` budget), or the app is
* reloaded/quit/crashes during the retry window, the prompt is lost.
*
* To make that loss very unlikely we persist this record BEFORE any
* agent/session setup is attempted, and only clear it once the prompt has
* actually been delivered to the agent. (Persistence is async and
* best-effort, so it is not an absolute guarantee — a crash in the first
* moments of a cold start can still race the write.) A persisted record
* therefore means "this prompt is owed delivery and has not been delivered
* yet" — the basis for recovering it on the next connect, whether that connect
* starts a fresh run or resumes the stranded one.
*/
export interface PendingPromptRecord {
taskId: string;
taskTitle: string;
repoPath: string;
initialPrompt: ContentBlock[];
executionMode?: ExecutionMode;
adapter?: Adapter;
model?: string;
reasoningLevel?: string;
/** Epoch ms when the prompt was first written ahead. */
createdAt: number;
}

/**
* Durable storage for {@link PendingPromptRecord}s, keyed by `taskId` (one
* in-flight prompt per task — retries reuse the same key). Implementations
* must survive an app restart.
*/
export interface PendingPromptStore {
/** Write-ahead (or overwrite) the pending prompt for a task. */
save(record: PendingPromptRecord): void;
/** Get the pending prompt for a task, if one is owed delivery. */
get(taskId: string): PendingPromptRecord | undefined;
/** Clear the pending prompt for a task once delivered or abandoned. */
remove(taskId: string): void;
}
113 changes: 99 additions & 14 deletions packages/core/src/sessions/sessionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
OFFLINE_SESSION_MESSAGE,
routeLocalConnect,
} from "./connectRouting";
import type { PendingPromptStore } from "./pendingPrompt";
import {
type PermissionSelectionPlan,
planPermissionResponse,
Expand Down Expand Up @@ -252,6 +253,11 @@ export interface SessionServiceDeps {
setAdapter(taskRunId: string, adapter: Adapter): void;
removeAdapter(taskRunId: string): void;
};
/**
* Durable, write-ahead storage for the prompt a local task run is starting
* with, so session-init timeouts or app restarts can never lose it.
*/
pendingPrompts: PendingPromptStore;
readonly settings: { customInstructions?: string | null };
usageLimit: { show: (...args: any[]) => any };
readonly addDirectoryDialog: { open: boolean };
Expand Down Expand Up @@ -822,6 +828,21 @@ export class SessionService {
),
);
}

// A run stranded before its prompt was ever delivered (e.g. the
// original session-init timed out) routes here on the next connect
// rather than to createNewLocalSession, since it already has a run id
// and log. Its prompt is still owed: if the resumed log carries no
// prompt yet but we have a write-ahead record, deliver it now.
const owed = this.d.pendingPrompts.get(taskId);
if (owed?.initialPrompt.length && !hasSessionPromptEvent(events)) {
this.localRepoPaths.set(taskId, repoPath);
this.d.log.info("Recovered a written-ahead prompt on resume", {
taskId,
ageMs: Date.now() - owed.createdAt,
});
await this.deliverWrittenAheadPrompt(taskId, owed.initialPrompt);
}
return true;
} else {
this.d.log.warn("Reconnect returned null", { taskId, taskRunId });
Expand Down Expand Up @@ -1060,6 +1081,52 @@ export class SessionService {
model?: string,
reasoningLevel?: string,
): Promise<void> {
// Recover a previously written-ahead prompt when this connect didn't
// carry one itself: a persisted record means an earlier attempt for this
// task saved a prompt and never delivered it (init timed out, or the app
// was restarted before delivery). Adopt its prompt and the settings the
// user originally chose, unless the caller overrode them.
const recovered = initialPrompt?.length
? undefined
: this.d.pendingPrompts.get(taskId);
const effectivePrompt = initialPrompt?.length
? initialPrompt
Comment thread
pauldambra marked this conversation as resolved.
: recovered?.initialPrompt;
Comment thread
pauldambra marked this conversation as resolved.
const effectiveExecutionMode = executionMode ?? recovered?.executionMode;
const effectiveAdapter = adapter ?? recovered?.adapter;
const effectiveModel = model ?? recovered?.model;
const effectiveReasoningLevel = reasoningLevel ?? recovered?.reasoningLevel;

if (recovered) {
// Surface when the safety net actually catches something, so we can tell
// it's working (and gauge how long prompts sit undelivered).
this.d.log.info("Recovered a written-ahead prompt for a task", {
taskId,
ageMs: Date.now() - recovered.createdAt,
});
}

// Write-ahead the prompt BEFORE any session/agent work. This is the step
// that makes losing it very unlikely: everything below (run creation, the
// 30s-bounded agent.start, prompt delivery) can throw or be killed by an
// app restart, and the prompt still survives to be recovered on the next
// connect. Cleared only once it has actually been delivered. (Persistence
// is async and best-effort, so a crash in the first moments of a cold
// start can still race it — hence "very unlikely", not "guaranteed".)
if (effectivePrompt?.length) {
this.d.pendingPrompts.save({
taskId,
taskTitle,
repoPath,
initialPrompt: effectivePrompt,
executionMode: effectiveExecutionMode,
adapter: effectiveAdapter,
model: effectiveModel,
reasoningLevel: effectiveReasoningLevel,
createdAt: recovered?.createdAt ?? Date.now(),
});
}

const { client } = auth;
if (!client) {
throw new Error("Unable to reach server. Please check your connection.");
Expand All @@ -1071,26 +1138,26 @@ export class SessionService {
}

const { customInstructions: startCustomInstructions } = this.d.settings;
const preferredModel = model ?? this.d.DEFAULT_GATEWAY_MODEL;
const preferredModel = effectiveModel ?? this.d.DEFAULT_GATEWAY_MODEL;
const result = await this.d.trpc.agent.start.mutate({
taskId,
taskRunId: taskRun.id,
repoPath,
apiHost: auth.apiHost,
projectId: auth.projectId,
permissionMode: executionMode,
adapter,
permissionMode: effectiveExecutionMode,
adapter: effectiveAdapter,
customInstructions: startCustomInstructions || undefined,
effort: effortLevelSchema.safeParse(reasoningLevel).success
? (reasoningLevel as EffortLevel)
effort: effortLevelSchema.safeParse(effectiveReasoningLevel).success
? (effectiveReasoningLevel as EffortLevel)
: undefined,
model: preferredModel,
});

const session = createBaseSession(taskRun.id, taskId, taskTitle);
session.channel = result.channel;
session.status = "connected";
session.adapter = adapter;
session.adapter = effectiveAdapter;
const configOptions = result.configOptions as
| SessionConfigOption[]
| undefined;
Expand All @@ -1102,15 +1169,15 @@ export class SessionService {
}

// Persist the adapter
if (adapter) {
this.d.adapterStore.setAdapter(taskRun.id, adapter);
if (effectiveAdapter) {
this.d.adapterStore.setAdapter(taskRun.id, effectiveAdapter);
}

// Store the initial prompt on the session so retry/reset flows can
// re-send it if the session errors after this point (e.g. subscription
// error, agent crash, or prompt failure).
if (initialPrompt?.length) {
session.initialPrompt = initialPrompt;
if (effectivePrompt?.length) {
session.initialPrompt = effectivePrompt;
}

this.d.store.setSession(session);
Expand All @@ -1119,12 +1186,30 @@ export class SessionService {
this.d.track(ANALYTICS_EVENTS.TASK_RUN_STARTED, {
task_id: taskId,
execution_type: "local",
initial_mode: executionMode,
adapter,
initial_mode: effectiveExecutionMode,
adapter: effectiveAdapter,
});

if (initialPrompt?.length) {
await this.sendPrompt(taskId, initialPrompt);
if (effectivePrompt?.length) {
await this.deliverWrittenAheadPrompt(taskId, effectivePrompt);
}
}

/**
* Deliver a written-ahead prompt and clear its durable record only on
* confirmed delivery. `sendPrompt` can resolve WITHOUT delivering — "queued"
* (session busy) or "rate_limited" (usage limit swallowed by
* `sendLocalPrompt`) — and in those cases the write-ahead copy must survive
* so the prompt isn't lost. On real delivery the prompt now lives in the
* run's session log, so clearing the only-other-copy is safe.
*/
private async deliverWrittenAheadPrompt(
taskId: string,
prompt: ContentBlock[],
): Promise<void> {
const { stopReason } = await this.sendPrompt(taskId, prompt);
if (stopReason !== "queued" && stopReason !== "rate_limited") {
this.d.pendingPrompts.remove(taskId);
}
}

Expand Down
72 changes: 72 additions & 0 deletions packages/ui/src/features/sessions/pendingPromptStore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import type { ContentBlock } from "@agentclientprotocol/sdk";
import type { PendingPromptRecord } from "@posthog/core/sessions/pendingPrompt";
import { beforeEach, describe, expect, it } from "vitest";
import {
pendingPromptStore,
usePendingPromptStore,
} from "./pendingPromptStore";

function record(
taskId: string,
text: string,
overrides: Partial<PendingPromptRecord> = {},
): PendingPromptRecord {
const initialPrompt: ContentBlock[] = [{ type: "text", text }];
return {
taskId,
taskTitle: `Task ${taskId}`,
repoPath: "/repo",
initialPrompt,
createdAt: 1,
...overrides,
};
}

function storedTaskIds(): string[] {
return Object.keys(usePendingPromptStore.getState().promptsByTaskId).sort();
}

describe("pendingPromptStore", () => {
beforeEach(() => {
usePendingPromptStore.setState({ promptsByTaskId: {} });
});

it("saves and reads back a pending prompt by taskId", () => {
pendingPromptStore.save(record("t1", "do the thing"));

expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([
{ type: "text", text: "do the thing" },
]);
});

it("returns undefined when no prompt is owed", () => {
expect(pendingPromptStore.get("missing")).toBeUndefined();
});

it("overwrites the record for a task on a re-save (retry reuses the key)", () => {
pendingPromptStore.save(record("t1", "first"));
pendingPromptStore.save(record("t1", "second"));

expect(storedTaskIds()).toEqual(["t1"]);
expect(pendingPromptStore.get("t1")?.initialPrompt).toEqual([
{ type: "text", text: "second" },
]);
});

it("removes a delivered prompt and leaves others intact", () => {
pendingPromptStore.save(record("t1", "one"));
pendingPromptStore.save(record("t2", "two"));

pendingPromptStore.remove("t1");

expect(pendingPromptStore.get("t1")).toBeUndefined();
expect(pendingPromptStore.get("t2")).toBeDefined();
expect(storedTaskIds()).toEqual(["t2"]);
});

it("remove is a no-op for an unknown task", () => {
pendingPromptStore.save(record("t1", "one"));
pendingPromptStore.remove("nope");
expect(storedTaskIds()).toEqual(["t1"]);
});
});
60 changes: 60 additions & 0 deletions packages/ui/src/features/sessions/pendingPromptStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type {
PendingPromptRecord,
PendingPromptStore,
} from "@posthog/core/sessions/pendingPrompt";
import { electronStorage } from "@posthog/ui/shell/rendererStorage";
import { create } from "zustand";
import { persist } from "zustand/middleware";

interface PendingPromptState {
/** Map of taskId -> the prompt owed delivery for that task. */
promptsByTaskId: Record<string, PendingPromptRecord>;
}

interface PendingPromptActions {
savePrompt: (record: PendingPromptRecord) => void;
getPrompt: (taskId: string) => PendingPromptRecord | undefined;
removePrompt: (taskId: string) => void;
}

type PendingPromptStoreState = PendingPromptState & PendingPromptActions;

export const usePendingPromptStore = create<PendingPromptStoreState>()(
persist(
(set, get) => ({
promptsByTaskId: {},

savePrompt: (record) =>
set((state) => ({
promptsByTaskId: {
...state.promptsByTaskId,
[record.taskId]: record,
},
})),

getPrompt: (taskId) => get().promptsByTaskId[taskId],

removePrompt: (taskId) =>
Comment thread
pauldambra marked this conversation as resolved.
set((state) => {
if (!(taskId in state.promptsByTaskId)) return state;
const { [taskId]: _removed, ...rest } = state.promptsByTaskId;
return { promptsByTaskId: rest };
}),
}),
{
name: "pending-prompt-storage",
storage: electronStorage,
partialize: (state) => ({ promptsByTaskId: state.promptsByTaskId }),
},
),
);

/**
* Non-hook adapter implementing the core {@link PendingPromptStore} contract,
* wired into the session service dependencies.
*/
export const pendingPromptStore: PendingPromptStore = {
save: (record) => usePendingPromptStore.getState().savePrompt(record),
get: (taskId) => usePendingPromptStore.getState().getPrompt(taskId),
remove: (taskId) => usePendingPromptStore.getState().removePrompt(taskId),
};
Loading
Loading