From 424ddb4faf71dae6fc14b4ac1dab472df9031a85 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Thu, 30 Apr 2026 08:49:31 +0100 Subject: [PATCH 1/3] fix(cloud-agent): handle stream churn --- .../main/services/cloud-task/service.test.ts | 73 ++++++++++++++++++- .../src/main/services/cloud-task/service.ts | 22 +++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 4bdd21f9b..3f05ac566 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -558,7 +558,9 @@ describe("CloudTaskService", () => { mockStreamFetch.mockImplementation(() => Promise.resolve( - createSseResponse('event: error\ndata: {"error":"boom"}\n\n'), + createSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), ), ); @@ -597,6 +599,75 @@ describe("CloudTaskService", () => { }); }); + it("fails the watcher when the status fetch loses access", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockResolvedValueOnce( + createJsonResponse({ detail: "Forbidden" }, 403), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length === 1); + + expect(mockStreamFetch).not.toHaveBeenCalled(); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud access denied", + errorMessage: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + }); + }); + + it("fails the watcher when persisted log fetch loses access", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:01Z", + }), + ) + .mockResolvedValueOnce(createJsonResponse({ detail: "Forbidden" }, 403)); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length === 1); + + expect(mockStreamFetch).not.toHaveBeenCalled(); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + errorTitle: "Cloud access denied", + errorMessage: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + }); + }); + it("loads paginated persisted logs once for an already terminal run", async () => { const updates: unknown[] = []; service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index bfdb942fa..34c58030d 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -204,6 +204,10 @@ function createStreamStatusError(status: number): CloudTaskStreamError { } } +function shouldFailWatcherForFetchStatus(status: number): boolean { + return status === 401 || status === 403 || status === 404; +} + @injectable() export class CloudTaskService extends TypedEventEmitter { private watchers = new Map(); @@ -444,6 +448,7 @@ export class CloudTaskService extends TypedEventEmitter { const run = await this.fetchTaskRun(watcher); const currentWatcher = this.watchers.get(key); if (!currentWatcher || currentWatcher !== watcher) return; + if (watcher.failed) return; if (!run) { this.failWatcher(key, { @@ -696,8 +701,6 @@ export class CloudTaskService extends TypedEventEmitter { throw new Error(message); } - watcher.reconnectAttempts = 0; - if ( event.event === "keepalive" || (typeof event.data === "object" && @@ -708,6 +711,8 @@ export class CloudTaskService extends TypedEventEmitter { return; } + watcher.reconnectAttempts = 0; + if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) { @@ -995,6 +1000,7 @@ export class CloudTaskService extends TypedEventEmitter { const run = await this.fetchTaskRun(watcher); const currentWatcher = this.watchers.get(key); if (!currentWatcher || currentWatcher !== watcher) return; + if (watcher.failed) return; if (watcher.isBootstrapping) { if (!run) { @@ -1126,6 +1132,12 @@ export class CloudTaskService extends TypedEventEmitter { runId: watcher.runId, offset, }); + if (shouldFailWatcherForFetchStatus(authedResponse.status)) { + this.failWatcher( + watcherKey(watcher.taskId, watcher.runId), + createStreamStatusError(authedResponse.status).details, + ); + } return null; } @@ -1188,6 +1200,12 @@ export class CloudTaskService extends TypedEventEmitter { taskId: watcher.taskId, runId: watcher.runId, }); + if (shouldFailWatcherForFetchStatus(authedResponse.status)) { + this.failWatcher( + watcherKey(watcher.taskId, watcher.runId), + createStreamStatusError(authedResponse.status).details, + ); + } return null; } From 7b00415ceef39f0e0fb3e59176652085f3aa9610 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Thu, 30 Apr 2026 08:58:49 +0100 Subject: [PATCH 2/3] test(cloud-agent): cover guarded fetch statuses --- .../main/services/cloud-task/service.test.ts | 150 ++++++++++-------- 1 file changed, 84 insertions(+), 66 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 3f05ac566..ffb88dd97 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -599,74 +599,92 @@ describe("CloudTaskService", () => { }); }); - it("fails the watcher when the status fetch loses access", async () => { - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - mockNetFetch.mockResolvedValueOnce( - createJsonResponse({ detail: "Forbidden" }, 403), - ); - - service.watch({ - taskId: "task-1", - runId: "run-1", - apiHost: "https://app.example.com", - teamId: 2, - }); - - await waitFor(() => updates.length === 1); - - expect(mockStreamFetch).not.toHaveBeenCalled(); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud access denied", - errorMessage: - "You no longer have access to this cloud run. Reauthenticate and retry.", - retryable: true, - }); - }); - - it("fails the watcher when persisted log fetch loses access", async () => { - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - mockNetFetch - .mockResolvedValueOnce( - createJsonResponse({ - id: "run-1", - status: "completed", - stage: null, - output: null, - error_message: null, - branch: "main", - updated_at: "2026-01-01T00:00:00Z", - completed_at: "2026-01-01T00:00:01Z", - }), - ) - .mockResolvedValueOnce(createJsonResponse({ detail: "Forbidden" }, 403)); - - service.watch({ - taskId: "task-1", - runId: "run-1", - apiHost: "https://app.example.com", - teamId: 2, - }); + const guardedFetchStatusExpectations = [ + [ + 401, + { + errorTitle: "Cloud authentication expired", + errorMessage: "Please reauthenticate and retry the cloud run stream.", + retryable: true, + }, + ], + [ + 403, + { + errorTitle: "Cloud access denied", + errorMessage: + "You no longer have access to this cloud run. Reauthenticate and retry.", + retryable: true, + }, + ], + [ + 404, + { + errorTitle: "Cloud run not found", + errorMessage: + "This cloud run could not be found. It may have been deleted or moved.", + retryable: false, + }, + ], + ] as const; + + const guardedFetchStatusCases = ( + ["status fetch", "persisted log fetch"] as const + ).flatMap((fetchPhase) => + guardedFetchStatusExpectations.map(([status, expectedError]) => ({ + fetchPhase, + status, + expectedError, + })), + ); + + it.each(guardedFetchStatusCases)( + "fails the watcher when $fetchPhase returns $status", + async ({ fetchPhase, status, expectedError }) => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + if (fetchPhase === "status fetch") { + mockNetFetch.mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } else { + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "completed", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + completed_at: "2026-01-01T00:00:01Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse({ detail: "Access denied" }, status), + ); + } + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); - await waitFor(() => updates.length === 1); + await waitFor(() => updates.length === 1); - expect(mockStreamFetch).not.toHaveBeenCalled(); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud access denied", - errorMessage: - "You no longer have access to this cloud run. Reauthenticate and retry.", - retryable: true, - }); - }); + expect(mockStreamFetch).not.toHaveBeenCalled(); + expect(updates).toContainEqual({ + taskId: "task-1", + runId: "run-1", + kind: "error", + ...expectedError, + }); + }, + ); it("loads paginated persisted logs once for an already terminal run", async () => { const updates: unknown[] = []; From 02ab49f64bf30fb33f3832734e05dca0c10a34c5 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Thu, 30 Apr 2026 10:30:21 +0100 Subject: [PATCH 3/3] fix(cloud-agent): stop watching after clean eof --- .../main/services/cloud-task/service.test.ts | 66 +++++++++++++++++++ .../src/main/services/cloud-task/service.ts | 20 +++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index ffb88dd97..76229e9d6 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -531,6 +531,72 @@ describe("CloudTaskService", () => { ]); }); + it("stops watching after clean stream completion even when the run remains active", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + const prUrl = "https://github.com/PostHog/code/pull/123"; + let statusFetchCount = 0; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const createInProgressRun = (output: Record | null) => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output, + error_message: null, + branch: "main", + updated_at: output ? "2026-01-01T00:00:01Z" : "2026-01-01T00:00:00Z", + }); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + + statusFetchCount += 1; + return Promise.resolve( + createInProgressRun(statusFetchCount === 1 ? null : { pr_url: prUrl }), + ); + }); + + mockStreamFetch.mockResolvedValueOnce(createSseResponse("")); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await waitFor( + () => + !( + service as unknown as { + watchers: Map; + } + ).watchers.has("task-1:run-1"), + ); + + expect(updates).toContainEqual( + expect.objectContaining({ + taskId: "task-1", + runId: "run-1", + status: "in_progress", + output: { pr_url: prUrl }, + }), + ); + + await vi.advanceTimersByTimeAsync(70_000); + + expect(mockStreamFetch).toHaveBeenCalledTimes(1); + }); + it("emits a retryable cloud error after repeated stream failures", async () => { vi.useFakeTimers(); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 34c58030d..094bba43d 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -655,7 +655,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - await this.handleStreamCompletion(key); + await this.handleStreamCompletion(key, { reconnectIfNonTerminal: false }); } catch (error) { this.flushLogBatch(key); @@ -677,7 +677,7 @@ export class CloudTaskService extends TypedEventEmitter { key, error: errorMessage, }); - await this.handleStreamCompletion(key); + await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); } finally { const currentWatcher = this.watchers.get(key); if (currentWatcher?.sseAbortController === controller) { @@ -993,10 +993,14 @@ export class CloudTaskService extends TypedEventEmitter { }, delay); } - private async handleStreamCompletion(key: string): Promise { + private async handleStreamCompletion( + key: string, + options: { reconnectIfNonTerminal: boolean }, + ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; + const { reconnectIfNonTerminal } = options; const run = await this.fetchTaskRun(watcher); const currentWatcher = this.watchers.get(key); if (!currentWatcher || currentWatcher !== watcher) return; @@ -1009,7 +1013,7 @@ export class CloudTaskService extends TypedEventEmitter { } this.applyTaskRunState(watcher, run); - if (isTerminalStatus(watcher.lastStatus)) { + if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) { watcher.needsStopAfterBootstrap = true; } else { watcher.needsPostBootstrapReconnect = true; @@ -1032,7 +1036,7 @@ export class CloudTaskService extends TypedEventEmitter { this.applyTaskRunState(watcher, run); - if (!isTerminalStatus(watcher.lastStatus)) { + if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { log.warn("Cloud task stream ended before terminal status", { key, status: watcher.lastStatus, @@ -1041,9 +1045,9 @@ export class CloudTaskService extends TypedEventEmitter { return; } - // Always emit terminal status — processEvent intentionally skips the emit - // for terminal states (to avoid acting on it before the stream fully ends), - // so this is the single place that notifies the renderer of completion. + // Always emit the latest status before stopping. Terminal states are + // intentionally deferred until stream completion; clean EOFs can also mean + // the backend has no more stream events even when the run status remains active. this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId,