Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se
return False
return True

async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None:
async def _handle_post_request( # noqa: C901, PLR0915
self, scope: Scope, request: Request, receive: Receive, send: Send
) -> None:
"""Handle POST requests containing JSON-RPC messages."""
writer = self._read_stream_writer
if writer is None: # pragma: no cover
Expand Down Expand Up @@ -531,7 +533,18 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
)

# Extract the request ID outside the try block for proper scope
request_id = str(message.root.id) # pragma: no cover
request_id = str(message.root.id)
# The MCP base protocol requires that "the request ID MUST NOT have been previously
# used by the requestor within the same session". If a client violates this, the
# prior stream would be silently overwritten and the in-flight request would hang,
# so reject the duplicate and leave the existing request untouched.
if request_id in self._request_streams:
response = self._create_error_response(
f"Conflict: request id {request_id!r} is already in flight on this session",
HTTPStatus.CONFLICT,
)
await response(scope, receive, send)
return
# Register this stream for the request ID
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0) # pragma: no cover
request_stream_reader = self._request_streams[request_id][1] # pragma: no cover
Expand Down
84 changes: 83 additions & 1 deletion tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
"""Tests for StreamableHTTPSessionManager."""

import json
from http import HTTPStatus
from typing import Any
from unittest.mock import AsyncMock, patch

import anyio
import pytest
from starlette.requests import Request
from starlette.types import Message

from mcp.server import streamable_http_manager
from mcp.server.lowlevel import Server
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport
from mcp.server.streamable_http import (
MCP_SESSION_ID_HEADER,
EventMessage,
StreamableHTTPServerTransport,
)
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.shared.message import SessionMessage
from mcp.types import INVALID_REQUEST


Expand Down Expand Up @@ -390,3 +397,78 @@ def test_session_idle_timeout_rejects_non_positive():
def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)


@pytest.mark.anyio
async def test_handle_post_rejects_duplicate_request_id():
"""Reject a POST whose JSON-RPC id matches an in-flight request on the same session.

The MCP base protocol forbids reusing a request ID within a session. Prior to the
fix, the second POST silently overwrote the prior ``_request_streams`` entry,
leaving the first request hanging forever. Now the duplicate is rejected with
409 Conflict and the prior in-flight stream is preserved untouched.
"""
transport = StreamableHTTPServerTransport(mcp_session_id=None)

# The early ``writer is None`` guard reads this; the duplicate-id branch never
# actually sends on it, so a real stream is sufficient.
read_writer, read_reader = anyio.create_memory_object_stream[SessionMessage | Exception](0)
transport._read_stream_writer = read_writer

# Seed an in-flight request with id "1". The duplicate-id check must leave this
# pair in place.
in_flight_pair = anyio.create_memory_object_stream[EventMessage](0)
transport._request_streams["1"] = in_flight_pair

body = json.dumps({"jsonrpc": "2.0", "method": "tools/list", "id": 1, "params": {}}).encode()

body_sent = False

async def mock_receive() -> Message:
nonlocal body_sent
if body_sent: # pragma: no cover
await anyio.sleep_forever()
body_sent = True
return {"type": "http.request", "body": body, "more_body": False}

sent_messages: list[Message] = []
response_body = b""

async def mock_send(message: Message) -> None:
nonlocal response_body
sent_messages.append(message)
if message["type"] == "http.response.body":
response_body += message.get("body", b"")

scope = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"accept", b"application/json, text/event-stream"),
],
}

request = Request(scope, mock_receive)
await transport._handle_post_request(scope, request, mock_receive, mock_send)

response_start = next(
(msg for msg in sent_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None, "Should have sent a response"
assert response_start["status"] == HTTPStatus.CONFLICT

error = json.loads(response_body)
assert error["jsonrpc"] == "2.0"
assert error["error"]["code"] == INVALID_REQUEST
assert "already in flight" in error["error"]["message"]

# The pre-existing in-flight stream must remain untouched.
assert transport._request_streams["1"] is in_flight_pair

in_flight_pair[0].close()
in_flight_pair[1].close()
read_writer.close()
read_reader.close()
Loading