From 033dba3a09af30cda29d56562bcdce1d76e00e75 Mon Sep 17 00:00:00 2001 From: karthik Date: Tue, 10 Feb 2026 14:42:45 -0500 Subject: [PATCH] fix: send CONNECTION_CLOSED error when SSE stream closes without event ID When an SSE stream closes without sending any events with IDs, _handle_sse_response silently returns instead of notifying the session. This leaves send_request waiting forever on a response that will never arrive. Send a JSONRPCError with CONNECTION_CLOSED to the read stream writer so the pending request fails with a clear error instead of hanging. Fixes #1811 Co-Authored-By: Claude Opus 4.6 --- src/mcp/client/streamable_http.py | 19 +++++++++-- tests/shared/test_streamable_http.py | 50 +++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9d45bec6e..f5a51ba6a 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -19,6 +19,7 @@ from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( + CONNECTION_CLOSED, INVALID_REQUEST, PARSE_ERROR, ErrorData, @@ -357,12 +358,24 @@ async def _handle_sse_response( await response.aclose() return # Normal completion, no reconnect needed except Exception: - logger.debug("SSE stream ended", exc_info=True) # pragma: no cover + logger.debug("SSE stream ended", exc_info=True) - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch + # Stream ended without response - reconnect if we have an event ID, + # otherwise notify the session that the connection is dead (#1811) + if last_event_id is not None: logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + else: + logger.warning("SSE stream closed without response or event ID, cannot reconnect") + error = JSONRPCError( + jsonrpc="2.0", + id=original_request_id, + error=ErrorData( + code=CONNECTION_CLOSED, + message="SSE stream closed without response", + ), + ) + await ctx.read_stream_writer.send(SessionMessage(error)) async def _handle_reconnection( self, diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index b04b92026..7f82574aa 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -50,7 +50,15 @@ ) from mcp.shared.message import ClientMessageMetadata, ServerMessageMetadata, SessionMessage from mcp.shared.session import RequestResponder -from mcp.types import InitializeResult, JSONRPCRequest, TextContent, TextResourceContents, Tool +from mcp.types import ( + CONNECTION_CLOSED, + InitializeResult, + JSONRPCError, + JSONRPCRequest, + TextContent, + TextResourceContents, + Tool, +) from tests.test_helpers import wait_for_server # Test constants @@ -2239,3 +2247,43 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_handle_sse_response_sends_error_when_stream_closes_without_event_id(): + """SSE stream closing without event ID sends CONNECTION_CLOSED error (#1811).""" + from mcp.client.streamable_http import RequestContext as _TransportRequestContext + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](1) + + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + ctx = _TransportRequestContext( + client=MagicMock(), + session_id=None, + session_message=SessionMessage(message=request), + metadata=None, + read_stream_writer=write_stream, + ) + + # Mock response whose SSE stream yields zero events (simulates abrupt close) + mock_response = MagicMock() + + async def _empty_aiter_lines(): # pragma: no cover + return + yield + + mock_response.aiter_lines = _empty_aiter_lines + + try: + await transport._handle_sse_response(mock_response, ctx) + + result = await read_stream.receive() + assert isinstance(result, SessionMessage) + assert isinstance(result.message, JSONRPCError) + assert result.message.error.code == CONNECTION_CLOSED + assert "SSE stream closed without response" in result.message.error.message + assert result.message.id == 1 + finally: + await write_stream.aclose() + await read_stream.aclose()