Skip to content

Commit 5359eda

Browse files
committed
fix(sdk): more PR review findings (round 3) — reconnect backoff + design-note divergence
Three more #3542 review fixes addressing the design-question bucket. sessionStreams/manager.ts + inputStreams/manager.ts — both #runTail loops swallowed errors and the .finally reconnected immediately whenever hasHandlers || hasWaiters. A persistent backend failure (auth rejection, 5xx, DNS) would reconnect in a tight loop with no rate limiting. Both managers now exponentially back off: 1s base, doubling per attempt, capped at 30s, plus 0–1s jitter. A reconnectAttempts counter resets to 0 on every successful #dispatch (any record flowing through = healthy connection), so transient blips don't accumulate. Per-waiter timeouts still bound how long any once() waits regardless. realtimeStreams/streamsWriterV2.ts + .test.ts — extracted the size-check + discriminant-extraction logic into encodeChunkOrError, a pure helper. Tests now exercise it directly, no `vi.mock("@s2-dev/ streamstore")` shim. The original vi.mock conflicted with the codebase rule of using testcontainers / not mocking; the new tests are framework-pure and faster. trigger-sdk/v3/shared.ts — added an in-code comment in triggerAndSubscribe explaining the error-shape divergence from triggerAndWait. The SerializedError surfaced by subscribeToRun strips the TaskRunError type discriminator at the server boundary (createJsonErrorObject in errors.ts:274), so the SDK can't reconstruct the discriminator on the receive side. Callers needing exact error-type matching should use triggerAndWait.
1 parent 127530c commit 5359eda

7 files changed

Lines changed: 273 additions & 143 deletions

File tree

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
InputStreamTimeoutError,
77
} from "./types.js";
88
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
9+
import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js";
910

1011
type InputStreamHandler = (data: unknown) => void | Promise<void>;
1112

@@ -29,6 +30,12 @@ export class StandardInputStreamManager implements InputStreamManager {
2930
private seqNums = new Map<string, number>();
3031
private currentRunId: string | null = null;
3132
private streamsVersion: string | undefined;
33+
// Reconnect attempt counter per streamId. Drives the exponential
34+
// backoff applied by `#ensureStreamTailConnected`'s `.finally` so a
35+
// persistent backend failure (auth rejection, 5xx, DNS, etc.) doesn't
36+
// reconnect in a tight loop. Reset to 0 by `#dispatch` whenever a
37+
// record flows through.
38+
private reconnectAttempts = new Map<string, number>();
3239

3340
constructor(
3441
private apiClient: ApiClient,
@@ -204,6 +211,7 @@ export class StandardInputStreamManager implements InputStreamManager {
204211
this.streamsVersion = undefined;
205212
this.seqNums.clear();
206213
this.handlers.clear();
214+
this.reconnectAttempts.clear();
207215

208216
// Reject all pending once waiters
209217
for (const [, waiters] of this.onceWaiters) {
@@ -238,13 +246,29 @@ export class StandardInputStreamManager implements InputStreamManager {
238246
.finally(() => {
239247
this.tails.delete(streamId);
240248

241-
// Auto-reconnect if there are still active handlers or waiters
249+
// Auto-reconnect with exponential backoff if there are still
250+
// active handlers or waiters. Without backoff a persistent
251+
// failure (auth rejected, 5xx, DNS) would reconnect in a tight
252+
// loop because `#runTail`'s error path only logs. `#dispatch`
253+
// resets the counter on every successful record.
242254
const hasHandlers =
243255
this.handlers.has(streamId) && this.handlers.get(streamId)!.size > 0;
244256
const hasWaiters =
245257
this.onceWaiters.has(streamId) && this.onceWaiters.get(streamId)!.length > 0;
246258
if (hasHandlers || hasWaiters) {
247-
this.#ensureStreamTailConnected(streamId);
259+
const attempt = this.reconnectAttempts.get(streamId) ?? 0;
260+
this.reconnectAttempts.set(streamId, attempt + 1);
261+
const delayMs = computeReconnectDelayMs(attempt);
262+
setTimeout(() => {
263+
if (this.tails.has(streamId)) return;
264+
const stillHasHandlers =
265+
this.handlers.has(streamId) && this.handlers.get(streamId)!.size > 0;
266+
const stillHasWaiters =
267+
this.onceWaiters.has(streamId) &&
268+
this.onceWaiters.get(streamId)!.length > 0;
269+
if (!stillHasHandlers && !stillHasWaiters) return;
270+
this.#ensureStreamTailConnected(streamId);
271+
}, delayMs);
248272
}
249273
});
250274
this.tails.set(streamId, { abortController, promise });
@@ -310,6 +334,10 @@ export class StandardInputStreamManager implements InputStreamManager {
310334
}
311335

312336
#dispatch(streamId: string, data: unknown): void {
337+
// Any record flowing through = healthy connection; reset the backoff
338+
// counter so the next disconnect starts fresh.
339+
this.reconnectAttempts.delete(streamId);
340+
313341
// First try to resolve a once waiter
314342
const waiters = this.onceWaiters.get(streamId);
315343
if (waiters && waiters.length > 0) {
Lines changed: 59 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,150 +1,77 @@
1-
import { afterEach, describe, expect, it, vi } from "vitest";
1+
import { describe, expect, it } from "vitest";
22

33
import { ChatChunkTooLargeError, isChatChunkTooLargeError } from "../errors.js";
4+
import { encodeChunkOrError } from "./streamsWriterV2.js";
45

5-
const lastAckedPosition = vi.fn(() => undefined);
6-
7-
const appendSession = vi.fn(async () => {
8-
// A WritableStream that just consumes records — we never reach S2 because
9-
// the size check fires upstream of this for the oversize case, but we still
10-
// need a valid writable for the small-chunk path.
11-
const writable = new WritableStream<unknown>({});
12-
return {
13-
writable,
14-
lastAckedPosition,
15-
};
16-
});
17-
18-
vi.mock("@s2-dev/streamstore", async (importOriginal) => {
19-
const actual = await importOriginal<typeof import("@s2-dev/streamstore")>();
20-
return {
21-
...actual,
22-
S2: class FakeS2 {
23-
basin() {
24-
return {
25-
stream: () => ({
26-
appendSession,
27-
}),
28-
};
29-
}
30-
},
31-
};
32-
});
33-
34-
import { StreamsWriterV2 } from "./streamsWriterV2.js";
35-
36-
afterEach(() => {
37-
vi.clearAllMocks();
38-
});
6+
// The size cap and discriminant extraction are the only S2-independent bits
7+
// of `StreamsWriterV2` that benefit from unit coverage. Both live in the
8+
// `encodeChunkOrError` pure helper, so the tests exercise it directly — no
9+
// `vi.mock("@s2-dev/streamstore", ...)` shim needed.
3910

40-
describe("StreamsWriterV2", () => {
41-
it("rejects with ChatChunkTooLargeError when a single chunk exceeds the per-record cap", async () => {
11+
describe("encodeChunkOrError", () => {
12+
it("flags oversize chunks and carries the chunk's `type` discriminant", () => {
4213
const oversized = {
4314
type: "tool-output-available",
4415
output: { text: "x".repeat(2_000_000) },
4516
};
46-
const source = new ReadableStream<unknown>({
47-
start(controller) {
48-
controller.enqueue(oversized);
49-
controller.close();
50-
},
51-
});
52-
53-
const writer = new StreamsWriterV2({
54-
basin: "test",
55-
stream: "test",
56-
accessToken: "test",
57-
source,
58-
});
59-
60-
await expect(writer.wait()).rejects.toBeInstanceOf(ChatChunkTooLargeError);
61-
62-
let captured: unknown;
63-
try {
64-
await writer.wait();
65-
} catch (err) {
66-
captured = err;
67-
}
68-
expect(isChatChunkTooLargeError(captured)).toBe(true);
69-
const e = captured as ChatChunkTooLargeError;
70-
expect(e.chunkType).toBe("tool-output-available");
71-
expect(e.chunkSize).toBeGreaterThan(1_000_000);
72-
expect(e.maxSize).toBe(1024 * 1024 - 1024);
73-
expect(e.message).toMatch(/tool-output-available/);
74-
expect(e.message).toMatch(/chat\.agent chunk/);
17+
18+
const result = encodeChunkOrError(oversized);
19+
20+
expect(result.ok).toBe(false);
21+
if (result.ok) return; // type guard
22+
expect(isChatChunkTooLargeError(result.error)).toBe(true);
23+
expect(result.error.chunkType).toBe("tool-output-available");
24+
expect(result.error.chunkSize).toBeGreaterThan(1_000_000);
25+
expect(result.error.maxSize).toBe(1024 * 1024 - 1024);
26+
expect(result.error.message).toMatch(/tool-output-available/);
27+
expect(result.error.message).toMatch(/chat\.agent chunk/);
7528
});
7629

77-
it("uses chunk.kind when chunk.type is missing (ChatInputChunk-style)", async () => {
78-
const oversized = {
79-
kind: "action",
80-
payload: "x".repeat(2_000_000),
81-
};
82-
const source = new ReadableStream<unknown>({
83-
start(controller) {
84-
controller.enqueue(oversized);
85-
controller.close();
86-
},
87-
});
88-
89-
const writer = new StreamsWriterV2({
90-
basin: "test",
91-
stream: "test",
92-
accessToken: "test",
93-
source,
94-
});
95-
96-
let captured: unknown;
97-
try {
98-
await writer.wait();
99-
} catch (err) {
100-
captured = err;
101-
}
102-
expect(isChatChunkTooLargeError(captured)).toBe(true);
103-
expect((captured as ChatChunkTooLargeError).chunkType).toBe("action");
30+
it("falls back to chunk.kind when chunk.type is missing (ChatInputChunk-style)", () => {
31+
const oversized = { kind: "action", payload: "x".repeat(2_000_000) };
32+
33+
const result = encodeChunkOrError(oversized);
34+
35+
expect(result.ok).toBe(false);
36+
if (result.ok) return;
37+
expect(result.error.chunkType).toBe("action");
10438
});
10539

106-
it("omits chunkType when chunk has no discriminant", async () => {
40+
it("omits chunkType when the chunk has no discriminant", () => {
10741
const oversized = "x".repeat(2_000_000);
108-
const source = new ReadableStream<unknown>({
109-
start(controller) {
110-
controller.enqueue(oversized);
111-
controller.close();
112-
},
113-
});
114-
115-
const writer = new StreamsWriterV2({
116-
basin: "test",
117-
stream: "test",
118-
accessToken: "test",
119-
source,
120-
});
121-
122-
let captured: unknown;
123-
try {
124-
await writer.wait();
125-
} catch (err) {
126-
captured = err;
127-
}
128-
expect(isChatChunkTooLargeError(captured)).toBe(true);
129-
expect((captured as ChatChunkTooLargeError).chunkType).toBeUndefined();
42+
43+
const result = encodeChunkOrError(oversized);
44+
45+
expect(result.ok).toBe(false);
46+
if (result.ok) return;
47+
expect(result.error.chunkType).toBeUndefined();
13048
});
13149

132-
it("does not reject for chunks under the cap", async () => {
50+
it("returns the encoded body for chunks under the cap", () => {
13351
const small = { type: "text-delta", delta: "hello" };
134-
const source = new ReadableStream<unknown>({
135-
start(controller) {
136-
controller.enqueue(small);
137-
controller.close();
138-
},
139-
});
140-
141-
const writer = new StreamsWriterV2({
142-
basin: "test",
143-
stream: "test",
144-
accessToken: "test",
145-
source,
146-
});
147-
148-
await expect(writer.wait()).resolves.toBeDefined();
52+
53+
const result = encodeChunkOrError(small);
54+
55+
expect(result.ok).toBe(true);
56+
if (!result.ok) return;
57+
const parsed = JSON.parse(result.body) as { data: unknown; id: string };
58+
expect(parsed.data).toEqual(small);
59+
expect(parsed.id).toMatch(/^[A-Za-z0-9_-]{7}$/); // nanoid(7)
60+
});
61+
});
62+
63+
// Cross-check the ChatChunkTooLargeError type-guard helper itself. Trivial,
64+
// but keeps the test surface here exercising the public error helpers a
65+
// consumer would import from the same module.
66+
describe("isChatChunkTooLargeError", () => {
67+
it("recognizes its own error class", () => {
68+
const err = new ChatChunkTooLargeError(2_000_000, 1024 * 1024 - 1024, "x");
69+
expect(isChatChunkTooLargeError(err)).toBe(true);
70+
});
71+
72+
it("rejects unrelated errors", () => {
73+
expect(isChatChunkTooLargeError(new Error("nope"))).toBe(false);
74+
expect(isChatChunkTooLargeError("string")).toBe(false);
75+
expect(isChatChunkTooLargeError(undefined)).toBe(false);
14976
});
15077
});

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
161161
controller.error(new Error("Stream aborted"));
162162
return;
163163
}
164-
const body = JSON.stringify({ data: chunk, id: nanoid(7) });
165-
const size = utf8Encoder.encode(body).length;
166-
if (size > RECORD_BODY_MAX_BYTES) {
167-
const chunkType = extractChunkType(chunk);
168-
controller.error(
169-
new ChatChunkTooLargeError(size, RECORD_BODY_MAX_BYTES, chunkType)
170-
);
164+
const encoded = encodeChunkOrError(chunk);
165+
if (!encoded.ok) {
166+
controller.error(encoded.error);
171167
return;
172168
}
173-
controller.enqueue(AppendRecord.string({ body }));
169+
controller.enqueue(AppendRecord.string({ body: encoded.body }));
174170
},
175171
})
176172
)
@@ -258,3 +254,29 @@ function extractChunkType(chunk: unknown): string | undefined {
258254
if (typeof c.kind === "string") return c.kind;
259255
return undefined;
260256
}
257+
258+
/**
259+
* Encode a chunk as a JSON record body for S2, enforcing the per-record
260+
* size cap. Exported so the size/discriminant logic can be unit-tested
261+
* directly without spinning up an S2 client or mocking `@s2-dev/streamstore`.
262+
*
263+
* Returns `{ ok: true, body }` when the encoded chunk fits within
264+
* `RECORD_BODY_MAX_BYTES`, or `{ ok: false, error }` carrying a
265+
* `ChatChunkTooLargeError` annotated with the chunk's discriminant
266+
* (`type` or `kind`, whichever is present) so the surfaced error is
267+
* useful — "tool-output-available chunk too large" beats a bare
268+
* "chunk too large" by a lot.
269+
*/
270+
export function encodeChunkOrError(
271+
chunk: unknown
272+
): { ok: true; body: string } | { ok: false; error: ChatChunkTooLargeError } {
273+
const body = JSON.stringify({ data: chunk, id: nanoid(7) });
274+
const size = utf8Encoder.encode(body).length;
275+
if (size > RECORD_BODY_MAX_BYTES) {
276+
return {
277+
ok: false,
278+
error: new ChatChunkTooLargeError(size, RECORD_BODY_MAX_BYTES, extractChunkType(chunk)),
279+
};
280+
}
281+
return { ok: true, body };
282+
}

0 commit comments

Comments
 (0)