diff --git a/apps/code/src/main/services/cloud-task/schemas.ts b/apps/code/src/main/services/cloud-task/schemas.ts index 69512afb7..ecf5e6985 100644 --- a/apps/code/src/main/services/cloud-task/schemas.ts +++ b/apps/code/src/main/services/cloud-task/schemas.ts @@ -56,6 +56,7 @@ export const sendCommandInput = z.object({ "close", "permission_response", "set_config_option", + "shell_execute", ]), params: z.record(z.string(), z.unknown()).optional(), }); diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index d1032cd15..476123b37 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -571,4 +571,152 @@ describe("CloudTaskService", () => { ]); expect(mockNetFetch).toHaveBeenCalledTimes(3); }); + + it("routes shell_output and shell_exit notifications to typed cloud updates without log batching", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + + const shellOutputFrame = { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/shell_output", + params: { + executionId: "exec-1", + stream: "stdout", + chunk: "vojta\n", + }, + }, + }; + const shellExitFrame = { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/shell_exit", + params: { executionId: "exec-1", exitCode: 0, signal: null }, + }, + }; + + mockStreamFetch.mockResolvedValueOnce( + createOpenSseResponse( + `id: 1\ndata: ${JSON.stringify(shellOutputFrame)}\n\n` + + `id: 2\ndata: ${JSON.stringify(shellExitFrame)}\n\n`, + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => + updates.some((u) => (u as { kind?: string }).kind === "shell_exit"), + ); + + const shellUpdates = updates.filter((u) => { + const kind = (u as { kind?: string }).kind; + return kind === "shell_output" || kind === "shell_exit"; + }); + + expect(shellUpdates).toEqual([ + { + taskId: "task-1", + runId: "run-1", + kind: "shell_output", + executionId: "exec-1", + stream: "stdout", + chunk: "vojta\n", + }, + { + taskId: "task-1", + runId: "run-1", + kind: "shell_exit", + executionId: "exec-1", + exitCode: 0, + signal: null, + }, + ]); + + // Shell notifications must not also be batched as log entries, otherwise + // the renderer re-processes the same chunks through the log conversion + // pipeline and the transcript gets noisy duplicates. + const logUpdates = updates.filter( + (u) => (u as { kind?: string }).kind === "logs", + ); + expect(logUpdates).toEqual([]); + }); + + it("ignores shell_output with invalid params", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + + // Missing executionId — extractor should reject and the event falls + // through to normal log batching. + const malformedFrame = { + type: "notification", + timestamp: "2026-01-01T00:00:01Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/shell_output", + params: { stream: "stdout", chunk: "orphan\n" }, + }, + }; + + mockStreamFetch.mockResolvedValueOnce( + createOpenSseResponse( + `id: 1\ndata: ${JSON.stringify(malformedFrame)}\n\n`, + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => + updates.some((u) => (u as { kind?: string }).kind === "logs"), + ); + + expect( + updates.some((u) => (u as { kind?: string }).kind === "shell_output"), + ).toBe(false); + }); }); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 01b9f2326..3fe77ebc2 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -1,3 +1,4 @@ +import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import type { CloudTaskPermissionRequestUpdate } from "@shared/types"; import type { StoredLogEntry } from "@shared/types/session-events"; import { net } from "electron"; @@ -141,6 +142,78 @@ function isPermissionRequestEvent( ); } +interface ShellOutputNotification { + executionId: string; + stream: "stdout" | "stderr"; + chunk: string; +} + +interface ShellExitNotification { + executionId: string; + exitCode: number | null; + signal: string | null; +} + +function extractShellNotification( + data: unknown, +): + | { kind: "output"; params: ShellOutputNotification } + | { kind: "exit"; params: ShellExitNotification } + | null { + if (typeof data !== "object" || data === null) return null; + const d = data as Record; + if (d.type !== "notification") return null; + const notification = d.notification as + | { method?: unknown; params?: unknown } + | undefined; + const method = notification?.method; + if ( + method !== POSTHOG_NOTIFICATIONS.SHELL_OUTPUT && + method !== POSTHOG_NOTIFICATIONS.SHELL_EXIT + ) { + return null; + } + const params = notification?.params as Record | undefined; + if (!params || typeof params !== "object") return null; + + if (method === POSTHOG_NOTIFICATIONS.SHELL_OUTPUT) { + const executionId = params.executionId; + const stream = params.stream; + const chunk = params.chunk; + if ( + typeof executionId === "string" && + (stream === "stdout" || stream === "stderr") && + typeof chunk === "string" + ) { + return { kind: "output", params: { executionId, stream, chunk } }; + } + return null; + } + + if (method === POSTHOG_NOTIFICATIONS.SHELL_EXIT) { + const executionId = params.executionId; + const exitCode = params.exitCode; + const signal = params.signal; + if ( + typeof executionId === "string" && + (exitCode === null || typeof exitCode === "number") && + (signal === null || typeof signal === "string") + ) { + return { + kind: "exit", + params: { + executionId, + exitCode: exitCode as number | null, + signal: signal as string | null, + }, + }; + } + return null; + } + + return null; +} + function createStreamStatusError(status: number): CloudTaskStreamError { switch (status) { case 401: @@ -713,6 +786,34 @@ export class CloudTaskService extends TypedEventEmitter { return; } + const shellNotification = extractShellNotification(event.data); + if (shellNotification) { + log.info("Shell notification received", { + kind: shellNotification.kind, + executionId: shellNotification.params.executionId, + }); + if (shellNotification.kind === "output") { + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "shell_output" as const, + executionId: shellNotification.params.executionId, + stream: shellNotification.params.stream, + chunk: shellNotification.params.chunk, + }); + } else { + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "shell_exit" as const, + executionId: shellNotification.params.executionId, + exitCode: shellNotification.params.exitCode, + signal: shellNotification.params.signal, + }); + } + return; + } + watcher.pendingLogEntries.push(event.data as StoredLogEntry); if (watcher.pendingLogEntries.length >= EVENT_BATCH_MAX_SIZE) { this.flushLogBatch(key); diff --git a/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx b/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx index e518b2487..598fbfc80 100644 --- a/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx +++ b/apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx @@ -31,6 +31,7 @@ export function CommandCenterSessionView({ promptStartedAt, isInitializing, cloudBranch, + canBash, errorTitle, errorMessage, } = useSessionViewState(taskId, task); @@ -58,7 +59,7 @@ export function CommandCenterSessionView({ isPromptPending={isPromptPending} promptStartedAt={promptStartedAt} onSendPrompt={handleSendPrompt} - onBashCommand={isCloud ? undefined : handleBashCommand} + onBashCommand={canBash ? handleBashCommand : undefined} onCancelPrompt={handleCancelPrompt} repoPath={repoPath} cloudBranch={cloudBranch} diff --git a/apps/code/src/renderer/features/message-editor/components/MessageEditor.tsx b/apps/code/src/renderer/features/message-editor/components/MessageEditor.tsx index a6a9a7634..994dab6ce 100644 --- a/apps/code/src/renderer/features/message-editor/components/MessageEditor.tsx +++ b/apps/code/src/renderer/features/message-editor/components/MessageEditor.tsx @@ -182,7 +182,7 @@ export const MessageEditor = forwardRef( autoFocus, context: { taskId, repoPath }, getPromptHistory, - capabilities: { bashMode: !isCloud }, + capabilities: { bashMode: !!onBashCommand }, onSubmit, onBashCommand, onBashModeChange, diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts index 167aa6ab0..cfe325c6a 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts @@ -112,26 +112,34 @@ export function useSessionCallbacks({ const handleBashCommand = useCallback( async (command: string) => { - if (!repoPath) return; + const currentSession = sessionRef.current; + const isCloud = currentSession?.isCloud === true; + + if (isCloud && !currentSession?.sandboxEnvironmentId) return; + if (!isCloud && !repoPath) return; const execId = `user-shell-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; + const displayCwd = repoPath ?? "cloud-sandbox"; + await getSessionService().startUserShellExecute( taskId, execId, command, - repoPath, + displayCwd, ); try { - const result = await trpcClient.shell.execute.mutate({ - cwd: repoPath, - command, - }); + const result = isCloud + ? await getSessionService().executeCloudShellCommand(taskId, command) + : await trpcClient.shell.execute.mutate({ + cwd: repoPath as string, + command, + }); await getSessionService().completeUserShellExecute( taskId, execId, command, - repoPath, + displayCwd, result, ); } catch (error) { @@ -140,7 +148,7 @@ export function useSessionCallbacks({ taskId, execId, command, - repoPath, + displayCwd, { stdout: "", stderr: error instanceof Error ? error.message : "Command failed", diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index 1a73f33b3..a4e379c56 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -78,6 +78,10 @@ export function useSessionConnection({ : undefined; const adapter = task.latest_run.runtime_adapter === "codex" ? "codex" : "claude"; + const sandboxEnvId = + typeof task.latest_run.state?.sandbox_environment_id === "string" + ? task.latest_run.state.sandbox_environment_id + : undefined; const cleanup = getSessionService().watchCloudTask( task.id, runId, @@ -89,6 +93,7 @@ export function useSessionConnection({ task.latest_run?.log_url, initialMode, adapter, + sandboxEnvId, ); return cleanup; }, [ @@ -103,6 +108,7 @@ export function useSessionConnection({ task.latest_run?.log_url, task.latest_run?.runtime_adapter, task.latest_run?.state?.initial_permission_mode, + task.latest_run?.state?.sandbox_environment_id, ]); useEffect(() => { diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts index 49302ecb4..a2336a711 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts @@ -41,6 +41,8 @@ export function useSessionViewState(taskId: string, task: Task) { ? (workspace?.baseBranch ?? task.latest_run?.branch ?? null) : null; + const canBash = !isCloud || !!session?.sandboxEnvironmentId; + return { session, repoPath, @@ -55,6 +57,7 @@ export function useSessionViewState(taskId: string, task: Task) { promptStartedAt, isInitializing, cloudBranch, + canBash, errorTitle: session?.errorTitle, errorMessage: session?.errorMessage ?? diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 7682d60a8..ed2f0519a 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1518,4 +1518,292 @@ describe("SessionService", () => { vi.useRealTimers(); }); }); + + describe("executeCloudShellCommand", () => { + function createOnDataCapturer() { + let capturedOnData: + | ((update: Record) => void) + | undefined; + mockTrpcCloudTask.onUpdate.subscribe.mockImplementation( + (_input: unknown, callbacks: { onData: (u: unknown) => void }) => { + capturedOnData = callbacks.onData as typeof capturedOnData; + return { unsubscribe: vi.fn() }; + }, + ); + return () => capturedOnData; + } + + it("throws when no active session exists", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(undefined); + + await expect( + service.executeCloudShellCommand("task-123", "echo vojta"), + ).rejects.toThrow("No active session for task"); + }); + + it("registers the pending listener before the RPC round-trip resolves", async () => { + const service = getSessionService(); + const getOnData = createOnDataCapturer(); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ isCloud: true }), + ); + + let resolveSendCommand: + | ((value: { success: boolean; result?: unknown }) => void) + | undefined; + mockTrpcCloudTask.sendCommand.mutate.mockImplementation( + () => + new Promise((resolve) => { + resolveSendCommand = resolve; + }), + ); + + const execPromise = service.executeCloudShellCommand( + "task-123", + "echo vojta", + ); + + await vi.waitFor(() => + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1), + ); + const callArgs = mockTrpcCloudTask.sendCommand.mutate.mock + .calls[0][0] as { + method: string; + params: { command: string; executionId: string }; + }; + expect(callArgs.method).toBe("shell_execute"); + const executionId = callArgs.params.executionId; + expect(executionId).toMatch(/.+/); + + // Deliver the shell_exit *before* the sendCommand RPC resolves — + // this mimics the real-world race where `echo` finishes faster than + // the HTTP round-trip. The pending listener must already be + // registered for the exit event to land. + const onData = getOnData(); + expect(onData).toBeDefined(); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_output", + executionId, + stream: "stdout", + chunk: "vojta\n", + }); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_exit", + executionId, + exitCode: 0, + signal: null, + }); + + resolveSendCommand?.({ success: true, result: { executionId } }); + + await expect(execPromise).resolves.toEqual({ + stdout: "vojta\n", + stderr: "", + exitCode: 0, + }); + }); + + it("accumulates multiple stdout and stderr chunks across shell_output events", async () => { + const service = getSessionService(); + const getOnData = createOnDataCapturer(); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ isCloud: true }), + ); + mockTrpcCloudTask.sendCommand.mutate.mockImplementation( + async (input: { params: { executionId: string } }) => ({ + success: true, + result: { executionId: input.params.executionId }, + }), + ); + + const execPromise = service.executeCloudShellCommand("task-123", "pipe"); + await vi.waitFor(() => + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1), + ); + const executionId = ( + mockTrpcCloudTask.sendCommand.mutate.mock.calls[0][0] as { + params: { executionId: string }; + } + ).params.executionId; + + const onData = getOnData(); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_output", + executionId, + stream: "stdout", + chunk: "line 1\n", + }); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_output", + executionId, + stream: "stderr", + chunk: "warning\n", + }); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_output", + executionId, + stream: "stdout", + chunk: "line 2\n", + }); + onData?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_exit", + executionId, + exitCode: 2, + signal: null, + }); + + await expect(execPromise).resolves.toEqual({ + stdout: "line 1\nline 2\n", + stderr: "warning\n", + exitCode: 2, + }); + }); + + it("treats shell_exit with only a signal as a failure", async () => { + const service = getSessionService(); + const getOnData = createOnDataCapturer(); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ isCloud: true }), + ); + mockTrpcCloudTask.sendCommand.mutate.mockImplementation( + async (input: { params: { executionId: string } }) => ({ + success: true, + result: { executionId: input.params.executionId }, + }), + ); + + const execPromise = service.executeCloudShellCommand( + "task-123", + "sleep 1", + ); + await vi.waitFor(() => + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1), + ); + const executionId = ( + mockTrpcCloudTask.sendCommand.mutate.mock.calls[0][0] as { + params: { executionId: string }; + } + ).params.executionId; + + getOnData()?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_exit", + executionId, + exitCode: null, + signal: "SIGTERM", + }); + + await expect(execPromise).resolves.toEqual({ + stdout: "", + stderr: "", + exitCode: 143, // 128 + SIGTERM(15) + }); + }); + + it("rejects the pending promise if sendCommand fails", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ isCloud: true }), + ); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: false, + error: "Backend exploded", + }); + + await expect( + service.executeCloudShellCommand("task-123", "echo hi"), + ).rejects.toThrow("Backend exploded"); + }); + + it("resolves hanging shell executions when the session is torn down", async () => { + const service = getSessionService(); + const getOnData = createOnDataCapturer(); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + const session = createMockSession({ isCloud: true }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": session, + }); + mockTrpcCloudTask.sendCommand.mutate.mockImplementation( + async (input: { params: { executionId: string } }) => ({ + success: true, + result: { executionId: input.params.executionId }, + }), + ); + mockTrpcAgent.cancel.mutate.mockResolvedValue(undefined); + + const execPromise = service.executeCloudShellCommand( + "task-123", + "sleep 60", + ); + await vi.waitFor(() => + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1), + ); + const executionId = ( + mockTrpcCloudTask.sendCommand.mutate.mock.calls[0][0] as { + params: { executionId: string }; + } + ).params.executionId; + + // Some stdout arrived before teardown — should still flush into the + // final result so the transcript has partial output. + getOnData()?.({ + taskId: "task-123", + runId: "run-123", + kind: "shell_output", + executionId, + stream: "stdout", + chunk: "partial\n", + }); + + await service.disconnectFromTask("task-123"); + + await expect(execPromise).resolves.toEqual({ + stdout: "partial\n", + stderr: "Session ended", + exitCode: -1, + }); + }); + }); }); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 9ffc673f0..f177712c9 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -82,6 +82,25 @@ const LOCAL_SESSION_RECONNECT_BACKOFF = { initialDelayMs: 1_000, maxDelayMs: 5_000, }; + +/** Maps a Unix signal name to exit code using the 128 + N convention. */ +const SIGNAL_NUMBERS: Record = { + SIGHUP: 1, + SIGINT: 2, + SIGQUIT: 3, + SIGABRT: 6, + SIGKILL: 9, + SIGPIPE: 13, + SIGALRM: 14, + SIGTERM: 15, +}; + +function signalToExitCode(signal: string | null): number { + if (!signal) return 0; + const num = SIGNAL_NUMBERS[signal]; + return num ? 128 + num : 128; +} + const LOCAL_SESSION_RECOVERY_MESSAGE = "Lost connection to the agent. Reconnecting…"; const LOCAL_SESSION_RECOVERY_FAILED_MESSAGE = @@ -185,6 +204,22 @@ export class SessionService { >(); /** Maps toolCallId → cloud requestId for routing permission responses */ private cloudPermissionRequestIds = new Map(); + private static readonly MAX_PENDING_CLOUD_SHELLS = 5; + private pendingCloudShells = new Map< + string, + { + taskRunId: string; + stdout: string; + stderr: string; + resolve: (result: { + stdout: string; + stderr: string; + exitCode: number; + }) => void; + } + >(); + /** Sandbox environment IDs set during task creation, keyed by taskId. */ + private taskSandboxEnvironments = new Map(); private idleKilledSubscription: { unsubscribe: () => void } | null = null; constructor() { @@ -555,6 +590,7 @@ export class SessionService { } this.unsubscribeFromChannel(taskRunId); + this.rejectPendingCloudShells(taskRunId, "Session ended"); sessionStoreSetters.removeSession(taskRunId); if (session) { this.localRepoPaths.delete(session.taskId); @@ -564,6 +600,18 @@ export class SessionService { removePersistedConfigOptions(taskRunId); } + private rejectPendingCloudShells(taskRunId: string, reason: string): void { + for (const [executionId, pending] of this.pendingCloudShells) { + if (pending.taskRunId !== taskRunId) continue; + this.pendingCloudShells.delete(executionId); + pending.resolve({ + stdout: pending.stdout, + stderr: pending.stderr || reason, + exitCode: -1, + }); + } + } + /** * Handle an idle-kill from the main process without destroying session state. * The main process already cleaned up the agent, so we only need to @@ -1707,6 +1755,7 @@ export class SessionService { newRun.log_url, initialMode, newRun.runtime_adapter ?? session.adapter ?? "claude", + session.sandboxEnvironmentId, ); // Invalidate task queries so the UI picks up the new run metadata @@ -1789,14 +1838,14 @@ export class SessionService { */ private async sendCloudCommand( session: AgentSession, - method: "permission_response" | "set_config_option", + method: "permission_response" | "set_config_option" | "shell_execute", params: Record, - ): Promise { + ) { const auth = await this.getCloudCommandAuth(); if (!auth) { throw new Error("No cloud auth credentials available"); } - await trpcClient.cloudTask.sendCommand.mutate({ + return trpcClient.cloudTask.sendCommand.mutate({ taskId: session.taskId, runId: session.taskRunId, apiHost: auth.apiHost, @@ -2046,6 +2095,66 @@ export class SessionService { await this.setSessionConfigOption(taskId, configOption.id, value); } + /** + * Records a sandbox environment ID for a task. Called during task creation + * so that `watchCloudTask` can propagate it to the session. + */ + setSandboxEnvironmentId(taskId: string, sandboxEnvironmentId: string): void { + this.taskSandboxEnvironments.set(taskId, sandboxEnvironmentId); + } + + async executeCloudShellCommand( + taskId: string, + command: string, + ): Promise<{ stdout: string; stderr: string; exitCode: number }> { + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session) { + throw new Error("No active session for task"); + } + + if ( + this.pendingCloudShells.size >= SessionService.MAX_PENDING_CLOUD_SHELLS + ) { + throw new Error( + `Too many concurrent shell executions (max ${SessionService.MAX_PENDING_CLOUD_SHELLS})`, + ); + } + + // Pre-register the pending listener before the RPC round-trip — fast + // commands (`echo`) emit `shell_exit` on SSE before the HTTP response + // returns, so the exit event would otherwise have nowhere to land. + const executionId = crypto.randomUUID(); + log.info("Starting cloud shell execute", { taskId, executionId, command }); + const resultPromise = new Promise<{ + stdout: string; + stderr: string; + exitCode: number; + }>((resolve) => { + this.pendingCloudShells.set(executionId, { + taskRunId: session.taskRunId, + stdout: "", + stderr: "", + resolve, + }); + }); + + try { + const response = await this.sendCloudCommand(session, "shell_execute", { + command, + executionId, + }); + if (!response.success) { + this.pendingCloudShells.delete(executionId); + throw new Error(response.error ?? "Failed to start shell execution"); + } + } catch (err) { + this.pendingCloudShells.delete(executionId); + throw err; + } + + return resultPromise; + } + /** * Start a user shell execute event (shows command as running). * Call completeUserShellExecute with the same id when the command finishes. @@ -2235,6 +2344,7 @@ export class SessionService { logUrl?: string, initialMode?: string, adapter: Adapter = "claude", + sandboxEnvironmentId?: string, ): () => void { const taskRunId = runId; const startToken = ++this.nextCloudTaskWatchToken; @@ -2299,6 +2409,8 @@ export class SessionService { session.status = "disconnected"; session.isCloud = true; session.adapter = adapter; + session.sandboxEnvironmentId = + sandboxEnvironmentId ?? this.taskSandboxEnvironments.get(taskId); session.configOptions = buildCloudDefaultConfigOptions( initialMode, adapter, @@ -2518,6 +2630,43 @@ export class SessionService { return; } + if (update.kind === "shell_output") { + const pending = this.pendingCloudShells.get(update.executionId); + log.info("Cloud shell output", { + executionId: update.executionId, + stream: update.stream, + chunkLen: update.chunk.length, + hasPending: !!pending, + }); + if (pending) { + if (update.stream === "stdout") { + pending.stdout += update.chunk; + } else { + pending.stderr += update.chunk; + } + } + return; + } + + if (update.kind === "shell_exit") { + const pending = this.pendingCloudShells.get(update.executionId); + log.info("Cloud shell exit", { + executionId: update.executionId, + exitCode: update.exitCode, + signal: update.signal, + hasPending: !!pending, + }); + if (pending) { + this.pendingCloudShells.delete(update.executionId); + pending.resolve({ + stdout: pending.stdout, + stderr: pending.stderr, + exitCode: update.exitCode ?? signalToExitCode(update.signal), + }); + } + return; + } + // Append new log entries with dedup guard if ( (update.kind === "logs" || update.kind === "snapshot") && diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 59610e4a7..4011aea10 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -57,6 +57,8 @@ export interface AgentSession { messageQueue: QueuedMessage[]; /** Whether this session is for a cloud run */ isCloud?: boolean; + /** Sandbox environment ID attached to this cloud run (enables bash execution) */ + sandboxEnvironmentId?: string; /** Cloud task run status (only set for cloud sessions) */ cloudStatus?: TaskRunStatus; /** Cloud task current stage */ diff --git a/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx b/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx index 965df1b36..1743d4d55 100644 --- a/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx +++ b/apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx @@ -55,6 +55,7 @@ export function TaskLogsPanel({ taskId, task, hideInput }: TaskLogsPanelProps) { promptStartedAt, isInitializing, cloudBranch, + canBash, errorTitle, errorMessage, } = useSessionViewState(taskId, task); @@ -127,7 +128,7 @@ export function TaskLogsPanel({ taskId, task, hideInput }: TaskLogsPanelProps) { isPromptPending={isPromptPending} promptStartedAt={promptStartedAt} onSendPrompt={handleSendPrompt} - onBashCommand={isCloud ? undefined : handleBashCommand} + onBashCommand={canBash ? handleBashCommand : undefined} onCancelPrompt={handleCancelPrompt} repoPath={repoPath} cloudBranch={cloudBranch} diff --git a/apps/code/src/renderer/sagas/task/task-creation.ts b/apps/code/src/renderer/sagas/task/task-creation.ts index 8d4f9003f..4695ea4e5 100644 --- a/apps/code/src/renderer/sagas/task/task-creation.ts +++ b/apps/code/src/renderer/sagas/task/task-creation.ts @@ -318,6 +318,13 @@ export class TaskCreationSaga extends Saga< }, }); + if (input.sandboxEnvironmentId) { + getSessionService().setSandboxEnvironmentId( + task.id, + input.sandboxEnvironmentId, + ); + } + if (!hasProvisioning && this.deps.onTaskReady) { this.deps.onTaskReady({ task, workspace }); } diff --git a/apps/code/src/shared/types.ts b/apps/code/src/shared/types.ts index ecd9077a6..529bd397c 100644 --- a/apps/code/src/shared/types.ts +++ b/apps/code/src/shared/types.ts @@ -171,12 +171,28 @@ export interface CloudTaskPermissionRequestUpdate extends CloudTaskUpdateBase { options: CloudPermissionOption[]; } +export interface CloudTaskShellOutputUpdate extends CloudTaskUpdateBase { + kind: "shell_output"; + executionId: string; + stream: "stdout" | "stderr"; + chunk: string; +} + +export interface CloudTaskShellExitUpdate extends CloudTaskUpdateBase { + kind: "shell_exit"; + executionId: string; + exitCode: number | null; + signal: string | null; +} + export type CloudTaskUpdatePayload = | CloudTaskLogsUpdate | CloudTaskStatusUpdate | CloudTaskSnapshotUpdate | CloudTaskErrorUpdate - | CloudTaskPermissionRequestUpdate; + | CloudTaskPermissionRequestUpdate + | CloudTaskShellOutputUpdate + | CloudTaskShellExitUpdate; // Mention types for editors type MentionType = diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 3cfeba297..484d7fd19 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -66,6 +66,12 @@ export const POSTHOG_NOTIFICATIONS = { /** Response to a relayed permission request (plan approval, question) */ PERMISSION_RESPONSE: "_posthog/permission_response", + + /** Streamed stdout/stderr chunk from an inline `!command` execution */ + SHELL_OUTPUT: "_posthog/shell_output", + + /** Terminal event for an inline `!command` execution (exit code + signal) */ + SHELL_EXIT: "_posthog/shell_exit", } as const; type NotificationMethod = diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index e873336bb..957c6a32b 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -1,3 +1,4 @@ +import { type ChildProcess, spawn } from "node:child_process"; import type { ContentBlock, RequestPermissionRequest, @@ -209,6 +210,10 @@ export class AgentServer { }) => void; } >(); + private shellExecutions = new Map(); + private static readonly SHELL_DEFAULT_TIMEOUT_MS = 60_000; + private static readonly SHELL_MAX_TIMEOUT_MS = 600_000; + private static readonly MAX_CONCURRENT_SHELLS = 5; private detachSseController(controller: SseController): void { if (this.session?.sseController === controller) { @@ -644,6 +649,26 @@ export class AgentServer { }; } + case "posthog/shell_execute": + case "shell_execute": { + const command = params.command as string; + const cwd = (params.cwd as string | undefined) ?? this.getShellCwd(); + const requestedTimeout = params.timeoutMs as number | undefined; + const timeoutMs = Math.min( + requestedTimeout && requestedTimeout > 0 + ? requestedTimeout + : AgentServer.SHELL_DEFAULT_TIMEOUT_MS, + AgentServer.SHELL_MAX_TIMEOUT_MS, + ); + const executionId = params.executionId as string | undefined; + return this.startShellExecution({ + command, + cwd, + timeoutMs, + executionId, + }); + } + case POSTHOG_NOTIFICATIONS.PERMISSION_RESPONSE: case "permission_response": { const requestId = params.requestId as string; @@ -1912,6 +1937,8 @@ ${attributionInstructions} } this.pendingPermissions.clear(); + this.killAllShellExecutions(); + try { await this.session.acpConnection.cleanup(); } catch (error) { @@ -1980,6 +2007,119 @@ ${attributionInstructions} }); } + private getShellCwd(): string { + return this.config.repositoryPath ?? "/tmp/workspace"; + } + + private emitShellNotification( + method: + | typeof POSTHOG_NOTIFICATIONS.SHELL_OUTPUT + | typeof POSTHOG_NOTIFICATIONS.SHELL_EXIT, + params: Record, + ): void { + if (!this.session) return; + const notification = { + jsonrpc: "2.0" as const, + method, + params, + }; + this.broadcastEvent({ + type: "notification", + timestamp: new Date().toISOString(), + notification, + }); + this.session.logWriter.appendRawLine( + this.session.payload.run_id, + JSON.stringify(notification), + ); + } + + private startShellExecution(params: { + command: string; + cwd: string; + timeoutMs: number; + executionId?: string; + }): { executionId: string } { + if (this.shellExecutions.size >= AgentServer.MAX_CONCURRENT_SHELLS) { + throw new Error( + `Too many concurrent shell executions (max ${AgentServer.MAX_CONCURRENT_SHELLS})`, + ); + } + + const executionId = params.executionId ?? crypto.randomUUID(); + const { command, cwd, timeoutMs } = params; + + this.logger.info("Starting shell execution", { + executionId, + cwd, + command, + }); + + const child = spawn("bash", ["-c", command], { + cwd, + env: { PATH: process.env.PATH, HOME: process.env.HOME }, + stdio: ["ignore", "pipe", "pipe"], + }); + + this.shellExecutions.set(executionId, child); + + const timeout = setTimeout(() => { + if (!child.killed) { + this.logger.warn("Shell execution timed out", { + executionId, + timeoutMs, + }); + child.kill("SIGTERM"); + } + }, timeoutMs); + + const emitOutput = (stream: "stdout" | "stderr", chunk: string): void => { + this.emitShellNotification(POSTHOG_NOTIFICATIONS.SHELL_OUTPUT, { + executionId, + stream, + chunk, + }); + }; + + child.stdout?.on("data", (data: Buffer) => + emitOutput("stdout", data.toString("utf8")), + ); + child.stderr?.on("data", (data: Buffer) => + emitOutput("stderr", data.toString("utf8")), + ); + + child.on("error", (err) => { + this.logger.error("Shell execution error", { executionId, error: err }); + emitOutput("stderr", `${err.message}\n`); + }); + + child.on("close", (code, signal) => { + clearTimeout(timeout); + this.shellExecutions.delete(executionId); + this.logger.info("Shell execution finished", { + executionId, + exitCode: code, + signal, + }); + this.emitShellNotification(POSTHOG_NOTIFICATIONS.SHELL_EXIT, { + executionId, + exitCode: code, + signal: signal ?? null, + }); + }); + + return { executionId }; + } + + private killAllShellExecutions(): void { + for (const [, child] of this.shellExecutions) { + if (!child.killed) { + child.kill("SIGTERM"); + } + } + this.shellExecutions.clear(); + } + private broadcastEvent(event: Record): void { if (this.session?.sseController) { this.sendSseEvent(this.session.sseController, event); diff --git a/packages/agent/src/server/schemas.test.ts b/packages/agent/src/server/schemas.test.ts index af47fdaa1..87054494a 100644 --- a/packages/agent/src/server/schemas.test.ts +++ b/packages/agent/src/server/schemas.test.ts @@ -184,4 +184,61 @@ describe("validateCommandParams", () => { expect(result.success).toBe(false); }); + + it("accepts valid shell_execute", () => { + const result = validateCommandParams("shell_execute", { + command: "echo vojta", + }); + + expect(result.success).toBe(true); + }); + + it("accepts shell_execute with cwd and timeoutMs", () => { + const result = validateCommandParams("shell_execute", { + command: "ls", + cwd: "/workspace", + timeoutMs: 5000, + }); + + expect(result.success).toBe(true); + }); + + it("accepts shell_execute with caller-provided executionId", () => { + const result = validateCommandParams("shell_execute", { + command: "echo hi", + executionId: "abc-123", + }); + + expect(result.success).toBe(true); + }); + + it("rejects shell_execute with an empty executionId", () => { + const result = validateCommandParams("shell_execute", { + command: "echo hi", + executionId: "", + }); + + expect(result.success).toBe(false); + }); + + it("rejects shell_execute without command", () => { + const result = validateCommandParams("shell_execute", {}); + + expect(result.success).toBe(false); + }); + + it("rejects shell_execute with empty command", () => { + const result = validateCommandParams("shell_execute", { command: "" }); + + expect(result.success).toBe(false); + }); + + it("rejects shell_execute with timeoutMs above the cap", () => { + const result = validateCommandParams("shell_execute", { + command: "echo hi", + timeoutMs: 600_001, + }); + + expect(result.success).toBe(false); + }); }); diff --git a/packages/agent/src/server/schemas.ts b/packages/agent/src/server/schemas.ts index 30e7a3633..343948889 100644 --- a/packages/agent/src/server/schemas.ts +++ b/packages/agent/src/server/schemas.ts @@ -60,6 +60,19 @@ export const setConfigOptionParamsSchema = z.object({ value: z.string().min(1, "value is required"), }); +export const shellExecuteParamsSchema = z.object({ + command: z.string().min(1, "command is required"), + cwd: z.string().optional(), + timeoutMs: z.number().int().positive().max(600_000).optional(), + /** + * Caller-provided identifier used to tag streamed output events. Lets the + * renderer register a pending listener before the HTTP response returns — + * otherwise fast commands (`echo`) emit `shell_exit` faster than the RPC + * round-trip and the exit notification is lost. + */ + executionId: z.string().min(1).max(128).optional(), +}); + export const commandParamsSchemas = { user_message: userMessageParamsSchema, "posthog/user_message": userMessageParamsSchema, @@ -71,6 +84,8 @@ export const commandParamsSchemas = { "posthog/permission_response": permissionResponseParamsSchema, set_config_option: setConfigOptionParamsSchema, "posthog/set_config_option": setConfigOptionParamsSchema, + shell_execute: shellExecuteParamsSchema, + "posthog/shell_execute": shellExecuteParamsSchema, } as const; export type CommandMethod = keyof typeof commandParamsSchemas;