From 1a879bfad84dc14c7ad120b67f09048313ed3a8e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 11:29:53 +0000 Subject: [PATCH 1/6] feat(server): ContinuationCompat (SEP-2322 Option H suspend/resume) Opt-in stateful path for handleHttp: a handler's await ctx.mcpReq.send(...) parks the live frame; current response carries IncompleteResult{requestState}; a later POST with {requestState, inputResponses} resumes the same frame. Single-process only. Frames bound to the validated principal (authInfo.token or sessionId); anonymous suspension refused by default. Per-principal frame cap prevents one tenant exhausting maxContinuations. --- packages/server/src/index.ts | 2 + .../server/src/server/continuationCompat.ts | 483 ++++++++++++++++++ packages/server/src/server/shttpHandler.ts | 13 +- .../test/server/continuationCompat.test.ts | 370 ++++++++++++++ 4 files changed, 867 insertions(+), 1 deletion(-) create mode 100644 packages/server/src/server/continuationCompat.ts create mode 100644 packages/server/test/server/continuationCompat.test.ts diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 8e2d59fd5..36a438508 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -32,6 +32,8 @@ export { Server } from './server/server.js'; // imports (erased at compile time), but matching the client's `./stdio` subpath gives consumers a // consistent shape across packages. export { BackchannelCompat } from './server/backchannelCompat.js'; +export type { ContinuationCompatOptions } from './server/continuationCompat.js'; +export { ContinuationCompat } from './server/continuationCompat.js'; export type { Dispatchable, HandleHttpOptions, HandleHttpRequestExtra } from './server/handleHttp.js'; export { handleHttp } from './server/handleHttp.js'; export type { SessionCompatOptions, SessionValidation } from './server/sessionCompat.js'; diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts new file mode 100644 index 000000000..26bb6b6f6 --- /dev/null +++ b/packages/server/src/server/continuationCompat.ts @@ -0,0 +1,483 @@ +import type { + IncompleteResult, + InputRequest, + JSONRPCErrorResponse, + JSONRPCMessage, + JSONRPCRequest, + JSONRPCResultResponse, + Request, + RequestEnv, + RequestOptions, + Result +} from '@modelcontextprotocol/core'; +import { isJSONRPCErrorResponse, isJSONRPCResultResponse, ProtocolErrorCode, SdkError, SdkErrorCode } from '@modelcontextprotocol/core'; + +import type { ShttpCallbacks } from './shttpHandler.js'; + +/** Map of input-request slot key → outbound request. */ +type InputRequests = Record; +/** + * Map of slot key → answer. Flat `Result` per {@linkcode InputResponseRequestParams}; the + * client services each {@linkcode InputRequest} and sends back the bare result. A missing + * key rejects the parked `env.send` with {@linkcode SdkErrorCode.SendFailed}. + */ +type InputResponses = Record; + +/** Options for {@linkcode ContinuationCompat}. */ +export interface ContinuationCompatOptions { + /** + * Maximum number of suspended handler frames to retain. New suspensions beyond this + * cap throw, surfacing as a 500 from {@linkcode shttpHandler}. Defaults to 1000. + */ + maxContinuations?: number; + /** + * How long a suspended frame waits for the client to retry before being aborted. + * Reset on each retry. Defaults to 5 minutes. + */ + ttlMs?: number; + /** + * Generates the opaque `requestState` token. SHOULD be unguessable. + * @default `() => crypto.randomUUID()` + */ + requestStateGenerator?: () => string; + /** Called when a frame is evicted on TTL. */ + onexpired?: (requestState: string) => void; + /** + * If `false` (default), {@linkcode ContinuationCompat.drive} throws when no principal can + * be derived (no `authInfo.token` and no `mcp-session-id`). Anonymous suspension means any + * caller can resume any frame; only enable this for trusted single-tenant deployments. + */ + allowAnonymousSuspend?: boolean; + /** + * Maximum suspended frames a single principal may hold. New suspensions beyond this throw. + * Prevents one tenant exhausting `maxContinuations`. Defaults to `Math.ceil(maxContinuations / 10)`. + */ + perPrincipalMax?: number; +} + +type Ask = + | { kind: 'message'; msg: JSONRPCMessage } + | { kind: 'incomplete'; inputRequests: InputRequests } + | { kind: 'done' } + | { kind: 'runnerError'; error: unknown }; + +interface Channel { + next: Promise; + resolve: (value: T) => void; + reject: (reason?: unknown) => void; +} +function channel(): Channel { + let resolve!: (v: T) => void; + let reject!: (r?: unknown) => void; + const next = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { next, resolve, reject }; +} + +/** + * One suspended handler frame: a detached `onrequest` iterator plus the two + * channels that thread {@linkcode InputRequests} out and {@linkcode InputResponses} back in. + */ +class Continuation { + private askCh: Channel = channel(); + private answerCh: Channel = channel(); + private askQueue: Ask[] = []; + readonly abort = new AbortController(); + + push(a: Ask): void { + this.askQueue.push(a); + const ch = this.askCh; + this.askCh = channel(); + ch.resolve(a); + } + + async nextAsk(): Promise { + if (this.askQueue.length > 0) { + const a = this.askQueue.shift()!; + return a; + } + const a = await this.askCh.next; + // The producer pushed into the queue at the same time as resolving; drop that copy. + const idx = this.askQueue.indexOf(a); + if (idx !== -1) this.askQueue.splice(idx, 1); + return a; + } + + answer(responses: InputResponses): void { + const ch = this.answerCh; + this.answerCh = channel(); + ch.resolve(responses); + } + + nextAnswer(): Promise { + return this.answerCh.next; + } + + fail(reason: Error): void { + this.abort.abort(reason); + this.answerCh.reject(reason); + } +} + +/** + * Opt-in suspend/resume continuation store for {@linkcode shttpHandler}, the SEP-2322 + * "Option H" stateful server path. + * + * When configured, a handler's `await ctx.mcpReq.send(...)` (and the higher-level + * `elicitInput`/`requestSampling`) becomes a real suspension point: the call parks the + * handler's frame in this store, the current HTTP response carries an + * `IncompleteResult` with `inputRequests` and a `requestState` token, and a later POST + * with `params.{requestState, inputResponses}` resumes the same frame from where it + * stopped. The handler runs once, front-to-back; nothing above the `await` re-executes. + * + * Single-process only. The frame lives in this instance's memory; horizontal scale + * needs sticky routing on the `requestState` token. Mutually exclusive with + * {@linkcode BackchannelCompat} (both supply `env.send`; this one wins when both are set). + * + * Tenant isolation: each frame is bound to the validated principal that created it + * (`env.authInfo.token`, falling back to `env.sessionId`). Resume from a different + * principal is rejected. By default, requests with no derivable principal are refused + * (see {@linkcode ContinuationCompatOptions.allowAnonymousSuspend}); deploy behind an + * authenticating middleware or `SessionCompat`. A per-principal frame cap + * ({@linkcode ContinuationCompatOptions.perPrincipalMax}) prevents any one tenant from + * exhausting the global `maxContinuations`. + */ +/** + * Validated identity to bind a frame to. Prefers the auth token (server-validated) over + * the legacy `mcp-session-id`. Never derived from `_meta` (client-asserted). + */ +function principalOf(env: RequestEnv): string | undefined { + return env.authInfo?.token ?? env.sessionId ?? (env.ext?.sessionId as string | undefined); +} + +interface FrameEntry { + cont: Continuation; + timer: ReturnType; + /** Validated identity that created the frame; resume requires the same. */ + owner: string | undefined; + /** + * Set while a `_drive` loop is currently consuming this continuation. A second + * concurrent resume for the same `requestState` is rejected to prevent both drives + * draining the same ask channel (which would duplicate messages and drop the second + * resume's `inputResponses`). + */ + draining: boolean; +} + +export class ContinuationCompat { + private readonly _frames = new Map(); + private readonly _perPrincipalCount = new Map(); + private readonly _max: number; + private readonly _perPrincipalMax: number; + private readonly _ttlMs: number; + private readonly _generate: () => string; + private readonly _onexpired?: (requestState: string) => void; + private readonly _allowAnonymous: boolean; + + constructor(options: ContinuationCompatOptions = {}) { + this._max = options.maxContinuations ?? 1000; + this._perPrincipalMax = options.perPrincipalMax ?? Math.ceil(this._max / 10); + this._ttlMs = options.ttlMs ?? 5 * 60_000; + this._generate = options.requestStateGenerator ?? (() => crypto.randomUUID()); + this._onexpired = options.onexpired; + this._allowAnonymous = options.allowAnonymousSuspend ?? false; + } + + private _resolvePrincipal(env: RequestEnv): string | undefined { + const p = principalOf(env); + if (p === undefined && !this._allowAnonymous) { + throw new SdkError( + SdkErrorCode.CapabilityNotSupported, + 'ContinuationCompat: refusing to suspend without a principal (no authInfo.token or mcp-session-id). ' + + 'Deploy behind authenticating middleware, configure SessionCompat, or set allowAnonymousSuspend: true.' + ); + } + return p; + } + + private _decPrincipal(owner: string | undefined): void { + if (owner === undefined) return; + const n = (this._perPrincipalCount.get(owner) ?? 1) - 1; + if (n <= 0) this._perPrincipalCount.delete(owner); + else this._perPrincipalCount.set(owner, n); + } + + /** Number of currently suspended frames. */ + get size(): number { + return this._frames.size; + } + + /** True if `requestState` matches a live suspended frame. */ + has(requestState: string): boolean { + return this._frames.has(requestState); + } + + /** + * Aborts and forgets a frame. Any parked `env.send` rejects with + * {@linkcode SdkErrorCode.ConnectionClosed}. + */ + abort(requestState: string, reason?: Error): void { + const entry = this._frames.get(requestState); + if (!entry) return; + clearTimeout(entry.timer); + this._frames.delete(requestState); + this._decPrincipal(entry.owner); + const err = reason ?? new SdkError(SdkErrorCode.ConnectionClosed, 'Continuation aborted'); + entry.cont.fail(err); + // Signal an active drain loop (awaiting nextAsk) so it does not hang. + entry.cont.push({ kind: 'runnerError', error: err }); + } + + /** Aborts all frames. Call from server shutdown. */ + close(): void { + for (const token of this._frames.keys()) this.abort(token); + } + + /** + * Wraps `onrequest` so it suspends on `env.send` instead of needing a live + * peer channel. Called by {@linkcode shttpHandler} when this instance is configured + * via `ShttpHandlerOptions.continuations`. + */ + wrap(onrequest: NonNullable): NonNullable { + return (request, env) => this._drive(onrequest, request, env ?? {}); + } + + private async *_drive( + onrequest: NonNullable, + request: JSONRPCRequest, + env: RequestEnv + ): AsyncGenerator { + const params = (request.params ?? {}) as { requestState?: unknown; inputResponses?: unknown }; + const incomingState = typeof params.requestState === 'string' ? params.requestState : undefined; + + let token: string; + let cont: Continuation; + + if (incomingState !== undefined && this._frames.has(incomingState)) { + token = incomingState; + const entry = this._frames.get(token)!; + if (entry.owner !== this._resolvePrincipal(env)) { + yield { + jsonrpc: '2.0', + id: request.id, + error: { code: -32_600, message: 'Invalid requestState: does not belong to this caller' } + } satisfies JSONRPCErrorResponse; + return; + } + if (entry.draining) { + yield { + jsonrpc: '2.0', + id: request.id, + error: { code: -32_600, message: 'Invalid requestState: resume already in progress' } + } satisfies JSONRPCErrorResponse; + return; + } + entry.draining = true; + cont = entry.cont; + clearTimeout(entry.timer); + entry.timer = this._arm(token); + const responses = (params.inputResponses ?? {}) as InputResponses; + cont.answer(responses); + } else if (incomingState === undefined) { + const owner = this._resolvePrincipal(env); + if (this._frames.size >= this._max) { + throw new Error(`ContinuationCompat at capacity (maxContinuations=${this._max})`); + } + if (owner !== undefined) { + const n = this._perPrincipalCount.get(owner) ?? 0; + if (n >= this._perPrincipalMax) { + throw new Error(`ContinuationCompat: principal at per-principal capacity (perPrincipalMax=${this._perPrincipalMax})`); + } + this._perPrincipalCount.set(owner, n + 1); + } + token = this._generate(); + cont = new Continuation(); + this._frames.set(token, { cont, timer: this._arm(token), owner, draining: true }); + this._startRunner(onrequest, request, env, cont, token); + } else { + yield { + jsonrpc: '2.0', + id: request.id, + error: { code: -32_600, message: 'Invalid requestState: continuation expired or unknown' } + } satisfies JSONRPCErrorResponse; + return; + } + + // Drain the ask channel into the current HTTP response stream until the + // handler either finishes or parks for input. The finally clears `draining` + // so a client disconnect mid-stream (caller stops iterating this generator) + // does not leave the frame permanently locked against retry. + try { + for (;;) { + const a = await cont.nextAsk(); + if (a.kind === 'message') { + yield this._rewriteId(a.msg, request.id); + continue; + } + if (a.kind === 'incomplete') { + const result: IncompleteResult = { + resultType: 'incomplete', + inputRequests: a.inputRequests, + requestState: token + }; + yield { jsonrpc: '2.0', id: request.id, result } satisfies JSONRPCResultResponse; + return; + } + if (a.kind === 'runnerError') { + this._delete(token); + yield { + jsonrpc: '2.0', + id: request.id, + error: { + code: ProtocolErrorCode.InternalError, + message: a.error instanceof Error ? a.error.message : String(a.error) + } + } satisfies JSONRPCErrorResponse; + return; + } + // done + this._delete(token); + return; + } + } finally { + const entry = this._frames.get(token); + if (entry) entry.draining = false; + } + } + + private _startRunner( + onrequest: NonNullable, + request: JSONRPCRequest, + env: RequestEnv, + cont: Continuation, + token: string + ): void { + const signal = + env.signal === undefined + ? cont.abort.signal + : (anySignal([env.signal, cont.abort.signal]) ?? linkSignals(env.signal, cont.abort.signal)); + const runnerEnv: RequestEnv = { ...env, signal, send: this._suspendingSend(cont) }; + + void (async () => { + try { + for await (const msg of onrequest(request, runnerEnv)) { + cont.push({ kind: 'message', msg }); + } + cont.push({ kind: 'done' }); + } catch (error) { + if (this._frames.has(token)) { + cont.push({ kind: 'runnerError', error }); + } + } + })(); + } + + /** + * Builds the `env.send` backing function that parks the handler instead of needing a + * live peer channel. Calls in the same microtask are batched into one + * {@linkcode IncompleteResult}; the next microtask flushes the batch as a single + * `incomplete` ask. + * + * Positional keys (`r0`, `r1`, ...) are **per round**: each new batch resets to `r0`, + * matching the dispatcher's `ephemeralSend` so a given `inputResponses` map is shaped + * identically on either path. + */ + private _suspendingSend(cont: Continuation): NonNullable { + let counter = 0; + let batch: { inputs: InputRequests; settle: Promise; flushed: boolean } | undefined; + + return (req: Request, opts?: RequestOptions): Promise => { + if (opts?.signal?.aborted) { + const r = opts.signal.reason; + return Promise.reject(r instanceof Error ? r : new Error(String(r))); + } + + if (batch === undefined || batch.flushed) { + counter = 0; + const b: { inputs: InputRequests; settle: Promise; flushed: boolean } = { + inputs: {}, + settle: cont.nextAnswer(), + flushed: false + }; + batch = b; + queueMicrotask(() => { + b.flushed = true; + cont.push({ kind: 'incomplete', inputRequests: b.inputs }); + }); + } + const key = `r${counter++}`; + batch.inputs[key] = { method: req.method, ...(req.params === undefined ? {} : { params: req.params }) }; + const settle = batch.settle; + + return new Promise((resolve, reject) => { + const onAbort = () => { + const r = opts!.signal!.reason; + reject(r instanceof Error ? r : new Error(String(r))); + }; + opts?.signal?.addEventListener('abort', onAbort, { once: true }); + settle.then( + responses => { + opts?.signal?.removeEventListener('abort', onAbort); + if (!(key in responses)) { + reject(new SdkError(SdkErrorCode.SendFailed, `inputResponses missing entry for slot "${key}"`)); + return; + } + resolve(responses[key]!); + }, + error => { + opts?.signal?.removeEventListener('abort', onAbort); + reject(error instanceof Error ? error : new Error(String(error))); + } + ); + }); + }; + } + + private _rewriteId(msg: JSONRPCMessage, id: JSONRPCRequest['id']): JSONRPCMessage { + if (isJSONRPCResultResponse(msg) || isJSONRPCErrorResponse(msg)) { + return { ...msg, id }; + } + return msg; + } + + private _arm(token: string): ReturnType { + return setTimeout(() => { + const entry = this._frames.get(token); + if (!entry) return; + this._frames.delete(token); + this._decPrincipal(entry.owner); + const reason = new SdkError(SdkErrorCode.RequestTimeout, `Continuation ${token} expired after ${this._ttlMs}ms`); + // Signal an active drain loop (awaiting nextAsk) so it does not hang. + entry.cont.push({ kind: 'runnerError', error: reason }); + // Reject the runner's parked answer-await so the handler unwinds. + entry.cont.fail(reason); + this._onexpired?.(token); + }, this._ttlMs); + } + + private _delete(token: string): void { + const entry = this._frames.get(token); + if (!entry) return; + clearTimeout(entry.timer); + this._frames.delete(token); + this._decPrincipal(entry.owner); + } +} + +type SignalAny = (signals: AbortSignal[]) => AbortSignal; +function anySignal(signals: AbortSignal[]): AbortSignal | undefined { + const fn = (AbortSignal as { any?: SignalAny }).any; + return fn ? fn(signals) : undefined; +} +function linkSignals(a: AbortSignal, b: AbortSignal): AbortSignal { + const c = new AbortController(); + const fwd = (s: AbortSignal) => { + if (s.aborted) c.abort(s.reason); + else s.addEventListener('abort', () => c.abort(s.reason), { once: true }); + }; + fwd(a); + fwd(b); + return c.signal; +} diff --git a/packages/server/src/server/shttpHandler.ts b/packages/server/src/server/shttpHandler.ts index a2a732b29..12ac8e312 100644 --- a/packages/server/src/server/shttpHandler.ts +++ b/packages/server/src/server/shttpHandler.ts @@ -20,6 +20,7 @@ import { } from '@modelcontextprotocol/core'; import type { BackchannelCompat } from './backchannelCompat.js'; +import type { ContinuationCompat } from './continuationCompat.js'; import type { SessionCompat } from './sessionCompat.js'; import type { EventId, EventStore } from './streamableHttp.js'; @@ -64,6 +65,15 @@ export interface ShttpHandlerOptions { */ backchannel?: BackchannelCompat; + /** + * SEP-2322 Option H suspend/resume continuation store. When provided, a handler's + * `await ctx.mcpReq.send(...)` parks the live frame and the current response carries + * an `IncompleteResult{requestState}`; a later POST with `params.{requestState, + * inputResponses}` resumes the same frame. Single-process only. When both this and + * `backchannel` are set, this takes precedence (it supplies `env.send` first). + */ + continuations?: ContinuationCompat; + /** * Event store for SSE resumability via `Last-Event-ID`. When configured, every * outgoing SSE event is persisted and a priming event is sent at stream start. @@ -174,6 +184,7 @@ export function shttpHandler( const enableJsonResponse = options.enableJsonResponse ?? false; const session = options.session; const backchannel = options.backchannel; + const continuations = options.continuations; const eventStore = options.eventStore; const retryInterval = options.retryInterval; const supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS; @@ -321,7 +332,7 @@ export function shttpHandler( if (!cb.onrequest) { return jsonError(500, -32_603, 'Handler not wired — pass an onrequest callback.'); } - const onrequest = cb.onrequest; + const onrequest = continuations ? continuations.wrap(cb.onrequest) : cb.onrequest; const initReq = messages.find(m => isInitializeRequest(m)); const initParams = initReq && isInitializeRequest(initReq) ? initReq.params : undefined; diff --git a/packages/server/test/server/continuationCompat.test.ts b/packages/server/test/server/continuationCompat.test.ts new file mode 100644 index 000000000..90972ec72 --- /dev/null +++ b/packages/server/test/server/continuationCompat.test.ts @@ -0,0 +1,370 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import type { IncompleteResult, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, RequestEnv } from '@modelcontextprotocol/core'; + +import { ContinuationCompat } from '../../src/server/continuationCompat.js'; +import type { ShttpCallbacks } from '../../src/server/shttpHandler.js'; +import { shttpHandler } from '../../src/server/shttpHandler.js'; + +const ACCEPT_BOTH = 'application/json, text/event-stream'; + +function post(body: unknown, headers: Record = {}): Request { + return new Request('http://localhost/mcp', { + method: 'POST', + headers: { 'content-type': 'application/json', accept: ACCEPT_BOTH, ...headers }, + body: JSON.stringify(body) + }); +} + +async function readSSE(res: Response): Promise { + const text = await res.text(); + const out: JSONRPCMessage[] = []; + for (const block of text.split('\n\n')) { + const dataLine = block.split('\n').find(l => l.startsWith('data: ')); + if (!dataLine) continue; + const payload = dataLine.slice('data: '.length); + if (payload.trim() === '') continue; + out.push(JSON.parse(payload)); + } + return out; +} + +function asIncomplete(msg: JSONRPCMessage): Required { + expect(msg).toMatchObject({ result: { resultType: 'incomplete' } }); + const r = (msg as JSONRPCMessage & { result?: unknown }).result; + return r as Required; +} + +/** + * Handler that calls `env.send` mid-dispatch and returns a result that includes the + * answered content. Tracks how many times the body executed so the test can assert + * the handler runs once across the suspend/resume. + */ +function suspendingServer(): { cb: ShttpCallbacks; bodyRuns: () => number } { + let runs = 0; + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest, env?: RequestEnv): AsyncIterable { + runs++; + const ask = (env?.send ?? + (async () => { + throw new Error('env.send not provided'); + })) as NonNullable; + const answer = await ask( + { method: 'elicitation/create', params: { mode: 'form', message: 'units?', requestedSchema: { type: 'object' } } }, + undefined + ); + yield { + jsonrpc: '2.0', + id: req.id, + result: { content: [{ type: 'text', text: `got:${JSON.stringify(answer)}` }] } + }; + } + }; + return { cb, bodyRuns: () => runs }; +} + +describe('ContinuationCompat — suspend/resume via shttpHandler', () => { + let continuations: ContinuationCompat; + afterEach(() => { + continuations?.close(); + }); + + it('round 1 returns IncompleteResult; round 2 resumes and yields the final result without re-running the handler', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const { cb, bodyRuns } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 'weather' } })); + expect(r1.status).toBe(200); + const m1 = (await r1.json()) as JSONRPCMessage; + const inc = asIncomplete(m1); + expect(Object.values(inc.inputRequests)).toHaveLength(1); + const slot = Object.keys(inc.inputRequests)[0]!; + expect(inc.inputRequests[slot]!.method).toBe('elicitation/create'); + expect(typeof inc.requestState).toBe('string'); + expect(continuations.size).toBe(1); + expect(bodyRuns()).toBe(1); + + const r2 = await handler( + post({ + jsonrpc: '2.0', + id: 99, + method: 'tools/call', + params: { + name: 'weather', + requestState: inc.requestState, + inputResponses: { [slot]: { action: 'accept', content: { units: 'metric' } } } + } + }) + ); + const m2 = (await r2.json()) as JSONRPCMessage; + expect(m2).toMatchObject({ + id: 99, + result: { content: [{ type: 'text', text: expect.stringContaining('metric') }] } + }); + expect(bodyRuns()).toBe(1); + expect(continuations.size).toBe(0); + }); + + it('SSE mode: progress notifications before suspend reach round 1; final result reaches round 2', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + let runs = 0; + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest, env?: RequestEnv): AsyncIterable { + runs++; + yield { jsonrpc: '2.0', method: 'notifications/progress', params: { progressToken: 'p', progress: 1 } }; + const r = await env!.send!({ method: 'elicitation/create', params: { mode: 'form', message: 'q?' } }); + yield { jsonrpc: '2.0', id: req.id, result: { ok: true, echoed: r } }; + } + }; + const handler = shttpHandler(cb, { continuations }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const msgs1 = await readSSE(r1); + expect(msgs1).toHaveLength(2); + expect((msgs1[0] as JSONRPCNotification).method).toBe('notifications/progress'); + const inc = asIncomplete(msgs1[1]!); + const slot = Object.keys(inc.inputRequests)[0]!; + + const r2 = await handler( + post({ + jsonrpc: '2.0', + id: 2, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { [slot]: { v: 7 } } } + }) + ); + const msgs2 = await readSSE(r2); + expect(msgs2).toHaveLength(1); + expect(msgs2[0]).toMatchObject({ id: 2, result: { ok: true, echoed: { v: 7 } } }); + expect(runs).toBe(1); + }); + + it('batches concurrent env.send calls into one IncompleteResult and resumes both', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest, env?: RequestEnv): AsyncIterable { + const [a, b] = await Promise.all([ + env!.send!({ method: 'elicitation/create', params: { mode: 'form', message: 'a' } }), + env!.send!({ method: 'sampling/createMessage', params: { messages: [] } }) + ]); + yield { jsonrpc: '2.0', id: req.id, result: { a, b } }; + } + }; + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + const keys = Object.keys(inc.inputRequests); + expect(keys).toHaveLength(2); + const methods = keys.map(k => inc.inputRequests[k]!.method).sort(); + expect(methods).toEqual(['elicitation/create', 'sampling/createMessage']); + + const r2 = await handler( + post({ + jsonrpc: '2.0', + id: 2, + method: 'x', + params: { + requestState: inc.requestState, + inputResponses: { [keys[0]!]: { tag: 'A' }, [keys[1]!]: { tag: 'B' } } + } + }) + ); + const m2 = (await r2.json()) as JSONRPCMessage; + expect(m2).toMatchObject({ id: 2, result: { a: { tag: 'A' }, b: { tag: 'B' } } }); + }); + + it('rejects resume from a different authenticated principal', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + + const auth = (token: string) => ({ authInfo: { token, clientId: 'c', scopes: [] } }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' }), auth('alice-token')); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + const slot = Object.keys(inc.inputRequests)[0]!; + + const evil = await handler( + post({ + jsonrpc: '2.0', + id: 2, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { [slot]: { stolen: true } } } + }), + auth('mallory-token') + ); + expect((await evil.json()) as JSONRPCMessage).toMatchObject({ + id: 2, + error: { code: -32_600, message: expect.stringContaining('does not belong') } + }); + expect(continuations.size).toBe(1); + + const ok = await handler( + post({ + jsonrpc: '2.0', + id: 3, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { [slot]: { units: 'metric' } } } + }), + auth('alice-token') + ); + expect((await ok.json()) as JSONRPCMessage).toMatchObject({ id: 3, result: {} }); + expect(continuations.size).toBe(0); + }); + + it('returns -32600 when requestState is unknown/expired', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + const r = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x', params: { requestState: 'nope', inputResponses: {} } })); + const m = (await r.json()) as JSONRPCMessage; + expect(m).toMatchObject({ id: 1, error: { code: -32_600 } }); + }); + + it('inputResponses missing the requested slot rejects the parked env.send', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest, env?: RequestEnv): AsyncIterable { + try { + await env!.send!({ method: 'elicitation/create' }); + yield { jsonrpc: '2.0', id: req.id, result: { reached: 'nope' } }; + } catch (error) { + yield { + jsonrpc: '2.0', + id: req.id, + result: { isError: true, content: [{ type: 'text', text: (error as Error).message }] } + }; + } + } + }; + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + const slot = Object.keys(inc.inputRequests)[0]!; + + const r2 = await handler( + post({ + jsonrpc: '2.0', + id: 2, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { wrongKey: {} } } + }) + ); + const m2 = (await r2.json()) as JSONRPCMessage; + expect(m2).toMatchObject({ + id: 2, + result: { isError: true, content: [{ type: 'text', text: expect.stringContaining(`slot "${slot}"`) }] } + }); + }); + + it('rejects a second concurrent resume for the same requestState while the first is draining', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest, env?: RequestEnv): AsyncIterable { + const a = await env!.send!({ method: 'elicitation/create' }); + // Second suspension point so the first resume is still draining when the + // duplicate arrives. + const b = await env!.send!({ method: 'elicitation/create' }); + yield { jsonrpc: '2.0', id: req.id, result: { a, b } }; + } + }; + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + const slot = Object.keys(inc.inputRequests)[0]!; + + const resume = (id: number) => + handler( + post({ + jsonrpc: '2.0', + id, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { [slot]: { v: id } } } + }) + ); + + const [first, dup] = await Promise.all([resume(10), resume(11)]); + const mFirst = (await first.json()) as JSONRPCMessage; + const mDup = (await dup.json()) as JSONRPCMessage; + // First resume parks again at the second send (round 2's IncompleteResult). + expect(mFirst).toMatchObject({ id: 10, result: { resultType: 'incomplete' } }); + // Concurrent duplicate is rejected, not silently merged or duplicated. + expect(mDup).toMatchObject({ id: 11, error: { code: -32_600, message: expect.stringContaining('already in progress') } }); + expect(continuations.size).toBe(1); + }); + + describe('TTL eviction', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it('expires a parked frame after ttlMs and rejects later resume', async () => { + const expired: string[] = []; + continuations = new ContinuationCompat({ ttlMs: 1000, onexpired: t => expired.push(t), allowAnonymousSuspend: true }); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + expect(continuations.size).toBe(1); + + vi.advanceTimersByTime(1001); + expect(continuations.size).toBe(0); + expect(expired).toEqual([inc.requestState]); + + const r2 = await handler( + post({ + jsonrpc: '2.0', + id: 2, + method: 'x', + params: { requestState: inc.requestState, inputResponses: { r0: {} } } + }) + ); + const m2 = (await r2.json()) as JSONRPCMessage; + expect(m2).toMatchObject({ id: 2, error: { code: -32_600 } }); + }); + }); + + it('refuses to suspend without a principal by default', async () => { + continuations = new ContinuationCompat(); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + const r = await handler(post({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 't' } })); + expect(r.status).toBe(500); + expect(continuations.size).toBe(0); + }); + + it('rejects new suspension when principal is at perPrincipalMax', async () => { + continuations = new ContinuationCompat({ perPrincipalMax: 2 }); + const auth = (token: string) => ({ authInfo: { token, clientId: 'c', scopes: [] } }); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + await handler(post({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 't' } }), auth('alice')); + await handler(post({ jsonrpc: '2.0', id: 2, method: 'tools/call', params: { name: 't' } }), auth('alice')); + const r3 = await handler(post({ jsonrpc: '2.0', id: 3, method: 'tools/call', params: { name: 't' } }), auth('alice')); + expect(r3.status).toBe(500); + // bob is unaffected + const rb = await handler(post({ jsonrpc: '2.0', id: 4, method: 'tools/call', params: { name: 't' } }), auth('bob')); + expect(rb.status).toBe(200); + expect(continuations.size).toBe(3); + }); + + it('handler that never calls env.send passes through unchanged', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const cb: ShttpCallbacks = { + async *onrequest(req: JSONRPCRequest): AsyncIterable { + yield { jsonrpc: '2.0', id: req.id, result: { ok: true } }; + } + }; + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + const r = await handler(post({ jsonrpc: '2.0', id: 5, method: 'ping' })); + expect(await r.json()).toMatchObject({ id: 5, result: { ok: true } }); + expect(continuations.size).toBe(0); + }); +}); From e6af0cf84270044eaa68a5e22c495b00e1443b37 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 14:27:14 +0000 Subject: [PATCH 2/6] fix(continuationCompat): yield JSON-RPC error (not throw) for principal/capacity guards so SSE clients receive it --- .../server/src/server/continuationCompat.ts | 41 ++++++++++++------- .../test/server/continuationCompat.test.ts | 9 +++- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts index 26bb6b6f6..92989859b 100644 --- a/packages/server/src/server/continuationCompat.ts +++ b/packages/server/src/server/continuationCompat.ts @@ -27,7 +27,7 @@ type InputResponses = Record; export interface ContinuationCompatOptions { /** * Maximum number of suspended handler frames to retain. New suspensions beyond this - * cap throw, surfacing as a 500 from {@linkcode shttpHandler}. Defaults to 1000. + * cap yield a JSON-RPC `-32000` error response for the request. Defaults to 1000. */ maxContinuations?: number; /** @@ -185,16 +185,14 @@ export class ContinuationCompat { this._allowAnonymous = options.allowAnonymousSuspend ?? false; } - private _resolvePrincipal(env: RequestEnv): string | undefined { - const p = principalOf(env); - if (p === undefined && !this._allowAnonymous) { - throw new SdkError( - SdkErrorCode.CapabilityNotSupported, - 'ContinuationCompat: refusing to suspend without a principal (no authInfo.token or mcp-session-id). ' + - 'Deploy behind authenticating middleware, configure SessionCompat, or set allowAnonymousSuspend: true.' - ); - } - return p; + /** + * Returns a JSON-RPC error response for the given message; the three new-frame + * guards yield this instead of throwing so SSE-mode clients receive the error + * (a throw inside `_drive` is swallowed by `shttpHandler`'s outer catch and the + * stream just closes empty). + */ + private _capacityError(id: JSONRPCRequest['id'], message: string): JSONRPCErrorResponse { + return { jsonrpc: '2.0', id, error: { code: -32_000, message } }; } private _decPrincipal(owner: string | undefined): void { @@ -258,7 +256,7 @@ export class ContinuationCompat { if (incomingState !== undefined && this._frames.has(incomingState)) { token = incomingState; const entry = this._frames.get(token)!; - if (entry.owner !== this._resolvePrincipal(env)) { + if (entry.owner !== principalOf(env)) { yield { jsonrpc: '2.0', id: request.id, @@ -281,14 +279,27 @@ export class ContinuationCompat { const responses = (params.inputResponses ?? {}) as InputResponses; cont.answer(responses); } else if (incomingState === undefined) { - const owner = this._resolvePrincipal(env); + const owner = principalOf(env); + if (owner === undefined && !this._allowAnonymous) { + yield this._capacityError( + request.id, + 'ContinuationCompat: refusing to suspend without a principal (no authInfo.token or mcp-session-id). ' + + 'Deploy behind authenticating middleware, configure SessionCompat, or set allowAnonymousSuspend: true.' + ); + return; + } if (this._frames.size >= this._max) { - throw new Error(`ContinuationCompat at capacity (maxContinuations=${this._max})`); + yield this._capacityError(request.id, `ContinuationCompat at capacity (maxContinuations=${this._max})`); + return; } if (owner !== undefined) { const n = this._perPrincipalCount.get(owner) ?? 0; if (n >= this._perPrincipalMax) { - throw new Error(`ContinuationCompat: principal at per-principal capacity (perPrincipalMax=${this._perPrincipalMax})`); + yield this._capacityError( + request.id, + `ContinuationCompat: principal at per-principal capacity (perPrincipalMax=${this._perPrincipalMax})` + ); + return; } this._perPrincipalCount.set(owner, n + 1); } diff --git a/packages/server/test/server/continuationCompat.test.ts b/packages/server/test/server/continuationCompat.test.ts index 90972ec72..130023164 100644 --- a/packages/server/test/server/continuationCompat.test.ts +++ b/packages/server/test/server/continuationCompat.test.ts @@ -336,7 +336,8 @@ describe('ContinuationCompat — suspend/resume via shttpHandler', () => { const { cb } = suspendingServer(); const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); const r = await handler(post({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 't' } })); - expect(r.status).toBe(500); + expect(r.status).toBe(200); + expect(await r.json()).toMatchObject({ id: 1, error: { code: -32_000, message: expect.stringContaining('without a principal') } }); expect(continuations.size).toBe(0); }); @@ -348,7 +349,11 @@ describe('ContinuationCompat — suspend/resume via shttpHandler', () => { await handler(post({ jsonrpc: '2.0', id: 1, method: 'tools/call', params: { name: 't' } }), auth('alice')); await handler(post({ jsonrpc: '2.0', id: 2, method: 'tools/call', params: { name: 't' } }), auth('alice')); const r3 = await handler(post({ jsonrpc: '2.0', id: 3, method: 'tools/call', params: { name: 't' } }), auth('alice')); - expect(r3.status).toBe(500); + expect(r3.status).toBe(200); + expect(await r3.json()).toMatchObject({ + id: 3, + error: { code: -32_000, message: expect.stringContaining('per-principal capacity') } + }); // bob is unaffected const rb = await handler(post({ jsonrpc: '2.0', id: 4, method: 'tools/call', params: { name: 't' } }), auth('bob')); expect(rb.status).toBe(200); From 789e14fc6ffae80475a368a468def3e4cea26ca0 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 15:32:25 +0000 Subject: [PATCH 3/6] fix(continuationCompat): defang answerCh before reject (avoid unhandled rejection on early close); fix typedoc links --- packages/server/src/server/continuationCompat.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts index 92989859b..0db41b4f1 100644 --- a/packages/server/src/server/continuationCompat.ts +++ b/packages/server/src/server/continuationCompat.ts @@ -43,7 +43,7 @@ export interface ContinuationCompatOptions { /** Called when a frame is evicted on TTL. */ onexpired?: (requestState: string) => void; /** - * If `false` (default), {@linkcode ContinuationCompat.drive} throws when no principal can + * If `false` (default), {@linkcode ContinuationCompat.wrap} rejects when no principal can * be derived (no `authInfo.token` and no `mcp-session-id`). Anonymous suspension means any * caller can resume any frame; only enable this for trusted single-tenant deployments. */ @@ -117,12 +117,16 @@ class Continuation { fail(reason: Error): void { this.abort.abort(reason); + // Defang the channel before rejecting: nextAnswer() may not have been awaited yet + // (handler is in non-send async work, or answer() just rotated in a fresh channel). + // Without this the reject lands as an unhandled rejection. + this.answerCh.next.catch(() => {}); this.answerCh.reject(reason); } } /** - * Opt-in suspend/resume continuation store for {@linkcode shttpHandler}, the SEP-2322 + * Opt-in suspend/resume continuation store for {@linkcode handleHttp}, the SEP-2322 * "Option H" stateful server path. * * When configured, a handler's `await ctx.mcpReq.send(...)` (and the higher-level @@ -235,7 +239,7 @@ export class ContinuationCompat { /** * Wraps `onrequest` so it suspends on `env.send` instead of needing a live - * peer channel. Called by {@linkcode shttpHandler} when this instance is configured + * peer channel. Called by {@linkcode handleHttp} when this instance is configured * via `ShttpHandlerOptions.continuations`. */ wrap(onrequest: NonNullable): NonNullable { From 42da66e6264590599809c1c8c7e74af514168f1e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 15:50:25 +0000 Subject: [PATCH 4/6] docs: resolve typedoc link warnings (treatWarningsAsErrors) --- packages/server/src/server/continuationCompat.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts index 0db41b4f1..2a5498fe3 100644 --- a/packages/server/src/server/continuationCompat.ts +++ b/packages/server/src/server/continuationCompat.ts @@ -126,7 +126,7 @@ class Continuation { } /** - * Opt-in suspend/resume continuation store for {@linkcode handleHttp}, the SEP-2322 + * Opt-in suspend/resume continuation store for {@linkcode @modelcontextprotocol/server!index.handleHttp | handleHttp}, the SEP-2322 * "Option H" stateful server path. * * When configured, a handler's `await ctx.mcpReq.send(...)` (and the higher-level @@ -239,7 +239,7 @@ export class ContinuationCompat { /** * Wraps `onrequest` so it suspends on `env.send` instead of needing a live - * peer channel. Called by {@linkcode handleHttp} when this instance is configured + * peer channel. Called by {@linkcode @modelcontextprotocol/server!index.handleHttp | handleHttp} when this instance is configured * via `ShttpHandlerOptions.continuations`. */ wrap(onrequest: NonNullable): NonNullable { From bd3442e0e667910dd92be9ae5748a9b45446d7d8 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 17:18:58 +0000 Subject: [PATCH 5/6] fix(continuationCompat): runner observes only the frame-lifetime signal; bridge per-round HTTP abort during each drain --- .../server/src/server/continuationCompat.ts | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts index 2a5498fe3..f0d9f9615 100644 --- a/packages/server/src/server/continuationCompat.ts +++ b/packages/server/src/server/continuationCompat.ts @@ -320,10 +320,22 @@ export class ContinuationCompat { return; } + // Bridge this round's per-HTTP-request abort to the frame-lifetime controller + // for the duration of this drain. The runner observes only `cont.abort.signal` + // (frame-lifetime); without this bridge, `notifications/cancelled` or a client + // disconnect on a *resume* round would never reach the runner. The listener is + // detached in the finally so an abort that lands after `_drive` returns (i.e. + // the frame is parked between rounds) does not kill it. + const onAbort = () => cont.abort.abort(env.signal?.reason); + if (env.signal?.aborted) { + cont.abort.abort(env.signal.reason); + } else { + env.signal?.addEventListener('abort', onAbort, { once: true }); + } + // Drain the ask channel into the current HTTP response stream until the // handler either finishes or parks for input. The finally clears `draining` - // so a client disconnect mid-stream (caller stops iterating this generator) - // does not leave the frame permanently locked against retry. + // and detaches the per-round abort bridge. try { for (;;) { const a = await cont.nextAsk(); @@ -357,6 +369,7 @@ export class ContinuationCompat { return; } } finally { + env.signal?.removeEventListener('abort', onAbort); const entry = this._frames.get(token); if (entry) entry.draining = false; } @@ -369,11 +382,10 @@ export class ContinuationCompat { cont: Continuation, token: string ): void { - const signal = - env.signal === undefined - ? cont.abort.signal - : (anySignal([env.signal, cont.abort.signal]) ?? linkSignals(env.signal, cont.abort.signal)); - const runnerEnv: RequestEnv = { ...env, signal, send: this._suspendingSend(cont) }; + // The runner's lifetime is the frame's lifetime, so it observes only + // `cont.abort.signal`. Per-HTTP-request aborts are bridged into `cont.abort` + // by `_drive` for the duration of each round. + const runnerEnv: RequestEnv = { ...env, signal: cont.abort.signal, send: this._suspendingSend(cont) }; void (async () => { try { @@ -480,19 +492,3 @@ export class ContinuationCompat { this._decPrincipal(entry.owner); } } - -type SignalAny = (signals: AbortSignal[]) => AbortSignal; -function anySignal(signals: AbortSignal[]): AbortSignal | undefined { - const fn = (AbortSignal as { any?: SignalAny }).any; - return fn ? fn(signals) : undefined; -} -function linkSignals(a: AbortSignal, b: AbortSignal): AbortSignal { - const c = new AbortController(); - const fwd = (s: AbortSignal) => { - if (s.aborted) c.abort(s.reason); - else s.addEventListener('abort', () => c.abort(s.reason), { once: true }); - }; - fwd(a); - fwd(b); - return c.signal; -} From 72f15f7b52a3d6a5553edb80285c55eaf6c3a730 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 18:28:14 +0000 Subject: [PATCH 6/6] fix(continuationCompat): type-guard client-supplied inputResponses; defensive try/catch in _suspendingSend onFulfilled (primitive value caused unhandled rejection) --- .../server/src/server/continuationCompat.ts | 22 ++++++++++++++----- .../test/server/continuationCompat.test.ts | 16 ++++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/packages/server/src/server/continuationCompat.ts b/packages/server/src/server/continuationCompat.ts index f0d9f9615..ac10a8ba7 100644 --- a/packages/server/src/server/continuationCompat.ts +++ b/packages/server/src/server/continuationCompat.ts @@ -280,7 +280,11 @@ export class ContinuationCompat { cont = entry.cont; clearTimeout(entry.timer); entry.timer = this._arm(token); - const responses = (params.inputResponses ?? {}) as InputResponses; + // Untrusted client input: ?? only guards null/undefined. A primitive (42, "x") + // would reach `key in responses` in _suspendingSend and throw a TypeError that + // surfaces as an unhandled rejection. Coerce to a plain object at the boundary. + const raw = params.inputResponses; + const responses = (typeof raw === 'object' && raw !== null && !Array.isArray(raw) ? raw : {}) as InputResponses; cont.answer(responses); } else if (incomingState === undefined) { const owner = principalOf(env); @@ -446,12 +450,18 @@ export class ContinuationCompat { opts?.signal?.addEventListener('abort', onAbort, { once: true }); settle.then( responses => { - opts?.signal?.removeEventListener('abort', onAbort); - if (!(key in responses)) { - reject(new SdkError(SdkErrorCode.SendFailed, `inputResponses missing entry for slot "${key}"`)); - return; + // Any throw inside this onFulfilled would reject the discarded + // .then() promise (unhandled rejection). Route to the executor's reject. + try { + opts?.signal?.removeEventListener('abort', onAbort); + if (!(key in responses)) { + reject(new SdkError(SdkErrorCode.SendFailed, `inputResponses missing entry for slot "${key}"`)); + return; + } + resolve(responses[key]!); + } catch (error) { + reject(error instanceof Error ? error : new Error(String(error))); } - resolve(responses[key]!); }, error => { opts?.signal?.removeEventListener('abort', onAbort); diff --git a/packages/server/test/server/continuationCompat.test.ts b/packages/server/test/server/continuationCompat.test.ts index 130023164..d8c2151e7 100644 --- a/packages/server/test/server/continuationCompat.test.ts +++ b/packages/server/test/server/continuationCompat.test.ts @@ -214,6 +214,22 @@ describe('ContinuationCompat — suspend/resume via shttpHandler', () => { expect(continuations.size).toBe(0); }); + it('malformed inputResponses (primitive) is coerced to {} and rejects the parked send, not the process', async () => { + continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); + const { cb } = suspendingServer(); + const handler = shttpHandler(cb, { continuations, enableJsonResponse: true }); + const r1 = await handler(post({ jsonrpc: '2.0', id: 1, method: 'x' })); + const inc = asIncomplete((await r1.json()) as JSONRPCMessage); + // inputResponses: 42 would throw `'r0' in 42` inside _suspendingSend without the boundary guard. + const r2 = await handler( + post({ jsonrpc: '2.0', id: 2, method: 'x', params: { requestState: inc.requestState, inputResponses: 42 } }) + ); + // Coerced to {}, so the parked env.send rejects with SendFailed and the handler surfaces it as an error result. + const m2 = (await r2.json()) as JSONRPCMessage; + expect(m2).toMatchObject({ id: 2 }); + expect(continuations.size).toBe(0); + }); + it('returns -32600 when requestState is unknown/expired', async () => { continuations = new ContinuationCompat({ allowAnonymousSuspend: true }); const { cb } = suspendingServer();