diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 4bdd21f9b..76229e9d6 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -531,6 +531,72 @@ describe("CloudTaskService", () => { ]); }); + it("stops watching after clean stream completion even when the run remains active", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + const prUrl = "https://github.com/PostHog/code/pull/123"; + let statusFetchCount = 0; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const createInProgressRun = (output: Record | null) => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output, + error_message: null, + branch: "main", + updated_at: output ? "2026-01-01T00:00:01Z" : "2026-01-01T00:00:00Z", + }); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + + statusFetchCount += 1; + return Promise.resolve( + createInProgressRun(statusFetchCount === 1 ? null : { pr_url: prUrl }), + ); + }); + + mockStreamFetch.mockResolvedValueOnce(createSseResponse("")); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await waitFor( + () => + !( + service as unknown as { + watchers: Map; + } + ).watchers.has("task-1:run-1"), + ); + + expect(updates).toContainEqual( + expect.objectContaining({ + taskId: "task-1", + runId: "run-1", + status: "in_progress", + output: { pr_url: prUrl }, + }), + ); + + await vi.advanceTimersByTimeAsync(70_000); + + expect(mockStreamFetch).toHaveBeenCalledTimes(1); + }); + it("emits a retryable cloud error after repeated stream failures", async () => { vi.useFakeTimers(); @@ -558,7 +624,9 @@ describe("CloudTaskService", () => { mockStreamFetch.mockImplementation(() => Promise.resolve( - createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + createSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), ), ); @@ -597,6 +665,93 @@ describe("CloudTaskService", () => { }); }); + const guardedFetchStatusExpectations = [ + [ + 401, + { + errorTitle: "Cloud authentication expired", + errorMessage: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + }, + ], + [ + 403, + { + errorTitle: "Cloud access denied", + errorMessage: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + }, + ], + [ + 404, + { + errorTitle: "Cloud run not found", + errorMessage: + "This cloud run could not be found. It may have been deleted or moved.", + retryable: false, + }, + ], + ] as const; + + const guardedFetchStatusCases = ( + ["status fetch", "persisted log fetch"] as const + ).flatMap((fetchPhase) => + guardedFetchStatusExpectations.map(([status, expectedError]) => ({ + fetchPhase, + status, + expectedError, + })), + ); + + it.each(guardedFetchStatusCases)( + "fails the watcher when $fetchPhase returns $status", + async ({ fetchPhase, status, expectedError }) => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + if (fetchPhase === "status fetch") { + mockNetFetch.mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } else { + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:01Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length === 1); + + expect(mockStreamFetch).not.toHaveBeenCalled(); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + ...expectedError, + }); + }, + ); + it("loads paginated persisted logs once for an already terminal run", async () => { const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index bfdb942fa..094bba43d 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -204,6 +204,10 @@ function createStreamStatusError(status: number): CloudTaskStreamError { } } +function shouldFailWatcherForFetchStatus(status: number): boolean { + return status === 401 || status === 403 || status === 404; +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -444,6 +448,7 @@ export class CloudTaskService extends TypedEventEmitter { const run = await this.fetchTaskRun(watcher); const currentWatcher = this.watchers.get(key); if (!currentWatcher || currentWatcher !== watcher) return; + if (watcher.failed) return; if (!run) { this.failWatcher(key, { @@ -650,7 +655,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - await this.handleStreamCompletion(key); + await this.handleStreamCompletion(key, { reconnectIfNonTerminal: false }); } catch (error) { this.flushLogBatch(key); @@ -672,7 +677,7 @@ export class CloudTaskService extends TypedEventEmitter { key, error: errorMessage, }); - await this.handleStreamCompletion(key); + await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); } finally { const currentWatcher = this.watchers.get(key); if (currentWatcher?.sseAbortController === controller) { @@ -696,8 +701,6 @@ export class CloudTaskService extends TypedEventEmitter { throw new Error(message); } - watcher.reconnectAttempts = 0; - if ( event.event === "keepalive" || (typeof event.data === "object" && @@ -708,6 +711,8 @@ export class CloudTaskService extends TypedEventEmitter { return; } + watcher.reconnectAttempts = 0; + if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) { @@ -988,13 +993,18 @@ export class CloudTaskService extends TypedEventEmitter { }, delay); } - private async handleStreamCompletion(key: string): Promise { + private async handleStreamCompletion( + key: string, + options: { reconnectIfNonTerminal: boolean }, + ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; + const { reconnectIfNonTerminal } = options; const run = await this.fetchTaskRun(watcher); const currentWatcher = this.watchers.get(key); if (!currentWatcher || currentWatcher !== watcher) return; + if (watcher.failed) return; if (watcher.isBootstrapping) { if (!run) { @@ -1003,7 +1013,7 @@ export class CloudTaskService extends TypedEventEmitter { } this.applyTaskRunState(watcher, run); - if (isTerminalStatus(watcher.lastStatus)) { + if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) { watcher.needsStopAfterBootstrap = true; } else { watcher.needsPostBootstrapReconnect = true; @@ -1026,7 +1036,7 @@ export class CloudTaskService extends TypedEventEmitter { this.applyTaskRunState(watcher, run); - if (!isTerminalStatus(watcher.lastStatus)) { + if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { log.warn("Cloud task stream ended before terminal status", { key, status: watcher.lastStatus, @@ -1035,9 +1045,9 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // Always emit terminal status — processEvent intentionally skips the emit - // for terminal states (to avoid acting on it before the stream fully ends), - // so this is the single place that notifies the renderer of completion. + // Always emit the latest status before stopping. Terminal states are + // intentionally deferred until stream completion; clean EOFs can also mean + // the backend has no more stream events even when the run status remains active. this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -1126,6 +1136,12 @@ export class CloudTaskService extends TypedEventEmitter { runId: watcher.runId, offset, }); + if (shouldFailWatcherForFetchStatus(authedResponse.status)) { + this.failWatcher( + watcherKey(watcher.taskId, watcher.runId), + createStreamStatusError(authedResponse.status).details, + ); + } return null; } @@ -1188,6 +1204,12 @@ export class CloudTaskService extends TypedEventEmitter { taskId: watcher.taskId, runId: watcher.runId, }); + if (shouldFailWatcherForFetchStatus(authedResponse.status)) { + this.failWatcher( + watcherKey(watcher.taskId, watcher.runId), + createStreamStatusError(authedResponse.status).details, + ); + } return null; }