diff --git a/.autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json b/.autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json new file mode 100644 index 000000000..9ad5afe6e --- /dev/null +++ b/.autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.RuntimeSupport", + "Type": "Minor", + "ChangelogMessages": [ + "Add response streaming support" + ] + } + ] +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs index 0e00f3e7f..6241fb61f 100644 --- a/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs @@ -349,6 +349,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul _logger.LogInformation("Starting InvokeOnceAsync"); var invocation = await Client.GetNextInvocationAsync(cancellationToken); + var isMultiConcurrency = Utils.IsUsingMultiConcurrency(_environmentVariables); Func processingFunc = async () => { @@ -358,6 +359,17 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul SetInvocationTraceId(impl.RuntimeApiHeaders.TraceId); } + // Initialize ResponseStreamFactory — includes RuntimeApiClient reference + var runtimeApiClient = Client as RuntimeApiClient; + if (runtimeApiClient != null) + { + LambdaResponseStreamFactory.InitializeInvocation( + invocation.LambdaContext.AwsRequestId, + isMultiConcurrency, + runtimeApiClient, + cancellationToken); + } + try { InvocationResponse response = null; @@ -372,15 +384,39 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul catch (Exception exception) { WriteUnhandledExceptionToLog(exception); - await Client.ReportInvocationErrorAsync(invocation.LambdaContext.AwsRequestId, exception, cancellationToken); + + var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency); + if (streamIfCreated != null && streamIfCreated.BytesWritten > 0) + { + // Midstream error — report via trailers on the already-open HTTP connection + await streamIfCreated.ReportErrorAsync(exception); + } + else + { + // Error before streaming started — use standard error reporting + await Client.ReportInvocationErrorAsync(invocation.LambdaContext.AwsRequestId, exception, cancellationToken); + } } finally { _logger.LogInformation("Finished invoking handler"); } - if (invokeSucceeded) + // If streaming was started, await the HTTP send task to ensure it completes + var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency); + if (sendTask != null) { + var stream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency); + if (stream != null && !stream.IsCompleted && !stream.HasError) + { + // Handler returned successfully — signal stream completion + stream.MarkCompleted(); + } + await sendTask; // Wait for HTTP request to finish + } + else if (invokeSucceeded) + { + // No streaming — send buffered response _logger.LogInformation("Starting sending response"); try { @@ -415,6 +451,10 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul } finally { + if (runtimeApiClient != null) + { + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency); + } invocation.Dispose(); } }; diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ILambdaResponseStream.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ILambdaResponseStream.cs new file mode 100644 index 000000000..d3565fdbc --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ILambdaResponseStream.cs @@ -0,0 +1,73 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// Interface for writing streaming responses in AWS Lambda functions. + /// Obtained by calling within a handler. + /// + public interface ILambdaResponseStream : IDisposable + { + /// + /// Asynchronously writes a byte array to the response stream. + /// + /// The byte array to write. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has been reported. + Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default); + + /// + /// Asynchronously writes a portion of a byte array to the response stream. + /// + /// The byte array containing data to write. + /// The zero-based byte offset in buffer at which to begin copying bytes. + /// The number of bytes to write. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has been reported. + Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default); + + /// + /// Reports an error that occurred during streaming. + /// This will send error information via HTTP trailing headers. + /// + /// The exception to report. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has already been reported. + Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default); + + /// + /// Gets the total number of bytes written to the stream so far. + /// + long BytesWritten { get; } + + /// + /// Gets whether the stream has been completed. + /// + bool IsCompleted { get; } + + /// + /// Gets whether an error has been reported. + /// + bool HasError { get; } + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs new file mode 100644 index 000000000..7830c81b4 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs @@ -0,0 +1,203 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// A write-only, non-seekable subclass that streams response data + /// to the Lambda Runtime API. Returned by . + /// + public partial class LambdaResponseStream : Stream, ILambdaResponseStream + { + private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n"); + + private long _bytesWritten; + private bool _isCompleted; + private bool _hasError; + private Exception _reportedError; + private readonly object _lock = new object(); + + // The live HTTP output stream, set by StreamingHttpContent when SerializeToStreamAsync is called. + private Stream _httpOutputStream; + private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1); + private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1); + + /// + /// The number of bytes written to the Lambda response stream so far. + /// + public long BytesWritten => _bytesWritten; + + /// + /// Gets a value indicating whether the operation has completed. + /// + public bool IsCompleted => _isCompleted; + + /// + /// Gets a value indicating whether an error has occurred. + /// + public bool HasError => _hasError; + + + internal Exception ReportedError => _reportedError; + + internal LambdaResponseStream() + { + } + + /// + /// Called by StreamingHttpContent.SerializeToStreamAsync to provide the HTTP output stream. + /// + internal void SetHttpOutputStream(Stream httpOutputStream) + { + _httpOutputStream = httpOutputStream; + _httpStreamReady.Release(); + } + + /// + /// Called by StreamingHttpContent.SerializeToStreamAsync to wait until the handler + /// finishes writing (MarkCompleted or ReportErrorAsync). + /// + internal async Task WaitForCompletionAsync() + { + await _completionSignal.WaitAsync(); + } + + /// + /// Asynchronously writes a byte array to the response stream. + /// + /// The byte array to write. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has been reported. + public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default) + { + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + await WriteAsync(buffer, 0, buffer.Length, cancellationToken); + } + + /// + /// Asynchronously writes a portion of a byte array to the response stream. + /// + /// The byte array containing data to write. + /// The zero-based byte offset in buffer at which to begin copying bytes. + /// The number of bytes to write. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has been reported. + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) + { + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + if (offset < 0 || offset > buffer.Length) + throw new ArgumentOutOfRangeException(nameof(offset)); + if (count < 0 || offset + count > buffer.Length) + throw new ArgumentOutOfRangeException(nameof(count)); + + // Wait for the HTTP stream to be ready (first write only blocks) + await _httpStreamReady.WaitAsync(cancellationToken); + try + { + lock (_lock) + { + ThrowIfCompletedOrError(); + _bytesWritten += count; + } + + // Write chunk directly to the HTTP stream: size(hex) + CRLF + data + CRLF + var chunkSizeHex = count.ToString("X"); + var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex); + await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken); + await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken); + await _httpOutputStream.WriteAsync(buffer, offset, count, cancellationToken); + await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken); + await _httpOutputStream.FlushAsync(cancellationToken); + } + finally + { + // Re-release so subsequent writes don't block + _httpStreamReady.Release(); + } + } + + /// + /// Reports an error that occurred during streaming. + /// This will send error information via HTTP trailing headers. + /// + /// The exception to report. + /// Optional cancellation token. + /// A task representing the asynchronous operation. + /// Thrown if the stream is already completed or an error has already been reported. + public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default) + { + if (exception == null) + throw new ArgumentNullException(nameof(exception)); + + lock (_lock) + { + if (_isCompleted) + throw new InvalidOperationException("Cannot report an error after the stream has been completed."); + if (_hasError) + throw new InvalidOperationException("An error has already been reported for this stream."); + + _hasError = true; + _reportedError = exception; + } + + // Signal completion so StreamingHttpContent can write error trailers and finish + _completionSignal.Release(); + + return Task.CompletedTask; + } + + internal void MarkCompleted() + { + lock (_lock) + { + _isCompleted = true; + } + // Signal completion so StreamingHttpContent can write the final chunk and finish + _completionSignal.Release(); + } + + private void ThrowIfCompletedOrError() + { + if (_isCompleted) + throw new InvalidOperationException("Cannot write to a completed stream."); + if (_hasError) + throw new InvalidOperationException("Cannot write to a stream after an error has been reported."); + } + + // ── Dispose ────────────────────────────────────────────────────────── + + /// + protected override void Dispose(bool disposing) + { + if (disposing) + { + try { _completionSignal.Release(); } catch (SemaphoreFullException) { } + } + + base.Dispose(disposing); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.Stream.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.Stream.cs new file mode 100644 index 000000000..5453333e7 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.Stream.cs @@ -0,0 +1,97 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// A write-only, non-seekable subclass that streams response data + /// to the Lambda Runtime API. Returned by . + /// Integrates with standard .NET stream consumers such as . + /// + public partial class LambdaResponseStream : Stream, ILambdaResponseStream + { + // ── System.IO.Stream — capabilities ───────────────────────────────── + + /// Gets a value indicating whether the stream supports reading. Always false. + public override bool CanRead => false; + + /// Gets a value indicating whether the stream supports seeking. Always false. + public override bool CanSeek => false; + + /// Gets a value indicating whether the stream supports writing. Always true. + public override bool CanWrite => true; + + // ── System.IO.Stream — Length / Position ──────────────────────────── + + /// + /// Gets the total number of bytes written to the stream so far. + /// Equivalent to . + /// + public override long Length => BytesWritten; + + /// + /// Getting or setting the position is not supported. + /// + /// Always thrown. + public override long Position + { + get => throw new NotSupportedException("LambdaResponseStream does not support seeking."); + set => throw new NotSupportedException("LambdaResponseStream does not support seeking."); + } + + // ── System.IO.Stream — seek / read (not supported) ────────────────── + + /// Not supported. + /// Always thrown. + public override long Seek(long offset, SeekOrigin origin) + => throw new NotImplementedException("LambdaResponseStream does not support seeking."); + + /// Not supported. + /// Always thrown. + public override int Read(byte[] buffer, int offset, int count) + => throw new NotImplementedException("LambdaResponseStream does not support reading."); + + /// Not supported. + /// Always thrown. + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => throw new NotImplementedException("LambdaResponseStream does not support reading."); + + // ── System.IO.Stream — write ───────────────────────────────────────── + + /// + /// Writes a sequence of bytes to the stream. Delegates to the async path synchronously. + /// Prefer to avoid blocking. + /// + public override void Write(byte[] buffer, int offset, int count) + => WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + + // ── System.IO.Stream — flush / set length ──────────────────────────── + + /// + /// Flush is a no-op; data is sent to the Runtime API immediately on each write. + /// + public override void Flush() { } + + /// Not supported. + /// Always thrown. + public override void SetLength(long value) + => throw new NotSupportedException("LambdaResponseStream does not support SetLength."); + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamContext.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamContext.cs new file mode 100644 index 000000000..c6a58c81d --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamContext.cs @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// Internal context class used by ResponseStreamFactory to track per-invocation streaming state. + /// + internal class LambdaResponseStreamContext + { + /// + /// The AWS request ID for the current invocation. + /// + public string AwsRequestId { get; set; } + + /// + /// Whether CreateStream() has been called for this invocation. + /// + public bool StreamCreated { get; set; } + + /// + /// The ResponseStream instance if created. + /// + public LambdaResponseStream Stream { get; set; } + + /// + /// The RuntimeApiClient used to start the streaming HTTP POST. + /// + public RuntimeApiClient RuntimeApiClient { get; set; } + + /// + /// Cancellation token for the current invocation. + /// + public CancellationToken CancellationToken { get; set; } + + /// + /// The Task representing the in-flight HTTP POST to the Runtime API. + /// Started when CreateStream() is called, completes when the stream is finalized. + /// + public Task SendTask { get; set; } + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamFactory.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamFactory.cs new file mode 100644 index 000000000..84d8c0ebd --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamFactory.cs @@ -0,0 +1,132 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// Factory for creating streaming responses in AWS Lambda functions. + /// Call CreateStream() within your handler to opt into response streaming for that invocation. + /// + public static class LambdaResponseStreamFactory + { + // For on-demand mode (single invocation at a time) + private static LambdaResponseStreamContext _onDemandContext; + + // For multi-concurrency mode (multiple concurrent invocations) + private static readonly AsyncLocal _asyncLocalContext = new AsyncLocal(); + + /// + /// Creates a streaming response for the current invocation. + /// Can only be called once per invocation. + /// + /// + /// A — a subclass — for writing + /// response data. The returned stream also implements . + /// + /// Thrown if called outside an invocation context. + /// Thrown if called more than once per invocation. + public static LambdaResponseStream CreateStream() + { + var context = GetCurrentContext(); + + if (context == null) + { + throw new InvalidOperationException( + "ResponseStreamFactory.CreateStream() can only be called within a Lambda handler invocation."); + } + + if (context.StreamCreated) + { + throw new InvalidOperationException( + "ResponseStreamFactory.CreateStream() can only be called once per invocation."); + } + + var lambdaStream = new LambdaResponseStream(); + context.Stream = lambdaStream; + context.StreamCreated = true; + + // Start the HTTP POST to the Runtime API. + // This runs concurrently — SerializeToStreamAsync will block + // until the handler finishes writing or reports an error. + context.SendTask = context.RuntimeApiClient.StartStreamingResponseAsync( + context.AwsRequestId, lambdaStream, context.CancellationToken); + + return lambdaStream; + } + + // Internal methods for LambdaBootstrap to manage state + + internal static void InitializeInvocation( + string awsRequestId, bool isMultiConcurrency, + RuntimeApiClient runtimeApiClient, CancellationToken cancellationToken) + { + var context = new LambdaResponseStreamContext + { + AwsRequestId = awsRequestId, + StreamCreated = false, + Stream = null, + RuntimeApiClient = runtimeApiClient, + CancellationToken = cancellationToken + }; + + if (isMultiConcurrency) + { + _asyncLocalContext.Value = context; + } + else + { + _onDemandContext = context; + } + } + + internal static LambdaResponseStream GetStreamIfCreated(bool isMultiConcurrency) + { + var context = isMultiConcurrency ? _asyncLocalContext.Value : _onDemandContext; + return context?.Stream; + } + + /// + /// Returns the Task for the in-flight HTTP send, or null if streaming wasn't started. + /// LambdaBootstrap awaits this after the handler returns to ensure the HTTP request completes. + /// + internal static Task GetSendTask(bool isMultiConcurrency) + { + var context = isMultiConcurrency ? _asyncLocalContext.Value : _onDemandContext; + return context?.SendTask; + } + + internal static void CleanupInvocation(bool isMultiConcurrency) + { + if (isMultiConcurrency) + { + _asyncLocalContext.Value = null; + } + else + { + _onDemandContext = null; + } + } + + private static LambdaResponseStreamContext GetCurrentContext() + { + // Check multi-concurrency first (AsyncLocal), then on-demand + return _asyncLocalContext.Value ?? _onDemandContext; + } + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs index daa9fff24..f594d5e56 100644 --- a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs @@ -177,6 +177,47 @@ public Task ReportRestoreErrorAsync(Exception exception, String errorType = null #endif + /// + /// Start sending a streaming response to the Runtime API. + /// This initiates the HTTP POST with streaming headers. The actual data + /// is written by the handler via ResponseStream.WriteAsync, which flows + /// through StreamingHttpContent to the HTTP connection. + /// This Task completes when the stream is finalized (MarkCompleted or error). + /// + /// The ID of the function request being responded to. + /// The ResponseStream that will provide the streaming data. + /// The optional cancellation token to use. + /// A Task representing the in-flight HTTP POST. + internal virtual async Task StartStreamingResponseAsync( + string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default) + { + if (awsRequestId == null) throw new ArgumentNullException(nameof(awsRequestId)); + if (responseStream == null) throw new ArgumentNullException(nameof(responseStream)); + + var url = $"http://{LambdaEnvironment.RuntimeServerHostAndPort}/2018-06-01/runtime/invocation/{awsRequestId}/response"; + + using (var request = new HttpRequestMessage(HttpMethod.Post, url)) + { + request.Headers.Add(StreamingConstants.ResponseModeHeader, StreamingConstants.StreamingResponseMode); + request.Headers.TransferEncodingChunked = true; + + // Declare trailers upfront — we always declare them since we don't know + // at request start time whether an error will occur mid-stream. + request.Headers.Add("Trailer", + $"{StreamingConstants.ErrorTypeTrailer}, {StreamingConstants.ErrorBodyTrailer}"); + + request.Content = new StreamingHttpContent(responseStream); + + // SendAsync calls SerializeToStreamAsync, which blocks until the handler + // finishes writing. This is why this method runs concurrently with the handler. + var response = await _httpClient.SendAsync( + request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + response.EnsureSuccessStatusCode(); + } + + responseStream.MarkCompleted(); + } + /// /// Send a response to a function invocation to the Runtime API as an asynchronous operation. /// diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingConstants.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingConstants.cs new file mode 100644 index 000000000..c1e99ed17 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingConstants.cs @@ -0,0 +1,43 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// Constants used for Lambda response streaming. + /// + internal static class StreamingConstants + { + /// + /// Header name for Lambda response mode. + /// + public const string ResponseModeHeader = "Lambda-Runtime-Function-Response-Mode"; + + /// + /// Value for streaming response mode. + /// + public const string StreamingResponseMode = "streaming"; + + /// + /// Trailer header name for error type. + /// + public const string ErrorTypeTrailer = "Lambda-Runtime-Function-Error-Type"; + + /// + /// Trailer header name for error body. + /// + public const string ErrorBodyTrailer = "Lambda-Runtime-Function-Error-Body"; + } +} diff --git a/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs new file mode 100644 index 000000000..c642873aa --- /dev/null +++ b/Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs @@ -0,0 +1,83 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport +{ + /// + /// HttpContent implementation for streaming responses with chunked transfer encoding. + /// + internal class StreamingHttpContent : HttpContent + { + private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n"); + private static readonly byte[] FinalChunkBytes = Encoding.ASCII.GetBytes("0\r\n"); + + private readonly LambdaResponseStream _responseStream; + + public StreamingHttpContent(LambdaResponseStream responseStream) + { + _responseStream = responseStream ?? throw new ArgumentNullException(nameof(responseStream)); + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + // Hand the HTTP output stream to ResponseStream so WriteAsync calls + // can write chunks directly to it. + _responseStream.SetHttpOutputStream(stream); + + // Wait for the handler to finish writing (MarkCompleted or ReportErrorAsync) + await _responseStream.WaitForCompletionAsync(); + + // Write final chunk + await stream.WriteAsync(FinalChunkBytes, 0, FinalChunkBytes.Length); + + // Write error trailers if present + if (_responseStream.HasError) + { + await WriteErrorTrailersAsync(stream, _responseStream.ReportedError); + } + + // Write final CRLF to end the chunked message + await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length); + await stream.FlushAsync(); + } + + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } + + private async Task WriteErrorTrailersAsync(Stream stream, Exception exception) + { + var exceptionInfo = ExceptionInfo.GetExceptionInfo(exception); + + var errorTypeHeader = $"{StreamingConstants.ErrorTypeTrailer}: {exceptionInfo.ErrorType}\r\n"; + var errorTypeBytes = Encoding.UTF8.GetBytes(errorTypeHeader); + await stream.WriteAsync(errorTypeBytes, 0, errorTypeBytes.Length); + + var errorBodyJson = LambdaJsonExceptionWriter.WriteJson(exceptionInfo); + var errorBodyHeader = $"{StreamingConstants.ErrorBodyTrailer}: {errorBodyJson}\r\n"; + var errorBodyBytes = Encoding.UTF8.GetBytes(errorBodyHeader); + await stream.WriteAsync(errorBodyBytes, 0, errorBodyBytes.Length); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/HandlerTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/HandlerTests.cs index 80f9d13d0..e257b688e 100644 --- a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/HandlerTests.cs +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/HandlerTests.cs @@ -31,7 +31,7 @@ namespace Amazon.Lambda.RuntimeSupport.UnitTests { - [Collection("Bootstrap")] + [Collection("ResponseStreamFactory")] public class HandlerTests { private const string AggregateExceptionTestMarker = "AggregateExceptionTesting"; diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapTests.cs index e1636ff16..ae40b7e2e 100644 --- a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapTests.cs +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapTests.cs @@ -14,9 +14,11 @@ */ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net.Http; using System.Text; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -29,6 +31,7 @@ namespace Amazon.Lambda.RuntimeSupport.UnitTests /// Tests to test LambdaBootstrap when it's constructed using its actual constructor. /// Tests of the static GetLambdaBootstrap methods can be found in LambdaBootstrapWrapperTests. /// + [Collection("ResponseStreamFactory")] public class LambdaBootstrapTests { readonly TestHandler _testFunction; @@ -283,5 +286,159 @@ public void IsCallPreJitTest() environmentVariables.SetEnvironmentVariable(ENVIRONMENT_VARIABLE_AWS_LAMBDA_INITIALIZATION_TYPE, AWS_LAMBDA_INITIALIZATION_TYPE_PC); Assert.True(UserCodeInit.IsCallPreJit(environmentVariables)); } + + // --- Streaming Integration Tests --- + + private TestStreamingRuntimeApiClient CreateStreamingClient() + { + var envVars = new TestEnvironmentVariables(); + var headers = new Dictionary> + { + { RuntimeApiHeaders.HeaderAwsRequestId, new List { "streaming-request-id" } }, + { RuntimeApiHeaders.HeaderInvokedFunctionArn, new List { "invoked_function_arn" } }, + { RuntimeApiHeaders.HeaderAwsTenantId, new List { "tenant_id" } } + }; + return new TestStreamingRuntimeApiClient(envVars, headers); + } + + /// + /// Property 2: CreateStream Enables Streaming Mode + /// When a handler calls ResponseStreamFactory.CreateStream(), the response is transmitted + /// using streaming mode. LambdaBootstrap awaits the send task. + /// **Validates: Requirements 1.4, 6.1, 6.2, 6.3, 6.4** + /// + [Fact] + public async Task StreamingMode_HandlerCallsCreateStream_SendTaskAwaited() + { + var streamingClient = CreateStreamingClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(Encoding.UTF8.GetBytes("hello")); + return new InvocationResponse(Stream.Null, false); + }; + + using (var bootstrap = new LambdaBootstrap(handler, null)) + { + bootstrap.Client = streamingClient; + await bootstrap.InvokeOnceAsync(); + } + + Assert.True(streamingClient.StartStreamingResponseAsyncCalled); + Assert.False(streamingClient.SendResponseAsyncCalled); + } + + /// + /// Property 3: Default Mode Is Buffered + /// When a handler does not call ResponseStreamFactory.CreateStream(), the response + /// is transmitted using buffered mode via SendResponseAsync. + /// **Validates: Requirements 1.5, 7.2** + /// + [Fact] + public async Task BufferedMode_HandlerDoesNotCallCreateStream_UsesSendResponse() + { + var streamingClient = CreateStreamingClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var outputStream = new MemoryStream(Encoding.UTF8.GetBytes("buffered response")); + return new InvocationResponse(outputStream); + }; + + using (var bootstrap = new LambdaBootstrap(handler, null)) + { + bootstrap.Client = streamingClient; + await bootstrap.InvokeOnceAsync(); + } + + Assert.False(streamingClient.StartStreamingResponseAsyncCalled); + Assert.True(streamingClient.SendResponseAsyncCalled); + } + + /// + /// Property 14: Exception After Writes Uses Trailers + /// When a handler throws an exception after writing data to an IResponseStream, + /// the error is reported via trailers (ReportErrorAsync) rather than standard error reporting. + /// **Validates: Requirements 5.6, 5.7** + /// + [Fact] + public async Task MidstreamError_ExceptionAfterWrites_ReportsViaTrailers() + { + var streamingClient = CreateStreamingClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(Encoding.UTF8.GetBytes("partial data")); + throw new InvalidOperationException("midstream failure"); + }; + + using (var bootstrap = new LambdaBootstrap(handler, null)) + { + bootstrap.Client = streamingClient; + await bootstrap.InvokeOnceAsync(); + } + + // Error should be reported via trailers on the stream, not via standard error reporting + Assert.True(streamingClient.StartStreamingResponseAsyncCalled); + Assert.NotNull(streamingClient.LastStreamingResponseStream); + Assert.True(streamingClient.LastStreamingResponseStream.HasError); + Assert.False(streamingClient.ReportInvocationErrorAsyncExceptionCalled); + } + + /// + /// Property 15: Exception Before CreateStream Uses Standard Error + /// When a handler throws an exception before calling ResponseStreamFactory.CreateStream(), + /// the error is reported using the standard Lambda error reporting mechanism. + /// **Validates: Requirements 5.7, 7.1** + /// + [Fact] + public async Task PreStreamError_ExceptionBeforeCreateStream_UsesStandardErrorReporting() + { + var streamingClient = CreateStreamingClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + throw new InvalidOperationException("pre-stream failure"); + }; + + using (var bootstrap = new LambdaBootstrap(handler, null)) + { + bootstrap.Client = streamingClient; + await bootstrap.InvokeOnceAsync(); + } + + Assert.False(streamingClient.StartStreamingResponseAsyncCalled); + Assert.True(streamingClient.ReportInvocationErrorAsyncExceptionCalled); + } + + /// + /// State Isolation: ResponseStreamFactory state is cleared after each invocation. + /// **Validates: Requirements 6.5, 8.9** + /// + [Fact] + public async Task Cleanup_ResponseStreamFactoryStateCleared_AfterInvocation() + { + var streamingClient = CreateStreamingClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(Encoding.UTF8.GetBytes("data")); + return new InvocationResponse(Stream.Null, false); + }; + + using (var bootstrap = new LambdaBootstrap(handler, null)) + { + bootstrap.Client = streamingClient; + await bootstrap.InvokeOnceAsync(); + } + + // After invocation, factory state should be cleaned up + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(false)); + Assert.Null(LambdaResponseStreamFactory.GetSendTask(false)); + } } } diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamFactoryTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamFactoryTests.cs new file mode 100644 index 000000000..9fce99ad5 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamFactoryTests.cs @@ -0,0 +1,282 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + [Collection("ResponseStreamFactory")] + public class ResponseStreamFactoryTests : IDisposable + { + private const long MaxResponseSize = 20 * 1024 * 1024; + + public void Dispose() + { + // Clean up both modes to avoid test pollution + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + } + + /// + /// A minimal RuntimeApiClient subclass for testing that overrides StartStreamingResponseAsync + /// to avoid real HTTP calls while tracking invocations. + /// + private class MockStreamingRuntimeApiClient : RuntimeApiClient + { + public bool StartStreamingCalled { get; private set; } + public string LastAwsRequestId { get; private set; } + public LambdaResponseStream LastResponseStream { get; private set; } + public TaskCompletionSource SendTaskCompletion { get; } = new TaskCompletionSource(); + + public MockStreamingRuntimeApiClient() + : base(new TestEnvironmentVariables(), new TestHelpers.NoOpInternalRuntimeApiClient()) + { + } + + internal override async Task StartStreamingResponseAsync( + string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default) + { + StartStreamingCalled = true; + LastAwsRequestId = awsRequestId; + LastResponseStream = responseStream; + await SendTaskCompletion.Task; + } + } + + private void InitializeWithMock(string requestId, bool isMultiConcurrency, MockStreamingRuntimeApiClient mockClient) + { + LambdaResponseStreamFactory.InitializeInvocation( + requestId, isMultiConcurrency, + mockClient, CancellationToken.None); + } + + // --- Property 1: CreateStream Returns Valid Stream --- + + /// + /// Property 1: CreateStream Returns Valid Stream - on-demand mode. + /// Validates: Requirements 1.3, 2.2, 2.3 + /// + [Fact] + public void CreateStream_OnDemandMode_ReturnsValidStream() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-1", isMultiConcurrency: false, mock); + + var stream = LambdaResponseStreamFactory.CreateStream(); + + Assert.NotNull(stream); + Assert.IsAssignableFrom(stream); + } + + /// + /// Property 1: CreateStream Returns Valid Stream - multi-concurrency mode. + /// Validates: Requirements 1.3, 2.2, 2.3 + /// + [Fact] + public void CreateStream_MultiConcurrencyMode_ReturnsValidStream() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-2", isMultiConcurrency: true, mock); + + var stream = LambdaResponseStreamFactory.CreateStream(); + + Assert.NotNull(stream); + Assert.IsAssignableFrom(stream); + } + + // --- Property 4: Single Stream Per Invocation --- + + /// + /// Property 4: Single Stream Per Invocation - calling CreateStream twice throws. + /// Validates: Requirements 2.5, 2.6 + /// + [Fact] + public void CreateStream_CalledTwice_ThrowsInvalidOperationException() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-3", isMultiConcurrency: false, mock); + LambdaResponseStreamFactory.CreateStream(); + + Assert.Throws(() => LambdaResponseStreamFactory.CreateStream()); + } + + [Fact] + public void CreateStream_OutsideInvocationContext_ThrowsInvalidOperationException() + { + // No InitializeInvocation called + Assert.Throws(() => LambdaResponseStreamFactory.CreateStream()); + } + + // --- CreateStream starts HTTP POST --- + + /// + /// Validates that CreateStream calls StartStreamingResponseAsync on the RuntimeApiClient. + /// Validates: Requirements 1.3, 1.4, 2.2, 2.3, 2.4 + /// + [Fact] + public void CreateStream_CallsStartStreamingResponseAsync() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-start", isMultiConcurrency: false, mock); + + LambdaResponseStreamFactory.CreateStream(); + + Assert.True(mock.StartStreamingCalled); + Assert.Equal("req-start", mock.LastAwsRequestId); + Assert.NotNull(mock.LastResponseStream); + } + + // --- GetSendTask --- + + /// + /// Validates that GetSendTask returns the task from the HTTP POST. + /// Validates: Requirements 5.1, 7.3 + /// + [Fact] + public void GetSendTask_AfterCreateStream_ReturnsNonNullTask() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-send", isMultiConcurrency: false, mock); + + LambdaResponseStreamFactory.CreateStream(); + + var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency: false); + Assert.NotNull(sendTask); + } + + [Fact] + public void GetSendTask_BeforeCreateStream_ReturnsNull() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-nosend", isMultiConcurrency: false, mock); + + var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency: false); + Assert.Null(sendTask); + } + + [Fact] + public void GetSendTask_NoContext_ReturnsNull() + { + Assert.Null(LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency: false)); + } + + // --- Internal methods --- + + [Fact] + public void InitializeInvocation_OnDemand_SetsUpContext() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-4", isMultiConcurrency: false, mock); + + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false)); + + var stream = LambdaResponseStreamFactory.CreateStream(); + Assert.NotNull(stream); + } + + [Fact] + public void InitializeInvocation_MultiConcurrency_SetsUpContext() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-5", isMultiConcurrency: true, mock); + + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true)); + + var stream = LambdaResponseStreamFactory.CreateStream(); + Assert.NotNull(stream); + } + + [Fact] + public void GetStreamIfCreated_AfterCreateStream_ReturnsStream() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-6", isMultiConcurrency: false, mock); + LambdaResponseStreamFactory.CreateStream(); + + var retrieved = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false); + Assert.NotNull(retrieved); + } + + [Fact] + public void GetStreamIfCreated_NoContext_ReturnsNull() + { + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false)); + } + + [Fact] + public void CleanupInvocation_ClearsState() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-7", isMultiConcurrency: false, mock); + LambdaResponseStreamFactory.CreateStream(); + + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); + + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false)); + Assert.Throws(() => LambdaResponseStreamFactory.CreateStream()); + } + + // --- Property 16: State Isolation Between Invocations --- + + /// + /// Property 16: State Isolation Between Invocations - state from one invocation doesn't leak to the next. + /// Validates: Requirements 6.5, 8.9 + /// + [Fact] + public void StateIsolation_SequentialInvocations_NoLeakage() + { + var mock = new MockStreamingRuntimeApiClient(); + + // First invocation - streaming + InitializeWithMock("req-8a", isMultiConcurrency: false, mock); + var stream1 = LambdaResponseStreamFactory.CreateStream(); + Assert.NotNull(stream1); + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); + + // Second invocation - should start fresh + InitializeWithMock("req-8b", isMultiConcurrency: false, mock); + Assert.Null(LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false)); + + var stream2 = LambdaResponseStreamFactory.CreateStream(); + Assert.NotNull(stream2); + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); + } + + /// + /// Property 16: State Isolation - multi-concurrency mode uses AsyncLocal. + /// Validates: Requirements 2.9, 2.10 + /// + [Fact] + public async Task StateIsolation_MultiConcurrency_UsesAsyncLocal() + { + var mock = new MockStreamingRuntimeApiClient(); + InitializeWithMock("req-9", isMultiConcurrency: true, mock); + var stream = LambdaResponseStreamFactory.CreateStream(); + Assert.NotNull(stream); + + bool childSawNull = false; + await Task.Run(() => + { + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + childSawNull = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true) == null; + }); + + Assert.True(childSawNull); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs new file mode 100644 index 000000000..a4d265228 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs @@ -0,0 +1,337 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + public class ResponseStreamTests + { + /// + /// Helper: creates a ResponseStream and wires up a MemoryStream as the HTTP output stream. + /// Returns both so tests can inspect what was written. + /// + private static (LambdaResponseStream stream, MemoryStream httpOutput) CreateWiredStream() + { + var rs = new LambdaResponseStream(); + var output = new MemoryStream(); + rs.SetHttpOutputStream(output); + return (rs, output); + } + + // ---- Basic state tests ---- + + [Fact] + public void Constructor_InitializesStateCorrectly() + { + var stream = new LambdaResponseStream(); + + Assert.Equal(0, stream.BytesWritten); + Assert.False(stream.IsCompleted); + Assert.False(stream.HasError); + Assert.Null(stream.ReportedError); + } + + // ---- Chunked encoding format (Property 9, Property 22) ---- + + /// + /// Property 9: Chunked Encoding Format — each chunk is hex-size + CRLF + data + CRLF. + /// Property 22: CRLF Line Terminators — all line terminators are \r\n. + /// Validates: Requirements 3.2, 10.1, 10.5 + /// + [Theory] + [InlineData(new byte[] { 1, 2, 3 }, "3")] // 3 bytes → "3" + [InlineData(new byte[] { 0xFF }, "1")] // 1 byte → "1" + [InlineData(new byte[0], "0")] // 0 bytes → "0" + public async Task WriteAsync_WritesChunkedEncodingFormat(byte[] data, string expectedHexSize) + { + var (stream, httpOutput) = CreateWiredStream(); + + await stream.WriteAsync(data); + + var written = httpOutput.ToArray(); + var expected = Encoding.ASCII.GetBytes(expectedHexSize + "\r\n") + .Concat(data) + .Concat(Encoding.ASCII.GetBytes("\r\n")) + .ToArray(); + + Assert.Equal(expected, written); + } + + /// + /// Property 9: Chunked Encoding Format — verify with offset/count overload. + /// Validates: Requirements 3.2, 10.1 + /// + [Fact] + public async Task WriteAsync_WithOffset_WritesCorrectSliceAsChunk() + { + var (stream, httpOutput) = CreateWiredStream(); + var data = new byte[] { 0, 1, 2, 3, 0 }; + + await stream.WriteAsync(data, 1, 3); + + var written = httpOutput.ToArray(); + // 3 bytes → hex "3", data is {1,2,3} + var expected = Encoding.ASCII.GetBytes("3\r\n") + .Concat(new byte[] { 1, 2, 3 }) + .Concat(Encoding.ASCII.GetBytes("\r\n")) + .ToArray(); + + Assert.Equal(expected, written); + } + + // ---- Property 5: Written Data Appears in HTTP Response Immediately ---- + + /// + /// Property 5: Written Data Appears in HTTP Response Immediately — + /// each WriteAsync call writes to the HTTP stream before returning. + /// Validates: Requirements 3.2 + /// + [Fact] + public async Task WriteAsync_MultipleWrites_EachAppearsImmediately() + { + var (stream, httpOutput) = CreateWiredStream(); + + await stream.WriteAsync(new byte[] { 0xAA }); + var afterFirst = httpOutput.ToArray().Length; + Assert.True(afterFirst > 0, "First chunk should be on the HTTP stream immediately after WriteAsync returns"); + + await stream.WriteAsync(new byte[] { 0xBB, 0xCC }); + var afterSecond = httpOutput.ToArray().Length; + Assert.True(afterSecond > afterFirst, "Second chunk should appear on the HTTP stream immediately"); + + Assert.Equal(3, stream.BytesWritten); + } + + /// + /// Property 5: Written Data Appears in HTTP Response Immediately — + /// verify with a larger payload that hex size is multi-character. + /// Validates: Requirements 3.2 + /// + [Fact] + public async Task WriteAsync_LargerPayload_HexSizeIsCorrect() + { + var (stream, httpOutput) = CreateWiredStream(); + var data = new byte[256]; // 0x100 + + await stream.WriteAsync(data); + + var written = Encoding.ASCII.GetString(httpOutput.ToArray()); + Assert.StartsWith("100\r\n", written); + } + + // ---- Semaphore coordination: _httpStreamReady blocks until SetHttpOutputStream ---- + + /// + /// Test that WriteAsync blocks until SetHttpOutputStream is called. + /// Validates: Requirements 3.2, 10.1 + /// + [Fact] + public async Task WriteAsync_BlocksUntilSetHttpOutputStream() + { + var rs = new LambdaResponseStream(); + var httpOutput = new MemoryStream(); + var writeStarted = new ManualResetEventSlim(false); + var writeCompleted = new ManualResetEventSlim(false); + + // Start a write on a background thread — it should block + var writeTask = Task.Run(async () => + { + writeStarted.Set(); + await rs.WriteAsync(new byte[] { 1, 2, 3 }); + writeCompleted.Set(); + }); + + // Wait for the write to start, then verify it hasn't completed + writeStarted.Wait(TimeSpan.FromSeconds(2)); + await Task.Delay(100); // give it a moment + Assert.False(writeCompleted.IsSet, "WriteAsync should block until SetHttpOutputStream is called"); + + // Now provide the HTTP stream — the write should complete + rs.SetHttpOutputStream(httpOutput); + await writeTask; + + Assert.True(writeCompleted.IsSet); + Assert.True(httpOutput.ToArray().Length > 0); + } + + // ---- Completion signaling: MarkCompleted releases _completionSignal ---- + + /// + /// Test that MarkCompleted releases the completion signal (WaitForCompletionAsync unblocks). + /// Validates: Requirements 5.5, 8.3 + /// + [Fact] + public async Task MarkCompleted_ReleasesCompletionSignal() + { + var (stream, _) = CreateWiredStream(); + + var waitTask = stream.WaitForCompletionAsync(); + Assert.False(waitTask.IsCompleted, "WaitForCompletionAsync should block before MarkCompleted"); + + stream.MarkCompleted(); + + // Should complete within a reasonable time + var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2))); + Assert.Same(waitTask, completed); + Assert.True(stream.IsCompleted); + } + + // ---- Completion signaling: ReportErrorAsync releases _completionSignal ---- + + /// + /// Test that ReportErrorAsync releases the completion signal. + /// Validates: Requirements 5.5 + /// + [Fact] + public async Task ReportErrorAsync_ReleasesCompletionSignal() + { + var (stream, _) = CreateWiredStream(); + + var waitTask = stream.WaitForCompletionAsync(); + Assert.False(waitTask.IsCompleted, "WaitForCompletionAsync should block before ReportErrorAsync"); + + await stream.ReportErrorAsync(new Exception("test error")); + + var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2))); + Assert.Same(waitTask, completed); + Assert.True(stream.HasError); + } + + // ---- Property 19: Writes After Completion Rejected ---- + + /// + /// Property 19: Writes After Completion Rejected — writes after MarkCompleted throw. + /// Validates: Requirements 8.8 + /// + [Fact] + public async Task WriteAsync_AfterMarkCompleted_Throws() + { + var (stream, _) = CreateWiredStream(); + await stream.WriteAsync(new byte[] { 1 }); + stream.MarkCompleted(); + + await Assert.ThrowsAsync( + () => stream.WriteAsync(new byte[] { 2 })); + } + + /// + /// Property 19: Writes After Completion Rejected — writes after ReportErrorAsync throw. + /// Validates: Requirements 8.8 + /// + [Fact] + public async Task WriteAsync_AfterReportError_Throws() + { + var (stream, _) = CreateWiredStream(); + await stream.WriteAsync(new byte[] { 1 }); + await stream.ReportErrorAsync(new Exception("test")); + + await Assert.ThrowsAsync( + () => stream.WriteAsync(new byte[] { 2 })); + } + + // ---- Error handling tests ---- + + [Fact] + public async Task ReportErrorAsync_SetsErrorState() + { + var stream = new LambdaResponseStream(); + var exception = new InvalidOperationException("something broke"); + + await stream.ReportErrorAsync(exception); + + Assert.True(stream.HasError); + Assert.Same(exception, stream.ReportedError); + } + + [Fact] + public async Task ReportErrorAsync_AfterCompleted_Throws() + { + var stream = new LambdaResponseStream(); + stream.MarkCompleted(); + + await Assert.ThrowsAsync( + () => stream.ReportErrorAsync(new Exception("test"))); + } + + [Fact] + public async Task ReportErrorAsync_CalledTwice_Throws() + { + var stream = new LambdaResponseStream(); + await stream.ReportErrorAsync(new Exception("first")); + + await Assert.ThrowsAsync( + () => stream.ReportErrorAsync(new Exception("second"))); + } + + [Fact] + public void MarkCompleted_SetsCompletionState() + { + var stream = new LambdaResponseStream(); + + stream.MarkCompleted(); + + Assert.True(stream.IsCompleted); + } + + // ---- Argument validation ---- + + [Fact] + public async Task WriteAsync_NullBuffer_ThrowsArgumentNull() + { + var (stream, _) = CreateWiredStream(); + + await Assert.ThrowsAsync(() => stream.WriteAsync((byte[])null)); + } + + [Fact] + public async Task WriteAsync_NullBufferWithOffset_ThrowsArgumentNull() + { + var (stream, _) = CreateWiredStream(); + + await Assert.ThrowsAsync(() => stream.WriteAsync(null, 0, 0)); + } + + [Fact] + public async Task ReportErrorAsync_NullException_ThrowsArgumentNull() + { + var stream = new LambdaResponseStream(); + + await Assert.ThrowsAsync(() => stream.ReportErrorAsync(null)); + } + + // ---- Dispose signals completion ---- + + [Fact] + public async Task Dispose_ReleasesCompletionSignalIfNotAlreadyReleased() + { + var stream = new LambdaResponseStream(); + + var waitTask = stream.WaitForCompletionAsync(); + Assert.False(waitTask.IsCompleted); + + stream.Dispose(); + + var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2))); + Assert.Same(waitTask, completed); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/RuntimeApiClientTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/RuntimeApiClientTests.cs new file mode 100644 index 000000000..fbc4a8ae6 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/RuntimeApiClientTests.cs @@ -0,0 +1,223 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + /// + /// Tests for RuntimeApiClient streaming and buffered behavior. + /// Validates Properties 7, 8, 10, 13, 18. + /// + public class RuntimeApiClientTests + { + private const long MaxResponseSize = 20 * 1024 * 1024; + + /// + /// Mock HttpMessageHandler that captures the request for header inspection. + /// It completes the ResponseStream and returns immediately without reading + /// the content body, avoiding the SerializeToStreamAsync blocking issue. + /// + private class MockHttpMessageHandler : HttpMessageHandler + { + public HttpRequestMessage CapturedRequest { get; private set; } + private readonly LambdaResponseStream _responseStream; + + public MockHttpMessageHandler(LambdaResponseStream responseStream) + { + _responseStream = responseStream; + } + + protected override Task SendAsync( + HttpRequestMessage request, CancellationToken cancellationToken) + { + CapturedRequest = request; + + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)); + } + } + + private static RuntimeApiClient CreateClientWithMockHandler( + LambdaResponseStream stream, out MockHttpMessageHandler handler) + { + handler = new MockHttpMessageHandler(stream); + var httpClient = new HttpClient(handler); + var envVars = new TestEnvironmentVariables(); + envVars.SetEnvironmentVariable("AWS_LAMBDA_RUNTIME_API", "localhost:9001"); + return new RuntimeApiClient(envVars, httpClient); + } + + // --- Property 7: Streaming Response Mode Header --- + + /// + /// Property 7: Streaming Response Mode Header + /// For any streaming response, the HTTP request should include + /// "Lambda-Runtime-Function-Response-Mode: streaming". + /// **Validates: Requirements 4.1** + /// + [Fact] + public async Task StartStreamingResponseAsync_IncludesStreamingResponseModeHeader() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out var handler); + + await client.StartStreamingResponseAsync("req-1", stream, CancellationToken.None); + + Assert.NotNull(handler.CapturedRequest); + Assert.True(handler.CapturedRequest.Headers.Contains(StreamingConstants.ResponseModeHeader)); + var values = handler.CapturedRequest.Headers.GetValues(StreamingConstants.ResponseModeHeader).ToList(); + Assert.Single(values); + Assert.Equal(StreamingConstants.StreamingResponseMode, values[0]); + } + + // --- Property 8: Chunked Transfer Encoding Header --- + + /// + /// Property 8: Chunked Transfer Encoding Header + /// For any streaming response, the HTTP request should include + /// "Transfer-Encoding: chunked". + /// **Validates: Requirements 4.2** + /// + [Fact] + public async Task StartStreamingResponseAsync_IncludesChunkedTransferEncodingHeader() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out var handler); + + await client.StartStreamingResponseAsync("req-2", stream, CancellationToken.None); + + Assert.NotNull(handler.CapturedRequest); + Assert.True(handler.CapturedRequest.Headers.TransferEncodingChunked); + } + + // --- Property 13: Trailer Declaration Header --- + + /// + /// Property 13: Trailer Declaration Header + /// For any streaming response, the HTTP request should include a "Trailer" header + /// declaring the error trailer headers upfront (since we cannot know at request + /// start whether an error will occur). + /// **Validates: Requirements 5.4** + /// + [Fact] + public async Task StartStreamingResponseAsync_DeclaresTrailerHeaderUpfront() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out var handler); + + await client.StartStreamingResponseAsync("req-3", stream, CancellationToken.None); + + Assert.NotNull(handler.CapturedRequest); + Assert.True(handler.CapturedRequest.Headers.Contains("Trailer")); + var trailerValue = string.Join(", ", handler.CapturedRequest.Headers.GetValues("Trailer")); + Assert.Contains(StreamingConstants.ErrorTypeTrailer, trailerValue); + Assert.Contains(StreamingConstants.ErrorBodyTrailer, trailerValue); + } + + // --- Property 18: Stream Finalization --- + + /// + /// Property 18: Stream Finalization + /// For any streaming response that completes successfully, the ResponseStream + /// should be marked as completed (IsCompleted = true) after the HTTP response succeeds. + /// **Validates: Requirements 8.3** + /// + [Fact] + public async Task StartStreamingResponseAsync_MarksStreamCompletedAfterSuccess() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out _); + + await client.StartStreamingResponseAsync("req-4", stream, CancellationToken.None); + + Assert.True(stream.IsCompleted); + } + + // --- Property 10: Buffered Responses Exclude Streaming Headers --- + + /// + /// Mock HttpMessageHandler that captures the request for buffered response header inspection. + /// Returns an Accepted (202) response since that's what the InternalRuntimeApiClient expects. + /// + private class BufferedMockHttpMessageHandler : HttpMessageHandler + { + public HttpRequestMessage CapturedRequest { get; private set; } + + protected override Task SendAsync( + HttpRequestMessage request, CancellationToken cancellationToken) + { + CapturedRequest = request; + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.Accepted)); + } + } + + /// + /// Property 10: Buffered Responses Exclude Streaming Headers + /// For any buffered response (where CreateStream was not called), the HTTP request + /// should not include "Lambda-Runtime-Function-Response-Mode" or + /// "Transfer-Encoding: chunked" or "Trailer" headers. + /// **Validates: Requirements 4.6** + /// + [Fact] + public async Task SendResponseAsync_BufferedResponse_ExcludesStreamingHeaders() + { + var bufferedHandler = new BufferedMockHttpMessageHandler(); + var httpClient = new HttpClient(bufferedHandler); + var envVars = new TestEnvironmentVariables(); + envVars.SetEnvironmentVariable("AWS_LAMBDA_RUNTIME_API", "localhost:9001"); + var client = new RuntimeApiClient(envVars, httpClient); + + var outputStream = new MemoryStream(new byte[] { 1, 2, 3 }); + await client.SendResponseAsync("req-buffered", outputStream, CancellationToken.None); + + Assert.NotNull(bufferedHandler.CapturedRequest); + // Buffered responses must not include streaming-specific headers + Assert.False(bufferedHandler.CapturedRequest.Headers.Contains(StreamingConstants.ResponseModeHeader), + "Buffered response should not include Lambda-Runtime-Function-Response-Mode header"); + Assert.NotEqual(true, bufferedHandler.CapturedRequest.Headers.TransferEncodingChunked); + Assert.False(bufferedHandler.CapturedRequest.Headers.Contains("Trailer"), + "Buffered response should not include Trailer header"); + } + + // --- Argument validation --- + + [Fact] + public async Task StartStreamingResponseAsync_NullRequestId_ThrowsArgumentNullException() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out _); + + await Assert.ThrowsAsync( + () => client.StartStreamingResponseAsync(null, stream, CancellationToken.None)); + } + + [Fact] + public async Task StartStreamingResponseAsync_NullResponseStream_ThrowsArgumentNullException() + { + var stream = new LambdaResponseStream(); + var client = CreateClientWithMockHandler(stream, out _); + + await Assert.ThrowsAsync( + () => client.StartStreamingResponseAsync("req-5", null, CancellationToken.None)); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingHttpContentTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingHttpContentTests.cs new file mode 100644 index 000000000..1f85f47a8 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingHttpContentTests.cs @@ -0,0 +1,347 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + public class StreamingHttpContentTests + { + private const long MaxResponseSize = 20 * 1024 * 1024; + + /// + /// Helper: runs SerializeToStreamAsync concurrently with handler actions. + /// The handlerAction receives the ResponseStream and should write data then signal completion. + /// Returns the bytes written to the HTTP output stream. + /// + private async Task SerializeWithConcurrentHandler( + LambdaResponseStream responseStream, + Func handlerAction) + { + var content = new StreamingHttpContent(responseStream); + var outputStream = new MemoryStream(); + + // Start serialization on a background task (it will call SetHttpOutputStream and wait) + var serializeTask = Task.Run(() => content.CopyToAsync(outputStream)); + + // Give SerializeToStreamAsync a moment to start and call SetHttpOutputStream + await Task.Delay(50); + + // Run the handler action (writes data, signals completion) + await handlerAction(responseStream); + + // Wait for serialization to complete + await serializeTask; + + return outputStream.ToArray(); + } + + // ---- SerializeToStreamAsync hands off HTTP stream ---- + + /// + /// Test that SerializeToStreamAsync calls SetHttpOutputStream on the ResponseStream, + /// enabling writes to flow through. + /// Validates: Requirements 4.3, 10.1 + /// + [Fact] + public async Task SerializeToStreamAsync_HandsOffHttpStream_WritesFlowThrough() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 0xAA, 0xBB }); + stream.MarkCompleted(); + }); + + var outputStr = Encoding.ASCII.GetString(output); + // Should contain the chunk data written by the handler + Assert.Contains("2\r\n", outputStr); + Assert.True(output.Length > 0); + } + + /// + /// Test that SerializeToStreamAsync blocks until MarkCompleted is called. + /// Validates: Requirements 4.3 + /// + [Fact] + public async Task SerializeToStreamAsync_BlocksUntilMarkCompleted() + { + var rs = new LambdaResponseStream(); + var content = new StreamingHttpContent(rs); + var outputStream = new MemoryStream(); + + var serializeTask = Task.Run(() => content.CopyToAsync(outputStream)); + await Task.Delay(50); + + // Serialization should still be running (waiting for completion) + Assert.False(serializeTask.IsCompleted, "SerializeToStreamAsync should block until completion is signaled"); + + // Now signal completion + rs.MarkCompleted(); + await serializeTask; + + Assert.True(serializeTask.IsCompleted); + } + + /// + /// Test that SerializeToStreamAsync blocks until ReportErrorAsync is called. + /// Validates: Requirements 4.3, 5.1 + /// + [Fact] + public async Task SerializeToStreamAsync_BlocksUntilReportErrorAsync() + { + var rs = new LambdaResponseStream(); + var content = new StreamingHttpContent(rs); + var outputStream = new MemoryStream(); + + var serializeTask = Task.Run(() => content.CopyToAsync(outputStream)); + await Task.Delay(50); + + Assert.False(serializeTask.IsCompleted, "SerializeToStreamAsync should block until error is reported"); + + await rs.ReportErrorAsync(new Exception("test error")); + await serializeTask; + + Assert.True(serializeTask.IsCompleted); + } + + // ---- Property 20: Final Chunk Termination ---- + + /// + /// Property 20: Final Chunk Termination — final chunk "0\r\n" is written after completion. + /// Validates: Requirements 4.3, 10.2, 10.3 + /// + [Fact] + public async Task FinalChunk_WrittenAfterCompletion() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + stream.MarkCompleted(); + }); + + var outputStr = Encoding.ASCII.GetString(output); + Assert.Contains("0\r\n", outputStr); + + // Final chunk should appear after the data chunk + var dataChunkEnd = outputStr.IndexOf("1\r\n") + 3 + 1 + 2; // "1\r\n" + 1 byte data + "\r\n" + var finalChunkIndex = outputStr.IndexOf("0\r\n", dataChunkEnd); + Assert.True(finalChunkIndex >= 0, "Final chunk 0\\r\\n should appear after data chunks"); + } + + /// + /// Property 20: Final Chunk Termination — empty stream still gets final chunk. + /// Validates: Requirements 10.2 + /// + [Fact] + public async Task FinalChunk_EmptyStream_StillWritten() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, stream => + { + stream.MarkCompleted(); + return Task.CompletedTask; + }); + + var outputStr = Encoding.ASCII.GetString(output); + Assert.StartsWith("0\r\n", outputStr); + } + + // ---- Property 21: Trailer Ordering ---- + + /// + /// Property 21: Trailer Ordering — trailers appear after final chunk. + /// Validates: Requirements 10.3 + /// + [Fact] + public async Task ErrorTrailers_AppearAfterFinalChunk() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + await stream.ReportErrorAsync(new Exception("fail")); + }); + + var outputStr = Encoding.UTF8.GetString(output); + + // Find the final chunk "0\r\n" that appears after data chunks + var dataEnd = outputStr.IndexOf("1\r\n") + 3 + 1 + 2; + var finalChunkIndex = outputStr.IndexOf("0\r\n", dataEnd); + var errorTypeIndex = outputStr.IndexOf("Lambda-Runtime-Function-Error-Type:"); + var errorBodyIndex = outputStr.IndexOf("Lambda-Runtime-Function-Error-Body:"); + + Assert.True(finalChunkIndex >= 0, "Final chunk not found"); + Assert.True(errorTypeIndex > finalChunkIndex, "Error type trailer should appear after final chunk"); + Assert.True(errorBodyIndex > finalChunkIndex, "Error body trailer should appear after final chunk"); + } + + // ---- Property 11: Midstream Error Type Trailer ---- + + /// + /// Property 11: Midstream Error Type Trailer — error type trailer is included for various exception types. + /// Validates: Requirements 5.1, 5.2 + /// + [Theory] + [InlineData(typeof(InvalidOperationException))] + [InlineData(typeof(ArgumentException))] + [InlineData(typeof(NullReferenceException))] + public async Task ErrorTrailer_IncludesErrorType(Type exceptionType) + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + var exception = (Exception)Activator.CreateInstance(exceptionType, "test error"); + await stream.ReportErrorAsync(exception); + }); + + var outputStr = Encoding.UTF8.GetString(output); + Assert.Contains($"Lambda-Runtime-Function-Error-Type: {exceptionType.Name}", outputStr); + } + + // ---- Property 12: Midstream Error Body Trailer ---- + + /// + /// Property 12: Midstream Error Body Trailer — error body trailer includes JSON exception details. + /// Validates: Requirements 5.3 + /// + [Fact] + public async Task ErrorTrailer_IncludesJsonErrorBody() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + await stream.ReportErrorAsync(new InvalidOperationException("something went wrong")); + }); + + var outputStr = Encoding.UTF8.GetString(output); + Assert.Contains("Lambda-Runtime-Function-Error-Body:", outputStr); + Assert.Contains("something went wrong", outputStr); + Assert.Contains("InvalidOperationException", outputStr); + } + + // ---- Final CRLF termination ---- + + /// + /// Test that the chunked message ends with CRLF after successful completion (no trailers). + /// Validates: Requirements 10.2, 10.5 + /// + [Fact] + public async Task SuccessfulCompletion_EndsWithCrlf() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + stream.MarkCompleted(); + }); + + var outputStr = Encoding.ASCII.GetString(output); + // Should end with "0\r\n" (final chunk) + "\r\n" (end of message) + Assert.EndsWith("0\r\n\r\n", outputStr); + } + + /// + /// Test that the chunked message ends with CRLF after error trailers. + /// Validates: Requirements 10.3, 10.5 + /// + [Fact] + public async Task ErrorCompletion_EndsWithCrlf() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + await stream.ReportErrorAsync(new Exception("fail")); + }); + + var outputStr = Encoding.UTF8.GetString(output); + Assert.EndsWith("\r\n", outputStr); + } + + // ---- No error, no trailers ---- + + [Fact] + public async Task NoError_NoTrailersWritten() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 1 }); + stream.MarkCompleted(); + }); + + var outputStr = Encoding.UTF8.GetString(output); + Assert.DoesNotContain("Lambda-Runtime-Function-Error-Type:", outputStr); + Assert.DoesNotContain("Lambda-Runtime-Function-Error-Body:", outputStr); + } + + // ---- TryComputeLength ---- + + [Fact] + public void TryComputeLength_ReturnsFalse() + { + var stream = new LambdaResponseStream(); + var content = new StreamingHttpContent(stream); + + var result = content.Headers.ContentLength; + Assert.Null(result); + } + + // ---- CRLF correctness ---- + + /// + /// Property 22: CRLF Line Terminators — all line terminators are CRLF, not just LF. + /// Validates: Requirements 10.5 + /// + [Fact] + public async Task CrlfTerminators_NoBareLineFeed() + { + var rs = new LambdaResponseStream(); + + var output = await SerializeWithConcurrentHandler(rs, async stream => + { + await stream.WriteAsync(new byte[] { 65, 66, 67 }); // "ABC" + stream.MarkCompleted(); + }); + + for (int i = 0; i < output.Length; i++) + { + if (output[i] == (byte)'\n') + { + Assert.True(i > 0 && output[i - 1] == (byte)'\r', + $"Found bare LF at position {i} without preceding CR"); + } + } + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingIntegrationTests.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingIntegrationTests.cs new file mode 100644 index 000000000..0f15680f4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingIntegrationTests.cs @@ -0,0 +1,651 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Amazon.Lambda.RuntimeSupport.UnitTests.TestHelpers; +using Xunit; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + [CollectionDefinition("ResponseStreamFactory")] + public class ResponseStreamFactoryCollection { } + + /// + /// End-to-end integration tests for the true-streaming architecture. + /// These tests exercise the full pipeline: LambdaBootstrap → ResponseStreamFactory → + /// ResponseStream → StreamingHttpContent → captured HTTP output stream. + /// + [Collection("ResponseStreamFactory")] + public class StreamingIntegrationTests : IDisposable + { + public void Dispose() + { + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + } + + // ─── Helpers ──────────────────────────────────────────────────────────────── + + private static Dictionary> MakeHeaders(string requestId = "test-request-id") + => new Dictionary> + { + { RuntimeApiHeaders.HeaderAwsRequestId, new List { requestId } }, + { RuntimeApiHeaders.HeaderInvokedFunctionArn, new List { "arn:aws:lambda:us-east-1:123456789012:function:test" } }, + { RuntimeApiHeaders.HeaderAwsTenantId, new List { "tenant-id" } }, + { RuntimeApiHeaders.HeaderTraceId, new List { "trace-id" } }, + { RuntimeApiHeaders.HeaderDeadlineMs, new List { "9999999999999" } }, + }; + + /// + /// A capturing RuntimeApiClient that records the raw bytes written to the HTTP output stream + /// by SerializeToStreamAsync, enabling assertions on chunked-encoding format. + /// + private class CapturingStreamingRuntimeApiClient : RuntimeApiClient, IRuntimeApiClient + { + private readonly IEnvironmentVariables _envVars; + private readonly Dictionary> _headers; + + public bool StartStreamingCalled { get; private set; } + public bool SendResponseCalled { get; private set; } + public bool ReportInvocationErrorCalled { get; private set; } + public byte[] CapturedHttpBytes { get; private set; } + public LambdaResponseStream LastResponseStream { get; private set; } + public Stream LastBufferedOutputStream { get; private set; } + + public new Amazon.Lambda.RuntimeSupport.Helpers.IConsoleLoggerWriter ConsoleLogger { get; } = new Helpers.LogLevelLoggerWriter(new SystemEnvironmentVariables()); + + public CapturingStreamingRuntimeApiClient( + IEnvironmentVariables envVars, + Dictionary> headers) + : base(envVars, new NoOpInternalRuntimeApiClient()) + { + _envVars = envVars; + _headers = headers; + } + + public new async Task GetNextInvocationAsync(CancellationToken cancellationToken = default) + { + _headers[RuntimeApiHeaders.HeaderTraceId] = new List { Guid.NewGuid().ToString() }; + var inputStream = new MemoryStream(new byte[0]); + return new InvocationRequest + { + InputStream = inputStream, + LambdaContext = new LambdaContext( + new RuntimeApiHeaders(_headers), + new LambdaEnvironment(_envVars), + new TestDateTimeHelper(), + new Helpers.SimpleLoggerWriter(_envVars)) + }; + } + + internal override async Task StartStreamingResponseAsync( + string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default) + { + StartStreamingCalled = true; + LastResponseStream = responseStream; + + // Use a real MemoryStream as the HTTP output stream so we capture actual bytes + var captureStream = new MemoryStream(); + var content = new StreamingHttpContent(responseStream); + + // SerializeToStreamAsync hands the stream to ResponseStream and waits for completion + await content.CopyToAsync(captureStream); + CapturedHttpBytes = captureStream.ToArray(); + } + + public new async Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default) + { + SendResponseCalled = true; + if (outputStream != null) + { + var ms = new MemoryStream(); + await outputStream.CopyToAsync(ms); + ms.Position = 0; + LastBufferedOutputStream = ms; + } + } + + public new Task ReportInvocationErrorAsync(string awsRequestId, Exception exception, CancellationToken cancellationToken = default) + { + ReportInvocationErrorCalled = true; + return Task.CompletedTask; + } + + public new Task ReportInitializationErrorAsync(Exception exception, string errorType = null, CancellationToken cancellationToken = default) + => Task.CompletedTask; + + public new Task ReportInitializationErrorAsync(string errorType, CancellationToken cancellationToken = default) + => Task.CompletedTask; + +#if NET8_0_OR_GREATER + public new Task RestoreNextInvocationAsync(CancellationToken cancellationToken = default) => Task.CompletedTask; + public new Task ReportRestoreErrorAsync(Exception exception, string errorType = null, CancellationToken cancellationToken = default) => Task.CompletedTask; +#endif + } + + private static CapturingStreamingRuntimeApiClient CreateClient(string requestId = "test-request-id") + => new CapturingStreamingRuntimeApiClient(new TestEnvironmentVariables(), MakeHeaders(requestId)); + + // ─── 10.1 End-to-end streaming response ───────────────────────────────────── + + /// + /// End-to-end: handler calls CreateStream, writes multiple chunks. + /// Verifies data flows through with correct chunked encoding and stream is finalized. + /// Requirements: 3.2, 4.3, 10.1 + /// + [Fact] + public async Task Streaming_MultipleChunks_FlowThroughWithChunkedEncoding() + { + var client = CreateClient(); + var chunks = new[] { "Hello", ", ", "World" }; + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + foreach (var chunk in chunks) + await stream.WriteAsync(Encoding.UTF8.GetBytes(chunk)); + return new InvocationResponse(Stream.Null, false); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.StartStreamingCalled); + Assert.NotNull(client.CapturedHttpBytes); + + var output = Encoding.UTF8.GetString(client.CapturedHttpBytes); + + // Each chunk should appear as: hex-size\r\ndata\r\n + Assert.Contains("5\r\nHello\r\n", output); + Assert.Contains("2\r\n, \r\n", output); + Assert.Contains("5\r\nWorld\r\n", output); + + // Final chunk terminates the stream + Assert.Contains("0\r\n", output); + Assert.EndsWith("0\r\n\r\n", output); + } + + /// + /// End-to-end: all data is transmitted correctly (content round-trip). + /// Requirements: 3.2, 4.3, 10.1 + /// + [Fact] + public async Task Streaming_AllDataTransmitted_ContentRoundTrip() + { + var client = CreateClient(); + var payload = Encoding.UTF8.GetBytes("integration test payload"); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(payload); + return new InvocationResponse(Stream.Null, false); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + var output = client.CapturedHttpBytes; + Assert.NotNull(output); + + // Decode the single chunk: hex-size\r\ndata\r\n + var outputStr = Encoding.UTF8.GetString(output); + var hexSize = payload.Length.ToString("X"); + Assert.Contains(hexSize + "\r\n", outputStr); + Assert.Contains("integration test payload", outputStr); + } + + /// + /// End-to-end: stream is finalized (final chunk written, BytesWritten matches). + /// Requirements: 3.2, 4.3, 10.1 + /// + [Fact] + public async Task Streaming_StreamFinalized_BytesWrittenMatchesPayload() + { + var client = CreateClient(); + var data = Encoding.UTF8.GetBytes("finalization check"); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(data); + return new InvocationResponse(Stream.Null, false); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.NotNull(client.LastResponseStream); + Assert.Equal(data.Length, client.LastResponseStream.BytesWritten); + Assert.True(client.LastResponseStream.IsCompleted); + } + + // ─── 10.2 End-to-end buffered response ────────────────────────────────────── + + /// + /// End-to-end: handler does NOT call CreateStream — response goes via buffered path. + /// Verifies SendResponseAsync is called and streaming headers are absent. + /// Requirements: 1.5, 4.6, 9.4 + /// + [Fact] + public async Task Buffered_HandlerDoesNotCallCreateStream_UsesSendResponsePath() + { + var client = CreateClient(); + var responseBody = Encoding.UTF8.GetBytes("buffered response body"); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + return new InvocationResponse(new MemoryStream(responseBody)); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.False(client.StartStreamingCalled, "StartStreamingResponseAsync should NOT be called for buffered mode"); + Assert.True(client.SendResponseCalled, "SendResponseAsync should be called for buffered mode"); + Assert.Null(client.CapturedHttpBytes); + } + + /// + /// End-to-end: buffered response body is transmitted correctly. + /// Requirements: 1.5, 4.6, 9.4 + /// + [Fact] + public async Task Buffered_ResponseBodyTransmittedCorrectly() + { + var client = CreateClient(); + var responseBody = Encoding.UTF8.GetBytes("hello buffered world"); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + return new InvocationResponse(new MemoryStream(responseBody)); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.SendResponseCalled); + Assert.NotNull(client.LastBufferedOutputStream); + var received = new MemoryStream(); + await client.LastBufferedOutputStream.CopyToAsync(received); + Assert.Equal(responseBody, received.ToArray()); + } + + // ─── 10.3 Midstream error ──────────────────────────────────────────────────── + + /// + /// End-to-end: handler writes data then throws — error trailers appear after final chunk. + /// Requirements: 5.1, 5.2, 5.3, 5.6 + /// + [Fact] + public async Task MidstreamError_ErrorTrailersIncludedAfterFinalChunk() + { + var client = CreateClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(Encoding.UTF8.GetBytes("partial data")); + throw new InvalidOperationException("midstream failure"); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.StartStreamingCalled); + Assert.NotNull(client.CapturedHttpBytes); + + var output = Encoding.UTF8.GetString(client.CapturedHttpBytes); + + // Data chunk should be present + Assert.Contains("partial data", output); + + // Final chunk must appear + Assert.Contains("0\r\n", output); + + // Error trailers must appear after the final chunk + var finalChunkIdx = output.LastIndexOf("0\r\n"); + var errorTypeIdx = output.IndexOf(StreamingConstants.ErrorTypeTrailer + ":"); + var errorBodyIdx = output.IndexOf(StreamingConstants.ErrorBodyTrailer + ":"); + + Assert.True(errorTypeIdx > finalChunkIdx, "Error-Type trailer should appear after final chunk"); + Assert.True(errorBodyIdx > finalChunkIdx, "Error-Body trailer should appear after final chunk"); + + // Error type should reference the exception type + Assert.Contains("InvalidOperationException", output); + + // Standard error reporting should NOT be used (error went via trailers) + Assert.False(client.ReportInvocationErrorCalled); + } + + /// + /// End-to-end: handler throws before writing any data — standard error reporting is used. + /// Requirements: 5.6, 5.7 + /// + [Fact] + public async Task PreStreamError_ExceptionBeforeAnyWrite_UsesStandardErrorReporting() + { + var client = CreateClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + // Throw before writing anything + throw new ArgumentException("pre-write failure"); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + // BytesWritten == 0, so standard error reporting should be used + Assert.True(client.ReportInvocationErrorCalled, + "Standard error reporting should be used when no bytes were written"); + } + + /// + /// End-to-end: error body trailer contains JSON with exception details. + /// Requirements: 5.2, 5.3 + /// + [Fact] + public async Task MidstreamError_ErrorBodyTrailerContainsJsonDetails() + { + var client = CreateClient(); + const string errorMessage = "something went wrong mid-stream"; + + LambdaBootstrapHandler handler = async (invocation) => + { + var stream = LambdaResponseStreamFactory.CreateStream(); + await stream.WriteAsync(Encoding.UTF8.GetBytes("some data")); + throw new InvalidOperationException(errorMessage); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + var output = Encoding.UTF8.GetString(client.CapturedHttpBytes); + Assert.Contains(StreamingConstants.ErrorBodyTrailer + ":", output); + Assert.Contains(errorMessage, output); + } + + // ─── 10.4 Multi-concurrency ────────────────────────────────────────────────── + + /// + /// Multi-concurrency: concurrent invocations use AsyncLocal for state isolation. + /// Each invocation independently uses streaming or buffered mode without interference. + /// Requirements: 2.9, 6.5, 8.9 + /// + [Fact] + public async Task MultiConcurrency_ConcurrentInvocations_StateIsolated() + { + const int concurrency = 3; + var results = new ConcurrentDictionary(); + var barrier = new SemaphoreSlim(0, concurrency); + var allStarted = new SemaphoreSlim(0, concurrency); + + // Simulate concurrent invocations using AsyncLocal directly + var tasks = new List(); + for (int i = 0; i < concurrency; i++) + { + var requestId = $"req-{i}"; + var payload = $"payload-{i}"; + tasks.Add(Task.Run(async () => + { + var mockClient = new MockMultiConcurrencyStreamingClient(); + LambdaResponseStreamFactory.InitializeInvocation( + requestId, + isMultiConcurrency: true, + mockClient, + CancellationToken.None); + + var stream = LambdaResponseStreamFactory.CreateStream(); + allStarted.Release(); + + // Wait until all tasks have started (to ensure true concurrency) + await barrier.WaitAsync(); + + await stream.WriteAsync(Encoding.UTF8.GetBytes(payload)); + stream.MarkCompleted(); + + // Verify this invocation's stream is still accessible + var retrieved = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true); + results[requestId] = retrieved != null ? payload : "MISSING"; + + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + })); + } + + // Wait for all tasks to start, then release the barrier + for (int i = 0; i < concurrency; i++) + await allStarted.WaitAsync(); + barrier.Release(concurrency); + + await Task.WhenAll(tasks); + + // Each invocation should have seen its own stream + Assert.Equal(concurrency, results.Count); + for (int i = 0; i < concurrency; i++) + Assert.Equal($"payload-{i}", results[$"req-{i}"]); + } + + /// + /// Multi-concurrency: streaming and buffered invocations can run concurrently without interference. + /// Requirements: 2.9, 6.5, 8.9 + /// + [Fact] + public async Task MultiConcurrency_StreamingAndBufferedMixedConcurrently_NoInterference() + { + var streamingResults = new ConcurrentBag(); + var bufferedResults = new ConcurrentBag(); + var barrier = new SemaphoreSlim(0, 4); + var allStarted = new SemaphoreSlim(0, 4); + + var tasks = new List(); + + // 2 streaming invocations + for (int i = 0; i < 2; i++) + { + var requestId = $"stream-{i}"; + tasks.Add(Task.Run(async () => + { + var mockClient = new MockMultiConcurrencyStreamingClient(); + LambdaResponseStreamFactory.InitializeInvocation( + requestId, + isMultiConcurrency: true, mockClient, CancellationToken.None); + + var stream = LambdaResponseStreamFactory.CreateStream(); + allStarted.Release(); + await barrier.WaitAsync(); + + await stream.WriteAsync(Encoding.UTF8.GetBytes("streaming data")); + stream.MarkCompleted(); + + var retrieved = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true); + streamingResults.Add(retrieved != null); + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + })); + } + + // 2 buffered invocations (no CreateStream) + for (int i = 0; i < 2; i++) + { + var requestId = $"buffered-{i}"; + tasks.Add(Task.Run(async () => + { + var mockClient = new MockMultiConcurrencyStreamingClient(); + LambdaResponseStreamFactory.InitializeInvocation( + requestId, + isMultiConcurrency: true, mockClient, CancellationToken.None); + + allStarted.Release(); + await barrier.WaitAsync(); + + // No CreateStream — buffered mode + var retrieved = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true); + bufferedResults.Add(retrieved == null); // should be null (no stream created) + LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true); + })); + } + + for (int i = 0; i < 4; i++) + await allStarted.WaitAsync(); + barrier.Release(4); + + await Task.WhenAll(tasks); + + Assert.Equal(2, streamingResults.Count); + Assert.All(streamingResults, r => Assert.True(r, "Streaming invocation should have a stream")); + + Assert.Equal(2, bufferedResults.Count); + Assert.All(bufferedResults, r => Assert.True(r, "Buffered invocation should have no stream")); + } + + /// + /// Minimal mock RuntimeApiClient for multi-concurrency tests. + /// Accepts StartStreamingResponseAsync calls without real HTTP. + /// + private class MockMultiConcurrencyStreamingClient : RuntimeApiClient + { + public MockMultiConcurrencyStreamingClient() + : base(new TestEnvironmentVariables(), new NoOpInternalRuntimeApiClient()) { } + + internal override async Task StartStreamingResponseAsync( + string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default) + { + // Provide the HTTP output stream so writes don't block + responseStream.SetHttpOutputStream(new MemoryStream()); + await responseStream.WaitForCompletionAsync(); + } + } + + // ─── 10.5 Backward compatibility ──────────────────────────────────────────── + + /// + /// Backward compatibility: existing handler signatures (event + ILambdaContext) work without modification. + /// Requirements: 9.1, 9.2, 9.3 + /// + [Fact] + public async Task BackwardCompat_ExistingHandlerSignature_WorksUnchanged() + { + var client = CreateClient(); + bool handlerCalled = false; + + // Simulate a classic handler that returns a buffered response + LambdaBootstrapHandler handler = async (invocation) => + { + handlerCalled = true; + await Task.Yield(); + return new InvocationResponse(new MemoryStream(Encoding.UTF8.GetBytes("classic response"))); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(handlerCalled); + Assert.True(client.SendResponseCalled); + Assert.False(client.StartStreamingCalled); + } + + /// + /// Backward compatibility: no regression in buffered response behavior — response body is correct. + /// Requirements: 9.4, 9.5 + /// + [Fact] + public async Task BackwardCompat_BufferedResponse_NoRegression() + { + var client = CreateClient(); + var expected = Encoding.UTF8.GetBytes("no regression here"); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + return new InvocationResponse(new MemoryStream(expected)); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.SendResponseCalled); + Assert.NotNull(client.LastBufferedOutputStream); + var received = new MemoryStream(); + await client.LastBufferedOutputStream.CopyToAsync(received); + Assert.Equal(expected, received.ToArray()); + } + + /// + /// Backward compatibility: handler that returns null OutputStream still works. + /// Requirements: 9.4 + /// + [Fact] + public async Task BackwardCompat_NullOutputStream_HandledGracefully() + { + var client = CreateClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + return new InvocationResponse(Stream.Null, false); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + + // Should not throw + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.SendResponseCalled); + } + + /// + /// Backward compatibility: handler that throws before CreateStream uses standard error path. + /// Requirements: 9.5 + /// + [Fact] + public async Task BackwardCompat_HandlerThrows_StandardErrorReportingUsed() + { + var client = CreateClient(); + + LambdaBootstrapHandler handler = async (invocation) => + { + await Task.Yield(); + throw new Exception("classic handler error"); + }; + + using var bootstrap = new LambdaBootstrap(handler, null); + bootstrap.Client = client; + await bootstrap.InvokeOnceAsync(); + + Assert.True(client.ReportInvocationErrorCalled); + Assert.False(client.StartStreamingCalled); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/NoOpInternalRuntimeApiClient.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/NoOpInternalRuntimeApiClient.cs new file mode 100644 index 000000000..9fa0434cd --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/NoOpInternalRuntimeApiClient.cs @@ -0,0 +1,60 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests.TestHelpers +{ + /// + /// A no-op implementation of IInternalRuntimeApiClient for unit tests + /// that need to construct a RuntimeApiClient without real HTTP calls. + /// + internal class NoOpInternalRuntimeApiClient : IInternalRuntimeApiClient + { + private static readonly SwaggerResponse EmptyStatusResponse = + new SwaggerResponse(200, new Dictionary>(), new StatusResponse()); + + public Task> ErrorAsync( + string lambda_Runtime_Function_Error_Type, string errorJson, CancellationToken cancellationToken) + => Task.FromResult(EmptyStatusResponse); + + public Task> NextAsync(CancellationToken cancellationToken) + => Task.FromResult(new SwaggerResponse(200, new Dictionary>(), Stream.Null)); + + public Task> ResponseAsync(string awsRequestId, Stream outputStream) + => Task.FromResult(EmptyStatusResponse); + + public Task> ResponseAsync( + string awsRequestId, Stream outputStream, CancellationToken cancellationToken) + => Task.FromResult(EmptyStatusResponse); + + public Task> ErrorWithXRayCauseAsync( + string awsRequestId, string lambda_Runtime_Function_Error_Type, + string errorJson, string xrayCause, CancellationToken cancellationToken) + => Task.FromResult(EmptyStatusResponse); + +#if NET8_0_OR_GREATER + public Task> RestoreNextAsync(CancellationToken cancellationToken) + => Task.FromResult(new SwaggerResponse(200, new Dictionary>(), Stream.Null)); + + public Task> RestoreErrorAsync( + string lambda_Runtime_Function_Error_Type, string errorJson, CancellationToken cancellationToken) + => Task.FromResult(EmptyStatusResponse); +#endif + } +} diff --git a/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestStreamingRuntimeApiClient.cs b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestStreamingRuntimeApiClient.cs new file mode 100644 index 000000000..da68d2940 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestStreamingRuntimeApiClient.cs @@ -0,0 +1,131 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +using Amazon.Lambda.RuntimeSupport.Helpers; +using Amazon.Lambda.RuntimeSupport.UnitTests.TestHelpers; +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Amazon.Lambda.RuntimeSupport.UnitTests +{ + /// + /// A RuntimeApiClient subclass for testing LambdaBootstrap streaming integration. + /// Extends RuntimeApiClient so the (RuntimeApiClient)Client cast in LambdaBootstrap works. + /// Overrides StartStreamingResponseAsync to avoid real HTTP calls. + /// + internal class TestStreamingRuntimeApiClient : RuntimeApiClient, IRuntimeApiClient + { + private readonly IEnvironmentVariables _environmentVariables; + private readonly Dictionary> _headers; + + public new IConsoleLoggerWriter ConsoleLogger { get; } = new LogLevelLoggerWriter(new SystemEnvironmentVariables()); + + public TestStreamingRuntimeApiClient(IEnvironmentVariables environmentVariables, Dictionary> headers) + : base(environmentVariables, new NoOpInternalRuntimeApiClient()) + { + _environmentVariables = environmentVariables; + _headers = headers; + } + + // Tracking flags + public bool GetNextInvocationAsyncCalled { get; private set; } + public bool ReportInitializationErrorAsyncExceptionCalled { get; private set; } + public bool ReportInvocationErrorAsyncExceptionCalled { get; private set; } + public bool SendResponseAsyncCalled { get; private set; } + public bool StartStreamingResponseAsyncCalled { get; private set; } + + public string LastTraceId { get; private set; } + public byte[] FunctionInput { get; set; } + public Stream LastOutputStream { get; private set; } + public Exception LastRecordedException { get; private set; } + public LambdaResponseStream LastStreamingResponseStream { get; private set; } + + public new async Task GetNextInvocationAsync(CancellationToken cancellationToken = default) + { + GetNextInvocationAsyncCalled = true; + + LastTraceId = Guid.NewGuid().ToString(); + _headers[RuntimeApiHeaders.HeaderTraceId] = new List() { LastTraceId }; + + var inputStream = new MemoryStream(FunctionInput == null ? new byte[0] : FunctionInput); + inputStream.Position = 0; + + return new InvocationRequest() + { + InputStream = inputStream, + LambdaContext = new LambdaContext( + new RuntimeApiHeaders(_headers), + new LambdaEnvironment(_environmentVariables), + new TestDateTimeHelper(), new SimpleLoggerWriter(_environmentVariables)) + }; + } + + public new Task ReportInitializationErrorAsync(Exception exception, String errorType = null, CancellationToken cancellationToken = default) + { + LastRecordedException = exception; + ReportInitializationErrorAsyncExceptionCalled = true; + return Task.CompletedTask; + } + + public new Task ReportInitializationErrorAsync(string errorType, CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + + public new Task ReportInvocationErrorAsync(string awsRequestId, Exception exception, CancellationToken cancellationToken = default) + { + LastRecordedException = exception; + ReportInvocationErrorAsyncExceptionCalled = true; + return Task.CompletedTask; + } + + public new async Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default) + { + if (outputStream != null) + { + LastOutputStream = new MemoryStream((int)outputStream.Length); + outputStream.CopyTo(LastOutputStream); + LastOutputStream.Position = 0; + } + + SendResponseAsyncCalled = true; + } + + internal override async Task StartStreamingResponseAsync( + string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default) + { + StartStreamingResponseAsyncCalled = true; + LastStreamingResponseStream = responseStream; + + // Simulate the HTTP stream being available + responseStream.SetHttpOutputStream(new MemoryStream()); + + // Wait for the handler to finish writing (mirrors real SerializeToStreamAsync behavior) + await responseStream.WaitForCompletionAsync(); + } + +#if NET8_0_OR_GREATER + public new Task RestoreNextInvocationAsync(CancellationToken cancellationToken = default) + => Task.CompletedTask; + + public new Task ReportRestoreErrorAsync(Exception exception, String errorType = null, CancellationToken cancellationToken = default) + => Task.CompletedTask; +#endif + } +}