From 6056de446551b99d88768e1e63170d2a45c8eee0 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 11:29:53 +0000 Subject: [PATCH 1/3] fix(server): bind SSE replay to issuing transport instance in WebStandardStreamableHTTPServerTransport Per-instance _standaloneSseStreamId (lazy, prefers sessionId) and _issuedStreamIds set; replayEventsAfter rejects event IDs from a different instance. Same shape as the new shttpHandler path. --- docs/migration.md | 13 +++ examples/server/src/inMemoryEventStore.ts | 10 +++ .../node/test/streamableHttp.test.ts | 48 +++++++++++ packages/server/src/server/streamableHttp.ts | 80 ++++++++++++++----- test/conformance/src/everythingServer.ts | 3 + .../integration/test/taskResumability.test.ts | 5 ++ 6 files changed, 141 insertions(+), 18 deletions(-) diff --git a/docs/migration.md b/docs/migration.md index fecf185996..687eb6016e 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -962,6 +962,19 @@ a skill. ## Need Help? +### EventStore: `getStreamIdForEventId` required for SSE replay + +`EventStore.getStreamIdForEventId(eventId)` is now required for `Last-Event-ID` resumption. +The transport must verify the event belongs to a stream the requesting session issued +*before* replaying; without this method, replay returns HTTP 403. Implement it on your +event store (typically a lookup from event id to the `streamId` you stored it under). + +```typescript +async getStreamIdForEventId(eventId: string): Promise { + return this.events.get(eventId)?.streamId; +} +``` + If you encounter issues during migration: 1. Check the [FAQ](faq.md) for common questions about v2 changes diff --git a/examples/server/src/inMemoryEventStore.ts b/examples/server/src/inMemoryEventStore.ts index 604b84d39c..8861c772ab 100644 --- a/examples/server/src/inMemoryEventStore.ts +++ b/examples/server/src/inMemoryEventStore.ts @@ -33,6 +33,16 @@ export class InMemoryEventStore implements EventStore { return eventId; } + /** + * Resolves an event ID to its source stream. Required for the transport's + * replay-ownership check (a Last-Event-ID may only replay a stream the requesting + * session issued). + * Implements EventStore.getStreamIdForEventId + */ + async getStreamIdForEventId(eventId: string): Promise { + return this.events.get(eventId)?.streamId; + } + /** * Replays events that occurred after a specific event ID * Implements EventStore.replayEventsAfter diff --git a/packages/middleware/node/test/streamableHttp.test.ts b/packages/middleware/node/test/streamableHttp.test.ts index c427aa2eea..892c23f375 100644 --- a/packages/middleware/node/test/streamableHttp.test.ts +++ b/packages/middleware/node/test/streamableHttp.test.ts @@ -1377,6 +1377,12 @@ describe('Zod v4', () => { return eventId; }, + // Required for replay ownership: see WebStandardStreamableHTTPServerTransport.replayEvents. + async getStreamIdForEventId(eventId: string): Promise { + const sep = eventId.lastIndexOf('_'); + return sep > 0 ? eventId.slice(0, sep) : undefined; + }, + async replayEventsAfter( lastEventId: EventId, { @@ -1594,6 +1600,48 @@ describe('Zod v4', () => { expect(allText).toContain('Missed notification 2'); expect(allText).toContain('Missed notification 3'); }); + + it('rejects Last-Event-ID replay for a stream issued by a different transport instance (cross-session IDOR)', async () => { + // Obtain an event id from this transport's standalone GET stream. + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { Accept: 'text/event-stream', 'mcp-session-id': sessionId, 'mcp-protocol-version': '2025-11-25' } + }); + expect(sseResponse.status).toBe(200); + const reader = sseResponse.body?.getReader(); + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'victim session payload' }); + const text = new TextDecoder().decode((await reader!.read()).value); + const victimEventId = text.match(/id: ([^\n]+)/)![1]!; + await reader!.cancel(); + + // A second transport instance shares the SAME eventStore. It opens its own GET + // stream first (so its _issuedStreamIds is non-empty) to prove the check is + // per-instance-id, not just "has any stream". + const other = await createTestServer({ sessionIdGenerator: () => randomUUID(), eventStore }); + try { + const otherInit = await sendPostRequest(other.baseUrl, TEST_MESSAGES.initialize); + const otherSid = otherInit.headers.get('mcp-session-id') as string; + const otherSse = await fetch(other.baseUrl, { + method: 'GET', + headers: { Accept: 'text/event-stream', 'mcp-session-id': otherSid, 'mcp-protocol-version': '2025-11-25' } + }); + expect(otherSse.status).toBe(200); + await otherSse.body?.getReader().cancel(); + + const attack = await fetch(other.baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': otherSid, + 'mcp-protocol-version': '2025-11-25', + 'last-event-id': victimEventId + } + }); + expect(attack.status).toBe(403); + } finally { + await stopTestServer({ server: other.server, transport: other.transport }); + } + }); }); // Test stateless mode diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fd3563a077..14bab0e48f 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -38,8 +38,8 @@ export interface EventStore { * @param eventId The event ID to look up * @returns The stream ID, or `undefined` if not found * - * Optional: If not provided, the SDK will use the `streamId` returned by - * {@linkcode replayEventsAfter} for stream mapping. + * Required for `Last-Event-ID` resumption: the SDK uses this to verify the requesting + * caller owns the stream BEFORE replaying. If omitted, replay returns 403 (fail-closed). */ getStreamIdForEventId?(eventId: EventId): Promise; @@ -229,9 +229,25 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _streamMapping: Map = new Map(); private _requestToStreamMapping: Map = new Map(); private _requestResponseMap: Map = new Map(); + /** + * StreamIds this transport instance has minted (for SSE replay ownership; see + * {@linkcode replayEvents}). Bounded; oldest entries evicted FIFO. Kept across the + * source POST stream's close so a Last-Event-ID resumption can still verify ownership. + */ + private _issuedStreamIds = new Set(); + private static readonly _MAX_ISSUED_STREAM_IDS = 256; private _initialized: boolean = false; private _enableJsonResponse: boolean = false; - private _standaloneSseStreamId: string = '_GET_stream'; + /** + * Per-instance standalone-GET stream id. Suffixed so two transports sharing one + * EventStore cannot satisfy each other's `_issuedStreamIds` ownership check. + * Lazy: minted on first use (inside a request handler) so Cloudflare Workers does + * not reject `crypto.randomUUID()` at construction-in-global-scope. + */ + private __standaloneSseStreamId: string | undefined; + private get _standaloneSseStreamId(): string { + return (this.__standaloneSseStreamId ??= `_GET_stream:${this.sessionId ?? crypto.randomUUID()}`); + } private _eventStore?: EventStore; private _onsessioninitialized?: ((sessionId: string) => void | Promise) | undefined; private _onsessionclosed?: ((sessionId: string) => void | Promise) | undefined; @@ -461,6 +477,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { headers['mcp-session-id'] = this.sessionId; } + // Standalone GET stream is implicitly owned by this transport instance. + this._addIssuedStreamId(this._standaloneSseStreamId); + // Store the stream mapping with the controller for pushing data this._streamMapping.set(this._standaloneSseStreamId, { controller: streamController!, @@ -489,22 +508,33 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } try { - // If getStreamIdForEventId is available, use it for conflict checking - let streamId: string | undefined; - if (this._eventStore.getStreamIdForEventId) { - streamId = await this._eventStore.getStreamIdForEventId(lastEventId); - - if (!streamId) { - this.onerror?.(new Error('Invalid event ID format')); - return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format'); - } - - // Check conflict with the SAME streamId we'll use for mapping - if (this._streamMapping.get(streamId) !== undefined) { - this.onerror?.(new Error('Conflict: Stream already has an active connection')); - return this.createJsonErrorResponse(409, -32_000, 'Conflict: Stream already has an active connection'); - } + // Ownership check: a Last-Event-ID may only replay a stream this transport + // instance minted. Without this, a caller can replay any stream the (often + // shared) event store holds, leaking cross-session response bodies. The check + // requires getStreamIdForEventId so the stream identity is known BEFORE + // replayEventsAfter starts writing; event stores that do not implement it + // cannot support secure replay (fail-closed with 403). + if (!this._eventStore.getStreamIdForEventId) { + return this.createJsonErrorResponse( + 403, + -32_000, + 'Forbidden: event store does not support session-scoped replay (getStreamIdForEventId required)' + ); + } + const streamId = await this._eventStore.getStreamIdForEventId(lastEventId); + if (!streamId) { + this.onerror?.(new Error('Invalid event ID format')); + return this.createJsonErrorResponse(404, -32_001, 'Event not found'); } + if (!this._issuedStreamIds.has(streamId)) { + return this.createJsonErrorResponse(403, -32_000, 'Forbidden: event ID does not belong to this session'); + } + + // Tear down a stale stream entry for this id (e.g. server-side controller from a + // GET stream the client disconnected from). Reconnect-with-Last-Event-ID + // semantically replaces the prior stream; the prior code path (before + // getStreamIdForEventId became required) overwrote the mapping anyway. + this._streamMapping.get(streamId)?.cleanup(); const headers: Record = { 'Content-Type': 'text/event-stream', @@ -713,6 +743,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // The default behavior is to use SSE streaming // but in some cases server will return JSON responses const streamId = crypto.randomUUID(); + this._addIssuedStreamId(streamId); // Extract protocol version for priming event decision. // For initialize requests, get from request params. @@ -840,6 +871,19 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { return new Response(null, { status: 200 }); } + /** Record a streamId this transport instance issued, evicting the oldest non-standalone id if at the cap. */ + private _addIssuedStreamId(streamId: string): void { + if (this._issuedStreamIds.size >= WebStandardStreamableHTTPServerTransport._MAX_ISSUED_STREAM_IDS) { + for (const id of this._issuedStreamIds) { + if (id !== this._standaloneSseStreamId) { + this._issuedStreamIds.delete(id); + break; + } + } + } + this._issuedStreamIds.add(streamId); + } + /** * Validates session ID for non-initialization requests. * Returns `Response` error if invalid, `undefined` otherwise diff --git a/test/conformance/src/everythingServer.ts b/test/conformance/src/everythingServer.ts index f3925aeea8..4ab56ed572 100644 --- a/test/conformance/src/everythingServer.ts +++ b/test/conformance/src/everythingServer.ts @@ -36,6 +36,9 @@ function createEventStore(): EventStore { eventStoreData.set(eventId, { eventId, message, streamId }); return eventId; }, + async getStreamIdForEventId(eventId: EventId): Promise { + return eventStoreData.get(eventId)?.streamId; + }, async replayEventsAfter( lastEventId: EventId, { send }: { send: (eventId: EventId, message: unknown) => Promise } diff --git a/test/integration/test/taskResumability.test.ts b/test/integration/test/taskResumability.test.ts index f7b4174d18..2eeba0ad2e 100644 --- a/test/integration/test/taskResumability.test.ts +++ b/test/integration/test/taskResumability.test.ts @@ -21,6 +21,11 @@ class InMemoryEventStore implements EventStore { return eventId; } + /** Required for replay ownership: see WebStandardStreamableHTTPServerTransport.replayEvents. */ + async getStreamIdForEventId(eventId: string): Promise { + return this.events.get(eventId)?.streamId; + } + async replayEventsAfter( lastEventId: string, { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } From 4b8b389810540ccdae9f2558a203484bf84e327f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 15:32:17 +0000 Subject: [PATCH 2/3] fix(streamableHttp): register replay connection under ownership-verified streamId, not replayEventsAfter return --- packages/server/src/server/streamableHttp.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 14bab0e48f..28d40e70bd 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -560,8 +560,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } }); - // Replay events - returns the streamId for backwards compatibility - const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { + // Replay events. The returned stream id is ignored: `streamId` (from + // getStreamIdForEventId above) is the ownership-verified key and is what `send()` + // looks up; using the replay return value risks a divergent EventStore registering + // the new connection under a key that nothing reads. + await this._eventStore.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { const success = this.writeSSEEvent(streamController!, encoder, message, eventId); if (!success) { @@ -574,11 +577,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } }); - this._streamMapping.set(replayedStreamId, { + this._streamMapping.set(streamId, { controller: streamController!, encoder, cleanup: () => { - this._streamMapping.delete(replayedStreamId); + this._streamMapping.delete(streamId); try { streamController!.close(); } catch { From 60244304452ebd134d8bf4adc76de2855214e7f2 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 12 May 2026 17:14:12 +0000 Subject: [PATCH 3/3] fix(streamableHttp): unknown Last-Event-ID returns 400, not 404 (404 means session terminated per spec; this branch runs for a live session) --- packages/server/src/server/streamableHttp.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 28d40e70bd..850e920262 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -523,8 +523,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } const streamId = await this._eventStore.getStreamIdForEventId(lastEventId); if (!streamId) { - this.onerror?.(new Error('Invalid event ID format')); - return this.createJsonErrorResponse(404, -32_001, 'Event not found'); + // 400, not 404: per the Streamable HTTP spec a 404 on a request with + // Mcp-Session-Id signals "session terminated, reinitialize". This branch + // runs for a live session whose event store does not (or no longer) have + // that event id; clients should retry without Last-Event-ID, not abandon + // the session. + this.onerror?.(new Error(`Unknown Last-Event-ID '${lastEventId}'`)); + return this.createJsonErrorResponse(400, -32_000, 'Unknown or expired Last-Event-ID'); } if (!this._issuedStreamIds.has(streamId)) { return this.createJsonErrorResponse(403, -32_000, 'Forbidden: event ID does not belong to this session');