Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 156 additions & 1 deletion apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,72 @@ describe("CloudTaskService", () => {
]);
});

it("stops watching after clean stream completion even when the run remains active", async () => {
vi.useFakeTimers();

const updates: unknown[] = [];
const prUrl = "https://github.com/PostHog/code/pull/123";
let statusFetchCount = 0;
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

const createInProgressRun = (output: Record<string, unknown> | null) =>
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: "build",
output,
error_message: null,
branch: "main",
updated_at: output ? "2026-01-01T00:00:01Z" : "2026-01-01T00:00:00Z",
});

mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
if (url.includes("/session_logs/")) {
return Promise.resolve(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);
}

statusFetchCount += 1;
return Promise.resolve(
createInProgressRun(statusFetchCount === 1 ? null : { pr_url: prUrl }),
);
});

mockStreamFetch.mockResolvedValueOnce(createSseResponse(""));

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() => mockStreamFetch.mock.calls.length === 1);
await waitFor(
() =>
!(
service as unknown as {
watchers: Map<string, unknown>;
}
).watchers.has("task-1:run-1"),
);

expect(updates).toContainEqual(
expect.objectContaining({
taskId: "task-1",
runId: "run-1",
status: "in_progress",
output: { pr_url: prUrl },
}),
);

await vi.advanceTimersByTimeAsync(70_000);

expect(mockStreamFetch).toHaveBeenCalledTimes(1);
});

it("emits a retryable cloud error after repeated stream failures", async () => {
vi.useFakeTimers();

Expand Down Expand Up @@ -558,7 +624,9 @@ describe("CloudTaskService", () => {

mockStreamFetch.mockImplementation(() =>
Promise.resolve(
createSseResponse('event: error\ndata: {"error":"boom"}\n\n'),
createSseResponse(
'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n',
),
),
);

Expand Down Expand Up @@ -597,6 +665,93 @@ describe("CloudTaskService", () => {
});
});

const guardedFetchStatusExpectations = [
[
401,
{
errorTitle: "Cloud authentication expired",
errorMessage: "Please reauthenticate and retry the cloud run stream.",
retryable: true,
},
],
[
403,
{
errorTitle: "Cloud access denied",
errorMessage:
"You no longer have access to this cloud run. Reauthenticate and retry.",
retryable: true,
},
],
[
404,
{
errorTitle: "Cloud run not found",
errorMessage:
"This cloud run could not be found. It may have been deleted or moved.",
retryable: false,
},
],
] as const;

const guardedFetchStatusCases = (
["status fetch", "persisted log fetch"] as const
).flatMap((fetchPhase) =>
guardedFetchStatusExpectations.map(([status, expectedError]) => ({
fetchPhase,
status,
expectedError,
})),
);

it.each(guardedFetchStatusCases)(
"fails the watcher when $fetchPhase returns $status",
async ({ fetchPhase, status, expectedError }) => {
const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

if (fetchPhase === "status fetch") {
mockNetFetch.mockResolvedValueOnce(
createJsonResponse({ detail: "Access denied" }, status),
);
} else {
mockNetFetch
.mockResolvedValueOnce(
createJsonResponse({
id: "run-1",
status: "completed",
stage: null,
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
completed_at: "2026-01-01T00:00:01Z",
}),
)
.mockResolvedValueOnce(
createJsonResponse({ detail: "Access denied" }, status),
);
}

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() => updates.length === 1);

expect(mockStreamFetch).not.toHaveBeenCalled();
expect(updates).toContainEqual({
taskId: "task-1",
runId: "run-1",
kind: "error",
...expectedError,
});
},
);

it("loads paginated persisted logs once for an already terminal run", async () => {
const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));
Expand Down
42 changes: 32 additions & 10 deletions apps/code/src/main/services/cloud-task/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ function createStreamStatusError(status: number): CloudTaskStreamError {
}
}

function shouldFailWatcherForFetchStatus(status: number): boolean {
return status === 401 || status === 403 || status === 404;
}

@injectable()
export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
private watchers = new Map<string, WatcherState>();
Expand Down Expand Up @@ -444,6 +448,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
const run = await this.fetchTaskRun(watcher);
const currentWatcher = this.watchers.get(key);
if (!currentWatcher || currentWatcher !== watcher) return;
if (watcher.failed) return;

if (!run) {
this.failWatcher(key, {
Expand Down Expand Up @@ -650,7 +655,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

await this.handleStreamCompletion(key);
await this.handleStreamCompletion(key, { reconnectIfNonTerminal: false });
} catch (error) {
this.flushLogBatch(key);

Expand All @@ -672,7 +677,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
key,
error: errorMessage,
});
await this.handleStreamCompletion(key);
await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true });
} finally {
const currentWatcher = this.watchers.get(key);
if (currentWatcher?.sseAbortController === controller) {
Expand All @@ -696,8 +701,6 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
throw new Error(message);
}

watcher.reconnectAttempts = 0;

if (
event.event === "keepalive" ||
(typeof event.data === "object" &&
Expand All @@ -708,6 +711,8 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

watcher.reconnectAttempts = 0;

if (isTaskRunStateEvent(event.data)) {
if (this.applyTaskRunState(watcher, event.data)) {
if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) {
Expand Down Expand Up @@ -988,13 +993,18 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
}, delay);
}

private async handleStreamCompletion(key: string): Promise<void> {
private async handleStreamCompletion(
key: string,
options: { reconnectIfNonTerminal: boolean },
): Promise<void> {
const watcher = this.watchers.get(key);
if (!watcher) return;

const { reconnectIfNonTerminal } = options;
const run = await this.fetchTaskRun(watcher);
const currentWatcher = this.watchers.get(key);
if (!currentWatcher || currentWatcher !== watcher) return;
if (watcher.failed) return;

if (watcher.isBootstrapping) {
if (!run) {
Expand All @@ -1003,7 +1013,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
}

this.applyTaskRunState(watcher, run);
if (isTerminalStatus(watcher.lastStatus)) {
if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) {
watcher.needsStopAfterBootstrap = true;
} else {
watcher.needsPostBootstrapReconnect = true;
Expand All @@ -1026,7 +1036,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

this.applyTaskRunState(watcher, run);

if (!isTerminalStatus(watcher.lastStatus)) {
if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) {
log.warn("Cloud task stream ended before terminal status", {
key,
status: watcher.lastStatus,
Expand All @@ -1035,9 +1045,9 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

// Always emit terminal status — processEvent intentionally skips the emit
// for terminal states (to avoid acting on it before the stream fully ends),
// so this is the single place that notifies the renderer of completion.
// Always emit the latest status before stopping. Terminal states are
// intentionally deferred until stream completion; clean EOFs can also mean
// the backend has no more stream events even when the run status remains active.
this.emit(CloudTaskEvent.Update, {
taskId: watcher.taskId,
runId: watcher.runId,
Expand Down Expand Up @@ -1126,6 +1136,12 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
runId: watcher.runId,
offset,
});
if (shouldFailWatcherForFetchStatus(authedResponse.status)) {
this.failWatcher(
watcherKey(watcher.taskId, watcher.runId),
createStreamStatusError(authedResponse.status).details,
);
}
return null;
}

Expand Down Expand Up @@ -1188,6 +1204,12 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
taskId: watcher.taskId,
runId: watcher.runId,
});
if (shouldFailWatcherForFetchStatus(authedResponse.status)) {
this.failWatcher(
watcherKey(watcher.taskId, watcher.runId),
createStreamStatusError(authedResponse.status).details,
);
}
return null;
}

Expand Down
Loading