diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 71c366e83..64a160cf7 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -137,33 +137,54 @@ public async Task HandleGetRequestAsync(Stream sseResponseStream, CancellationTo throw new InvalidOperationException("GET requests are not supported in stateless mode."); } - using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false)) + try { - if (_getHttpRequestStarted) + using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false)) { - throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session."); - } + if (_getHttpRequestStarted) + { + throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session."); + } - _getHttpRequestStarted = true; - _httpSseWriter = new SseEventWriter(sseResponseStream); - _httpResponseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false); - if (_storeSseWriter is not null) - { - var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime(), cancellationToken).ConfigureAwait(false); - await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false); + _getHttpRequestStarted = true; + _httpSseWriter = new SseEventWriter(sseResponseStream); + _httpResponseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false); + if (_storeSseWriter is not null) + { + var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime(), cancellationToken).ConfigureAwait(false); + await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false); + } + else + { + // If there's no priming write, flush the stream to ensure HTTP response headers are + // sent to the client now that the transport is ready to accept messages via SendMessageAsync. + await sseResponseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + } } - else + + // Wait for the response to be written before returning from the handler. + // This keeps the HTTP response open until the final response message is sent. + await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + // Release the SseEventWriter's reference to the response stream promptly when the GET + // request ends, regardless of how it exits. Otherwise the response stream (and the + // underlying Kestrel connection and associated memory pool buffers) remains pinned + // in memory until the session itself is disposed (via explicit DELETE or idle timeout). + // Clients that disconnect without sending DELETE — common with long-lived SSE — would + // otherwise accumulate significant unmanaged memory per session during that interval. + using (await _unsolicitedMessageLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { - // If there's no priming write, flush the stream to ensure HTTP response headers are - // sent to the client now that the transport is ready to accept messages via SendMessageAsync. - await sseResponseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + _getHttpResponseCompleted = true; + if (_httpSseWriter is { } writer) + { + _httpSseWriter = null; + writer.Dispose(); + } } } - - // Wait for the response to be written before returning from the handler. - // This keeps the HTTP response open until the final response message is sent. - await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false); } /// @@ -219,7 +240,10 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can return; } - Debug.Assert(_httpSseWriter is not null); + // _httpSseWriter may be null here if the GET request has already ended (e.g. client + // disconnected). _getHttpResponseCompleted is set to true in that case, so the write + // below is correctly skipped. The event store writer (if configured) still captures + // the message so a reconnecting client can replay it via Last-Event-ID. Debug.Assert(_httpResponseTcs is not null); var item = SseItem.Message(message); @@ -229,13 +253,13 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false); } - if (!_getHttpResponseCompleted) + if (!_getHttpResponseCompleted && _httpSseWriter is { } writer) { // Only write the message to the response if the response has not completed. try { - await _httpSseWriter!.WriteAsync(item, cancellationToken).ConfigureAwait(false); + await writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { diff --git a/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs new file mode 100644 index 000000000..77f322b31 --- /dev/null +++ b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs @@ -0,0 +1,47 @@ +using ModelContextProtocol.Server; +using ModelContextProtocol.Tests.Utils; +using System.IO.Pipelines; +using System.Reflection; + +namespace ModelContextProtocol.Tests.Transport; + +public class StreamableHttpServerTransportTests(ITestOutputHelper testOutputHelper) : LoggedTest(testOutputHelper) +{ + [Fact] + public async Task HandleGetRequestAsync_ReleasesStreamReference_AfterRequestEnds() + { + await using var transport = new StreamableHttpServerTransport() + { + SessionId = "test-session", + }; + + var pipe = new Pipe(); + var responseStream = pipe.Writer.AsStream(); + + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(100)); + + try + { + await transport.HandleGetRequestAsync(responseStream, cts.Token); + } + catch (OperationCanceledException) + { + } + + // After the GET request handler returns, the transport must not retain a reference to the + // response stream via _httpSseWriter. Otherwise the Kestrel connection and its associated + // memory pool buffers (which can be ~20MiB per SSE session) stay pinned in unmanaged memory + // until the session is eventually disposed (via explicit DELETE or idle timeout), causing + // steady memory growth for servers whose clients disconnect without sending DELETE. + var httpSseWriterField = typeof(StreamableHttpServerTransport).GetField( + "_httpSseWriter", + BindingFlags.Instance | BindingFlags.NonPublic); + Assert.NotNull(httpSseWriterField); + var httpSseWriterValue = httpSseWriterField.GetValue(transport); + Assert.Null(httpSseWriterValue); + + await pipe.Reader.CompleteAsync(); + await pipe.Writer.CompleteAsync(); + } +}