From e67c2f8962ebbf9f600ce05241e379ed565ad383 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 11:29:50 +0000 Subject: [PATCH 1/5] feat(server): handleHttp() per-request entry, SessionCompat, dual-conformance Adds handleHttp(mcp): (Request) => Promise for stateless/serverless deployment, driven by Protocol.dispatch(). SessionCompat is opt-in 2025-11 mcp-session-id lifecycle. toNodeHttpHandler adapts to Node http. SSE Last-Event-ID resumption is bound to the issuing session: the streamId is per-session (per-POST UUID or _GET_stream:), tracked in SessionCompat, and replayEvents rejects event IDs that don't belong to the requesting session. Requires EventStore.getStreamIdForEventId. Dual conformance: CI runs the same suite against both connect(transport) and handleHttp() entry points (40/40 each). --- package.json | 2 + packages/core/src/shared/protocol.ts | 9 + .../middleware/node/src/streamableHttp.ts | 29 + .../node/test/streamableHttp.test.ts | 26 +- packages/server/src/index.ts | 4 + packages/server/src/server/handleHttp.ts | 51 + packages/server/src/server/mcp.ts | 17 + packages/server/src/server/sessionCompat.ts | 351 +++++++ packages/server/src/server/shttpHandler.ts | 565 +++++++++++ .../server/test/server/shttpHandler.test.ts | 163 ++++ .../expected-failures-handlehttp.yaml | 7 + test/conformance/package.json | 2 + .../scripts/run-server-conformance.sh | 5 +- test/conformance/src/everythingServer.ts | 912 +----------------- .../src/everythingServerHandleHttp.ts | 54 ++ test/conformance/src/everythingServerSetup.ts | 563 +++++++++++ 16 files changed, 1868 insertions(+), 892 deletions(-) create mode 100644 packages/server/src/server/handleHttp.ts create mode 100644 packages/server/src/server/sessionCompat.ts create mode 100644 packages/server/src/server/shttpHandler.ts create mode 100644 packages/server/test/server/shttpHandler.test.ts create mode 100644 test/conformance/expected-failures-handlehttp.yaml create mode 100644 test/conformance/src/everythingServerHandleHttp.ts create mode 100644 test/conformance/src/everythingServerSetup.ts diff --git a/package.json b/package.json index a2cb93f629..6b4aae17f8 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,8 @@ "test:conformance:server": "pnpm --filter @modelcontextprotocol/test-conformance run test:conformance:server", "test:conformance:server:all": "pnpm --filter @modelcontextprotocol/test-conformance run test:conformance:server:all", "test:conformance:server:run": "pnpm --filter @modelcontextprotocol/test-conformance run test:conformance:server:run", + "test:conformance:server:handlehttp": "pnpm --filter @modelcontextprotocol/test-conformance run test:conformance:server:handlehttp", + "test:conformance:server:dual": "pnpm --filter @modelcontextprotocol/test-conformance run test:conformance:server:dual", "test:conformance:all": "pnpm run test:conformance:client:all && pnpm run test:conformance:server:all" }, "devDependencies": { diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index fc60d8eb66..f84ea7b4a0 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -8,6 +8,7 @@ import { SdkError, SdkErrorCode } from '../errors/sdkErrors.js'; import type { + JSONRPCNotification, JSONRPCRequest, MessageExtraInfo, Notification, @@ -179,6 +180,14 @@ export abstract class Protocol { return this._dispatcher.dispatch(request, env); } + /** + * Dispatch one inbound notification to its registered handler. Transport-free + * counterpart to {@linkcode Protocol.dispatch}; consumed by `handleHttp`. + */ + dispatchNotification(notification: JSONRPCNotification): Promise { + return this._dispatcher.dispatchNotification(notification); + } + /** * Registers a handler to invoke when this protocol object receives a request with the given method. * diff --git a/packages/middleware/node/src/streamableHttp.ts b/packages/middleware/node/src/streamableHttp.ts index 68a0c224f0..bfbd3f8074 100644 --- a/packages/middleware/node/src/streamableHttp.ts +++ b/packages/middleware/node/src/streamableHttp.ts @@ -21,6 +21,35 @@ import { WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/ */ export type StreamableHTTPServerTransportOptions = WebStandardStreamableHTTPServerTransportOptions; +/** + * Converts a web-standard `(Request) => Response` handler into a Node.js + * `(IncomingMessage, ServerResponse) => void` handler suitable for express, + * `http.createServer`, etc. + * + * The third parameter (express's `next`) is accepted for middleware compatibility but + * not invoked; errors are written to the response (`@hono/node-server` swallows handler + * rejections internally). Auth info is read from `req.auth`; a pre-parsed body is read from `req.body` + * (e.g. when `express.json()` ran before this handler). + * + * ```ts + * import { handleHttp } from '@modelcontextprotocol/server'; + * import { toNodeHttpHandler } from '@modelcontextprotocol/node'; + * + * app.all('/mcp', toNodeHttpHandler(handleHttp(mcp, { session }))); + * ``` + */ +export function toNodeHttpHandler( + handler: (req: Request, extra?: { authInfo?: AuthInfo; parsedBody?: unknown }) => Response | Promise +): (req: IncomingMessage & { auth?: AuthInfo; body?: unknown }, res: ServerResponse, next?: (err?: unknown) => void) => Promise { + return async (req, res, _next) => { + void _next; + const parsedBody = req.body; + const extra = req.auth !== undefined || parsedBody !== undefined ? { authInfo: req.auth, parsedBody } : undefined; + const listener = getRequestListener(webReq => handler(webReq, extra), { overrideGlobalObjects: false }); + await listener(req, res); + }; +} + /** * Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. * It supports both SSE streaming and direct HTTP responses. diff --git a/packages/middleware/node/test/streamableHttp.test.ts b/packages/middleware/node/test/streamableHttp.test.ts index c427aa2eea..37d66ac2f4 100644 --- a/packages/middleware/node/test/streamableHttp.test.ts +++ b/packages/middleware/node/test/streamableHttp.test.ts @@ -18,7 +18,31 @@ import { listenOnRandomPort } from '@modelcontextprotocol/test-helpers'; import * as z from 'zod/v4'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { NodeStreamableHTTPServerTransport } from '../src/streamableHttp.js'; +import { NodeStreamableHTTPServerTransport, toNodeHttpHandler } from '../src/streamableHttp.js'; + +describe('toNodeHttpHandler', () => { + it('does not treat express next() as parsedBody; reads req.body instead', async () => { + let capturedExtra: { parsedBody?: unknown } | undefined; + const handler = toNodeHttpHandler(async (_req, extra) => { + capturedExtra = extra; + return Response.json({ ok: true }); + }); + const port = await getFreePort(); + const server = createServer((req, res) => { + (req as IncomingMessage & { body?: unknown }).body = { jsonrpc: '2.0', method: 'ping', id: 1 }; + // Express-style call: third arg is the next() function. + void handler(req as IncomingMessage & { auth?: AuthInfo; body?: unknown }, res, () => {}); + }); + await new Promise(r => server.listen(port, r)); + try { + await fetch(`http://localhost:${port}/`, { method: 'POST' }); + expect(capturedExtra?.parsedBody).toEqual({ jsonrpc: '2.0', method: 'ping', id: 1 }); + expect(typeof capturedExtra?.parsedBody).not.toBe('function'); + } finally { + await new Promise(r => server.close(() => r())); + } + }); +}); async function getFreePort() { return new Promise(res => { diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 95566bbb4d..10ebc06544 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -31,6 +31,10 @@ export { Server } from './server/server.js'; // StdioServerTransport is exported from the './stdio' subpath — server stdio has only type-level Node // imports (erased at compile time), but matching the client's `./stdio` subpath gives consumers a // consistent shape across packages. +export type { Dispatchable, HandleHttpOptions, HandleHttpRequestExtra } from './server/handleHttp.js'; +export { handleHttp } from './server/handleHttp.js'; +export type { SessionCompatOptions, SessionValidation } from './server/sessionCompat.js'; +export { SessionCompat } from './server/sessionCompat.js'; export type { EventId, EventStore, diff --git a/packages/server/src/server/handleHttp.ts b/packages/server/src/server/handleHttp.ts new file mode 100644 index 0000000000..9f0b9ab6fb --- /dev/null +++ b/packages/server/src/server/handleHttp.ts @@ -0,0 +1,51 @@ +import type { DispatchOutput, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, RequestEnv } from '@modelcontextprotocol/core'; + +import type { ShttpHandlerOptions, ShttpRequestExtra } from './shttpHandler.js'; +import { shttpHandler } from './shttpHandler.js'; + +async function* unwrap(gen: AsyncIterable): AsyncGenerator { + for await (const out of gen) yield out.message; +} + +/** + * Minimal contract {@linkcode handleHttp} requires. Satisfied by `McpServer`, + * `Server`, and any `Protocol` subclass. + */ +export interface Dispatchable { + dispatch(request: JSONRPCRequest, env?: RequestEnv): AsyncIterable; + dispatchNotification(notification: JSONRPCNotification): Promise; +} + +/** + * Mounts an `McpServer` (or any `Protocol`) as a web-standard + * `(Request) => Response` handler. Use this to drive a server from an HTTP framework + * without instantiating a transport class: + * + * ```ts + * import { McpServer, handleHttp, SessionCompat } from '@modelcontextprotocol/server'; + * import { toNodeHttpHandler } from '@modelcontextprotocol/node'; + * + * const mcp = new McpServer({ name: 's', version: '1.0.0' }); + * mcp.tool('search', schema, handler); + * + * app.all('/mcp', toNodeHttpHandler(handleHttp(mcp, { session: new SessionCompat() }))); + * ``` + * + * `mcp.connect(transport)` is not called; each HTTP request flows through + * `mcp.dispatch()` directly. Supply a `SessionCompat` via `options.session` + * to serve clients that send `Mcp-Session-Id` (the pre-2026-06 stateful flow). + */ +export function handleHttp( + mcp: Dispatchable, + options: ShttpHandlerOptions = {} +): (req: Request, extra?: ShttpRequestExtra) => Promise { + return shttpHandler( + { + onrequest: (req, env?: RequestEnv) => unwrap(mcp.dispatch(req, env)), + onnotification: n => mcp.dispatchNotification(n) + }, + options + ); +} + +export { type ShttpHandlerOptions as HandleHttpOptions, type ShttpRequestExtra as HandleHttpRequestExtra } from './shttpHandler.js'; diff --git a/packages/server/src/server/mcp.ts b/packages/server/src/server/mcp.ts index 393ee48604..3660da7051 100644 --- a/packages/server/src/server/mcp.ts +++ b/packages/server/src/server/mcp.ts @@ -6,8 +6,11 @@ import type { CompleteRequestResourceTemplate, CompleteResult, CreateTaskResult, + DispatchOutput, GetPromptResult, Implementation, + JSONRPCNotification, + JSONRPCRequest, ListPromptsResult, ListResourcesResult, ListToolsResult, @@ -15,6 +18,7 @@ import type { Prompt, PromptReference, ReadResourceResult, + RequestEnv, Resource, ResourceTemplateReference, Result, @@ -111,6 +115,19 @@ export class McpServer { return await this.server.connect(transport); } + /** + * Transport-free per-request entry; forwards to {@linkcode Server}`.dispatch`. + * Exposed so `handleHttp(mcp, ...)` accepts an {@linkcode McpServer} directly. + */ + dispatch(request: JSONRPCRequest, env?: RequestEnv): AsyncGenerator { + return this.server.dispatch(request, env); + } + + /** Forwards to {@linkcode Server}`.dispatchNotification` for the `handleHttp` path. */ + dispatchNotification(notification: JSONRPCNotification): Promise { + return this.server.dispatchNotification(notification); + } + /** * Closes the connection. */ diff --git a/packages/server/src/server/sessionCompat.ts b/packages/server/src/server/sessionCompat.ts new file mode 100644 index 0000000000..c5b09a11b6 --- /dev/null +++ b/packages/server/src/server/sessionCompat.ts @@ -0,0 +1,351 @@ +import type { ClientCapabilities, JSONRPCMessage } from '@modelcontextprotocol/core'; +import { isInitializeRequest } from '@modelcontextprotocol/core'; + +/** + * Upper bound on resumable stream IDs tracked per session for `Last-Event-ID` replay + * authorisation. Oldest entries are evicted on overflow; resumption of streams older than + * the most-recent N falls back to 403. Tradeoff vs. tracking every POST forever. + */ +const MAX_STREAM_IDS_PER_SESSION = 256; + +/** + * Options for {@linkcode SessionCompat}. + */ +export interface SessionCompatOptions { + /** + * Function that generates a session ID. SHOULD be globally unique and cryptographically secure + * (e.g., a securely generated UUID). + * + * @default `() => crypto.randomUUID()` + */ + sessionIdGenerator?: () => string; + + /** + * Maximum number of concurrent sessions to retain. New `initialize` requests beyond this cap + * are rejected with HTTP 503 + `Retry-After`. Idle sessions are evicted LRU when at the cap. + * + * @default 10000 + */ + maxSessions?: number; + + /** + * Sessions idle (no request received) for longer than this are evicted on the next sweep. + * + * @default 30 * 60_000 (30 minutes) + */ + idleTtlMs?: number; + + /** + * Suggested `Retry-After` value (seconds) returned with 503 when at {@linkcode maxSessions}. + * + * @default 30 + */ + retryAfterSeconds?: number; + + /** Called when a new session is minted. */ + onsessioninitialized?: (sessionId: string) => void | Promise; + + /** Called when a session is deleted (via DELETE) or evicted. */ + onsessionclosed?: (sessionId: string) => void | Promise; + + /** + * When `true`, this instance allows at most one session: a second `initialize` + * is rejected with "Server already initialized". Matches the per-transport-instance + * v1 behaviour where each `WebStandardStreamableHTTPServerTransport` holds one session. + * + * @default false + */ + singleSession?: boolean; + + /** Called for validation failures (re-init, missing/unknown session header). */ + onerror?: (error: Error) => void; +} + +interface SessionEntry { + createdAt: number; + lastSeen: number; + /** Standalone GET subscription stream controller, if one is open. */ + sseController?: ReadableStreamDefaultController; + /** Protocol version requested by the client in `initialize.params.protocolVersion`. */ + protocolVersion?: string; + /** Capabilities the client declared in `initialize.params.capabilities`. */ + clientCapabilities?: ClientCapabilities; + /** + * EventStore stream IDs minted for this session (per-POST SSE streams plus the standalone + * GET stream). Used to reject `Last-Event-ID` replay for streams the session does not own. + * Bounded to {@linkcode MAX_STREAM_IDS_PER_SESSION}; oldest entries evicted on overflow. + */ + streamIds: Set; + /** Subset of {@linkcode streamIds} exempt from FIFO eviction (e.g. the standalone GET stream). */ + protectedStreamIds: Set; +} + +/** Result of {@linkcode SessionCompat.validate}. */ +export type SessionValidation = { ok: true; sessionId: string | undefined; isInitialize: boolean } | { ok: false; response: Response }; + +function jsonError(status: number, code: number, message: string, headers?: Record): Response { + return Response.json( + { jsonrpc: '2.0', error: { code, message }, id: null }, + { status, headers: { 'Content-Type': 'application/json', ...headers } } + ); +} + +/** + * Bounded, in-memory `mcp-session-id` lifecycle for the pre-2026-06 stateful Streamable HTTP + * protocol. One instance is shared across all requests to a given `shttpHandler`. + * + * Sessions are minted when an `initialize` request arrives and validated on every subsequent + * request via the `mcp-session-id` header. Storage is LRU with {@linkcode SessionCompatOptions.maxSessions} + * cap and {@linkcode SessionCompatOptions.idleTtlMs} idle eviction. + */ +export class SessionCompat { + private readonly _sessions = new Map(); + private readonly _generate: () => string; + private readonly _maxSessions: number; + private readonly _idleTtlMs: number; + private readonly _retryAfterSeconds: number; + private readonly _onsessioninitialized?: (sessionId: string) => void | Promise; + private readonly _closeListeners: Array<(sessionId: string) => void | Promise> = []; + private readonly _singleSession: boolean; + private readonly _onerror?: (error: Error) => void; + + constructor(options: SessionCompatOptions = {}) { + this._generate = options.sessionIdGenerator ?? (() => crypto.randomUUID()); + this._maxSessions = options.maxSessions ?? 10_000; + this._idleTtlMs = options.idleTtlMs ?? 30 * 60_000; + this._retryAfterSeconds = options.retryAfterSeconds ?? 30; + this._onsessioninitialized = options.onsessioninitialized; + if (options.onsessionclosed) this._closeListeners.push(options.onsessionclosed); + this._singleSession = options.singleSession ?? false; + this._onerror = options.onerror; + } + + /** + * Registers an additional listener fired when a session is deleted or evicted. Callers + * may wire this to per-session resources (e.g. `BackchannelCompat.closeSession`) so + * pending state is rejected on idle/capacity eviction, not only on explicit DELETE. + */ + addCloseListener(listener: (sessionId: string) => void | Promise): void { + this._closeListeners.push(listener); + } + + /** + * Validates the `mcp-session-id` header for a parsed POST body. If the body contains an + * `initialize` request, mints a new session instead. Ported from + * `WebStandardStreamableHTTPServerTransport.validateSession` + the initialize-detection + * block of `handlePostRequest`. + */ + async validate(req: Request, messages: JSONRPCMessage[]): Promise { + const isInit = messages.some(m => isInitializeRequest(m)); + + if (isInit) { + if (messages.length > 1) { + this._onerror?.(new Error('Invalid Request: Only one initialization request is allowed')); + return { + ok: false, + response: jsonError(400, -32_600, 'Invalid Request: Only one initialization request is allowed') + }; + } + this._evictIdle(); + if (this._singleSession && this._sessions.size > 0) { + this._onerror?.(new Error('Invalid Request: Server already initialized')); + return { + ok: false, + response: jsonError(400, -32_600, 'Invalid Request: Server already initialized') + }; + } + if (this._sessions.size >= this._maxSessions) { + this._evictOldest(); + } + if (this._sessions.size >= this._maxSessions) { + return { + ok: false, + response: jsonError(503, -32_000, 'Server at session capacity', { + 'Retry-After': String(this._retryAfterSeconds) + }) + }; + } + const id = this._generate(); + const now = Date.now(); + const initMsg = messages.find(m => isInitializeRequest(m)); + const initParams = initMsg && isInitializeRequest(initMsg) ? initMsg.params : undefined; + this._sessions.set(id, { + createdAt: now, + lastSeen: now, + protocolVersion: initParams?.protocolVersion, + clientCapabilities: initParams?.capabilities, + streamIds: new Set(), + protectedStreamIds: new Set() + }); + try { + await Promise.resolve(this._onsessioninitialized?.(id)); + } catch (error) { + this._sessions.delete(id); + throw error; + } + return { ok: true, sessionId: id, isInitialize: true }; + } + + return this.validateHeader(req); + } + + /** + * Validates the `mcp-session-id` header without inspecting a body (for GET/DELETE). + */ + validateHeader(req: Request): SessionValidation { + if (this._singleSession && this._sessions.size === 0) { + this._onerror?.(new Error('Bad Request: Server not initialized')); + return { ok: false, response: jsonError(400, -32_000, 'Bad Request: Server not initialized') }; + } + const headerId = req.headers.get('mcp-session-id'); + if (!headerId) { + this._onerror?.(new Error('Bad Request: Mcp-Session-Id header is required')); + return { + ok: false, + response: jsonError(400, -32_000, 'Bad Request: Mcp-Session-Id header is required') + }; + } + const entry = this._sessions.get(headerId); + if (!entry) { + this._onerror?.(new Error('Session not found')); + return { ok: false, response: jsonError(404, -32_001, 'Session not found') }; + } + entry.lastSeen = Date.now(); + // Re-insert to maintain Map iteration order as LRU. + this._sessions.delete(headerId); + this._sessions.set(headerId, entry); + return { ok: true, sessionId: headerId, isInitialize: false }; + } + + /** Deletes a session (via DELETE request). */ + async delete(sessionId: string): Promise { + const entry = this._sessions.get(sessionId); + if (!entry) return; + try { + entry.sseController?.close(); + } catch { + // Already closed. + } + this._sessions.delete(sessionId); + for (const cb of this._closeListeners) await Promise.resolve(cb(sessionId)); + } + + /** Protocol version the client requested in `initialize` for this session, if known. */ + negotiatedVersion(sessionId: string): string | undefined { + return this._sessions.get(sessionId)?.protocolVersion; + } + + /** Capabilities the client declared in `initialize` for this session, if known. */ + clientCapabilities(sessionId: string): ClientCapabilities | undefined { + return this._sessions.get(sessionId)?.clientCapabilities; + } + + /** + * Records an EventStore stream ID as belonging to this session so {@linkcode ownsStreamId} + * can authorise `Last-Event-ID` replay. The set is bounded; once it reaches + * {@linkcode MAX_STREAM_IDS_PER_SESSION} the oldest entry is evicted, so very old POST + * streams become non-resumable in favour of bounding memory. + */ + addStreamId(sessionId: string, streamId: string, opts?: { protected?: boolean }): void { + const entry = this._sessions.get(sessionId); + if (!entry) return; + entry.streamIds.add(streamId); + if (opts?.protected) entry.protectedStreamIds.add(streamId); + while (entry.streamIds.size > MAX_STREAM_IDS_PER_SESSION) { + let evicted: string | undefined; + for (const id of entry.streamIds) { + if (!entry.protectedStreamIds.has(id)) { + evicted = id; + break; + } + } + if (evicted === undefined) break; + entry.streamIds.delete(evicted); + } + } + + /** + * Forgets a stream ID. Exposed for store/subclass implementations that want explicit + * cleanup; `shttpHandler` itself does not call this (stream IDs are kept for + * `Last-Event-ID` resumability and bounded by {@linkcode addStreamId}'s LRU cap). + */ + removeStreamId(sessionId: string, streamId: string): void { + this._sessions.get(sessionId)?.streamIds.delete(streamId); + } + + /** True if `streamId` was minted for `sessionId`. Used to authorise SSE replay. */ + ownsStreamId(sessionId: string, streamId: string): boolean { + return this._sessions.get(sessionId)?.streamIds.has(streamId) ?? false; + } + + /** Returns true if a standalone GET stream is already open for this session. */ + hasStandaloneStream(sessionId: string): boolean { + return this._sessions.get(sessionId)?.sseController !== undefined; + } + + /** + * Registers the open standalone GET stream controller for this session. Closes any + * previously-registered controller so a `Last-Event-ID` reconnect supersedes it + * cleanly instead of leaking the prior stream. + */ + setStandaloneStream(sessionId: string, controller: ReadableStreamDefaultController | undefined): void { + const entry = this._sessions.get(sessionId); + if (!entry) return; + if (entry.sseController && entry.sseController !== controller) { + try { + entry.sseController.close(); + } catch { + // Already closed. + } + } + entry.sseController = controller; + } + + /** + * Clears the standalone stream registration only if `owner` is still the registered controller. + * Guards against a stale `cancel` callback (from a superseded reconnect) clearing the new stream. + */ + clearStandaloneStream(sessionId: string, owner: ReadableStreamDefaultController): void { + const entry = this._sessions.get(sessionId); + if (entry?.sseController === owner) entry.sseController = undefined; + } + + /** Closes the standalone GET stream for this session if one is open. */ + closeStandaloneStream(sessionId: string): void { + const entry = this._sessions.get(sessionId); + try { + entry?.sseController?.close(); + } catch { + // Already closed. + } + if (entry) entry.sseController = undefined; + } + + /** Number of live sessions. */ + get size(): number { + return this._sessions.size; + } + + private _evict(id: string): void { + const entry = this._sessions.get(id); + try { + entry?.sseController?.close(); + } catch { + // Already closed. + } + this._sessions.delete(id); + for (const cb of this._closeListeners) void Promise.resolve(cb(id)); + } + + private _evictIdle(): void { + const cutoff = Date.now() - this._idleTtlMs; + for (const [id, entry] of this._sessions) { + if (entry.lastSeen < cutoff) this._evict(id); + } + } + + private _evictOldest(): void { + const oldest = this._sessions.keys().next(); + if (!oldest.done) this._evict(oldest.value); + } +} diff --git a/packages/server/src/server/shttpHandler.ts b/packages/server/src/server/shttpHandler.ts new file mode 100644 index 0000000000..5ebaf092e7 --- /dev/null +++ b/packages/server/src/server/shttpHandler.ts @@ -0,0 +1,565 @@ +import type { + AuthInfo, + JSONRPCErrorResponse, + JSONRPCMessage, + JSONRPCNotification, + JSONRPCRequest, + JSONRPCResultResponse, + MessageExtraInfo, + RequestEnv +} from '@modelcontextprotocol/core'; +import { + DEFAULT_NEGOTIATED_PROTOCOL_VERSION, + isInitializeRequest, + isJSONRPCErrorResponse, + isJSONRPCNotification, + isJSONRPCRequest, + isJSONRPCResultResponse, + JSONRPCMessageSchema, + SUPPORTED_PROTOCOL_VERSIONS +} from '@modelcontextprotocol/core'; + +import type { SessionCompat } from './sessionCompat.js'; +import type { EventId, EventStore } from './streamableHttp.js'; + +/** + * Callback bundle {@linkcode shttpHandler} uses to route inbound messages. + */ +export interface ShttpCallbacks { + /** Called per inbound JSON-RPC request; yields notifications then one terminal response. */ + onrequest?: ((request: JSONRPCRequest, env?: RequestEnv) => AsyncIterable) | undefined; + /** Called per inbound JSON-RPC notification. */ + onnotification?: (notification: JSONRPCNotification) => void | Promise; + /** Called per inbound JSON-RPC response (client POSTing back to a server-initiated request). Returns `true` if claimed. */ + onresponse?: (response: JSONRPCResultResponse | JSONRPCErrorResponse) => boolean; +} + +/** + * Options for {@linkcode shttpHandler}. + */ +export interface ShttpHandlerOptions { + /** + * If `true`, return a single `application/json` response instead of an SSE stream. + * Progress notifications yielded by handlers are dropped in this mode. + * + * @default false + */ + enableJsonResponse?: boolean; + + /** + * Pre-2026-06 session compatibility. When provided, the handler validates the + * `mcp-session-id` header, mints a session on `initialize`, and supports the + * standalone GET subscription stream and DELETE session termination. When omitted, + * the handler is stateless: GET/DELETE return 405. + */ + session?: SessionCompat; + + /** + * Event store for SSE resumability via `Last-Event-ID`. When configured, every + * outgoing SSE event is persisted and a priming event is sent at stream start. + */ + eventStore?: EventStore; + + /** + * Retry interval in milliseconds, sent in the SSE `retry` field of priming events. + */ + retryInterval?: number; + + /** + * Protocol versions accepted in the `mcp-protocol-version` header. + * + * @default {@linkcode SUPPORTED_PROTOCOL_VERSIONS} + */ + supportedProtocolVersions?: string[]; + + /** Called for non-fatal errors (validation failures, stream write errors). */ + onerror?: (error: Error) => void; +} + +/** + * Per-request extras passed alongside the web `Request`. + */ +export interface ShttpRequestExtra { + /** Pre-parsed body (e.g. from `express.json()`). When omitted, `req.json()` is used. */ + parsedBody?: unknown; + /** Validated auth token info from upstream middleware. */ + authInfo?: AuthInfo; +} + +/** + * RequestEnv augmented with the {@linkcode MessageExtraInfo} slot Protocol's + * `buildContext` adapter reads to populate `ctx.http.{req, closeSSE, closeStandaloneSSE}`. + * + * @internal + */ +type ShttpRequestEnv = RequestEnv & { + _transportExtra?: MessageExtraInfo; + /** Per-session capabilities from {@linkcode SessionCompat}; consumed by `Server.buildContext`. */ + clientCapabilities?: import('@modelcontextprotocol/core').ClientCapabilities; +}; + +function jsonError(status: number, code: number, message: string, extra?: { headers?: Record; data?: string }): Response { + const error: { code: number; message: string; data?: string } = { code, message }; + if (extra?.data !== undefined) error.data = extra.data; + return Response.json( + { jsonrpc: '2.0', error, id: null }, + { status, headers: { 'Content-Type': 'application/json', ...extra?.headers } } + ); +} + +function writeSSEEvent( + controller: ReadableStreamDefaultController, + encoder: InstanceType, + message: JSONRPCMessage, + eventId?: string +): boolean { + try { + let data = 'event: message\n'; + if (eventId) data += `id: ${eventId}\n`; + data += `data: ${JSON.stringify(message)}\n\n`; + controller.enqueue(encoder.encode(data)); + return true; + } catch { + return false; + } +} + +/** Compound key for {@linkcode shttpHandler}'s in-flight abort map: `(sessionId, requestId)`. */ +function abortKey(sessionId: string | undefined, id: JSONRPCRequest['id']): string { + return `${sessionId ?? ''}\u0000${String(id)}`; +} + +/** + * EventStore stream-ID prefix for the standalone GET stream (matches v1 `_standaloneSseStreamId`). + * Suffixed with the session ID so each session's standalone-stream events are isolated in the + * event store and the replay ownership check is meaningful. + */ +const STANDALONE_STREAM_ID_PREFIX = '_GET_stream'; +function standaloneStreamId(sessionId: string): string { + return `${STANDALONE_STREAM_ID_PREFIX}:${sessionId}`; +} + +const SSE_HEADERS: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive' +}; + +/** + * Creates a Web-standard `(Request) => Promise` handler for the MCP Streamable HTTP + * transport, driven by {@linkcode ShttpCallbacks.onrequest} per request. + * + * No `_streamMapping`, `_requestToStreamMapping`, or `relatedRequestId` routing — the response + * stream is in lexical scope of the request that opened it. Session lifecycle (when enabled) + * lives in the supplied {@linkcode SessionCompat}, not on this handler. + * + * @internal Use `handleHttp` for the public entry point. + */ +export function shttpHandler( + cb: ShttpCallbacks, + options: ShttpHandlerOptions = {} +): (req: Request, extra?: ShttpRequestExtra) => Promise { + const enableJsonResponse = options.enableJsonResponse ?? false; + const session = options.session; + const eventStore = options.eventStore; + const retryInterval = options.retryInterval; + const supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS; + const onerror = options.onerror; + + /** + * Per-request abort controllers for `notifications/cancelled`. Keyed by + * `(sessionId, requestId)` so concurrent sessions reusing the same JSON-RPC id don't collide. + * In stateless mode the session component is empty; cross-POST cancellation is best-effort + * (matches v1, which required per-request transport instances in stateless mode). + */ + const inflightAborts = new Map(); + + function validateProtocolVersion(req: Request): Response | undefined { + const v = req.headers.get('mcp-protocol-version'); + if (v !== null && !supportedProtocolVersions.includes(v)) { + const msg = `Bad Request: Unsupported protocol version: ${v} (supported versions: ${supportedProtocolVersions.join(', ')})`; + onerror?.(new Error(msg)); + return jsonError(400, -32_000, msg); + } + return undefined; + } + + async function writePrimingEvent( + controller: ReadableStreamDefaultController, + encoder: InstanceType, + streamId: string, + protocolVersion: string + ): Promise { + if (!eventStore) return; + if (protocolVersion < '2025-11-25') return; + const primingId = await eventStore.storeEvent(streamId, {} as JSONRPCMessage); + const retry = retryInterval === undefined ? '' : `retry: ${retryInterval}\n`; + controller.enqueue(encoder.encode(`id: ${primingId}\n${retry}data: \n\n`)); + } + + async function emit( + controller: ReadableStreamDefaultController, + encoder: InstanceType, + streamId: string, + message: JSONRPCMessage + ): Promise { + const eventId = eventStore ? await eventStore.storeEvent(streamId, message) : undefined; + if (!writeSSEEvent(controller, encoder, message, eventId)) { + onerror?.(new Error('Failed to write SSE event')); + } + } + + async function handlePost(req: Request, extra?: ShttpRequestExtra): Promise { + const accept = req.headers.get('accept'); + if (!accept?.includes('application/json') || !accept.includes('text/event-stream')) { + onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream')); + return jsonError(406, -32_000, 'Not Acceptable: Client must accept both application/json and text/event-stream'); + } + + const ct = req.headers.get('content-type'); + if (!ct?.includes('application/json')) { + onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json')); + return jsonError(415, -32_000, 'Unsupported Media Type: Content-Type must be application/json'); + } + + let raw: unknown; + if (extra?.parsedBody === undefined) { + try { + raw = await req.json(); + } catch (error) { + onerror?.(error as Error); + return jsonError(400, -32_700, 'Parse error: Invalid JSON'); + } + } else { + raw = extra.parsedBody; + } + + const isBatch = Array.isArray(raw); + let messages: JSONRPCMessage[]; + try { + messages = isBatch ? (raw as unknown[]).map(m => JSONRPCMessageSchema.parse(m)) : [JSONRPCMessageSchema.parse(raw)]; + } catch (error) { + onerror?.(error as Error); + return jsonError(400, -32_700, 'Parse error: Invalid JSON-RPC message'); + } + + let sessionId: string | undefined; + let isInitialize = false; + if (session) { + const v = await session.validate(req, messages); + if (!v.ok) return v.response; + sessionId = v.sessionId; + isInitialize = v.isInitialize; + } + if (!isInitialize) { + const protoErr = validateProtocolVersion(req); + if (protoErr) return protoErr; + } + + const requests = messages.filter(m => isJSONRPCRequest(m)); + const notifications = messages.filter(m => isJSONRPCNotification(m)); + const responses = messages.filter( + (m): m is JSONRPCResultResponse | JSONRPCErrorResponse => isJSONRPCResultResponse(m) || isJSONRPCErrorResponse(m) + ); + + // Register abort controllers up-front so a `notifications/cancelled` in the same batch + // (or arriving on a concurrent POST before dispatch starts) can find them. + const ctrls = new Map(); + for (const r of requests) { + const key = abortKey(sessionId, r.id); + const ctrl = new AbortController(); + ctrls.set(key, ctrl); + inflightAborts.set(key, ctrl); + } + + for (const n of notifications) { + if (n.method === 'notifications/cancelled') { + const requestId = (n.params as { requestId?: JSONRPCRequest['id'] } | undefined)?.requestId; + if (requestId !== undefined) { + inflightAborts.get(abortKey(sessionId, requestId))?.abort((n.params as { reason?: string } | undefined)?.reason); + } + } + void Promise.resolve(cb.onnotification?.(n)).catch(error => onerror?.(error as Error)); + } + + for (const r of responses) { + if (cb.onresponse?.(r) === false) { + onerror?.(new Error(`Unclaimed JSON-RPC response (id=${String(r.id)}); no pending server-initiated request matched.`)); + } + } + + if (requests.length === 0) { + return new Response(null, { status: 202 }); + } + + if (!cb.onrequest) { + return jsonError(500, -32_603, 'Handler not wired — pass an onrequest callback.'); + } + const onrequest = cb.onrequest; + + const initReq = messages.find(m => isInitializeRequest(m)); + const initParams = initReq && isInitializeRequest(initReq) ? initReq.params : undefined; + const clientProtocolVersion = + initParams?.protocolVersion ?? req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION; + + const baseEnv: ShttpRequestEnv = { + authInfo: extra?.authInfo, + httpReq: req, + sessionId, + clientCapabilities: + initParams?.capabilities ?? (sessionId !== undefined ? session?.clientCapabilities(sessionId) : undefined), + _transportExtra: { request: req, authInfo: extra?.authInfo } + }; + + if (enableJsonResponse) { + const perReq = await Promise.all( + requests.map(async r => { + const key = abortKey(sessionId, r.id); + const ctrl = ctrls.get(key)!; + const collected: JSONRPCMessage[] = []; + try { + for await (const msg of onrequest(r, { ...baseEnv, signal: ctrl.signal })) { + if ((isJSONRPCResultResponse(msg) || isJSONRPCErrorResponse(msg)) && !ctrl.signal.aborted) { + collected.push(msg); + } + } + } finally { + if (inflightAborts.get(key) === ctrl) inflightAborts.delete(key); + } + return collected; + }) + ); + const out = perReq.flat(); + const headers: Record = { 'Content-Type': 'application/json' }; + if (sessionId !== undefined) headers['mcp-session-id'] = sessionId; + // JSON-RPC 2.0: batch input MUST yield an array response, even if it has one entry. + const body = !isBatch && out.length === 1 ? out[0] : out; + return Response.json(body, { status: 200, headers }); + } + + const streamId = crypto.randomUUID(); + if (session && sessionId !== undefined) session.addStreamId(sessionId, streamId); + const encoder = new TextEncoder(); + const headers: Record = { ...SSE_HEADERS }; + if (sessionId !== undefined) headers['mcp-session-id'] = sessionId; + + const readable = new ReadableStream({ + start: controller => { + const closeStream = () => { + try { + controller.close(); + } catch { + // Already closed. + } + }; + const supportsPolling = eventStore !== undefined && clientProtocolVersion >= '2025-11-25'; + const transportExtra: MessageExtraInfo = { + request: req, + authInfo: extra?.authInfo, + closeSSEStream: supportsPolling ? closeStream : undefined, + closeStandaloneSSEStream: + supportsPolling && sessionId !== undefined ? () => session?.closeStandaloneStream(sessionId) : undefined + }; + const env: ShttpRequestEnv = { ...baseEnv, _transportExtra: transportExtra }; + void (async () => { + try { + await writePrimingEvent(controller, encoder, streamId, clientProtocolVersion); + await Promise.all( + requests.map(async r => { + const key = abortKey(sessionId, r.id); + const ctrl = ctrls.get(key)!; + try { + for await (const msg of onrequest(r, { ...env, signal: ctrl.signal })) { + if ((isJSONRPCResultResponse(msg) || isJSONRPCErrorResponse(msg)) && ctrl.signal.aborted) { + continue; + } + await emit(controller, encoder, streamId, msg); + } + } catch (error) { + onerror?.(error as Error); + } + }) + ); + } catch (error) { + onerror?.(error as Error); + } finally { + for (const [key, ctrl] of ctrls) { + if (inflightAborts.get(key) === ctrl) inflightAborts.delete(key); + } + try { + controller.close(); + } catch { + // Already closed. + } + } + })(); + }, + cancel: () => { + for (const [key, ctrl] of ctrls) { + ctrl.abort(new Error('Client closed SSE stream')); + if (inflightAborts.get(key) === ctrl) inflightAborts.delete(key); + } + // streamId stays in session.streamIds so the client can resume via Last-Event-ID; + // the bounded set in addStreamId caps growth. + } + }); + + return new Response(readable, { status: 200, headers }); + } + + async function handleGet(req: Request): Promise { + if (!session) { + return jsonError(405, -32_000, 'Method Not Allowed: stateless handler does not support GET stream', { + headers: { Allow: 'POST' } + }); + } + + const accept = req.headers.get('accept'); + if (!accept?.includes('text/event-stream')) { + onerror?.(new Error('Not Acceptable: Client must accept text/event-stream')); + return jsonError(406, -32_000, 'Not Acceptable: Client must accept text/event-stream'); + } + + const v = session.validateHeader(req); + if (!v.ok) return v.response; + const sessionId = v.sessionId!; + const protoErr = validateProtocolVersion(req); + if (protoErr) return protoErr; + + if (eventStore) { + const lastEventId = req.headers.get('last-event-id'); + if (lastEventId) { + return replayEvents(lastEventId, sessionId, session, eventStore); + } + } + + if (session.hasStandaloneStream(sessionId)) { + onerror?.(new Error('Conflict: Only one SSE stream is allowed per session')); + return jsonError(409, -32_000, 'Conflict: Only one SSE stream is allowed per session'); + } + + const streamId = standaloneStreamId(sessionId); + session.addStreamId(sessionId, streamId, { protected: true }); + const clientProtocolVersion = req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION; + const encoder = new TextEncoder(); + const headers: Record = { ...SSE_HEADERS, 'mcp-session-id': sessionId }; + let registeredController: ReadableStreamDefaultController | undefined; + const readable = new ReadableStream({ + start: controller => { + registeredController = controller; + session.setStandaloneStream(sessionId, controller); + void writePrimingEvent(controller, encoder, streamId, clientProtocolVersion).catch(error => onerror?.(error as Error)); + }, + cancel: () => { + if (registeredController) session.clearStandaloneStream(sessionId, registeredController); + } + }); + return new Response(readable, { headers }); + } + + async function replayEvents(lastEventId: string, sessionId: string, session: SessionCompat, eventStore: EventStore): Promise { + if (!eventStore.getStreamIdForEventId) { + return jsonError( + 403, + -32_000, + 'Forbidden: event store does not support session-scoped replay (getStreamIdForEventId required)' + ); + } + const eventStreamId = await eventStore.getStreamIdForEventId(lastEventId); + if (eventStreamId === undefined) { + return jsonError(404, -32_001, 'Event not found'); + } + if (!session.ownsStreamId(sessionId, eventStreamId)) { + return jsonError(403, -32_000, 'Forbidden: event ID does not belong to this session'); + } + // Only resuming the standalone GET stream takes over the session's standalone slot; + // resuming a per-POST stream is replay-only (the POST that owned it has finished). + const isStandaloneReplay = eventStreamId === standaloneStreamId(sessionId); + + const encoder = new TextEncoder(); + const headers: Record = { ...SSE_HEADERS, 'mcp-session-id': sessionId }; + let registeredController: ReadableStreamDefaultController | undefined; + let cancelled = false; + const readable = new ReadableStream({ + start: controller => { + if (isStandaloneReplay) { + // Claim synchronously so a concurrent GET hits the 409 path during replay. + registeredController = controller; + session.setStandaloneStream(sessionId, controller); + session.addStreamId(sessionId, eventStreamId); + } + void (async () => { + let failed = false; + try { + await eventStore.replayEventsAfter(lastEventId, { + send: async (eventId: EventId, message: JSONRPCMessage) => { + if (!writeSSEEvent(controller, encoder, message, eventId)) { + throw new Error('Replay write failed: client disconnected'); + } + } + }); + } catch (error) { + failed = true; + onerror?.(error as Error); + } + if (failed || !isStandaloneReplay || cancelled) { + if (registeredController) session.clearStandaloneStream(sessionId, registeredController); + try { + controller.close(); + } catch { + // Already closed. + } + } + })(); + }, + cancel: () => { + cancelled = true; + if (registeredController) session.clearStandaloneStream(sessionId, registeredController); + } + }); + return new Response(readable, { headers }); + } + + async function handleDelete(req: Request): Promise { + if (!session) { + return jsonError(405, -32_000, 'Method Not Allowed: stateless handler does not support session DELETE', { + headers: { Allow: 'POST' } + }); + } + const v = session.validateHeader(req); + if (!v.ok) return v.response; + const protoErr = validateProtocolVersion(req); + if (protoErr) return protoErr; + try { + await session.delete(v.sessionId!); + } catch (error) { + onerror?.(error as Error); + return jsonError(500, -32_603, 'Internal server error'); + } + return new Response(null, { status: 200 }); + } + + return async (req: Request, extra?: ShttpRequestExtra): Promise => { + try { + switch (req.method) { + case 'POST': { + return await handlePost(req, extra); + } + case 'GET': { + return await handleGet(req); + } + case 'DELETE': { + return await handleDelete(req); + } + default: { + return jsonError(405, -32_000, 'Method not allowed.', { headers: { Allow: 'GET, POST, DELETE' } }); + } + } + } catch (error) { + onerror?.(error as Error); + return jsonError(500, -32_603, 'Internal server error'); + } + }; +} + +export { type EventId, type EventStore, type StreamId } from './streamableHttp.js'; diff --git a/packages/server/test/server/shttpHandler.test.ts b/packages/server/test/server/shttpHandler.test.ts new file mode 100644 index 0000000000..64810b9d46 --- /dev/null +++ b/packages/server/test/server/shttpHandler.test.ts @@ -0,0 +1,163 @@ +import type { JSONRPCMessage, JSONRPCRequest } from '@modelcontextprotocol/core'; +import { LATEST_PROTOCOL_VERSION } from '@modelcontextprotocol/core'; +import { describe, expect, test } from 'vitest'; + +import { SessionCompat } from '../../src/server/sessionCompat.js'; +import { shttpHandler } from '../../src/server/shttpHandler.js'; +import type { EventId, EventStore, StreamId } from '../../src/server/streamableHttp.js'; + +function makeEventStore(): { store: EventStore; events: Map } { + const events = new Map(); + let n = 0; + const store: EventStore = { + async storeEvent(streamId, message) { + const id = `${streamId}::${n++}`; + events.set(id, { streamId, message }); + return id; + }, + async getStreamIdForEventId(eventId) { + return events.get(eventId)?.streamId; + }, + async replayEventsAfter(lastEventId, { send }) { + const last = events.get(lastEventId); + if (!last) throw new Error('unknown event'); + let after = false; + for (const [id, ev] of events) { + if (id === lastEventId) { + after = true; + continue; + } + if (after && ev.streamId === last.streamId) await send(id, ev.message); + } + return last.streamId; + } + }; + return { store, events }; +} + +function initRequest(): Request { + return new Request('http://localhost/mcp', { + method: 'POST', + headers: { 'content-type': 'application/json', accept: 'application/json, text/event-stream' }, + body: JSON.stringify({ + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { protocolVersion: LATEST_PROTOCOL_VERSION, capabilities: {}, clientInfo: { name: 't', version: '0' } } + }) + }); +} + +function pingRequest(sessionId: string): Request { + return new Request('http://localhost/mcp', { + method: 'POST', + headers: { + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': LATEST_PROTOCOL_VERSION + }, + body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'ping' }) + }); +} + +function getWithLastEventId(sessionId: string, lastEventId: string): Request { + return new Request('http://localhost/mcp', { + method: 'GET', + headers: { + accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': LATEST_PROTOCOL_VERSION, + 'last-event-id': lastEventId + } + }); +} + +const onrequest = async function* (r: JSONRPCRequest): AsyncIterable { + yield { jsonrpc: '2.0', id: r.id, result: {} }; +}; + +async function firstEventId(res: Response): Promise { + const reader = res.body!.getReader(); + const dec = new TextDecoder(); + let buf = ''; + for (;;) { + const { value, done } = await reader.read(); + if (value) buf += dec.decode(value, { stream: true }); + const m = /^id: (.+)$/m.exec(buf); + if (m) { + await reader.cancel(); + return m[1]!; + } + if (done) throw new Error('stream ended without an id: line'); + } +} + +describe('shttpHandler — Last-Event-ID replay session binding', () => { + test('rejects replay of an event ID belonging to another session', async () => { + const { store } = makeEventStore(); + const session = new SessionCompat(); + const handler = shttpHandler({ onrequest }, { session, eventStore: store }); + + // Session A: initialize, then make a POST whose SSE stream gets a stored event ID. + const initA = await handler(initRequest()); + const sidA = initA.headers.get('mcp-session-id')!; + await initA.body?.cancel(); + const postA = await handler(pingRequest(sidA)); + expect(postA.headers.get('content-type')).toBe('text/event-stream'); + const eventIdA = await firstEventId(postA); + + // Session B: initialize. + const initB = await handler(initRequest()); + const sidB = initB.headers.get('mcp-session-id')!; + await initB.body?.cancel(); + expect(sidB).not.toBe(sidA); + + // B attempts to replay A's event — must be rejected. + const replayCross = await handler(getWithLastEventId(sidB, eventIdA)); + expect(replayCross.status).toBe(403); + + // A replaying its own event is permitted. + const replayOwn = await handler(getWithLastEventId(sidA, eventIdA)); + expect(replayOwn.status).toBe(200); + await replayOwn.body?.cancel(); + }); + + test('rejects replay of an unknown event ID', async () => { + const { store } = makeEventStore(); + const session = new SessionCompat(); + const handler = shttpHandler({ onrequest }, { session, eventStore: store }); + + const init = await handler(initRequest()); + const sid = init.headers.get('mcp-session-id')!; + await init.body?.cancel(); + + const res = await handler(getWithLastEventId(sid, 'no-such-stream::42')); + expect(res.status).toBe(404); + }); + + test('fails closed when eventStore lacks getStreamIdForEventId', async () => { + const events = new Map(); + const store: EventStore = { + async storeEvent(streamId, message) { + const id = `${streamId}::${events.size}`; + events.set(id, { streamId, message }); + return id; + }, + async replayEventsAfter() { + return 'x'; + } + }; + const session = new SessionCompat(); + const handler = shttpHandler({ onrequest }, { session, eventStore: store }); + + const init = await handler(initRequest()); + const sid = init.headers.get('mcp-session-id')!; + await init.body?.cancel(); + const post = await handler(pingRequest(sid)); + const eventId = await firstEventId(post); + + const res = await handler(getWithLastEventId(sid, eventId)); + expect(res.status).toBe(403); + }); +}); diff --git a/test/conformance/expected-failures-handlehttp.yaml b/test/conformance/expected-failures-handlehttp.yaml new file mode 100644 index 0000000000..89da3a51f2 --- /dev/null +++ b/test/conformance/expected-failures-handlehttp.yaml @@ -0,0 +1,7 @@ +# Baseline for the `handleHttp` conformance target. +# Currently empty: all server scenarios pass on the dispatch() path. The +# sampling/elicitation tools catch the NotConnected error and surface it in the +# tool result, which satisfies the conformance check. R4 (BackchannelCompat) makes +# them succeed for real. + +server: [] diff --git a/test/conformance/package.json b/test/conformance/package.json index db0f04a4db..eef9e2df40 100644 --- a/test/conformance/package.json +++ b/test/conformance/package.json @@ -34,6 +34,8 @@ "test:conformance:server": "scripts/run-server-conformance.sh --expected-failures ./expected-failures.yaml", "test:conformance:server:all": "scripts/run-server-conformance.sh --suite all --expected-failures ./expected-failures.yaml", "test:conformance:server:run": "npx tsx ./src/everythingServer.ts", + "test:conformance:server:handlehttp": "SERVER_SCRIPT=./src/everythingServerHandleHttp.ts scripts/run-server-conformance.sh --expected-failures ./expected-failures-handlehttp.yaml", + "test:conformance:server:dual": "scripts/run-server-conformance.sh --expected-failures ./expected-failures.yaml && SERVER_SCRIPT=./src/everythingServerHandleHttp.ts scripts/run-server-conformance.sh --expected-failures ./expected-failures-handlehttp.yaml", "test:conformance:all": "pnpm run test:conformance:client:all && pnpm run test:conformance:server:all" }, "devDependencies": { diff --git a/test/conformance/scripts/run-server-conformance.sh b/test/conformance/scripts/run-server-conformance.sh index 203a0145b5..809ea7e84d 100755 --- a/test/conformance/scripts/run-server-conformance.sh +++ b/test/conformance/scripts/run-server-conformance.sh @@ -6,14 +6,15 @@ set -e PORT="${PORT:-3000}" SERVER_URL="http://localhost:${PORT}/mcp" +SERVER_SCRIPT="${SERVER_SCRIPT:-./src/everythingServer.ts}" # Navigate to the repo root SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR/.." # Start the server in the background -echo "Starting conformance test server on port ${PORT}..." -npx tsx ./src/everythingServer.ts & +echo "Starting conformance test server (${SERVER_SCRIPT}) on port ${PORT}..." +npx tsx "${SERVER_SCRIPT}" & SERVER_PID=$! # Function to cleanup on exit diff --git a/test/conformance/src/everythingServer.ts b/test/conformance/src/everythingServer.ts index f3925aeea8..897b13fb16 100644 --- a/test/conformance/src/everythingServer.ts +++ b/test/conformance/src/everythingServer.ts @@ -1,886 +1,35 @@ #!/usr/bin/env node /** - * MCP Conformance Test Server + * MCP conformance server — `transport.connect()` API path. * - * Server implementing all MCP features for conformance testing. - * This server is designed to pass all conformance test scenarios. + * Per-session `NodeStreamableHTTPServerTransport` instances created on `initialize` and + * looked up by `mcp-session-id` thereafter (the v1.x API surface). Registrations come + * from {@linkcode ./everythingServerSetup.ts}; this file is the express + transport + * wiring only. + * + * Sibling: {@linkcode ./everythingServerHandleHttp.ts} drives the same registrations via + * `handleHttp()` / `shttpHandler` so CI can prove both API surfaces stay conformant. */ import { randomUUID } from 'node:crypto'; import { localhostHostValidation } from '@modelcontextprotocol/express'; import { NodeStreamableHTTPServerTransport } from '@modelcontextprotocol/node'; -import type { CallToolResult, EventId, EventStore, GetPromptResult, ReadResourceResult, StreamId } from '@modelcontextprotocol/server'; -import { isInitializeRequest, McpServer, ResourceTemplate } from '@modelcontextprotocol/server'; +import type { McpServer } from '@modelcontextprotocol/server'; +import { isInitializeRequest } from '@modelcontextprotocol/server'; import cors from 'cors'; import type { Request, Response } from 'express'; import express from 'express'; -import * as z from 'zod/v4'; -// Server state -const resourceSubscriptions = new Set(); -const watchedResourceContent = 'Watched resource content'; +import { createEventStore, createMcpServer } from './everythingServerSetup.js'; -// Session management const transports: { [sessionId: string]: NodeStreamableHTTPServerTransport } = {}; const servers: { [sessionId: string]: McpServer } = {}; -// In-memory event store for SEP-1699 resumability -const eventStoreData = new Map(); - -function createEventStore(): EventStore { - return { - async storeEvent(streamId: StreamId, message: unknown): Promise { - const eventId = `${streamId}::${Date.now()}_${randomUUID()}`; - eventStoreData.set(eventId, { eventId, message, streamId }); - return eventId; - }, - async replayEventsAfter( - lastEventId: EventId, - { send }: { send: (eventId: EventId, message: unknown) => Promise } - ): Promise { - const streamId = lastEventId.split('::')[0] || lastEventId; - const eventsToReplay: Array<[string, { message: unknown }]> = []; - for (const [eventId, data] of eventStoreData.entries()) { - if (data.streamId === streamId && eventId > lastEventId) { - eventsToReplay.push([eventId, data]); - } - } - eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); - for (const [eventId, { message }] of eventsToReplay) { - if (message && typeof message === 'object' && Object.keys(message).length > 0) { - await send(eventId, message); - } - } - return streamId; - } - }; -} - -// Sample base64 encoded 1x1 red PNG pixel for testing -const TEST_IMAGE_BASE64 = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=='; - -// Sample base64 encoded minimal WAV file for testing -const TEST_AUDIO_BASE64 = 'UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA='; - -// Function to create a new MCP server instance (one per session) -function createMcpServer() { - const mcpServer = new McpServer( - { - name: 'mcp-conformance-test-server', - version: '1.0.0' - }, - { - capabilities: { - tools: { - listChanged: true - }, - resources: { - subscribe: true, - listChanged: true - }, - prompts: { - listChanged: true - }, - logging: {}, - completions: {} - } - } - ); - - // Helper to send log messages using the underlying server - function sendLog( - level: 'debug' | 'info' | 'notice' | 'warning' | 'error' | 'critical' | 'alert' | 'emergency', - message: string, - _data?: unknown - ) { - mcpServer.server - .notification({ - method: 'notifications/message', - params: { - level, - logger: 'conformance-test-server', - data: _data || message - } - }) - .catch(() => { - // Ignore error if no client is connected - }); - } - - // ===== TOOLS ===== - - // Simple text tool - mcpServer.registerTool( - 'test_simple_text', - { - description: 'Tests simple text content response' - }, - async (): Promise => { - return { - content: [{ type: 'text', text: 'This is a simple text response for testing.' }] - }; - } - ); - - // Image content tool - mcpServer.registerTool( - 'test_image_content', - { - description: 'Tests image content response' - }, - async (): Promise => { - return { - content: [{ type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' }] - }; - } - ); - - // Audio content tool - mcpServer.registerTool( - 'test_audio_content', - { - description: 'Tests audio content response' - }, - async (): Promise => { - return { - content: [{ type: 'audio', data: TEST_AUDIO_BASE64, mimeType: 'audio/wav' }] - }; - } - ); - - // Embedded resource tool - mcpServer.registerTool( - 'test_embedded_resource', - { - description: 'Tests embedded resource content response' - }, - async (): Promise => { - return { - content: [ - { - type: 'resource', - resource: { - uri: 'test://embedded-resource', - mimeType: 'text/plain', - text: 'This is an embedded resource content.' - } - } - ] - }; - } - ); - - // Multiple content types tool - mcpServer.registerTool( - 'test_multiple_content_types', - { - description: 'Tests response with multiple content types (text, image, resource)' - }, - async (): Promise => { - return { - content: [ - { type: 'text', text: 'Multiple content types test:' }, - { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' }, - { - type: 'resource', - resource: { - uri: 'test://mixed-content-resource', - mimeType: 'application/json', - text: JSON.stringify({ test: 'data', value: 123 }) - } - } - ] - }; - } - ); - - // Tool with logging - mcpServer.registerTool( - 'test_tool_with_logging', - { - description: 'Tests tool that emits log messages during execution', - inputSchema: z.object({}) - }, - async (_args, ctx): Promise => { - await ctx.mcpReq.notify({ - method: 'notifications/message', - params: { - level: 'info', - data: 'Tool execution started' - } - }); - await new Promise(resolve => setTimeout(resolve, 50)); - - await ctx.mcpReq.notify({ - method: 'notifications/message', - params: { - level: 'info', - data: 'Tool processing data' - } - }); - await new Promise(resolve => setTimeout(resolve, 50)); - - await ctx.mcpReq.notify({ - method: 'notifications/message', - params: { - level: 'info', - data: 'Tool execution completed' - } - }); - return { - content: [{ type: 'text', text: 'Tool with logging executed successfully' }] - }; - } - ); - - // Tool with progress - mcpServer.registerTool( - 'test_tool_with_progress', - { - description: 'Tests tool that reports progress notifications', - inputSchema: z.object({}) - }, - async (_args, ctx): Promise => { - const progressToken = ctx.mcpReq._meta?.progressToken ?? 0; - console.log('Progress token:', progressToken); - await ctx.mcpReq.notify({ - method: 'notifications/progress', - params: { - progressToken, - progress: 0, - total: 100, - message: `Completed step ${0} of ${100}` - } - }); - await new Promise(resolve => setTimeout(resolve, 50)); - - await ctx.mcpReq.notify({ - method: 'notifications/progress', - params: { - progressToken, - progress: 50, - total: 100, - message: `Completed step ${50} of ${100}` - } - }); - await new Promise(resolve => setTimeout(resolve, 50)); - - await ctx.mcpReq.notify({ - method: 'notifications/progress', - params: { - progressToken, - progress: 100, - total: 100, - message: `Completed step ${100} of ${100}` - } - }); - - return { - content: [{ type: 'text', text: String(progressToken) }] - }; - } - ); - - // Error handling tool - mcpServer.registerTool( - 'test_error_handling', - { - description: 'Tests error response handling' - }, - async (): Promise => { - throw new Error('This tool intentionally returns an error for testing'); - } - ); - - // SEP-1699: Reconnection test tool - closes SSE stream mid-call to test client reconnection - mcpServer.registerTool( - 'test_reconnection', - { - description: - 'Tests SSE stream disconnection and client reconnection (SEP-1699). Server will close the stream mid-call and send the result after client reconnects.', - inputSchema: z.object({}) - }, - async (_args, ctx): Promise => { - const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); - - console.log(`[${ctx.sessionId}] Starting test_reconnection tool...`); - - // Get the transport for this session - const transport = ctx.sessionId ? transports[ctx.sessionId] : undefined; - if (transport && ctx.mcpReq.id) { - // Close the SSE stream to trigger client reconnection - console.log(`[${ctx.sessionId}] Closing SSE stream to trigger client polling...`); - transport.closeSSEStream(ctx.mcpReq.id); - } - - // Wait for client to reconnect (should respect retry field) - await sleep(100); - - console.log(`[${ctx.sessionId}] test_reconnection tool complete`); - - return { - content: [ - { - type: 'text', - text: 'Reconnection test completed successfully. If you received this, the client properly reconnected after stream closure.' - } - ] - }; - } - ); - - // Sampling tool - requests LLM completion from client - mcpServer.registerTool( - 'test_sampling', - { - description: 'Tests server-initiated sampling (LLM completion request)', - inputSchema: z.object({ - prompt: z.string().describe('The prompt to send to the LLM') - }) - }, - async (args: { prompt: string }, ctx): Promise => { - try { - // Request sampling from client - const result = (await ctx.mcpReq.send({ - method: 'sampling/createMessage', - params: { - messages: [ - { - role: 'user', - content: { - type: 'text', - text: args.prompt - } - } - ], - maxTokens: 100 - } - })) as { content?: { text?: string }; message?: { content?: { text?: string } } }; - - const modelResponse = result.content?.text || result.message?.content?.text || 'No response'; - - return { - content: [ - { - type: 'text', - text: `LLM response: ${modelResponse}` - } - ] - }; - } catch (error) { - return { - content: [ - { - type: 'text', - text: `Sampling not supported or error: ${error instanceof Error ? error.message : String(error)}` - } - ] - }; - } - } - ); - - // Elicitation tool - requests user input from client - mcpServer.registerTool( - 'test_elicitation', - { - description: 'Tests server-initiated elicitation (user input request)', - inputSchema: z.object({ - message: z.string().describe('The message to show the user') - }) - }, - async (args: { message: string }, ctx): Promise => { - try { - // Request user input from client - const result = await ctx.mcpReq.send({ - method: 'elicitation/create', - params: { - message: args.message, - requestedSchema: { - type: 'object', - properties: { - response: { - type: 'string', - description: "User's response" - } - }, - required: ['response'] - } - } - }); - - const elicitResult = result as { action?: string; content?: unknown }; - return { - content: [ - { - type: 'text', - text: `User response: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` - } - ] - }; - } catch (error) { - return { - content: [ - { - type: 'text', - text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` - } - ] - }; - } - } - ); - - // SEP-1034: Elicitation with default values for all primitive types - mcpServer.registerTool( - 'test_elicitation_sep1034_defaults', - { - description: 'Tests elicitation with default values per SEP-1034', - inputSchema: z.object({}) - }, - async (_args, ctx): Promise => { - try { - // Request user input with default values for all primitive types - const result = await ctx.mcpReq.send({ - method: 'elicitation/create', - params: { - message: 'Please review and update the form fields with defaults', - requestedSchema: { - type: 'object', - properties: { - name: { - type: 'string', - description: 'User name', - default: 'John Doe' - }, - age: { - type: 'integer', - description: 'User age', - default: 30 - }, - score: { - type: 'number', - description: 'User score', - default: 95.5 - }, - status: { - type: 'string', - description: 'User status', - enum: ['active', 'inactive', 'pending'], - default: 'active' - }, - verified: { - type: 'boolean', - description: 'Verification status', - default: true - } - }, - required: [] - } - } - }); - - const elicitResult = result as { action?: string; content?: unknown }; - return { - content: [ - { - type: 'text', - text: `Elicitation completed: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` - } - ] - }; - } catch (error) { - return { - content: [ - { - type: 'text', - text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` - } - ] - }; - } - } - ); - - // SEP-1330: Elicitation with enum schema improvements - mcpServer.registerTool( - 'test_elicitation_sep1330_enums', - { - description: 'Tests elicitation with enum schema improvements per SEP-1330', - inputSchema: z.object({}) - }, - async (_args, ctx): Promise => { - try { - // Request user input with all 5 enum schema variants - const result = await ctx.mcpReq.send({ - method: 'elicitation/create', - params: { - message: 'Please select options from the enum fields', - requestedSchema: { - type: 'object', - properties: { - // Untitled single-select enum (basic) - untitledSingle: { - type: 'string', - description: 'Select one option', - enum: ['option1', 'option2', 'option3'] - }, - // Titled single-select enum (using oneOf with const/title) - titledSingle: { - type: 'string', - description: 'Select one option with titles', - oneOf: [ - { const: 'value1', title: 'First Option' }, - { const: 'value2', title: 'Second Option' }, - { const: 'value3', title: 'Third Option' } - ] - }, - // Legacy titled enum (using enumNames - deprecated) - legacyEnum: { - type: 'string', - description: 'Select one option (legacy)', - enum: ['opt1', 'opt2', 'opt3'], - enumNames: ['Option One', 'Option Two', 'Option Three'] - }, - // Untitled multi-select enum - untitledMulti: { - type: 'array', - description: 'Select multiple options', - minItems: 1, - maxItems: 3, - items: { - type: 'string', - enum: ['option1', 'option2', 'option3'] - } - }, - // Titled multi-select enum (using anyOf with const/title) - titledMulti: { - type: 'array', - description: 'Select multiple options with titles', - minItems: 1, - maxItems: 3, - items: { - anyOf: [ - { const: 'value1', title: 'First Choice' }, - { const: 'value2', title: 'Second Choice' }, - { const: 'value3', title: 'Third Choice' } - ] - } - } - }, - required: [] - } - } - }); - - const elicitResult = result as { action?: string; content?: unknown }; - return { - content: [ - { - type: 'text', - text: `Elicitation completed: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` - } - ] - }; - } catch (error) { - return { - content: [ - { - type: 'text', - text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` - } - ] - }; - } - } - ); - - // SEP-1613: JSON Schema 2020-12 conformance test tool - mcpServer.registerTool( - 'json_schema_2020_12_tool', - { - description: 'Tool with JSON Schema 2020-12 features for conformance testing (SEP-1613)', - inputSchema: z.object({ - name: z.string().optional(), - address: z - .object({ - street: z.string().optional(), - city: z.string().optional() - }) - .optional() - }) - }, - async (args: { name?: string; address?: { street?: string; city?: string } }): Promise => { - return { - content: [ - { - type: 'text', - text: `JSON Schema 2020-12 tool called with: ${JSON.stringify(args)}` - } - ] - }; - } - ); - - // ===== RESOURCES ===== - - // Static text resource - mcpServer.registerResource( - 'static-text', - 'test://static-text', - { - title: 'Static Text Resource', - description: 'A static text resource for testing', - mimeType: 'text/plain' - }, - async (): Promise => { - return { - contents: [ - { - uri: 'test://static-text', - mimeType: 'text/plain', - text: 'This is the content of the static text resource.' - } - ] - }; - } - ); - - // Static binary resource - mcpServer.registerResource( - 'static-binary', - 'test://static-binary', - { - title: 'Static Binary Resource', - description: 'A static binary resource (image) for testing', - mimeType: 'image/png' - }, - async (): Promise => { - return { - contents: [ - { - uri: 'test://static-binary', - mimeType: 'image/png', - blob: TEST_IMAGE_BASE64 - } - ] - }; - } - ); - - // Resource template - mcpServer.registerResource( - 'template', - new ResourceTemplate('test://template/{id}/data', { list: undefined }), - { - title: 'Resource Template', - description: 'A resource template with parameter substitution', - mimeType: 'application/json' - }, - async (uri, variables): Promise => { - const id = variables.id; - return { - contents: [ - { - uri: uri.toString(), - mimeType: 'application/json', - text: JSON.stringify({ - id, - templateTest: true, - data: `Data for ID: ${id}` - }) - } - ] - }; - } - ); - - // Watched resource - mcpServer.registerResource( - 'watched-resource', - 'test://watched-resource', - { - title: 'Watched Resource', - description: 'A resource that auto-updates every 3 seconds', - mimeType: 'text/plain' - }, - async (): Promise => { - return { - contents: [ - { - uri: 'test://watched-resource', - mimeType: 'text/plain', - text: watchedResourceContent - } - ] - }; - } - ); - - // Subscribe/Unsubscribe handlers - mcpServer.server.setRequestHandler('resources/subscribe', async request => { - const uri = request.params.uri; - resourceSubscriptions.add(uri); - sendLog('info', `Subscribed to resource: ${uri}`); - return {}; - }); - - mcpServer.server.setRequestHandler('resources/unsubscribe', async request => { - const uri = request.params.uri; - resourceSubscriptions.delete(uri); - sendLog('info', `Unsubscribed from resource: ${uri}`); - return {}; - }); - - // ===== PROMPTS ===== - - // Simple prompt - mcpServer.registerPrompt( - 'test_simple_prompt', - { - title: 'Simple Test Prompt', - description: 'A simple prompt without arguments' - }, - async (): Promise => { - return { - messages: [ - { - role: 'user', - content: { - type: 'text', - text: 'This is a simple prompt for testing.' - } - } - ] - }; - } - ); - - // Prompt with arguments - mcpServer.registerPrompt( - 'test_prompt_with_arguments', - { - title: 'Prompt With Arguments', - description: 'A prompt with required arguments', - argsSchema: z.object({ - arg1: z.string().describe('First test argument'), - arg2: z.string().describe('Second test argument') - }) - }, - async (args: { arg1: string; arg2: string }): Promise => { - return { - messages: [ - { - role: 'user', - content: { - type: 'text', - text: `Prompt with arguments: arg1='${args.arg1}', arg2='${args.arg2}'` - } - } - ] - }; - } - ); - - // Prompt with embedded resource - mcpServer.registerPrompt( - 'test_prompt_with_embedded_resource', - { - title: 'Prompt With Embedded Resource', - description: 'A prompt that includes an embedded resource', - argsSchema: z.object({ - resourceUri: z.string().describe('URI of the resource to embed') - }) - }, - async (args: { resourceUri: string }): Promise => { - return { - messages: [ - { - role: 'user', - content: { - type: 'resource', - resource: { - uri: args.resourceUri, - mimeType: 'text/plain', - text: 'Embedded resource content for testing.' - } - } - }, - { - role: 'user', - content: { - type: 'text', - text: 'Please process the embedded resource above.' - } - } - ] - }; - } - ); - - // Prompt with image - mcpServer.registerPrompt( - 'test_prompt_with_image', - { - title: 'Prompt With Image', - description: 'A prompt that includes image content' - }, - async (): Promise => { - return { - messages: [ - { - role: 'user', - content: { - type: 'image', - data: TEST_IMAGE_BASE64, - mimeType: 'image/png' - } - }, - { - role: 'user', - content: { type: 'text', text: 'Please analyze the image above.' } - } - ] - }; - } - ); - - // ===== LOGGING ===== - - mcpServer.server.setRequestHandler('logging/setLevel', async request => { - const level = request.params.level; - sendLog('info', `Log level set to: ${level}`); - return {}; - }); - - // ===== COMPLETION ===== - - mcpServer.server.setRequestHandler('completion/complete', async () => { - // Basic completion support - returns empty array for conformance - // Real implementations would provide contextual suggestions - return { - completion: { - values: [], - total: 0, - hasMore: false - } - }; - }); - - return mcpServer; -} - -// ===== EXPRESS APP ===== - const app = express(); app.use(express.json()); - -// DNS rebinding protection: reject non-localhost Host headers app.use(localhostHostValidation()); - -// Configure CORS to expose Mcp-Session-Id header for browser-based clients app.use( cors({ origin: '*', @@ -889,7 +38,6 @@ app.use( }) ); -// Handle POST requests - stateful mode app.post('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -897,16 +45,20 @@ app.post('/mcp', async (req: Request, res: Response) => { let transport: NodeStreamableHTTPServerTransport; if (sessionId && transports[sessionId]) { - // Reuse existing transport for established sessions transport = transports[sessionId]; } else if (!sessionId && isInitializeRequest(req.body)) { - // Create new transport for initialization requests - const mcpServer = createMcpServer(); + const mcpServer = createMcpServer({ + closeSSEForReconnectTest: ctx => { + const sid = ctx.sessionId; + const t = sid ? transports[sid] : undefined; + if (t && ctx.mcpReq.id) t.closeSSEStream(ctx.mcpReq.id); + } + }); transport = new NodeStreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore: createEventStore(), - retryInterval: 5000, // 5 second retry interval for SEP-1699 + retryInterval: 5000, onsessioninitialized: (newSessionId: string) => { transports[newSessionId] = transport; servers[newSessionId] = mcpServer; @@ -930,18 +82,10 @@ app.post('/mcp', async (req: Request, res: Response) => { await transport.handleRequest(req, res, req.body); return; } else if (sessionId) { - res.status(404).json({ - jsonrpc: '2.0', - error: { code: -32_001, message: 'Session not found' }, - id: null - }); + res.status(404).json({ jsonrpc: '2.0', error: { code: -32_001, message: 'Session not found' }, id: null }); return; } else { - res.status(400).json({ - jsonrpc: '2.0', - error: { code: -32_000, message: 'Bad Request: Session ID required' }, - id: null - }); + res.status(400).json({ jsonrpc: '2.0', error: { code: -32_000, message: 'Bad Request: Session ID required' }, id: null }); return; } @@ -949,19 +93,11 @@ app.post('/mcp', async (req: Request, res: Response) => { } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { - res.status(500).json({ - jsonrpc: '2.0', - error: { - code: -32_603, - message: 'Internal server error' - }, - id: null - }); + res.status(500).json({ jsonrpc: '2.0', error: { code: -32_603, message: 'Internal server error' }, id: null }); } } }); -// Handle GET requests - SSE streams for sessions app.get('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -992,7 +128,6 @@ app.get('/mcp', async (req: Request, res: Response) => { } }); -// Handle DELETE requests - session termination app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -1018,9 +153,8 @@ app.delete('/mcp', async (req: Request, res: Response) => { } }); -// Start server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { - console.log(`MCP Conformance Test Server running on http://localhost:${PORT}`); + console.log(`MCP Conformance Test Server (transport.connect path) running on http://localhost:${PORT}`); console.log(` - MCP endpoint: http://localhost:${PORT}/mcp`); }); diff --git a/test/conformance/src/everythingServerHandleHttp.ts b/test/conformance/src/everythingServerHandleHttp.ts new file mode 100644 index 0000000000..4fd48e672e --- /dev/null +++ b/test/conformance/src/everythingServerHandleHttp.ts @@ -0,0 +1,54 @@ +#!/usr/bin/env node + +/** + * MCP conformance server — `handleHttp()` API path. + * + * One shared {@linkcode McpServer} driven by `handleHttp(mcp, { session, eventStore })` + * (which mounts the internal `shttpHandler`) and adapted to express via `toNodeHttpHandler`. + * No transport class, no per-session server map. Registrations from + * {@linkcode ./everythingServerSetup.ts}. + * + * Sibling: {@linkcode ./everythingServer.ts} drives the same registrations via the + * `transport.connect()` API surface so CI proves both API surfaces stay conformant. + */ + +import { localhostHostValidation } from '@modelcontextprotocol/express'; +import { toNodeHttpHandler } from '@modelcontextprotocol/node'; +import { handleHttp, SessionCompat } from '@modelcontextprotocol/server'; +import cors from 'cors'; +import express from 'express'; + +import { createEventStore, createMcpServer } from './everythingServerSetup.js'; + +const mcp = createMcpServer({ + closeSSEForReconnectTest: ctx => ctx.http?.closeSSE?.() +}); + +const handler = toNodeHttpHandler( + handleHttp(mcp, { + session: new SessionCompat({ + onsessioninitialized: sid => console.log(`Session initialized with ID: ${sid}`), + onsessionclosed: sid => console.log(`Session ${sid} closed`) + }), + eventStore: createEventStore(), + retryInterval: 5000, + onerror: err => console.error('handleHttp error:', err) + }) +); + +const app = express(); +app.use(localhostHostValidation()); +app.use( + cors({ + origin: '*', + exposedHeaders: ['Mcp-Session-Id'], + allowedHeaders: ['Content-Type', 'mcp-session-id', 'last-event-id'] + }) +); +app.all('/mcp', handler); + +const PORT = process.env.PORT || 3000; +app.listen(PORT, () => { + console.log(`MCP Conformance Test Server (handleHttp path) running on http://localhost:${PORT}`); + console.log(` - MCP endpoint: http://localhost:${PORT}/mcp`); +}); diff --git a/test/conformance/src/everythingServerSetup.ts b/test/conformance/src/everythingServerSetup.ts new file mode 100644 index 0000000000..a2536f673f --- /dev/null +++ b/test/conformance/src/everythingServerSetup.ts @@ -0,0 +1,563 @@ +/** + * Shared MCP conformance server setup. Registers all tools/resources/prompts on a + * fresh {@linkcode McpServer}. Consumed by both conformance entry points so the + * dual-target harness exercises the same handler set against: + * - {@linkcode ./everythingServer.ts} — `transport.connect()` / `NodeStreamableHTTPServerTransport` + * - {@linkcode ./everythingServerHandleHttp.ts} — `handleHttp()` / `shttpHandler` + */ + +import { randomUUID } from 'node:crypto'; + +import type { + CallToolResult, + EventId, + EventStore, + GetPromptResult, + ReadResourceResult, + ServerContext, + StreamId +} from '@modelcontextprotocol/server'; +import { McpServer, ResourceTemplate } from '@modelcontextprotocol/server'; +import * as z from 'zod/v4'; + +const resourceSubscriptions = new Set(); +const watchedResourceContent = 'Watched resource content'; + +/** Sample base64-encoded 1×1 red PNG pixel for testing. */ +export const TEST_IMAGE_BASE64 = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=='; + +/** Sample base64-encoded minimal WAV file for testing. */ +export const TEST_AUDIO_BASE64 = 'UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA='; + +/** In-memory {@linkcode EventStore} for SEP-1699 SSE resumability. */ +export function createEventStore(): EventStore { + const data = new Map(); + return { + async storeEvent(streamId: StreamId, message: unknown): Promise { + const eventId = `${streamId}::${Date.now()}_${randomUUID()}`; + data.set(eventId, { eventId, message, streamId }); + return eventId; + }, + async getStreamIdForEventId(eventId: EventId): Promise { + return data.get(eventId)?.streamId; + }, + async replayEventsAfter( + lastEventId: EventId, + { send }: { send: (eventId: EventId, message: unknown) => Promise } + ): Promise { + const streamId = lastEventId.split('::')[0] || lastEventId; + const eventsToReplay: Array<[string, { message: unknown }]> = []; + for (const [eventId, ev] of data.entries()) { + if (ev.streamId === streamId && eventId > lastEventId) { + eventsToReplay.push([eventId, ev]); + } + } + eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); + for (const [eventId, { message }] of eventsToReplay) { + if (message && typeof message === 'object' && Object.keys(message).length > 0) { + await send(eventId, message); + } + } + return streamId; + } + }; +} + +export interface SetupOptions { + /** + * Hook for the SEP-1699 reconnection test. Called mid-handler to forcibly close the + * current SSE stream so the client must reconnect with `Last-Event-ID`. + * The two entry points wire this differently (transport-map lookup vs `ctx.http?.closeSSE`). + */ + closeSSEForReconnectTest: (ctx: ServerContext) => void; +} + +/** + * Builds a fully-registered conformance {@linkcode McpServer}. All registrations are + * stateless on the server instance, so the entry point decides whether to create one + * per session (transport.connect path) or one shared instance (handleHttp path). + */ +export function createMcpServer(opts: SetupOptions): McpServer { + const mcpServer = new McpServer( + { name: 'mcp-conformance-test-server', version: '1.0.0' }, + { + capabilities: { + tools: { listChanged: true }, + resources: { subscribe: true, listChanged: true }, + prompts: { listChanged: true }, + logging: {}, + completions: {} + } + } + ); + + function sendLog( + level: 'debug' | 'info' | 'notice' | 'warning' | 'error' | 'critical' | 'alert' | 'emergency', + message: string, + _data?: unknown + ) { + mcpServer.server + .notification({ method: 'notifications/message', params: { level, logger: 'conformance-test-server', data: _data || message } }) + .catch(() => { + // Ignore error if no client is connected. + }); + } + + // ===== TOOLS ===== + + mcpServer.registerTool('test_simple_text', { description: 'Tests simple text content response' }, async (): Promise => { + return { content: [{ type: 'text', text: 'This is a simple text response for testing.' }] }; + }); + + mcpServer.registerTool('test_image_content', { description: 'Tests image content response' }, async (): Promise => { + return { content: [{ type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' }] }; + }); + + mcpServer.registerTool('test_audio_content', { description: 'Tests audio content response' }, async (): Promise => { + return { content: [{ type: 'audio', data: TEST_AUDIO_BASE64, mimeType: 'audio/wav' }] }; + }); + + mcpServer.registerTool( + 'test_embedded_resource', + { description: 'Tests embedded resource content response' }, + async (): Promise => { + return { + content: [ + { + type: 'resource', + resource: { uri: 'test://embedded-resource', mimeType: 'text/plain', text: 'This is an embedded resource content.' } + } + ] + }; + } + ); + + mcpServer.registerTool( + 'test_multiple_content_types', + { description: 'Tests response with multiple content types (text, image, resource)' }, + async (): Promise => { + return { + content: [ + { type: 'text', text: 'Multiple content types test:' }, + { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' }, + { + type: 'resource', + resource: { + uri: 'test://mixed-content-resource', + mimeType: 'application/json', + text: JSON.stringify({ test: 'data', value: 123 }) + } + } + ] + }; + } + ); + + mcpServer.registerTool( + 'test_tool_with_logging', + { description: 'Tests tool that emits log messages during execution', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + await ctx.mcpReq.notify({ method: 'notifications/message', params: { level: 'info', data: 'Tool execution started' } }); + await new Promise(resolve => setTimeout(resolve, 50)); + await ctx.mcpReq.notify({ method: 'notifications/message', params: { level: 'info', data: 'Tool processing data' } }); + await new Promise(resolve => setTimeout(resolve, 50)); + await ctx.mcpReq.notify({ method: 'notifications/message', params: { level: 'info', data: 'Tool execution completed' } }); + return { content: [{ type: 'text', text: 'Tool with logging executed successfully' }] }; + } + ); + + mcpServer.registerTool( + 'test_tool_with_progress', + { description: 'Tests tool that reports progress notifications', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + const progressToken = ctx.mcpReq._meta?.progressToken ?? 0; + console.log('Progress token:', progressToken); + for (const progress of [0, 50, 100]) { + await ctx.mcpReq.notify({ + method: 'notifications/progress', + params: { progressToken, progress, total: 100, message: `Completed step ${progress} of ${100}` } + }); + if (progress !== 100) await new Promise(resolve => setTimeout(resolve, 50)); + } + return { content: [{ type: 'text', text: String(progressToken) }] }; + } + ); + + mcpServer.registerTool('test_error_handling', { description: 'Tests error response handling' }, async (): Promise => { + throw new Error('This tool intentionally returns an error for testing'); + }); + + mcpServer.registerTool( + 'test_reconnection', + { + description: + 'Tests SSE stream disconnection and client reconnection (SEP-1699). Server will close the stream mid-call and send the result after client reconnects.', + inputSchema: z.object({}) + }, + async (_args, ctx): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + const sid = ctx.sessionId; + console.log(`[${sid}] Starting test_reconnection tool...`); + console.log(`[${sid}] Closing SSE stream to trigger client polling...`); + opts.closeSSEForReconnectTest(ctx); + await sleep(100); + console.log(`[${sid}] test_reconnection tool complete`); + return { + content: [ + { + type: 'text', + text: 'Reconnection test completed successfully. If you received this, the client properly reconnected after stream closure.' + } + ] + }; + } + ); + + mcpServer.registerTool( + 'test_sampling', + { + description: 'Tests server-initiated sampling (LLM completion request)', + inputSchema: z.object({ prompt: z.string().describe('The prompt to send to the LLM') }) + }, + async (args: { prompt: string }, ctx): Promise => { + try { + const result = (await ctx.mcpReq.send({ + method: 'sampling/createMessage', + params: { messages: [{ role: 'user', content: { type: 'text', text: args.prompt } }], maxTokens: 100 } + })) as { content?: { text?: string }; message?: { content?: { text?: string } } }; + const modelResponse = result.content?.text || result.message?.content?.text || 'No response'; + return { content: [{ type: 'text', text: `LLM response: ${modelResponse}` }] }; + } catch (error) { + return { + content: [ + { type: 'text', text: `Sampling not supported or error: ${error instanceof Error ? error.message : String(error)}` } + ] + }; + } + } + ); + + mcpServer.registerTool( + 'test_elicitation', + { + description: 'Tests server-initiated elicitation (user input request)', + inputSchema: z.object({ message: z.string().describe('The message to show the user') }) + }, + async (args: { message: string }, ctx): Promise => { + try { + const result = await ctx.mcpReq.send({ + method: 'elicitation/create', + params: { + message: args.message, + requestedSchema: { + type: 'object', + properties: { response: { type: 'string', description: "User's response" } }, + required: ['response'] + } + } + }); + const elicitResult = result as { action?: string; content?: unknown }; + return { + content: [ + { + type: 'text', + text: `User response: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` + } + ] + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` + } + ] + }; + } + } + ); + + mcpServer.registerTool( + 'test_elicitation_sep1034_defaults', + { description: 'Tests elicitation with default values per SEP-1034', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + try { + const result = await ctx.mcpReq.send({ + method: 'elicitation/create', + params: { + message: 'Please review and update the form fields with defaults', + requestedSchema: { + type: 'object', + properties: { + name: { type: 'string', description: 'User name', default: 'John Doe' }, + age: { type: 'integer', description: 'User age', default: 30 }, + score: { type: 'number', description: 'User score', default: 95.5 }, + status: { + type: 'string', + description: 'User status', + enum: ['active', 'inactive', 'pending'], + default: 'active' + }, + verified: { type: 'boolean', description: 'Verification status', default: true } + }, + required: [] + } + } + }); + const elicitResult = result as { action?: string; content?: unknown }; + return { + content: [ + { + type: 'text', + text: `Elicitation completed: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` + } + ] + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` + } + ] + }; + } + } + ); + + mcpServer.registerTool( + 'test_elicitation_sep1330_enums', + { description: 'Tests elicitation with enum schema improvements per SEP-1330', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + try { + const result = await ctx.mcpReq.send({ + method: 'elicitation/create', + params: { + message: 'Please select options from the enum fields', + requestedSchema: { + type: 'object', + properties: { + untitledSingle: { + type: 'string', + description: 'Select one option', + enum: ['option1', 'option2', 'option3'] + }, + titledSingle: { + type: 'string', + description: 'Select one option with titles', + oneOf: [ + { const: 'value1', title: 'First Option' }, + { const: 'value2', title: 'Second Option' }, + { const: 'value3', title: 'Third Option' } + ] + }, + legacyEnum: { + type: 'string', + description: 'Select one option (legacy)', + enum: ['opt1', 'opt2', 'opt3'], + enumNames: ['Option One', 'Option Two', 'Option Three'] + }, + untitledMulti: { + type: 'array', + description: 'Select multiple options', + minItems: 1, + maxItems: 3, + items: { type: 'string', enum: ['option1', 'option2', 'option3'] } + }, + titledMulti: { + type: 'array', + description: 'Select multiple options with titles', + minItems: 1, + maxItems: 3, + items: { + anyOf: [ + { const: 'value1', title: 'First Choice' }, + { const: 'value2', title: 'Second Choice' }, + { const: 'value3', title: 'Third Choice' } + ] + } + } + }, + required: [] + } + } + }); + const elicitResult = result as { action?: string; content?: unknown }; + return { + content: [ + { + type: 'text', + text: `Elicitation completed: action=${elicitResult.action}, content=${JSON.stringify(elicitResult.content || {})}` + } + ] + }; + } catch (error) { + return { + content: [ + { + type: 'text', + text: `Elicitation not supported or error: ${error instanceof Error ? error.message : String(error)}` + } + ] + }; + } + } + ); + + mcpServer.registerTool( + 'json_schema_2020_12_tool', + { + description: 'Tool with JSON Schema 2020-12 features for conformance testing (SEP-1613)', + inputSchema: z.object({ + name: z.string().optional(), + address: z.object({ street: z.string().optional(), city: z.string().optional() }).optional() + }) + }, + async (args: { name?: string; address?: { street?: string; city?: string } }): Promise => { + return { content: [{ type: 'text', text: `JSON Schema 2020-12 tool called with: ${JSON.stringify(args)}` }] }; + } + ); + + // ===== RESOURCES ===== + + mcpServer.registerResource( + 'static-text', + 'test://static-text', + { title: 'Static Text Resource', description: 'A static text resource for testing', mimeType: 'text/plain' }, + async (): Promise => { + return { + contents: [{ uri: 'test://static-text', mimeType: 'text/plain', text: 'This is the content of the static text resource.' }] + }; + } + ); + + mcpServer.registerResource( + 'static-binary', + 'test://static-binary', + { title: 'Static Binary Resource', description: 'A static binary resource (image) for testing', mimeType: 'image/png' }, + async (): Promise => { + return { contents: [{ uri: 'test://static-binary', mimeType: 'image/png', blob: TEST_IMAGE_BASE64 }] }; + } + ); + + mcpServer.registerResource( + 'template', + new ResourceTemplate('test://template/{id}/data', { list: undefined }), + { title: 'Resource Template', description: 'A resource template with parameter substitution', mimeType: 'application/json' }, + async (uri, variables): Promise => { + const id = variables.id; + return { + contents: [ + { + uri: uri.toString(), + mimeType: 'application/json', + text: JSON.stringify({ id, templateTest: true, data: `Data for ID: ${id}` }) + } + ] + }; + } + ); + + mcpServer.registerResource( + 'watched-resource', + 'test://watched-resource', + { title: 'Watched Resource', description: 'A resource that auto-updates every 3 seconds', mimeType: 'text/plain' }, + async (): Promise => { + return { contents: [{ uri: 'test://watched-resource', mimeType: 'text/plain', text: watchedResourceContent }] }; + } + ); + + mcpServer.server.setRequestHandler('resources/subscribe', async request => { + const uri = request.params.uri; + resourceSubscriptions.add(uri); + sendLog('info', `Subscribed to resource: ${uri}`); + return {}; + }); + + mcpServer.server.setRequestHandler('resources/unsubscribe', async request => { + const uri = request.params.uri; + resourceSubscriptions.delete(uri); + sendLog('info', `Unsubscribed from resource: ${uri}`); + return {}; + }); + + // ===== PROMPTS ===== + + mcpServer.registerPrompt( + 'test_simple_prompt', + { title: 'Simple Test Prompt', description: 'A simple prompt without arguments' }, + async (): Promise => { + return { messages: [{ role: 'user', content: { type: 'text', text: 'This is a simple prompt for testing.' } }] }; + } + ); + + mcpServer.registerPrompt( + 'test_prompt_with_arguments', + { + title: 'Prompt With Arguments', + description: 'A prompt with required arguments', + argsSchema: z.object({ arg1: z.string().describe('First test argument'), arg2: z.string().describe('Second test argument') }) + }, + async (args: { arg1: string; arg2: string }): Promise => { + return { + messages: [ + { role: 'user', content: { type: 'text', text: `Prompt with arguments: arg1='${args.arg1}', arg2='${args.arg2}'` } } + ] + }; + } + ); + + mcpServer.registerPrompt( + 'test_prompt_with_embedded_resource', + { + title: 'Prompt With Embedded Resource', + description: 'A prompt that includes an embedded resource', + argsSchema: z.object({ resourceUri: z.string().describe('URI of the resource to embed') }) + }, + async (args: { resourceUri: string }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'resource', + resource: { uri: args.resourceUri, mimeType: 'text/plain', text: 'Embedded resource content for testing.' } + } + }, + { role: 'user', content: { type: 'text', text: 'Please process the embedded resource above.' } } + ] + }; + } + ); + + mcpServer.registerPrompt( + 'test_prompt_with_image', + { title: 'Prompt With Image', description: 'A prompt that includes image content' }, + async (): Promise => { + return { + messages: [ + { role: 'user', content: { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' } }, + { role: 'user', content: { type: 'text', text: 'Please analyze the image above.' } } + ] + }; + } + ); + + // ===== LOGGING ===== + + mcpServer.server.setRequestHandler('logging/setLevel', async request => { + const level = request.params.level; + sendLog('info', `Log level set to: ${level}`); + return {}; + }); + + // ===== COMPLETION ===== + + mcpServer.server.setRequestHandler('completion/complete', async () => { + return { completion: { values: [], total: 0, hasMore: false } }; + }); + + return mcpServer; +} From 8c8082690445ca6e99fec375e91f4fe4016b3c45 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 14:19:27 +0000 Subject: [PATCH 2/5] fix(sessionCompat): reject initialize POST that carries Mcp-Session-Id (v1 parity); ci: run dual-target server conformance --- .github/workflows/conformance.yml | 2 +- packages/server/src/server/sessionCompat.ts | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 2179f02729..ef3b11c3a3 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -48,4 +48,4 @@ jobs: cache-dependency-path: pnpm-lock.yaml - run: pnpm install - run: pnpm run build:all - - run: pnpm run test:conformance:server + - run: pnpm run test:conformance:server:dual diff --git a/packages/server/src/server/sessionCompat.ts b/packages/server/src/server/sessionCompat.ts index c5b09a11b6..305b46af1d 100644 --- a/packages/server/src/server/sessionCompat.ts +++ b/packages/server/src/server/sessionCompat.ts @@ -146,6 +146,18 @@ export class SessionCompat { response: jsonError(400, -32_600, 'Invalid Request: Only one initialization request is allowed') }; } + // v1 parity: an initialize POST that already carries an Mcp-Session-Id is a + // re-init against an existing session and is rejected (the v1 app pattern + // routed it to the existing transport, which returned "Server already + // initialized"). Without this check a confused/retrying client mints a new + // session per retry in multi-session mode. + if (req.headers.get('mcp-session-id') !== null) { + this._onerror?.(new Error('Invalid Request: initialize must not carry Mcp-Session-Id')); + return { + ok: false, + response: jsonError(400, -32_600, 'Invalid Request: Server already initialized') + }; + } this._evictIdle(); if (this._singleSession && this._sessions.size > 0) { this._onerror?.(new Error('Invalid Request: Server already initialized')); From f15bcb8c6d92d48efa743d241aaa49b86286b0ce Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 15:25:29 +0000 Subject: [PATCH 3/5] fix(sessionCompat): catch close-listener rejections in _evict; resolve typedoc warnings --- packages/server/src/server/sessionCompat.ts | 12 +++++++++--- packages/server/src/server/shttpHandler.ts | 3 +-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/server/src/server/sessionCompat.ts b/packages/server/src/server/sessionCompat.ts index 305b46af1d..98a7e7dce8 100644 --- a/packages/server/src/server/sessionCompat.ts +++ b/packages/server/src/server/sessionCompat.ts @@ -73,7 +73,7 @@ interface SessionEntry { /** * EventStore stream IDs minted for this session (per-POST SSE streams plus the standalone * GET stream). Used to reject `Last-Event-ID` replay for streams the session does not own. - * Bounded to {@linkcode MAX_STREAM_IDS_PER_SESSION}; oldest entries evicted on overflow. + * Bounded to `MAX_STREAM_IDS_PER_SESSION` (256); oldest entries evicted on overflow. */ streamIds: Set; /** Subset of {@linkcode streamIds} exempt from FIFO eviction (e.g. the standalone GET stream). */ @@ -255,7 +255,7 @@ export class SessionCompat { /** * Records an EventStore stream ID as belonging to this session so {@linkcode ownsStreamId} * can authorise `Last-Event-ID` replay. The set is bounded; once it reaches - * {@linkcode MAX_STREAM_IDS_PER_SESSION} the oldest entry is evicted, so very old POST + * `MAX_STREAM_IDS_PER_SESSION` (256) the oldest entry is evicted, so very old POST * streams become non-resumable in favour of bounding memory. */ addStreamId(sessionId: string, streamId: string, opts?: { protected?: boolean }): void { @@ -346,7 +346,13 @@ export class SessionCompat { // Already closed. } this._sessions.delete(id); - for (const cb of this._closeListeners) void Promise.resolve(cb(id)); + for (const cb of this._closeListeners) { + try { + void Promise.resolve(cb(id)).catch(error => this._onerror?.(error as Error)); + } catch (error) { + this._onerror?.(error as Error); + } + } } private _evictIdle(): void { diff --git a/packages/server/src/server/shttpHandler.ts b/packages/server/src/server/shttpHandler.ts index 5ebaf092e7..3c608720b2 100644 --- a/packages/server/src/server/shttpHandler.ts +++ b/packages/server/src/server/shttpHandler.ts @@ -306,8 +306,7 @@ export function shttpHandler( authInfo: extra?.authInfo, httpReq: req, sessionId, - clientCapabilities: - initParams?.capabilities ?? (sessionId !== undefined ? session?.clientCapabilities(sessionId) : undefined), + clientCapabilities: initParams?.capabilities ?? (sessionId === undefined ? undefined : session?.clientCapabilities(sessionId)), _transportExtra: { request: req, authInfo: extra?.authInfo } }; From 339211fa1b340ef32fc9b20ee806d8438371fa4f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 17:13:17 +0000 Subject: [PATCH 4/5] fix(sessionCompat): isolate close listeners in delete() (sibling of _evict fix; one bad listener no longer skips the rest) --- packages/server/src/server/sessionCompat.ts | 25 ++++++++++++++------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/packages/server/src/server/sessionCompat.ts b/packages/server/src/server/sessionCompat.ts index 98a7e7dce8..e0375560b1 100644 --- a/packages/server/src/server/sessionCompat.ts +++ b/packages/server/src/server/sessionCompat.ts @@ -239,7 +239,22 @@ export class SessionCompat { // Already closed. } this._sessions.delete(sessionId); - for (const cb of this._closeListeners) await Promise.resolve(cb(sessionId)); + this._fireCloseListeners(sessionId); + } + + /** + * Fires all registered close listeners with per-listener isolation: a throw or rejection + * from one listener is routed to {@linkcode SessionCompatOptions.onerror | onerror} and + * never prevents the remaining listeners from running. + */ + private _fireCloseListeners(sessionId: string): void { + for (const cb of this._closeListeners) { + try { + void Promise.resolve(cb(sessionId)).catch(error => this._onerror?.(error as Error)); + } catch (error) { + this._onerror?.(error as Error); + } + } } /** Protocol version the client requested in `initialize` for this session, if known. */ @@ -346,13 +361,7 @@ export class SessionCompat { // Already closed. } this._sessions.delete(id); - for (const cb of this._closeListeners) { - try { - void Promise.resolve(cb(id)).catch(error => this._onerror?.(error as Error)); - } catch (error) { - this._onerror?.(error as Error); - } - } + this._fireCloseListeners(id); } private _evictIdle(): void { From f34a8ff8408a2521f0f1cb0ddec8f3548abfdbfb Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 17:14:55 +0000 Subject: [PATCH 5/5] fix(shttpHandler): unknown Last-Event-ID returns 400, not 404 (404 means session terminated per spec) --- packages/server/src/server/shttpHandler.ts | 7 ++++++- packages/server/test/server/shttpHandler.test.ts | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/server/src/server/shttpHandler.ts b/packages/server/src/server/shttpHandler.ts index 3c608720b2..cf1ef09c52 100644 --- a/packages/server/src/server/shttpHandler.ts +++ b/packages/server/src/server/shttpHandler.ts @@ -466,7 +466,12 @@ export function shttpHandler( } const eventStreamId = await eventStore.getStreamIdForEventId(lastEventId); if (eventStreamId === undefined) { - return jsonError(404, -32_001, 'Event not found'); + // 400, not 404: per the Streamable HTTP spec a 404 on a request with + // Mcp-Session-Id signals "session terminated, reinitialize". This branch + // runs for a live session whose event store does not (or no longer) have + // that event id; clients should retry without Last-Event-ID, not abandon + // the session. + return jsonError(400, -32_000, 'Unknown or expired Last-Event-ID'); } if (!session.ownsStreamId(sessionId, eventStreamId)) { return jsonError(403, -32_000, 'Forbidden: event ID does not belong to this session'); diff --git a/packages/server/test/server/shttpHandler.test.ts b/packages/server/test/server/shttpHandler.test.ts index 64810b9d46..5e0673c80f 100644 --- a/packages/server/test/server/shttpHandler.test.ts +++ b/packages/server/test/server/shttpHandler.test.ts @@ -133,7 +133,8 @@ describe('shttpHandler — Last-Event-ID replay session binding', () => { await init.body?.cancel(); const res = await handler(getWithLastEventId(sid, 'no-such-stream::42')); - expect(res.status).toBe(404); + // 400, not 404: 404 with Mcp-Session-Id signals "session terminated" per spec. + expect(res.status).toBe(400); }); test('fails closed when eventStore lacks getStreamIdForEventId', async () => {