Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Minor",
"ChangelogMessages": [
"Add response streaming support"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
_logger.LogInformation("Starting InvokeOnceAsync");

var invocation = await Client.GetNextInvocationAsync(cancellationToken);
var isMultiConcurrency = Utils.IsUsingMultiConcurrency(_environmentVariables);

Func<Task> processingFunc = async () =>
{
Expand All @@ -358,6 +359,17 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
SetInvocationTraceId(impl.RuntimeApiHeaders.TraceId);
}

// Initialize ResponseStreamFactory — includes RuntimeApiClient reference
var runtimeApiClient = Client as RuntimeApiClient;
if (runtimeApiClient != null)
{
LambdaResponseStreamFactory.InitializeInvocation(
invocation.LambdaContext.AwsRequestId,
isMultiConcurrency,
runtimeApiClient,
cancellationToken);
}

try
{
InvocationResponse response = null;
Expand All @@ -372,15 +384,39 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
catch (Exception exception)
{
WriteUnhandledExceptionToLog(exception);
await Client.ReportInvocationErrorAsync(invocation.LambdaContext.AwsRequestId, exception, cancellationToken);

var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
if (streamIfCreated != null && streamIfCreated.BytesWritten > 0)
{
// Midstream error — report via trailers on the already-open HTTP connection
await streamIfCreated.ReportErrorAsync(exception);
}
else
{
// Error before streaming started — use standard error reporting
await Client.ReportInvocationErrorAsync(invocation.LambdaContext.AwsRequestId, exception, cancellationToken);
}
}
finally
{
_logger.LogInformation("Finished invoking handler");
}

if (invokeSucceeded)
// If streaming was started, await the HTTP send task to ensure it completes
var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency);
if (sendTask != null)
{
var stream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
if (stream != null && !stream.IsCompleted && !stream.HasError)
{
// Handler returned successfully — signal stream completion
stream.MarkCompleted();
}
await sendTask; // Wait for HTTP request to finish
}
else if (invokeSucceeded)
{
// No streaming — send buffered response
_logger.LogInformation("Starting sending response");
try
{
Expand Down Expand Up @@ -415,6 +451,10 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
}
finally
{
if (runtimeApiClient != null)
{
LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency);
}
invocation.Dispose();
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.RuntimeSupport
{
/// <summary>
/// Interface for writing streaming responses in AWS Lambda functions.
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
/// </summary>
public interface ILambdaResponseStream : IDisposable
{
/// <summary>
/// Asynchronously writes a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);

/// <summary>
/// Reports an error that occurred during streaming.
/// This will send error information via HTTP trailing headers.
/// </summary>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has already been reported.</exception>
Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default);

/// <summary>
/// Gets the total number of bytes written to the stream so far.
/// </summary>
long BytesWritten { get; }

/// <summary>
/// Gets whether the stream has been completed.
/// </summary>
bool IsCompleted { get; }

/// <summary>
/// Gets whether an error has been reported.
/// </summary>
bool HasError { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.RuntimeSupport
{
/// <summary>
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
/// </summary>
public partial class LambdaResponseStream : Stream, ILambdaResponseStream
{
private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n");

private long _bytesWritten;
private bool _isCompleted;
private bool _hasError;
private Exception _reportedError;
private readonly object _lock = new object();

// The live HTTP output stream, set by StreamingHttpContent when SerializeToStreamAsync is called.
private Stream _httpOutputStream;
private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1);
private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1);

/// <summary>
/// The number of bytes written to the Lambda response stream so far.
/// </summary>
public long BytesWritten => _bytesWritten;

/// <summary>
/// Gets a value indicating whether the operation has completed.
/// </summary>
public bool IsCompleted => _isCompleted;

/// <summary>
/// Gets a value indicating whether an error has occurred.
/// </summary>
public bool HasError => _hasError;


internal Exception ReportedError => _reportedError;

internal LambdaResponseStream()
{
}

/// <summary>
/// Called by StreamingHttpContent.SerializeToStreamAsync to provide the HTTP output stream.
/// </summary>
internal void SetHttpOutputStream(Stream httpOutputStream)
{
_httpOutputStream = httpOutputStream;
_httpStreamReady.Release();
}

/// <summary>
/// Called by StreamingHttpContent.SerializeToStreamAsync to wait until the handler
/// finishes writing (MarkCompleted or ReportErrorAsync).
/// </summary>
internal async Task WaitForCompletionAsync()
{
await _completionSignal.WaitAsync();
}

/// <summary>
/// Asynchronously writes a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));

await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));

// Wait for the HTTP stream to be ready (first write only blocks)
await _httpStreamReady.WaitAsync(cancellationToken);
try
{
lock (_lock)
{
ThrowIfCompletedOrError();
_bytesWritten += count;
}

// Write chunk directly to the HTTP stream: size(hex) + CRLF + data + CRLF
var chunkSizeHex = count.ToString("X");
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
await _httpOutputStream.WriteAsync(buffer, offset, count, cancellationToken);
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
await _httpOutputStream.FlushAsync(cancellationToken);
}
finally
{
// Re-release so subsequent writes don't block
_httpStreamReady.Release();
}
}

/// <summary>
/// Reports an error that occurred during streaming.
/// This will send error information via HTTP trailing headers.
/// </summary>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has already been reported.</exception>
public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default)
{
if (exception == null)
throw new ArgumentNullException(nameof(exception));

lock (_lock)
{
if (_isCompleted)
throw new InvalidOperationException("Cannot report an error after the stream has been completed.");
if (_hasError)
throw new InvalidOperationException("An error has already been reported for this stream.");

_hasError = true;
_reportedError = exception;
}

// Signal completion so StreamingHttpContent can write error trailers and finish
_completionSignal.Release();

return Task.CompletedTask;
}

internal void MarkCompleted()
{
lock (_lock)
{
_isCompleted = true;
}
// Signal completion so StreamingHttpContent can write the final chunk and finish
_completionSignal.Release();
}

private void ThrowIfCompletedOrError()
{
if (_isCompleted)
throw new InvalidOperationException("Cannot write to a completed stream.");
if (_hasError)
throw new InvalidOperationException("Cannot write to a stream after an error has been reported.");
}

// ── Dispose ──────────────────────────────────────────────────────────

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (disposing)
{
try { _completionSignal.Release(); } catch (SemaphoreFullException) { }
}

base.Dispose(disposing);
}
}
}
Loading