From a9cf0da0ab8f20ba69ec97b0f0dd37ab0e213d95 Mon Sep 17 00:00:00 2001 From: truffle Date: Thu, 21 May 2026 22:27:59 +0000 Subject: [PATCH] [v1.x] fix(streamable-http): reject duplicate JSON-RPC ids with 409 The MCP base protocol requires that a request ID "MUST NOT have been previously used by the requestor within the same session". Before this change a duplicate POST silently overwrote the prior _request_streams entry, leaving the original in-flight request hanging forever. Mirror the existing GET_STREAM_KEY collision branch and return 409 Conflict, keeping the prior stream untouched. Closes #2655 --- src/mcp/server/streamable_http.py | 17 +++- tests/server/test_streamable_http_manager.py | 84 +++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index c241e831a..1fda60659 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -443,7 +443,9 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se return False return True - async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: + async def _handle_post_request( # noqa: C901, PLR0915 + self, scope: Scope, request: Request, receive: Receive, send: Send + ) -> None: """Handle POST requests containing JSON-RPC messages.""" writer = self._read_stream_writer if writer is None: # pragma: no cover @@ -531,7 +533,18 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re ) # Extract the request ID outside the try block for proper scope - request_id = str(message.root.id) # pragma: no cover + request_id = str(message.root.id) + # The MCP base protocol requires that "the request ID MUST NOT have been previously + # used by the requestor within the same session". If a client violates this, the + # prior stream would be silently overwritten and the in-flight request would hang, + # so reject the duplicate and leave the existing request untouched. + if request_id in self._request_streams: + response = self._create_error_response( + f"Conflict: request id {request_id!r} is already in flight on this session", + HTTPStatus.CONFLICT, + ) + await response(scope, receive, send) + return # Register this stream for the request ID self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0) # pragma: no cover request_stream_reader = self._request_streams[request_id][1] # pragma: no cover diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 33bcb5f2a..712adb71d 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -1,17 +1,24 @@ """Tests for StreamableHTTPSessionManager.""" import json +from http import HTTPStatus from typing import Any from unittest.mock import AsyncMock, patch import anyio import pytest +from starlette.requests import Request from starlette.types import Message from mcp.server import streamable_http_manager from mcp.server.lowlevel import Server -from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport +from mcp.server.streamable_http import ( + MCP_SESSION_ID_HEADER, + EventMessage, + StreamableHTTPServerTransport, +) from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.shared.message import SessionMessage from mcp.types import INVALID_REQUEST @@ -390,3 +397,78 @@ def test_session_idle_timeout_rejects_non_positive(): def test_session_idle_timeout_rejects_stateless(): with pytest.raises(RuntimeError, match="not supported in stateless"): StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_handle_post_rejects_duplicate_request_id(): + """Reject a POST whose JSON-RPC id matches an in-flight request on the same session. + + The MCP base protocol forbids reusing a request ID within a session. Prior to the + fix, the second POST silently overwrote the prior ``_request_streams`` entry, + leaving the first request hanging forever. Now the duplicate is rejected with + 409 Conflict and the prior in-flight stream is preserved untouched. + """ + transport = StreamableHTTPServerTransport(mcp_session_id=None) + + # The early ``writer is None`` guard reads this; the duplicate-id branch never + # actually sends on it, so a real stream is sufficient. + read_writer, read_reader = anyio.create_memory_object_stream[SessionMessage | Exception](0) + transport._read_stream_writer = read_writer + + # Seed an in-flight request with id "1". The duplicate-id check must leave this + # pair in place. + in_flight_pair = anyio.create_memory_object_stream[EventMessage](0) + transport._request_streams["1"] = in_flight_pair + + body = json.dumps({"jsonrpc": "2.0", "method": "tools/list", "id": 1, "params": {}}).encode() + + body_sent = False + + async def mock_receive() -> Message: + nonlocal body_sent + if body_sent: # pragma: no cover + await anyio.sleep_forever() + body_sent = True + return {"type": "http.request", "body": body, "more_body": False} + + sent_messages: list[Message] = [] + response_body = b"" + + async def mock_send(message: Message) -> None: + nonlocal response_body + sent_messages.append(message) + if message["type"] == "http.response.body": + response_body += message.get("body", b"") + + scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"content-type", b"application/json"), + (b"accept", b"application/json, text/event-stream"), + ], + } + + request = Request(scope, mock_receive) + await transport._handle_post_request(scope, request, mock_receive, mock_send) + + response_start = next( + (msg for msg in sent_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None, "Should have sent a response" + assert response_start["status"] == HTTPStatus.CONFLICT + + error = json.loads(response_body) + assert error["jsonrpc"] == "2.0" + assert error["error"]["code"] == INVALID_REQUEST + assert "already in flight" in error["error"]["message"] + + # The pre-existing in-flight stream must remain untouched. + assert transport._request_streams["1"] is in_flight_pair + + in_flight_pair[0].close() + in_flight_pair[1].close() + read_writer.close() + read_reader.close()