|
1 | 1 | """Tests for StreamableHTTPSessionManager.""" |
2 | 2 |
|
3 | 3 | import json |
| 4 | +from http import HTTPStatus |
4 | 5 | from typing import Any |
5 | 6 | from unittest.mock import AsyncMock, patch |
6 | 7 |
|
7 | 8 | import anyio |
8 | 9 | import pytest |
| 10 | +from starlette.requests import Request |
9 | 11 | from starlette.types import Message |
10 | 12 |
|
11 | 13 | from mcp.server import streamable_http_manager |
12 | 14 | from mcp.server.lowlevel import Server |
13 | | -from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport |
| 15 | +from mcp.server.streamable_http import ( |
| 16 | + MCP_SESSION_ID_HEADER, |
| 17 | + EventMessage, |
| 18 | + StreamableHTTPServerTransport, |
| 19 | +) |
14 | 20 | from mcp.server.streamable_http_manager import StreamableHTTPSessionManager |
| 21 | +from mcp.shared.message import SessionMessage |
15 | 22 | from mcp.types import INVALID_REQUEST |
16 | 23 |
|
17 | 24 |
|
@@ -390,3 +397,78 @@ def test_session_idle_timeout_rejects_non_positive(): |
390 | 397 | def test_session_idle_timeout_rejects_stateless(): |
391 | 398 | with pytest.raises(RuntimeError, match="not supported in stateless"): |
392 | 399 | StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) |
| 400 | + |
| 401 | + |
| 402 | +@pytest.mark.anyio |
| 403 | +async def test_handle_post_rejects_duplicate_request_id(): |
| 404 | + """Reject a POST whose JSON-RPC id matches an in-flight request on the same session. |
| 405 | +
|
| 406 | + The MCP base protocol forbids reusing a request ID within a session. Prior to the |
| 407 | + fix, the second POST silently overwrote the prior ``_request_streams`` entry, |
| 408 | + leaving the first request hanging forever. Now the duplicate is rejected with |
| 409 | + 409 Conflict and the prior in-flight stream is preserved untouched. |
| 410 | + """ |
| 411 | + transport = StreamableHTTPServerTransport(mcp_session_id=None) |
| 412 | + |
| 413 | + # The early ``writer is None`` guard reads this; the duplicate-id branch never |
| 414 | + # actually sends on it, so a real stream is sufficient. |
| 415 | + read_writer, read_reader = anyio.create_memory_object_stream[SessionMessage | Exception](0) |
| 416 | + transport._read_stream_writer = read_writer |
| 417 | + |
| 418 | + # Seed an in-flight request with id "1". The duplicate-id check must leave this |
| 419 | + # pair in place. |
| 420 | + in_flight_pair = anyio.create_memory_object_stream[EventMessage](0) |
| 421 | + transport._request_streams["1"] = in_flight_pair |
| 422 | + |
| 423 | + body = json.dumps({"jsonrpc": "2.0", "method": "tools/list", "id": 1, "params": {}}).encode() |
| 424 | + |
| 425 | + body_sent = False |
| 426 | + |
| 427 | + async def mock_receive() -> Message: |
| 428 | + nonlocal body_sent |
| 429 | + if body_sent: # pragma: no cover |
| 430 | + await anyio.sleep_forever() |
| 431 | + body_sent = True |
| 432 | + return {"type": "http.request", "body": body, "more_body": False} |
| 433 | + |
| 434 | + sent_messages: list[Message] = [] |
| 435 | + response_body = b"" |
| 436 | + |
| 437 | + async def mock_send(message: Message) -> None: |
| 438 | + nonlocal response_body |
| 439 | + sent_messages.append(message) |
| 440 | + if message["type"] == "http.response.body": |
| 441 | + response_body += message.get("body", b"") |
| 442 | + |
| 443 | + scope = { |
| 444 | + "type": "http", |
| 445 | + "method": "POST", |
| 446 | + "path": "/mcp", |
| 447 | + "headers": [ |
| 448 | + (b"content-type", b"application/json"), |
| 449 | + (b"accept", b"application/json, text/event-stream"), |
| 450 | + ], |
| 451 | + } |
| 452 | + |
| 453 | + request = Request(scope, mock_receive) |
| 454 | + await transport._handle_post_request(scope, request, mock_receive, mock_send) |
| 455 | + |
| 456 | + response_start = next( |
| 457 | + (msg for msg in sent_messages if msg["type"] == "http.response.start"), |
| 458 | + None, |
| 459 | + ) |
| 460 | + assert response_start is not None, "Should have sent a response" |
| 461 | + assert response_start["status"] == HTTPStatus.CONFLICT |
| 462 | + |
| 463 | + error = json.loads(response_body) |
| 464 | + assert error["jsonrpc"] == "2.0" |
| 465 | + assert error["error"]["code"] == INVALID_REQUEST |
| 466 | + assert "already in flight" in error["error"]["message"] |
| 467 | + |
| 468 | + # The pre-existing in-flight stream must remain untouched. |
| 469 | + assert transport._request_streams["1"] is in_flight_pair |
| 470 | + |
| 471 | + in_flight_pair[0].close() |
| 472 | + in_flight_pair[1].close() |
| 473 | + read_writer.close() |
| 474 | + read_reader.close() |
0 commit comments