refactor(core): add internal Dispatcher and StreamDriver classes#2056
refactor(core): add internal Dispatcher and StreamDriver classes#2056felixweinberger wants to merge 1 commit into
Conversation
|
@modelcontextprotocol/client
@modelcontextprotocol/server
@modelcontextprotocol/express
@modelcontextprotocol/fastify
@modelcontextprotocol/hono
@modelcontextprotocol/node
commit: |
|
@claude review |
1 similar comment
|
@claude review |
There was a problem hiding this comment.
The two inline nits are minor, but this is ~900 lines of new core dispatch/lifecycle machinery (middleware chain, async-generator dispatch, new Outbound/RequestEnv abstractions, ext added to public BaseContext, three new export * lines in the barrel) that R2 will build on — the design and export-surface choices here need a maintainer's sign-off, not a bot's.
Extended reasoning...
Overview
Adds three new files under packages/core/src/shared/ — dispatcher.ts (~400 LOC: stateless handler registry, onion middleware via use(), async-generator dispatch()), streamDriver.ts (~450 LOC: per-pipe correlation/timeouts/progress/cancellation, largely mirrored from Protocol), and context.ts (RequestEnv, Outbound). Adds ext?: Record<string, unknown> to the public BaseContext type in protocol.ts. Re-exports all three new modules from packages/core/src/index.ts. ~520 LOC of new vitest coverage. Nothing wires the new classes into Protocol yet (that's R2).
Security risks
None identified. No auth, crypto, or trust-boundary changes; authInfo/httpReq are passed through opaquely. The async-generator + queue/wake pattern in _dispatchCore and the timeout/abort bookkeeping in StreamDriver are the trickiest parts, but they mirror existing Protocol semantics and the finally block correctly aborts the local controller and detaches the env.signal listener on early consumer return.
Level of scrutiny
High — this is foundational architecture for the v2-stateless track, not a mechanical change. Per the repo's review conventions, design justification comes first: the PR introduces a middleware engine and two new abstraction layers, and the principles section explicitly calls out middleware engines as belonging in userland unless strongly justified. The justification lives in an external gist review-guide and the R2 follow-up, so a maintainer who has that context needs to validate the shape before it becomes load-bearing.
Other factors
- Export surface vs. description: the PR body says "Not exported publicly", but
index.tsgains threeexport *lines. Everything is@internal-tagged, which may be the project's convention for cross-package internals, but per REVIEW.md ("Every new export is intentional") a human should confirm that's intended rather than gated behind a subpath. - Public type change:
BaseContext.extis a new field on an exported, non-@internaltype — small, additive, but still public API. - Inline findings: both are nits (missing
_closedguard parity inStreamDriver.request();ErrorvsProtocolErrorin the 3-argsetNotificationHandlerpath). Neither blocks; neither is the reason to defer. - No changeset: probably intentional for an internal-only refactor, but worth a maintainer confirming given the barrel-export additions.
- Test coverage for the new code is solid (dispatch yield ordering, middleware composition order, abort, correlation, timeout, progress, debounce), which raises confidence in correctness — the open question is design, not bugs.
| request<T extends StandardSchemaV1>(req: Request, resultSchema: T, options?: RequestOptions): Promise<StandardSchemaV1.InferOutput<T>> { | ||
| const { relatedRequestId, resumptionToken, onresumptiontoken } = options ?? {}; | ||
| let onAbort: (() => void) | undefined; | ||
| let cleanupId: number | undefined; | ||
|
|
||
| let responseReceived = false; | ||
|
|
||
| return new Promise<StandardSchemaV1.InferOutput<T>>((resolve, reject) => { | ||
| if (options?.signal?.aborted) { | ||
| const reason = options.signal.reason; | ||
| throw reason instanceof Error ? reason : new DOMException('Request was aborted before send', 'AbortError'); | ||
| } | ||
|
|
||
| const messageId = this._requestMessageId++; | ||
| cleanupId = messageId; |
There was a problem hiding this comment.
🟡 notification() early-returns when this._closed is true, and Protocol._requestWithSchema (which this mirrors) rejects with "Not connected" when !this._transport, but StreamDriver.request() has no equivalent guard. In practice the impact is limited — real transports reject send() after close, and the ctx.mcpReq.send path is already covered by the aborted-signal check — so the only exposure is a direct Outbound.request() call after close on a transport whose send() resolves silently, which would hang until the request timeout. A one-line if (this._closed) return Promise.reject(new SdkError(SdkErrorCode.ConnectionClosed, ...)) would restore parity with both notification() and Protocol before R2.
Extended reasoning...
What the gap is
StreamDriver.notification() (line 260) starts with if (this._closed) return;, and Protocol._requestWithSchema — which the PR description says StreamDriver.request() mirrors and which R2 will replace — early-rejects with new Error('Not connected') when !this._transport. StreamDriver.request() has no equivalent check on this._closed. So a direct call to the Outbound.request() contract after _onclose() has run will proceed: it increments _requestMessageId, registers entries in _responseHandlers / _progressHandlers / _timeoutInfo, and calls pipe.send(). Since _onclose() already swept those maps and won't run again, those entries are only cleaned up when the promise settles.
Why the practical impact is small
Three of the obvious exposure paths are already covered, which is why this is a nit rather than a correctness bug:
-
The
ctx.mcpReq.sendpath is guarded._onclose()aborts every controller in_requestHandlerAbortControllers; that controller's signal isenv.signal, whichDispatcher._dispatchCorepropagates tolocalAbort, which is threaded asoptions.signalintoenv.send→StreamDriver.request(). The first thingrequest()does (lines 184–187) is checkoptions?.signal?.abortedand throw early. So a handler callingctx.mcpReq.sendafter the pipe closed throws immediately. -
Real transports reject
send()after close.InMemoryTransport.send()throwsSdkError(NotConnected)once_otherTransportis undefined; stdio/WebSocket similarly fail on write-after-close. That rejection hitspipe.send(...).catch(reject), which settles the promise and runs the.finally()block that deletes_responseHandlers[cleanupId]and clears the timeout._progressHandlersis deleted in the same.catch. -
Even if
send()resolved silently, nothing leaks permanently. The request would sit untilDEFAULT_REQUEST_TIMEOUT_MSEC(or the caller'stimeout), thencancel()rejects it, deletes_progressHandlers, and.finally()deletes_responseHandlersand the timer. So the worst case is a bounded hang, not an unbounded leak.
The remaining exposure
The one path that does behave worse than Protocol is a direct owner call to driver.request() (the public Outbound contract) with no signal, after close, on a transport whose send() happens to resolve silently when closed. Walkthrough:
pipe.onclosefires →_onclose()sets_closed = true, clears all maps.- Owner calls
driver.request({ method: 'ping' }, schema)with nooptions.signal. options?.signal?.abortedis falsy → no early throw.messageId = 0is allocated; handler/timeout are registered in the (now-fresh) maps.pipe.send(jsonrpcRequest)resolves silently (hypothetical transport).- No response will ever arrive (
_onclosealready ran), so the promise sits until the 60s default timeout, then rejects withRequestTimeoutand self-cleans.
Under Protocol._requestWithSchema the same call rejects immediately with "Not connected", and under StreamDriver.notification() the analogous call returns immediately. So this is a behavioral parity gap that will become observable when R2 swaps Protocol's request path onto StreamDriver.
Suggested fix
Add at the top of request() (or as the first line inside the executor, before the signal.aborted check):
if (this._closed) {
return Promise.reject(new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'));
}This matches notification()'s _closed guard and Protocol._requestWithSchema's !this._transport guard, and costs nothing.
| if (!parsed.success) { | ||
| throw new Error(`Invalid params for notification ${method}: ${parsed.error}`); | ||
| } |
There was a problem hiding this comment.
🟡 The 3-arg setNotificationHandler path throws new Error(...) on params validation failure, while the sibling 3-arg setRequestHandler (just above) and the Protocol.setNotificationHandler this mirrors both throw new ProtocolError(ProtocolErrorCode.InvalidParams, ...). Since Dispatcher is being extracted to compose into Protocol in R2, this should match — change to throw new ProtocolError(ProtocolErrorCode.InvalidParams, \Invalid params for notification ${method}: ${parsed.error}`)`. Impact is minimal (notification errors don't reach the wire and StreamDriver re-wraps them anyway), so just a consistency nit.
Extended reasoning...
What the bug is
In Dispatcher.setNotificationHandler's 3-arg (custom-method) overload at dispatcher.ts:346-348, when validateStandardSchema fails on the user-supplied params schema, the stored handler throws:
throw new Error(`Invalid params for notification ${method}: ${parsed.error}`);This diverges from two directly comparable code paths:
-
The sibling
Dispatcher.setRequestHandler3-arg path (same file, ~30 lines up) throwsnew ProtocolError(ProtocolErrorCode.InvalidParams,Invalid params for ${method}: ...)on the identical validation failure. -
The
Protocol.setNotificationHandler3-arg path inprotocol.ts— which this PR explicitly mirrors as preparation for R2's Protocol composition — also throwsnew ProtocolError(ProtocolErrorCode.InvalidParams, ...).
Since the PR description states Dispatcher is being extracted so that Protocol can compose around it in R2, the new code should match the behavior it's about to replace.
Step-by-step reproduction
const d = new Dispatcher()d.setNotificationHandler('acme/ping', { params: z.object({ ts: z.number() }) }, () => {})await d.dispatchNotification({ jsonrpc: '2.0', method: 'acme/ping', params: { ts: 'not-a-number' } })- Inside the stored handler,
validateStandardSchemareturns{ success: false, error: ... }. - Line 347 executes
throw new Error('Invalid params for notification acme/ping: ...'). -
dispatchNotificationrejects with a plainError. Compare: doing the equivalent withsetRequestHandleranddispatchToResponseproduces a JSON-RPC error withcode: -32602(InvalidParams) because that path throwsProtocolError.
Why existing code doesn't prevent it
There's no normalization layer between the thrown error and dispatchNotification's rejection — dispatchNotification just does await Promise.resolve().then(() => handler(notification)), so whatever the handler throws propagates verbatim. The only consumer in this PR, StreamDriver._onnotification, catches the rejection and wraps it in new Error(\Uncaught error in notification handler: ${error}`)before passing it toonerror, which discards the original type either way — but that's a property of one specific consumer, not of Dispatcher` itself.
Impact
Minimal. Notifications have no id and never produce a wire-level JSON-RPC response, so the InvalidParams error code is never serialized to the peer. And as noted, StreamDriver re-wraps the error before it reaches onerror, so downstream instanceof ProtocolError checks wouldn't see it regardless. The only place the difference is observable is to code that calls dispatcher.dispatchNotification() directly and inspects the rejection — which nothing does yet. This is purely a consistency issue: when a PR replicates an existing pattern across sibling paths, the paths should match, especially since R2 will swap this in for Protocol's implementation.
Fix
One-line change at dispatcher.ts:347:
throw new ProtocolError(ProtocolErrorCode.InvalidParams, `Invalid params for notification ${method}: ${parsed.error}`);ProtocolError and ProtocolErrorCode are already imported in this file.
1916e69 to
22ba1b0
Compare
|
@claude review |
| _meta: request.params?._meta, | ||
| signal: localAbort.signal, | ||
| send: (async (r: Request, schemaOrOptions?: unknown, maybeOptions?: RequestOptions) => { | ||
| const isSchema = schemaOrOptions != null && typeof schemaOrOptions === 'object' && '~standard' in schemaOrOptions; |
There was a problem hiding this comment.
🟡 nit: this reimplements isStandardSchema() inline — Protocol's identical mcpReq.send overload-dispatch (which this is being extracted to replace in R2) uses the exported helper, and the inline version is slightly weaker (rejects function-typed schemas, doesn't check ~standard.validate). Since validateStandardSchema is already imported from the same module, just add isStandardSchema to that import and write const isSchema = isStandardSchema(schemaOrOptions);.
Extended reasoning...
What the issue is
At dispatcher.ts:170, mcpReq.send disambiguates its overloaded second argument (schema vs. RequestOptions) with an inline check:
const isSchema = schemaOrOptions != null && typeof schemaOrOptions === 'object' && '~standard' in schemaOrOptions;The codebase already exports isStandardSchema() (util/standardSchema.ts:128) for exactly this purpose, and Protocol._onrequest's identical mcpReq.send overload-dispatch — the code path this PR is explicitly extracting so R2 can compose Dispatcher into Protocol — uses the helper directly:
send: ((r, schemaOrOptions, maybeOptions) => {
if (isStandardSchema(schemaOrOptions)) { ... }
...
})So the extracted copy and the original it's about to replace now disambiguate the same overload two different ways.
How the inline check is weaker
isStandardSchema() accepts both typeof v === 'object' and typeof v === 'function', and additionally verifies that v['~standard'].validate is a function. The inline check requires typeof === 'object' only and doesn't look at .validate. The Standard Schema spec permits callable schemas (a function carrying a ~standard property), and isStandardSchema was written to accept them; the inline check rejects them.
Step-by-step divergence example
Take a callable Standard Schema — e.g. a schema library whose schema instances are functions with a ~standard property:
- A handler calls
ctx.mcpReq.send({ method: 'acme/ping' }, callableSchema). - At line 170,
typeof callableSchema === 'object'is false (it's'function'), soisSchemaisfalse. schemaOrOptionsis therefore treated asRequestOptions, andgetResultSchema('acme/ping')is called.- Since
acme/pingisn't a spec method, line 174 throwsTypeError: 'acme/ping' is not a spec method; pass a result schema...— even though the caller did pass one. - Under
Protocol's path (and underisStandardSchema), step 2 evaluatestrueand the call proceeds with the supplied schema.
If the method were a spec method, the divergence is quieter: the spec result schema would be used instead of the caller's, and the callable schema would be spread into options as garbage keys.
Why existing code doesn't prevent it
Nothing normalizes schemaOrOptions before line 170 — it's the first thing the implementation does with the argument. The only guard is the inline check itself.
Impact
Near-zero in practice, hence nit. Zod, Valibot, ArkType, and the RAW_RESULT_SCHEMA defined in this PR are all plain objects, and RequestOptions never carries a ~standard key, so the weaker check still produces the right answer for every schema anyone is likely to pass today. The concern is consistency: when R2 swaps Protocol's dispatch onto Dispatcher, the two should already disambiguate identically, and the repo's "one way to do things" convention argues against having two divergent definitions of "is a Standard Schema" in the same package.
Fix
dispatcher.ts already imports validateStandardSchema from '../util/standardSchema.js', so add isStandardSchema to that import line and replace line 170 with:
const isSchema = isStandardSchema(schemaOrOptions);Dispatcher is a stateless handler registry with a dispatch(req, env) async-generator that yields notifications then one terminal response. StreamDriver owns per-connection state (id correlation, timeouts, progress, cancellation) over a persistent transport. Additive: new files, not yet wired. Protocol composes them in R2.
22ba1b0 to
6aa731b
Compare
|
@claude review |
| return; | ||
| } | ||
| this.dispatcher | ||
| .dispatchNotification(notification) |
There was a problem hiding this comment.
🟡 A peer sending {jsonrpc:'2.0', method:'notifications/progress'} with no params will throw a synchronous TypeError here, since _onprogress's first line destructures progressParams. Protocol contains the same input via the setNotificationHandler → schema.parse → .catch(_onerror) chain, and the sibling notifications/cancelled branch just above is already defensive ((params as ... | undefined)?.requestId). Suggest guarding with if (!notification.params) return; (or defaulting progressParams to {}) to keep parity before R2 swaps Protocol onto this.
Extended reasoning...
What the bug is
StreamDriver._onnotification special-cases notifications/progress and forwards notification.params straight to _onprogress with an unchecked cast:
if (notification.method === 'notifications/progress') {
this._onprogress(notification.params as ProgressNotification['params']);
return;
}The first line of _onprogress is const { progressToken, ...params } = progressParams;. If a peer sends {jsonrpc:'2.0', method:'notifications/progress'} with no params (or params: null), progressParams is undefined and the destructure throws a synchronous TypeError: Cannot destructure property 'progressToken' of 'undefined'.
The code path that triggers it
pipe.onmessage is wired in StreamDriver.start() with no try/catch. isJSONRPCNotification validates against JSONRPCNotificationSchema, whose params field is optional, so the no-params message passes the type guard and reaches _onnotification. _onnotification matches 'notifications/progress' and calls _onprogress(undefined) synchronously. The destructure throws, and the throw escapes pipe.onmessage. For InMemoryTransport that means the peer's send() rejects; for stdio it surfaces as an uncaught exception in the data event handler.
Why existing code doesn't prevent it
This is the only path in _onnotification that isn't guarded. The sibling notifications/cancelled branch immediately above is defensive — (notification.params as { requestId?: RequestId } | undefined)?.requestId — and the fall-through path goes to dispatcher.dispatchNotification(...).catch(error => this._onerror(...)), which contains any throw. Only the progress branch executes synchronously with no guard and no catch.
This is a parity regression vs Protocol. In Protocol, notifications/progress is registered via setNotificationHandler, so Protocol._onnotification invokes the stored handler inside Promise.resolve().then(() => handler(notification)).catch(error => this._onerror(...)). The stored handler runs schema.parse(notification) (which rejects the missing params), and the resulting ZodError is caught and surfaced as a contained _onerror() call rather than a synchronous throw out of onmessage.
Step-by-step proof
const [a, b] = InMemoryTransport.createLinkedPair();const driver = new StreamDriver(new Dispatcher(), a);await driver.start(); await b.start();- Peer calls
await b.send({ jsonrpc: '2.0', method: 'notifications/progress' }). InMemoryTransport.sendsynchronously invokesa.onmessage(message)(=StreamDriver's wired handler).isJSONRPCNotification(message)→true(params is optional in the envelope schema) →_onnotification(message).notification.method === 'notifications/progress'→_onprogress(undefined).const { progressToken, ...params } = undefined→TypeErrorthrown synchronously.- The throw propagates out of
a.onmessage, so the peer'sb.send()promise rejects with theTypeError. UnderProtocolthe same input would instead resolveb.send()and surface a contained error viaonerror.
Impact
Low — hence nit. It requires a misbehaving peer to send a spec-invalid progress notification, and StreamDriver is @internal and not yet wired (R1 of decomposition). But since R2 will swap Protocol onto this code path, it's worth restoring the defensive behavior now so the swap doesn't quietly weaken robustness against malformed peer input.
Fix
Apply the same pattern as the notifications/cancelled branch — either guard before the call:
if (notification.method === 'notifications/progress') {
if (notification.params) this._onprogress(notification.params as ProgressNotification['params']);
return;
}or default the destructure target in _onprogress: const { progressToken, ...params } = progressParams ?? {}; (then early-return on progressToken === undefined).
|
@claude review |
| /** | ||
| * Extension slot. Adapters and middleware populate keys here; handlers cast to the | ||
| * extension's declared type to read them. Core never reads or writes this field. | ||
| */ | ||
| ext?: Record<string, unknown>; |
There was a problem hiding this comment.
🟡 nit (docs): this PR adds ext?: Record<string, unknown> to the publicly-exported BaseContext, but the root CLAUDE.md § "Request Handler Context" (~lines 188–204) — which exhaustively enumerates every BaseContext field — does not gain a bullet for it. Worth adding e.g. "ext?: Extension slot for adapter/middleware-populated keys" to keep the onboarding doc in sync. While there, the same list still includes task?, which was removed from BaseContext in 39e10ab (R0), so a quick pass on the whole list may be warranted.
Extended reasoning...
What the issue is
This PR adds a new field to the publicly-exported BaseContext type in protocol.ts:206-211:
/**
* Extension slot. Adapters and middleware populate keys here; handlers cast to the
* extension's declared type to read them. Core never reads or writes this field.
*/
ext?: Record<string, unknown>;and threads it through Dispatcher._dispatchCore (base.ext = env.ext). BaseContext is the handler-facing context type that ServerContext/ClientContext extend, so ext is now part of the public surface every request handler can read.
The root CLAUDE.md has a "Request Handler Context" section (~lines 188–204) that explicitly enumerates BaseContext field-by-field as a bulleted list:
BaseContext(common to both Server and Client), fields organized into nested groups:
sessionId?: Transport session identifiermcpReq: Request-level concerns
id/method/_meta?/signal/send(...)/notify(...)http?: HTTP transport info (undefined for stdio)
authInfo?: Validated auth token infotask?: Task context ({ id?, store, requestedTtl? }) when task storage is configured
The list is presented as exhaustive ("fields organized into nested groups:") and is the canonical contributor onboarding reference for the handler-context shape. After this PR it under-describes the implementation: ext is missing.
Why this is worth flagging
Per the repo's REVIEW.md doc-sync checklist ("behavior change: check whether docs/**/*.md describes the old behavior and needs updating; flag prose that now contradicts the implementation"), CLAUDE.md should track public-shape changes. BaseContext is exported via index.ts → protocol.js, and ext is specifically the seam adapters/middleware will use to populate handler-visible state — i.e., exactly the kind of field a contributor reading CLAUDE.md to understand the handler context would want to know about.
Step-by-step
- Before this PR,
BaseContexthad four top-level keys:sessionId?,mcpReq,http?, and (until 39e10ab)task?.CLAUDE.mdlists exactly those four. - This PR adds a fifth top-level key,
ext?, atprotocol.ts:211. CLAUDE.mdis not touched in the diff (not in the changed-files list).- Result: the enumeration at
CLAUDE.md:191-204now omits a top-levelBaseContextfield that exists in the type and is populated at runtime byDispatcher(dispatcher.ts,base.ext = env.ext).
Pre-existing drift in the same section
The same bulleted list still includes task?: Task context ({ id?, store, requestedTtl? }) when task storage is configured. That field was removed from BaseContext by base-branch commit 39e10ab ("remove experimental tasks interception from Protocol", R0). That staleness is pre-existing and not introduced by this PR, but since this PR is touching BaseContext and the section is already due for a sync, it's worth fixing both in one pass.
Impact
Doc-only; zero runtime effect. Hence nit.
Fix
Add one bullet under the BaseContext list in CLAUDE.md:
- `ext?`: Extension slot — adapter/middleware-populated keys; core never reads or writes thisand (optionally, while there) drop the stale task? bullet.
Adds two new internal classes in
packages/core/src/shared/:Dispatcher(stateless handler registry + middleware chain +dispatch()async-generator) andStreamDriver(per-connection state: id correlation, timeouts, progress, cancellation). Not exported publicly. Nothing wires them yet.Motivation and Context
Prepares R2's Protocol composition. Separating stateless dispatch from per-connection state is the structural change that lets the SEP seams land additively and avoids the GHSA-345p shape (shared id-keyed maps in the dispatch path).
How Has This Been Tested?
New unit tests for Dispatcher (
dispatch()yield, middleware composition, abort) and StreamDriver (correlation, timeout, progress). All existing tests unchanged.Breaking Changes
None.
Types of changes
Checklist
Additional context
R1 of decomposition; depends on R0. Review guide: https://gist.github.com/felixweinberger/5a48e0f14d5aced39ed6a91b61940711. New files only —
Protocolis untouched until R2.