Skip to content

Commit bfe2be0

Browse files
committed
fix: buffer stdio server writes during progress notifications
1 parent e8e6484 commit bfe2be0

2 files changed

Lines changed: 43 additions & 2 deletions

File tree

src/mcp/server/stdio.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.
4444
stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8"))
4545

4646
read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)
47-
write_stream, write_stream_reader = create_context_streams[SessionMessage](0)
47+
# Let a handler queue its final response while stdout_writer is still
48+
# flushing an earlier notification, such as progress.
49+
write_stream, write_stream_reader = create_context_streams[SessionMessage](1)
4850

4951
async def stdin_reader():
5052
try:

tests/server/test_stdio.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
import io
22
import sys
33
from io import TextIOWrapper
4+
from typing import cast
45

56
import anyio
67
import pytest
78

89
from mcp.server.stdio import stdio_server
910
from mcp.shared.message import SessionMessage
10-
from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter
11+
from mcp.types import JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter
12+
13+
14+
class BlockingStdout:
15+
def __init__(self) -> None:
16+
self.lines: list[str] = []
17+
self.write_started = anyio.Event()
18+
self.release_write = anyio.Event()
19+
20+
async def write(self, text: str) -> None:
21+
self.lines.append(text)
22+
self.write_started.set()
23+
await self.release_write.wait()
24+
25+
async def flush(self) -> None:
26+
pass
1127

1228

1329
@pytest.mark.anyio
@@ -92,3 +108,26 @@ async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch):
92108
second = await read_stream.receive()
93109
assert isinstance(second, SessionMessage)
94110
assert second.message == valid
111+
112+
113+
@pytest.mark.anyio
114+
async def test_stdio_server_write_stream_allows_response_after_slow_notification():
115+
"""A slow stdout write for a notification must not block the next response."""
116+
stdout = BlockingStdout()
117+
typed_stdout = cast(anyio.AsyncFile[str], stdout)
118+
119+
async with stdio_server(stdin=anyio.AsyncFile(io.StringIO()), stdout=typed_stdout) as (
120+
read_stream,
121+
write_stream,
122+
):
123+
notification = JSONRPCNotification(jsonrpc="2.0", method="notifications/progress")
124+
await write_stream.send(SessionMessage(notification))
125+
await stdout.write_started.wait()
126+
127+
with anyio.move_on_after(0.1) as scope:
128+
await write_stream.send(SessionMessage(JSONRPCResponse(jsonrpc="2.0", id=2, result={})))
129+
130+
assert not scope.cancel_called
131+
stdout.release_write.set()
132+
await write_stream.aclose()
133+
await read_stream.aclose()

0 commit comments

Comments
 (0)