Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def __init__(
] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
self._terminated = False
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None

@property
def is_terminated(self) -> bool:
Expand Down
49 changes: 42 additions & 7 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class StreamableHTTPSessionManager:
2. Resumability via an optional event store
3. Connection management and lifecycle
4. Request handling and transport setup
5. Idle session cleanup via optional timeout

Important: Only one StreamableHTTPSessionManager instance should be created
per application. The instance cannot be reused after its run() context has
Expand All @@ -56,6 +57,14 @@ class StreamableHTTPSessionManager:
security_settings: Optional transport security settings.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
retry field. Used for SSE polling behavior.
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
If set, sessions that receive no HTTP requests for this
duration will be automatically terminated and removed.
When retry_interval is also configured, ensure the idle
timeout comfortably exceeds the retry interval to avoid
reaping sessions during normal SSE polling gaps.
Default is None (no timeout). A value of 1800
(30 minutes) is recommended for most deployments.
"""

def __init__(
Expand All @@ -66,13 +75,20 @@ def __init__(
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
):
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise ValueError("session_idle_timeout is not supported in stateless mode")

self.app = app
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings
self.retry_interval = retry_interval
self.session_idle_timeout = session_idle_timeout

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
Expand Down Expand Up @@ -184,6 +200,9 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
transport = self._server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
# Push back idle deadline on activity
if transport.idle_scope is not None and self.session_idle_timeout is not None:
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
await transport.handle_request(scope, receive, send)
return

Expand All @@ -210,16 +229,32 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
read_stream, write_stream = streams
task_status.started()
try:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False, # Stateful mode
)
# Use a cancel scope for idle timeout — when the
# deadline passes the scope cancels app.run() and
# execution continues after the ``with`` block.
# Incoming requests push the deadline forward.
idle_scope = anyio.CancelScope()
if self.session_idle_timeout is not None:
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
http_transport.idle_scope = idle_scope

with idle_scope:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False,
)

if idle_scope.cancelled_caught:
session_id = http_transport.mcp_session_id
logger.info(f"Session {session_id} idle timeout")
if session_id is not None: # pragma: no branch
self._server_instances.pop(session_id, None)
await http_transport.terminate()
except Exception:
logger.exception(f"Session {http_transport.mcp_session_id} crashed")
finally:
# Only remove from instances if not terminated
if ( # pragma: no branch
http_transport.mcp_session_id
and http_transport.mcp_session_id in self._server_instances
Expand Down
267 changes: 267 additions & 0 deletions tests/issues/test_1283_idle_session_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up.

Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager
persist indefinitely in ``_server_instances`` even after the client disconnects.
Over time this leaks memory.

The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows
the manager to automatically terminate and remove sessions that have been idle for
longer than the configured duration.
"""

import time
from collections.abc import Callable, Coroutine
from typing import Any

import anyio
import pytest
from starlette.types import Message, Scope

from mcp.server.lowlevel import Server
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager


def _make_scope() -> Scope:
return {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [(b"content-type", b"application/json")],
}


async def _mock_receive() -> Message: # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}


def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]:
async def mock_send(message: Message) -> None:
sent.append(message)

return mock_send


def _extract_session_id(sent_messages: list[Message]) -> str:
for msg in sent_messages:
if msg["type"] == "http.response.start": # pragma: no branch
for name, value in msg.get("headers", []): # pragma: no branch
if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch
return value.decode()
raise AssertionError("Session ID not found in response headers") # pragma: no cover


@pytest.mark.anyio
async def test_idle_session_is_reaped():
"""Session should be removed from _server_instances after idle timeout."""
app = Server("test-idle-reap")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)

assert session_id in manager._server_instances

# Wait for the cancel scope deadline to fire
await anyio.sleep(0.4)

assert session_id not in manager._server_instances


@pytest.mark.anyio
async def test_activity_resets_idle_timer():
"""Requests during the timeout window should prevent the session from being reaped."""
app = Server("test-idle-reset")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)

# Simulate ongoing activity by pushing back the idle scope deadline
transport = manager._server_instances[session_id]
assert transport.idle_scope is not None
for _ in range(4):
await anyio.sleep(0.1)
transport.idle_scope.deadline = anyio.current_time() + 0.3

# Session should still be alive because we kept it active
assert session_id in manager._server_instances

# Now stop activity and let the timeout expire
await anyio.sleep(0.6)

assert session_id not in manager._server_instances


@pytest.mark.anyio
async def test_multiple_sessions_reaped_independently():
"""Each session tracks its own idle timeout independently."""
app = Server("test-multi-idle")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15)

async with manager.run():
sent1: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1))
session_id_1 = _extract_session_id(sent1)

await anyio.sleep(0.05)
sent2: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2))
session_id_2 = _extract_session_id(sent2)

assert session_id_1 in manager._server_instances
assert session_id_2 in manager._server_instances

# After enough time, both should be reaped
await anyio.sleep(0.4)

assert session_id_1 not in manager._server_instances
assert session_id_2 not in manager._server_instances


def test_session_idle_timeout_rejects_negative():
"""session_idle_timeout must be a positive number."""
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)


def test_session_idle_timeout_rejects_zero():
"""session_idle_timeout must be a positive number."""
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0)


def test_session_idle_timeout_rejects_stateless():
"""session_idle_timeout is not supported in stateless mode."""
with pytest.raises(ValueError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)


@pytest.mark.anyio
async def test_terminate_idempotency():
"""Calling terminate() multiple times should be safe."""
transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent")

async with transport.connect():
await transport.terminate()
assert transport.is_terminated

# Second call should be a no-op (no exception)
await transport.terminate()
assert transport.is_terminated


@pytest.mark.anyio
async def test_no_idle_timeout_sessions_persist():
"""When session_idle_timeout is None (default), sessions persist indefinitely."""
app = Server("test-no-timeout")
manager = StreamableHTTPSessionManager(app=app)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)

await anyio.sleep(0.3)
assert session_id in manager._server_instances


@pytest.mark.anyio
async def test_run_server_exits_promptly_after_idle_timeout():
"""The run_server task must exit shortly after the idle timeout fires."""
app = Server("test-lifecycle")

task_exited = anyio.Event()
exit_timestamp: list[float] = []
original_run = app.run

async def instrumented_run(*args: Any, **kwargs: Any) -> None:
try:
await original_run(*args, **kwargs)
finally:
exit_timestamp.append(time.monotonic())
task_exited.set()

app.run = instrumented_run # type: ignore[assignment]

idle_timeout = 0.5
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)
assert session_id in manager._server_instances

pre_reap_time = time.monotonic()

with anyio.fail_after(idle_timeout * 4):
await task_exited.wait()

assert len(exit_timestamp) == 1
total_elapsed = exit_timestamp[0] - pre_reap_time
assert total_elapsed < idle_timeout * 3, (
f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s"
)
assert session_id not in manager._server_instances


@pytest.mark.anyio
async def test_run_server_finally_block_runs_after_terminate():
"""Verify that the finally block in run_server executes after terminate()."""
app = Server("test-finally")

lifecycle_events: list[str] = []
original_run = app.run

async def instrumented_run(*args: Any, **kwargs: Any) -> None:
lifecycle_events.append("run_entered")
try:
await original_run(*args, **kwargs)
finally:
lifecycle_events.append("run_exited")

app.run = instrumented_run # type: ignore[assignment]

manager = StreamableHTTPSessionManager(app=app)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)
transport = manager._server_instances[session_id]

assert "run_entered" in lifecycle_events
assert "run_exited" not in lifecycle_events

await transport.terminate()

with anyio.fail_after(3.0):
while "run_exited" not in lifecycle_events:
await anyio.sleep(0.01)

assert "run_exited" in lifecycle_events


@pytest.mark.anyio
async def test_idle_timeout_end_to_end():
"""End-to-end: idle timeout causes session cleanup with a real Server."""
app = Server("test-e2e")
idle_timeout = 0.3
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout)

async with manager.run():
sent: list[Message] = []
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
session_id = _extract_session_id(sent)
assert session_id in manager._server_instances

with anyio.fail_after(idle_timeout + 1.0):
while session_id in manager._server_instances:
await anyio.sleep(0.05)

assert session_id not in manager._server_instances
Loading