-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(server): BackchannelCompat; Server.buildContext outbound via ctx.mcpReq.send #2059
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6dfcc47
15a47e8
716039a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| import type { | ||
| JSONRPCErrorResponse, | ||
| JSONRPCMessage, | ||
| JSONRPCRequest, | ||
| JSONRPCResultResponse, | ||
| Request, | ||
| RequestOptions, | ||
| Result | ||
| } from '@modelcontextprotocol/core'; | ||
| import { DEFAULT_REQUEST_TIMEOUT_MSEC, isJSONRPCErrorResponse, ProtocolError, SdkError, SdkErrorCode } from '@modelcontextprotocol/core'; | ||
|
|
||
| /** | ||
| * Isolated 2025-11 server-to-client request backchannel for `handleHttp`. | ||
| * | ||
| * The 2025-11 protocol allows a server to send `elicitation/create` and | ||
| * `sampling/createMessage` requests to the client mid-tool-call by writing them as | ||
| * SSE events on the open POST response stream and waiting for the client to POST | ||
| * the response back. This class owns the per-session `{requestId -> resolver}` | ||
| * map that correlation requires. | ||
| * | ||
| * It exists so this stateful behaviour is in one removable file once MRTR | ||
| * (SEP-2322) is the protocol floor and `env.send` becomes a hard error in | ||
| * stateless paths. | ||
| */ | ||
| export class BackchannelCompat { | ||
| private _pending = new Map<string, Map<number, { resolve: (r: Result) => void; reject: (e: Error) => void }>>(); | ||
| private _nextId = 0; | ||
|
|
||
| /** | ||
| * Returns an `env.send` implementation bound to the given session and POST-stream writer. | ||
| * The returned function writes the outbound JSON-RPC request to `writeSSE` and resolves when | ||
| * {@linkcode handleResponse} is called for the same id on the same session. | ||
| * | ||
| * `writeSSE` returns `false` when the underlying stream is closed; the returned promise then | ||
| * rejects immediately with `SendFailed` instead of waiting for the timeout. | ||
| * | ||
| * Backchannel writes are not persisted to the configured event store, so a client that | ||
| * disconnects mid-elicitation and resumes via `Last-Event-ID` will not see the outbound | ||
| * request again; the awaiting handler will time out. This is a known limitation of the | ||
| * legacy backchannel path; SEP-2322 (`ContinuationCompat`) is the resumable alternative. | ||
| */ | ||
| makeEnvSend(sessionId: string, writeSSE: (msg: JSONRPCMessage) => boolean): (req: Request, opts?: RequestOptions) => Promise<Result> { | ||
| return (req: Request, opts?: RequestOptions): Promise<Result> => { | ||
| return new Promise<Result>((resolve, reject) => { | ||
| if (opts?.signal?.aborted) { | ||
| reject(opts.signal.reason instanceof Error ? opts.signal.reason : new Error(String(opts.signal.reason))); | ||
| return; | ||
| } | ||
|
|
||
| const id = this._nextId++; | ||
| const sessionMap = this._pending.get(sessionId) ?? new Map(); | ||
| this._pending.set(sessionId, sessionMap); | ||
|
|
||
| // eslint-disable-next-line prefer-const -- forward-referenced by cleanup() before assignment site | ||
| let timer: ReturnType<typeof setTimeout> | undefined; | ||
| const onAbort = () => { | ||
| // Tell the client to stop processing, then reject locally. | ||
| writeSSE({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: id } }); | ||
| settle.reject(opts!.signal!.reason instanceof Error ? opts!.signal!.reason : new Error(String(opts!.signal!.reason))); | ||
| }; | ||
| const cleanup = () => { | ||
| if (timer !== undefined) clearTimeout(timer); | ||
| sessionMap.delete(id); | ||
| if (sessionMap.size === 0) this._pending.delete(sessionId); | ||
| opts?.signal?.removeEventListener('abort', onAbort); | ||
| }; | ||
| const settle = { | ||
| resolve: (r: Result) => { | ||
| cleanup(); | ||
| resolve(r); | ||
| }, | ||
| reject: (e: Error) => { | ||
| cleanup(); | ||
| reject(e); | ||
| } | ||
| }; | ||
| sessionMap.set(id, settle); | ||
|
|
||
| const timeoutMs = opts?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC; | ||
| timer = setTimeout( | ||
| () => settle.reject(new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { timeout: timeoutMs })), | ||
| timeoutMs | ||
| ); | ||
|
|
||
| opts?.signal?.addEventListener('abort', onAbort, { once: true }); | ||
|
|
||
| const wire: JSONRPCRequest = { jsonrpc: '2.0', id, method: req.method, params: req.params }; | ||
| if (!writeSSE(wire)) { | ||
|
Comment on lines
+79
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 nit: Extended reasoning...What the gap is
Why the dispatcher doesn't cover thisOne might expect the shared Step-by-step proof
Impact and why it's a nitThe behavioural divergence is real but narrow: server→client sampling-with-progress over the legacy SSE backchannel is an uncommon path, and before this PR Suggested fixAdd one sentence to the
That documents the limitation for users of this public class without expanding the shim's scope. |
||
| settle.reject(new SdkError(SdkErrorCode.SendFailed, 'Backchannel stream closed')); | ||
| } | ||
| }); | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Routes an incoming JSON-RPC response (from a client POST) to the waiting `env.send` promise. | ||
| * @returns true if a pending request matched and was settled. | ||
| */ | ||
| handleResponse(sessionId: string, response: JSONRPCResultResponse | JSONRPCErrorResponse): boolean { | ||
| // We only mint numeric ids; a non-numeric id cannot be ours, so do not coerce | ||
| // (string "0" must not claim numeric pending id 0). | ||
| if (typeof response.id !== 'number') return false; | ||
| const sessionMap = this._pending.get(sessionId); | ||
| const settle = sessionMap?.get(response.id); | ||
| if (!settle) return false; | ||
| if (isJSONRPCErrorResponse(response)) { | ||
| settle.reject(ProtocolError.fromError(response.error.code, response.error.message, response.error.data)); | ||
| } else { | ||
| settle.resolve(response.result); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| /** Rejects all pending requests for a session and forgets it. */ | ||
| closeSession(sessionId: string): void { | ||
| const sessionMap = this._pending.get(sessionId); | ||
| if (!sessionMap) return; | ||
| const err = new SdkError(SdkErrorCode.ConnectionClosed, 'Session closed'); | ||
| for (const s of sessionMap.values()) s.reject(err); | ||
| this._pending.delete(sessionId); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 nit:
onAbortsendsnotifications/cancelledwith only{ requestId: id }, dropping the abort reason even thoughopts!.signal!.reasonis read on the very next line to build the local rejection. The connected-transport path (StreamDriver.request, streamDriver.ts:187-188) includesreason: String(reason)on the same notification — addingreason: String(opts!.signal!.reason)here would give the client the same diagnostic context (e.g. "Client closed SSE stream") and keep the twoenv.sendproviders in parity. Spec-optional, so non-blocking.Extended reasoning...
What the gap is
BackchannelCompat.makeEnvSend'sonAborthandler (backchannelCompat.ts:56-59) writes:The wire notification carries only
requestId. The abort reason is in scope — it's dereferenced on the immediately following line to construct the local rejection — but it never reaches the client.The sibling path includes it
BackchannelCompatis one of twoenv.sendproviders. The other,StreamDriver.request(packages/core/src/shared/streamDriver.ts:184-191), handles the same abort-→-cancel case for the connected-transport path and sends:So a server-initiated request cancelled via
StreamDrivertells the client why; the same request cancelled viaBackchannelCompatdoes not. Same protocol message, same semantic event, two different payloads depending on which transport adapter the server is mounted through.Why nothing prevents it
CancelledNotification.params.reasonisz.string().optional()in the spec schema, so the message validates either way. There's no shared helper that builds this notification — each path constructs the literal independently — so nothing forces them to agree on shape.Step-by-step
handleHttp(mcp, { session, backchannel }); a tool handler callsctx.mcpReq.elicitInput(params, { signal }).ctx.mcpReq.sendresolves tobackchannel.makeEnvSend(sessionId, writeSSE). The elicitation request is written to the POST's SSE stream with id0;signal.addEventListener('abort', onAbort)is registered.shttpHandler'scancel()runsctrl.abort(new Error('Client closed SSE stream')), which propagates to the handler'sctx.mcpReq.signalif the handler wired it through).onAbortfires. It writes{ method: 'notifications/cancelled', params: { requestId: 0 } }to the SSE stream — noreasonfield. Then it rejects the local promise withError('Client closed SSE stream').0was cancelled but not whether the server timed out, the user aborted, or the stream broke. Had the same flow gone throughStreamDriver, the client would have receivedreason: "Error: Client closed SSE stream".Impact
Not a protocol violation —
reasonis optional — so this is a consistency/diagnostics nit, not a correctness bug. The client loses human-readable cancel context on thehandleHttpbackchannel path only, and the twoenv.sendimplementations diverge on wire shape for no stated reason.Fix
One-token addition:
This matches
StreamDriverexactly and reuses the value already being read on the next line.