diff --git a/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json new file mode 100644 index 000000000000..bf864237642e --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTbasedHTTPClient-d1be626.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT-based HTTP Client", + "contributor": "", + "description": "Fixed an issue where AwsCrtHttpClient (sync) could deadlock when a request body was sourced from an InputStream that depends on the same CRT event loop, for example when piping a GetObject ResponseInputStream into a PutObject body. The InputStream read now happens on the caller thread instead of the CRT event-loop thread." +} diff --git a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml index 05606dc6d574..446d747b4353 100644 --- a/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml +++ b/build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml @@ -348,6 +348,15 @@ + + + + + + diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 9c6d769e48fa..937e4d5dd1a2 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -21,9 +21,13 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.http.HttpException; +import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; @@ -35,6 +39,7 @@ import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor; +import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -102,6 +107,7 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { .streamManager(streamManager) .readBufferSize(this.readBufferSize) .request(request) + .connectionAcquisitionTimeoutMillis(this.connectionAcquisitionTimeout) .build(); return new CrtHttpRequest(context); } @@ -109,6 +115,7 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { private static final class CrtHttpRequest implements ExecutableHttpRequest { private final CrtRequestContext context; private volatile CompletableFuture responseFuture; + private volatile SyncRequestBodyPump pump; private CrtHttpRequest(CrtRequestContext context) { this.context = context; @@ -119,7 +126,37 @@ public HttpExecuteResponse call() throws IOException { HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder(); try { - responseFuture = new CrtRequestExecutor().execute(context); + CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context); + responseFuture = result.responseFuture(); + pump = result.pump(); + + // Wake a parked producer when CRT signals request failure via responseFuture so the + // pump's blocked acquireForFill() returns instead of holding the caller thread. + if (pump != null) { + SyncRequestBodyPump pumpRef = pump; + responseFuture.whenComplete((r, t) -> { + if (t != null) { + pumpRef.abort(); + } + }); + } + + boolean streamAcquired = waitForStreamAcquired(result.streamFuture(), + context.connectionAcquisitionTimeoutMillis()); + + if (pump != null) { + if (streamAcquired) { + try { + pump.pump(); + } catch (IOException ioe) { + responseFuture.completeExceptionally(ioe); + throw ioe; + } + } else { + pump.abort(); + } + } + SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture); builder.response(response); builder.responseBody(response.content().orElse(null)); @@ -128,13 +165,17 @@ public HttpExecuteResponse call() throws IOException { Throwable cause = e.getCause(); // Complete the future exceptionally to trigger connection cleanup in the response handler. - // Handles thread-interrupt case where joinInterruptibly throws due to - // InterruptedException. Without this, the - // Ensures that closeConnection() is invoked to prevent leaking the connection from the pool. + // Handles the thread-interrupt case where joinInterruptibly throws due to + // InterruptedException, ensuring closeConnection() is invoked to prevent leaking the + // connection from the pool. if (responseFuture != null) { responseFuture.completeExceptionally(cause != null ? cause : e); } + if (pump != null) { + pump.abort(); + } + if (cause instanceof IOException) { throw (IOException) cause; } @@ -156,6 +197,26 @@ public void abort() { if (responseFuture != null) { responseFuture.completeExceptionally(new IOException("Request was cancelled")); } + if (pump != null) { + pump.abort(); + } + } + + private boolean waitForStreamAcquired(CompletableFuture streamFuture, long timeoutMillis) { + if (streamFuture == null) { + return false; + } + // The eventual exception is propagated by the executor's streamFuture.whenComplete via + // requestFuture.completeExceptionally; here we only decide whether to run the body pump. + try { + streamFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (TimeoutException | ExecutionException e) { + return false; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java index 50689d2236d5..afe19159e3c2 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java @@ -61,6 +61,7 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { protected final long readBufferSize; protected final Protocol protocol; + protected final long connectionAcquisitionTimeout; private final Map connectionPools = new ConcurrentHashMap<>(); private final LinkedList ownedSubResources = new LinkedList<>(); private final ClientBootstrap bootstrap; @@ -70,7 +71,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { private final HttpMonitoringOptions monitoringOptions; private final long maxConnectionIdleInMilliseconds; private final int maxStreamsPerEndpoint; - private final long connectionAcquisitionTimeout; private final TlsContextOptions tlsContextOptions; private boolean isClosed = false; diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index ca9e04fa229a..b3b6c232261c 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -82,7 +82,9 @@ private void doExecute(CrtAsyncRequestContext executionContext, long finalAcquireStartTime = acquireStartTime; streamFuture.whenComplete((stream, throwable) -> { - crtResponseHandler.onAcquireStream(stream); + if (throwable == null) { + crtResponseHandler.onAcquireStream(stream); + } if (shouldPublishMetrics) { reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java index ba97cc3466e8..76771f859d0c 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java @@ -26,12 +26,14 @@ public final class CrtRequestContext { private final long readBufferSize; private final HttpStreamManager streamManager; private final MetricCollector metricCollector; + private final long connectionAcquisitionTimeoutMillis; private CrtRequestContext(Builder builder) { this.request = builder.request; this.readBufferSize = builder.readBufferSize; this.streamManager = builder.streamManager; this.metricCollector = request.metricCollector().orElse(null); + this.connectionAcquisitionTimeoutMillis = builder.connectionAcquisitionTimeoutMillis; } public static Builder builder() { @@ -54,10 +56,15 @@ public MetricCollector metricCollector() { return metricCollector; } + public long connectionAcquisitionTimeoutMillis() { + return connectionAcquisitionTimeoutMillis; + } + public static final class Builder { private HttpExecuteRequest request; private long readBufferSize; private HttpStreamManager streamManager; + private long connectionAcquisitionTimeoutMillis; private Builder() { } @@ -77,6 +84,11 @@ public Builder streamManager(HttpStreamManager streamManager) { return this; } + public Builder connectionAcquisitionTimeoutMillis(long connectionAcquisitionTimeoutMillis) { + this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis; + return this; + } + public CrtRequestContext build() { return new CrtRequestContext(this); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index a1283236dc1a..f47b59e7a868 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -20,10 +20,11 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; +import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.SyncCrtRequest; +import software.amazon.awssdk.http.crt.internal.request.SyncRequestBodyPump; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.metrics.MetricCollector; import software.amazon.awssdk.metrics.NoOpMetricCollector; @@ -31,58 +32,82 @@ @SdkInternalApi public final class CrtRequestExecutor { - public CompletableFuture execute(CrtRequestContext executionContext) { + public Result execute(CrtRequestContext executionContext) { CompletableFuture requestFuture = new CompletableFuture<>(); + MetricCollector metricCollector = executionContext.metricCollector(); + boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); + + // get acquireStartTime as early as possible for the concurrency timer, but only when metrics are + // enabled since clock_gettime() is a full sys call barrier (multiple mutexes and a hw interrupt). + long acquireStartTime = shouldPublishMetrics ? System.nanoTime() : 0; try { - doExecute(executionContext, requestFuture); + InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = + new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + SyncCrtRequest syncCrtRequest = CrtRequestAdapter.toCrtRequest(executionContext); + CompletableFuture streamFuture = + executionContext.streamManager().acquireStream(syncCrtRequest.httpRequest(), crtResponseHandler); + + // Evict the connection from the pool on failure so it is not reused. + requestFuture.whenComplete((r, t) -> { + if (t != null) { + crtResponseHandler.closeConnection(); + } + }); + + long finalAcquireStartTime = acquireStartTime; + streamFuture.whenComplete((streamBase, throwable) -> { + if (throwable != null) { + requestFuture.completeExceptionally(wrapCrtException(throwable)); + + } else { + crtResponseHandler.onAcquireStream(streamBase); + if (shouldPublishMetrics) { + reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); + } + } + }); + + return new Result(requestFuture, syncCrtRequest.pump(), streamFuture); } catch (Throwable t) { requestFuture.completeExceptionally(t); + return new Result(requestFuture, null, null); } - - return requestFuture; } - private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { - MetricCollector metricCollector = executionContext.metricCollector(); - boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); - - long acquireStartTime = 0; - - if (shouldPublishMetrics) { - // go ahead and get acquireStartTime for the concurrency timer as early as possible, - // so it's as accurate as possible, but only do it in a branch since clock_gettime() - // results in a full sys call barrier (multiple mutexes and a hw interrupt). - acquireStartTime = System.nanoTime(); + /** + * Result of {@link #execute(CrtRequestContext)}: bundles the response future with the optional + * caller-thread body pump (null when the request has no body) and the future that completes + * when the CRT stream has been acquired from the connection pool. + */ + public static final class Result { + private final CompletableFuture responseFuture; + private final SyncRequestBodyPump pump; + private final CompletableFuture streamFuture; + + Result(CompletableFuture responseFuture, + SyncRequestBodyPump pump, + CompletableFuture streamFuture) { + this.responseFuture = responseFuture; + this.pump = pump; + this.streamFuture = streamFuture; } - InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler = - new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); - - HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); - - CompletableFuture streamFuture = - executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); - - // Evict the connection from the pool on failure so it is not reused. - requestFuture.whenComplete((r, t) -> { - if (t != null) { - crtResponseHandler.closeConnection(); - } - }); - - long finalAcquireStartTime = acquireStartTime; + public CompletableFuture responseFuture() { + return responseFuture; + } - streamFuture.whenComplete((streamBase, throwable) -> { - crtResponseHandler.onAcquireStream(streamBase); - if (shouldPublishMetrics) { - reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime); - } + public SyncRequestBodyPump pump() { + return pump; + } - if (throwable != null) { - Throwable toThrow = wrapCrtException(throwable); - requestFuture.completeExceptionally(toThrow); - } - }); + /** + * Future that completes when the CRT stream has been acquired (or acquisition has failed). + * The caller blocks on this before running the body pump so per-request body buffers are + * not allocated while a request is queued on the connection pool. + */ + public CompletableFuture streamFuture() { + return streamFuture; + } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java new file mode 100644 index 000000000000..6fc436d5b896 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java @@ -0,0 +1,269 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.utils.Logger; + +/** + * Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer). + * + *

The producer reads from the customer's {@code InputStream} into heap {@link ByteBuffer}s acquired + * via {@link #acquireForFill()}, then {@link #publish(ByteBuffer) publishes} them into a bounded + * {@link ArrayBlockingQueue}. The consumer drains those buffers via {@link #pollDrain(ByteBuffer)}, + * which is non-blocking: if no data is ready the consumer returns 0 bytes and CRT reschedules itself + * via {@code aws_channel_schedule_task_now}. + * + *

Drained buffers are returned to a free {@link ArrayDeque} (LIFO for cache hotness) guarded by a + * private monitor. The producer parks on this monitor when the free deque is empty, providing back-pressure. + * + *

Buffers are heap-allocated lazily on the producer's first {@link #acquireForFill()}, not in the + * constructor. This keeps per-request heap minimal while a request is queued on the CRT connection + * pool waiting for a stream: the pipe object exists but its backing buffers do not. + * + *

Buffer modes follow standard NIO conventions: {@link #acquireForFill()} returns a buffer in + * write mode (cleared); the producer fills it, calls {@code flip()}, and {@link #publish(ByteBuffer) + * publishes}. The consumer drains in read mode and {@code clear()}s during recycle. + * + *

State machine: {@code OPEN -> {EOF | ERROR | ABORTED}}. Transitions are one-way. + */ +@SdkInternalApi +final class BodyChunkPipe { + private static final Logger LOG = Logger.loggerFor(BodyChunkPipe.class); + + enum State { + OPEN, + EOF, + ERROR, + ABORTED + } + + /** + * Defense-in-depth wait timeout for {@link #acquireForFill()}. Even if a code path forgets + * to call {@link #abort()}, a parked producer wakes every {@value} ms to re-check state. + * Spurious wakeups are harmless. + */ + private static final long ACQUIRE_WAIT_TIMEOUT_MS = 50L; + + private final int depth; + private final int chunkSize; + private final ArrayBlockingQueue ready; + private final Deque free; + private final AtomicReference state = new AtomicReference<>(State.OPEN); + /** + * Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private + * so external code cannot synchronize on the pipe instance and stall the producer. + */ + private final Object freeLock = new Object(); + + private int allocated; + private volatile Throwable error; + private ByteBuffer pendingDrain; + + BodyChunkPipe(int depth, int chunkSize) { + if (depth < 1) { + throw new IllegalArgumentException("depth must be >= 1"); + } + if (chunkSize < 1) { + throw new IllegalArgumentException("chunkSize must be >= 1"); + } + this.depth = depth; + this.chunkSize = chunkSize; + this.ready = new ArrayBlockingQueue<>(depth); + this.free = new ArrayDeque<>(depth); + } + + /** + * Producer side: acquire a buffer in write mode (position=0, limit=capacity). Blocks if all + * buffers are currently in flight. Returns {@code null} only if the pipe was aborted while the + * producer was waiting. + * + *

Allocates the buffer on first use up to the configured depth. This keeps the per-request + * footprint minimal until the producer actually starts pumping (i.e., until after the CRT stream + * has been acquired). + */ + ByteBuffer acquireForFill() throws InterruptedException { + synchronized (freeLock) { + while (true) { + State s = state.get(); + if (s == State.ABORTED || s == State.ERROR) { + LOG.debug(() -> "acquireForFill returning null, state=" + s); + return null; + } + ByteBuffer bb = free.pollFirst(); + if (bb != null) { + return bb; + } + if (allocated < depth) { + allocated++; + return ByteBuffer.allocate(chunkSize); + } + freeLock.wait(ACQUIRE_WAIT_TIMEOUT_MS); + } + } + } + + /** + * Producer side: publish a filled buffer to the consumer. Caller must have called + * {@link ByteBuffer#flip()} so the buffer is in read mode (position=0, limit=N). + * + *

If the buffer has no remaining bytes (zero-length read), it is recycled back to the free + * deque rather than pushed to the ready queue: an empty buffer would otherwise be leaked from + * the bounded pool, and the consumer would interpret it as a no-op anyway. + */ + void publish(ByteBuffer chunk) throws InterruptedException { + if (!chunk.hasRemaining()) { + recycle(chunk); + return; + } + // ready.put() blocks if the queue is full, but the queue capacity == pool size, + // so this can only block briefly while the consumer drains. + ready.put(chunk); + } + + /** + * Producer side: signal end-of-stream. Idempotent. + */ + void signalEof() { + if (state.compareAndSet(State.OPEN, State.EOF)) { + LOG.debug(() -> "state OPEN -> EOF"); + } + } + + /** + * Producer side: signal a fatal producer-side error. Idempotent. + */ + void signalError(Throwable t) { + synchronized (freeLock) { + // Publish the cause BEFORE flipping state so a consumer's lock-free read in pollDrain + // never observes state==ERROR with error==null. The volatile write to `error` is + // harmless if the CAS later loses (idempotent signal). + error = t; + if (state.compareAndSet(State.OPEN, State.ERROR)) { + LOG.debug(() -> "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")"); + } + freeLock.notifyAll(); + } + } + + /** + * External-cancel: clear ready queue, flip state, wake producer. + */ + void abort() { + synchronized (freeLock) { + if (state.compareAndSet(State.OPEN, State.ABORTED)) { + LOG.debug(() -> "state OPEN -> ABORTED"); + ready.clear(); + } + freeLock.notifyAll(); + } + } + + /** + * Consumer side: drain bytes into {@code dst}. NEVER blocks. + * + *

Single-consumer only. {@code pendingDrain} is non-volatile, so this method MUST be + * invoked from a single thread. CRT honors that by scheduling the outgoing-stream task serially + * on one event-loop thread per stream. Concurrent invocation will silently corrupt body delivery. + * + * @return number of bytes drained, or {@code -1} on EOF with no remaining data. + * @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data. + */ + int pollDrain(ByteBuffer dst) { + int totalBytesConsumed = 0; + while (dst.hasRemaining()) { + if (pendingDrain == null) { + pendingDrain = ready.poll(); + } + if (pendingDrain == null) { + State s = state.get(); + if (s == State.OPEN) { + return totalBytesConsumed; + } + // JMM happens-before: the producer's program order is publish() (ready.put) THEN + // signalEof/Error/abort (volatile state CAS). Once we observe the volatile state + // transition here, the producer's prior ready.put is guaranteed visible to a + // subsequent poll on this thread. Re-poll once to drain any chunk that landed in + // ready before the producer's terminal CAS. + pendingDrain = ready.poll(); + if (pendingDrain != null) { + continue; + } + switch (s) { + case ERROR: + throw new RuntimeException("Producer failed", error); + case ABORTED: + throw new RuntimeException("Request body stream was aborted"); + case EOF: + return totalBytesConsumed > 0 ? totalBytesConsumed : -1; + default: + return totalBytesConsumed; + } + } + int n = Math.min(dst.remaining(), pendingDrain.remaining()); + // Cap source's limit so dst.put(src) only consumes n bytes (avoids BufferOverflowException + // when pendingDrain.remaining() > dst.remaining()). + int srcOrigLimit = pendingDrain.limit(); + pendingDrain.limit(pendingDrain.position() + n); + dst.put(pendingDrain); + pendingDrain.limit(srcOrigLimit); + totalBytesConsumed += n; + if (!pendingDrain.hasRemaining()) { + // Buffer fully copied into dst, return it to the free deque (and notify the producer + // in case it was waiting). This is what bounds the pool: buffers only re-enter the + // producer pool after the consumer has drained them. + ByteBuffer drained = pendingDrain; + pendingDrain = null; + recycle(drained); + } + } + return totalBytesConsumed; + } + + /** + * Visible-for-test / test-only helper: current pipe state. + */ + @SdkTestInternalApi + State state() { + return state.get(); + } + + /** + * Visible-for-test / test-only helper: number of buffers minted so far. The pipe lazily allocates + * buffers on the producer's first {@link #acquireForFill()}, so this is 0 until the producer + * starts pumping and grows up to {@code depth}. + */ + @SdkTestInternalApi + int allocatedForTest() { + synchronized (freeLock) { + return allocated; + } + } + + private void recycle(ByteBuffer bb) { + synchronized (freeLock) { + bb.clear(); + free.push(bb); + freeLock.notifyAll(); + } + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 8672d80b0d1b..4a3991c5d44a 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.Protocol; @@ -33,6 +34,16 @@ @SdkInternalApi public final class CrtRequestAdapter { + /** + * Per-chunk size used by the sync request-body pipe. + */ + private static final int CHUNK_SIZE = 128 * 1024; + + /** + * Number of in-flight chunks the pipe holds. + */ + private static final int PIPE_DEPTH = 4; + private CrtRequestAdapter() { } @@ -60,7 +71,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request) crtRequestBodyAdapter); } - public static HttpRequest toCrtRequest(CrtRequestContext request) { + /** + * Build the CRT request for the sync path. When the SDK request has a body, this also constructs the + * {@link BodyChunkPipe} and a {@link SyncRequestBodyPump}; the caller thread is expected to drive + * the pump after the stream is activated. + */ + public static SyncCrtRequest toCrtRequest(CrtRequestContext request) { HttpExecuteRequest sdkExecuteRequest = request.sdkRequest(); SdkHttpRequest sdkRequest = sdkExecuteRequest.httpRequest(); @@ -78,14 +94,39 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); String finalEncodedPath = encodedPath + encodedQueryString; - return sdkExecuteRequest.contentStreamProvider() - .map(provider -> new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, - new CrtRequestInputStreamAdapter(provider))) - .orElse(new HttpRequest(method, - finalEncodedPath, - crtHeaderArray, null)); + + Optional providerOpt = sdkExecuteRequest.contentStreamProvider(); + if (!providerOpt.isPresent()) { + return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null); + } + + BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE); + PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe); + SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe); + HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream); + return new SyncCrtRequest(crtRequest, pump); + } + + /** + * Holder returned from {@link #toCrtRequest(CrtRequestContext)} bundling the CRT-side request and the + * caller-thread producer pump (null when the SDK request has no body). + */ + public static final class SyncCrtRequest { + private final HttpRequest httpRequest; + private final SyncRequestBodyPump pump; + + SyncCrtRequest(HttpRequest httpRequest, SyncRequestBodyPump pump) { + this.httpRequest = httpRequest; + this.pump = pump; + } + + public HttpRequest httpRequest() { + return httpRequest; + } + + public SyncRequestBodyPump pump() { + return pump; + } } private static HttpHeader[] asArray(List crtHeaderList) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java deleted file mode 100644 index 68f418b9e1df..000000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import static java.lang.Math.min; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.ContentStreamProvider; - -@SdkInternalApi -final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream { - private static final int READ_BUFFER_SIZE = 16 * 1024; - - private final ContentStreamProvider provider; - private volatile InputStream providerStream; - private final byte[] readBuffer = new byte[READ_BUFFER_SIZE]; - - CrtRequestInputStreamAdapter(ContentStreamProvider provider) { - this.provider = provider; - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - int read; - - try { - if (providerStream == null) { - createNewStream(); - } - - int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining()); - read = providerStream.read(readBuffer, 0, toRead); - - if (read > 0) { - bodyBytesOut.put(readBuffer, 0, read); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return read < 0; - } - - @Override - public boolean resetPosition() { - try { - createNewStream(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - return true; - } - - private void createNewStream() throws IOException { - if (providerStream != null) { - providerStream.close(); - } - providerStream = provider.newStream(); - } -} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java new file mode 100644 index 000000000000..a2dd78eea0c4 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java @@ -0,0 +1,49 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.nio.ByteBuffer; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; + +/** + * A {@link HttpRequestBodyStream} adapter whose {@link #sendRequestBody(ByteBuffer)} drains bytes from a + * {@link BodyChunkPipe} that is fed by the caller thread. The pull callback NEVER blocks: if no data is ready, + * it returns 0 bytes and CRT reschedules the outgoing-stream task via {@code aws_channel_schedule_task_now}, + * allowing other event-loop tasks (such as a concurrent GET response delivery) to run before the retry. + */ +@SdkInternalApi +final class PipeBackedRequestBodyStream implements HttpRequestBodyStream { + + private final BodyChunkPipe pipe; + + PipeBackedRequestBodyStream(BodyChunkPipe pipe) { + this.pipe = pipe; + } + + @Override + public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + int drained = pipe.pollDrain(bodyBytesOut); + return drained < 0; + } + + @Override + public boolean resetPosition() { + // The SDK retry layer (RetryableStage) handles request-level retries by calling prepareRequest() again, + // CRT does not currently exercise resetPosition for HTTP/1.1, so opting out is safe in practice. + return false; + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java new file mode 100644 index 000000000000..fa237045ede2 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java @@ -0,0 +1,80 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.http.ContentStreamProvider; + +/** + * Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a + * {@link BodyChunkPipe}. Runs on the caller (sync) thread between stream activation and + * {@code responseFuture.join()}, ensuring the blocking {@code read()} happens off the CRT event loop. + */ +@SdkInternalApi +public final class SyncRequestBodyPump { + + private final ContentStreamProvider contentStreamProvider; + private final BodyChunkPipe pipe; + + SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) { + this.contentStreamProvider = contentStreamProvider; + this.pipe = pipe; + } + + /** + * Pump the entire input stream into the pipe. Runs on the caller thread; never invoked on the CRT + * event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows. + */ + public void pump() throws IOException { + try (InputStream in = contentStreamProvider.newStream()) { + while (true) { + ByteBuffer chunk = pipe.acquireForFill(); + if (chunk == null) { + // pipe was aborted while we were waiting; stop without signaling EOF. + return; + } + int read; + try { + read = in.read(chunk.array(), chunk.arrayOffset() + chunk.position(), chunk.remaining()); + } catch (IOException ioe) { + pipe.signalError(ioe); + throw ioe; + } + if (read < 0) { + pipe.signalEof(); + return; + } + chunk.position(chunk.position() + read); + chunk.flip(); + pipe.publish(chunk); + } + } catch (InterruptedException ie) { + pipe.abort(); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while writing request body", ie); + } + } + + /** + * Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled). + */ + public void abort() { + pipe.abort(); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java index ce5d778f06a1..aad5103b287c 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java @@ -17,8 +17,13 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; +import static com.github.tomakehurst.wiremock.client.WireMock.put; +import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; @@ -26,13 +31,22 @@ import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL; import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit.WireMockRule; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -45,6 +59,8 @@ import software.amazon.awssdk.http.HttpMetric; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricCollector; @@ -133,6 +149,421 @@ public void abortRequest_shouldFailTheExceptionWithIOException() throws Exceptio } } + @Test + public void putRequest_withInputStreamBody_serverReceivesBody() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.create()) { + String body = "hello pull pump"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpFullRequest request = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + + HttpExecuteRequest executeRequest = HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(executeRequest).call(); + + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + verify(putRequestedFor(urlPathEqualTo("/sink")) + .withHeader("Content-Length", equalTo(Integer.toString(bodyBytes.length))) + .withRequestBody(equalToIgnoreCase(body))); + } + } + + @Test + public void inputStreamThrows_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + // Bound the pool to a single connection: if the failed request leaks its connection, the + // second call() either fails to acquire (with the explicit timeout below) or blocks until + // the test framework times out. Either manifests as a deterministic failure rather than a hang. + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + IOException expected = new IOException("simulated upstream failure"); + SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", "100") + .build(); + HttpExecuteRequest failingExecute = + HttpExecuteRequest.builder() + .request(failingRequest) + .contentStreamProvider(() -> new InputStream() { + @Override + public int read() throws IOException { + throw expected; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw expected; + } + }) + .build(); + + assertThatThrownBy(() -> client.prepareRequest(failingExecute).call()) + .isInstanceOf(IOException.class); + + // If the previous failure leaked the connection, this second call would fail to acquire + // (bounded by the connectionAcquisitionTimeout configured above) instead of hanging. + String body = "second request body"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void abortMidRequest_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello"))); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpRequest delayedRequest = createRequest(uri); + HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder() + .request(delayedRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + ExecutableHttpRequest abortable = client.prepareRequest(delayedExecute); + executorService.schedule(abortable::abort, 100, TimeUnit.MILLISECONDS); + assertThatThrownBy(abortable::call).isInstanceOf(IOException.class).hasMessageContaining("cancelled"); + + String body = "after abort"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void serverResetsConnection_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")) + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); + + byte[] bodyBytes = randomAlphabetic(64).getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest failingExecute = HttpExecuteRequest.builder() + .request(failingRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + + assertThatThrownBy(() -> client.prepareRequest(failingExecute).call()) + .isInstanceOf(IOException.class); + + stubFor(put(urlPathEqualTo("/sink2")).willReturn(aResponse().withStatus(200))); + byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink2") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(okBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(okBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void interruptDuringCall_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello"))); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpRequest delayedRequest = createRequest(uri); + HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder() + .request(delayedRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + CountDownLatch workerDone = new CountDownLatch(1); + AtomicReference workerError = new AtomicReference<>(); + ExecutorService worker = Executors.newSingleThreadExecutor(); + try { + Future inFlight = worker.submit(() -> { + try { + client.prepareRequest(delayedExecute).call(); + } catch (Throwable t) { + workerError.set(t); + } finally { + workerDone.countDown(); + } + }); + + // Give call() time to enter joinInterruptibly() before we interrupt. + Thread.sleep(100); + inFlight.cancel(true); + + assertThat(workerDone.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(workerError.get()) + .isInstanceOf(IOException.class) + .hasMessageContaining("cancelled"); + } finally { + worker.shutdownNow(); + worker.awaitTermination(5, TimeUnit.SECONDS); + } + + // If the interrupt leaked the connection, this second call() would block on acquire and fail + // when connectionAcquisitionTimeout (10s above) elapses. + String body = "after-interrupt"; + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)) + .build(); + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + @Test + public void acquireTimeoutThenHolderCancelled_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(2)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(60_000).withBody("hello"))); + + SdkHttpRequest holderRequest = createRequest(uri); + HttpExecuteRequest holderExecute = HttpExecuteRequest.builder() + .request(holderRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + ExecutableHttpRequest holder = client.prepareRequest(holderExecute); + + SdkHttpRequest racerRequest = createRequest(uri); + HttpExecuteRequest racerExecute = HttpExecuteRequest.builder() + .request(racerRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + Future holderFuture = pool.submit(holder::call); + // Give the holder time to acquire the only slot before the racer tries. + Thread.sleep(500); + + Future racerFuture = pool.submit(() -> client.prepareRequest(racerExecute).call()); + // CRT surfaces the acquire-timeout as HttpException; CrtHttpRequest.call() rethrows + // it directly (does not wrap in IOException). + assertThatThrownBy(() -> racerFuture.get(5, TimeUnit.SECONDS)) + .hasMessageContaining("acquire"); + + // Release the slot via the same closeConnection path the other leak tests exercise. + holder.abort(); + assertThatThrownBy(() -> holderFuture.get(5, TimeUnit.SECONDS)) + .hasCauseInstanceOf(IOException.class); + } finally { + pool.shutdownNow(); + pool.awaitTermination(5, TimeUnit.SECONDS); + } + + // If the slot didn't reclaim, this third call() blocks on acquire and fails when the + // 2s connectionAcquisitionTimeout above elapses. + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(okBytes.length)) + .build(); + HttpExecuteRequest okExecute = HttpExecuteRequest.builder() + .request(okRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(okBytes)) + .build(); + + HttpExecuteResponse response = client.prepareRequest(okExecute).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + } + } + + /** + * Regression test for the deadlock the pull-pump fix addresses. On master, the request body's + * {@code InputStream.read(...)} ran on the CRT event-loop thread (via the body callback), which + * meant a body sourced from a {@code GET}'s {@code ResponseInputStream} on the same event loop + * could deadlock: the GET held the event loop while the PUT body waited for it. + * + *

Pull-pump moves the read to the caller (sync) thread. This test verifies that load-bearing + * claim by recording the thread that performs the body read and asserting it is the caller + * thread - not a CRT event-loop thread. Failure of either the assertion or the test timeout + * (a hang) is the deadlock signal. + */ + @Test + public void putBodyReadHappensOnCallerThread_notOnCrtEventLoop() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(10)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + byte[] bodyBytes = "body-on-caller".getBytes(StandardCharsets.UTF_8); + AtomicReference readThreadName = new AtomicReference<>(); + SdkHttpFullRequest request = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(bodyBytes.length)) + .build(); + HttpExecuteRequest executeRequest = + HttpExecuteRequest.builder() + .request(request) + .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes) { + @Override + public synchronized int read(byte[] b, int off, int len) { + readThreadName.compareAndSet(null, Thread.currentThread().getName()); + return super.read(b, off, len); + } + }) + .build(); + + String callerThreadName = Thread.currentThread().getName(); + HttpExecuteResponse response = client.prepareRequest(executeRequest).call(); + assertThat(response.httpResponse().statusCode()).isEqualTo(200); + + String observed = readThreadName.get(); + assertThat(observed) + .as("body read should happen on the caller thread, not the CRT event loop") + .isNotNull() + .isEqualTo(callerThreadName) + .doesNotContainIgnoringCase("AwsEventLoop") + .doesNotContainIgnoringCase("aws-event-loop"); + } + } + + /** + * Stress companion to {@link #putBodyReadHappensOnCallerThread_notOnCrtEventLoop}. Issues a + * delayed GET (response delayed server-side) and a PUT in parallel through the same + * {@code maxConcurrency(1)} client. On master, sequencing them through a single connection + * with the body read tied to the event-loop thread could deadlock; here both calls must + * complete within the test timeout. + */ + @Test + public void getInFlight_concurrentPut_bothComplete() throws Exception { + try (SdkHttpClient client = AwsCrtHttpClient.builder() + .maxConcurrency(1) + .connectionAcquisitionTimeout(Duration.ofSeconds(15)) + .build()) { + URI uri = URI.create("http://localhost:" + mockServer.port()); + stubFor(any(urlPathEqualTo("/slow")) + .willReturn(aResponse().withFixedDelay(2_000).withBody("hello"))); + stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200))); + + SdkHttpRequest getRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.GET) + .encodedPath("/slow") + .putHeader("Host", uri.getHost()) + .build(); + HttpExecuteRequest getExecute = HttpExecuteRequest.builder() + .request(getRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])) + .build(); + + byte[] putBytes = "put-body".getBytes(StandardCharsets.UTF_8); + SdkHttpFullRequest putRequest = SdkHttpFullRequest.builder() + .uri(uri) + .method(SdkHttpMethod.PUT) + .encodedPath("/sink") + .putHeader("Host", uri.getHost()) + .putHeader("Content-Length", Integer.toString(putBytes.length)) + .build(); + HttpExecuteRequest putExecute = HttpExecuteRequest.builder() + .request(putRequest) + .contentStreamProvider(() -> new ByteArrayInputStream(putBytes)) + .build(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + Callable getTask = () -> client.prepareRequest(getExecute).call(); + Callable putTask = () -> client.prepareRequest(putExecute).call(); + Future getFuture = pool.submit(getTask); + Future putFuture = pool.submit(putTask); + + HttpExecuteResponse getResponse = getFuture.get(15, TimeUnit.SECONDS); + HttpExecuteResponse putResponse = putFuture.get(15, TimeUnit.SECONDS); + assertThat(getResponse.httpResponse().statusCode()).isEqualTo(200); + assertThat(putResponse.httpResponse().statusCode()).isEqualTo(200); + } finally { + pool.shutdownNow(); + pool.awaitTermination(5, TimeUnit.SECONDS); + } + } + } + /** * Make a simple request and wait for it to finish. * diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 456000ac1150..8b916ba1ee98 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() { .request(HttpExecuteRequest.builder().build()) .build(); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() { .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable) Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } @@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) .thenReturn(completableFuture); - CompletableFuture executeFuture = requestExecutor.execute(context); + CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture(); assertThatThrownBy(executeFuture::join).hasCause(exception); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java new file mode 100644 index 000000000000..99097487a29b --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java @@ -0,0 +1,402 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +class BodyChunkPipeTest { + + @Test + void pollDrain_emptyOpenPipe_returnsZero() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer dst = ByteBuffer.allocate(8); + + int n = pipe.pollDrain(dst); + + assertThat(n).isZero(); + assertThat(dst.position()).isZero(); + } + + @Test + void pollDrain_afterEofWithEmptyQueue_returnsMinusOne() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalEof(); + + int n = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(n).isEqualTo(-1); + } + + @Test + void publish_thenDrain_consumerSeesProducerBytes() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {1, 2, 3, 4, 5}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + ByteBuffer dst = ByteBuffer.allocate(16); + + int first = pipe.pollDrain(dst); + int second = pipe.pollDrain(dst); + + assertThat(first).isEqualTo(payload.length); + assertThat(second).isEqualTo(-1); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + } + + @Test + void signalError_pollDrainThrows() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalError(new RuntimeException("boom")); + + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("boom"); + } + + @Test + void abort_emptiesReadyAndChangesState() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + bb.put(new byte[]{1, 2, 3, 4}); + bb.flip(); + pipe.publish(bb); + + pipe.abort(); + + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("aborted"); + } + + @Test + void pollDrain_signalErrorWithQueuedChunks_drainsThenThrows() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {7, 8, 9}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalError(new RuntimeException("boom")); + + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + + assertThat(drained).isEqualTo(payload.length); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("boom"); + } + + @Test + void pollDrain_signalEofWithQueuedChunks_drainsThenReturnsMinusOne() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {10, 20, 30}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(drained).isEqualTo(payload.length); + dst.flip(); + byte[] out = new byte[dst.remaining()]; + dst.get(out); + assertThat(out).containsExactly(payload); + assertThat(afterDrain).isEqualTo(-1); + } + + @Test + void abort_afterSignalEof_leavesStateAsEof() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalEof(); + + pipe.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void abort_afterSignalEofWithQueuedChunks_doesNotClearReady() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {1, 2, 3}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + + pipe.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + ByteBuffer dst = ByteBuffer.allocate(payload.length); + int drained = pipe.pollDrain(dst); + assertThat(drained).isEqualTo(payload.length); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void recycle_intoEofPipe_doesNotThrowAndDoesNotCorruptPool() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + bb.put(new byte[]{1, 2, 3, 4}); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + + ByteBuffer dst = ByteBuffer.allocate(8); + int drained = pipe.pollDrain(dst); + int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8)); + + assertThat(drained).isEqualTo(4); + assertThat(afterDrain).isEqualTo(-1); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void recycle_intoAbortedPipe_doesNotThrow() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + pipe.abort(); + + bb.flip(); + pipe.publish(bb); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + @Test + void recycle_intoErrorPipe_doesNotThrow() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + pipe.signalError(new RuntimeException("boom")); + + bb.flip(); + pipe.publish(bb); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR); + } + + @Test + void constructor_doesNotAllocateChunks() { + BodyChunkPipe pipe = new BodyChunkPipe(4, 16); + + assertThat(pipe.allocatedForTest()).isZero(); + } + + @Test + void acquireForFill_firstCall_allocatesOneChunk() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(4, 16); + + ByteBuffer bb = pipe.acquireForFill(); + + assertThat(bb).isNotNull(); + assertThat(bb.capacity()).isEqualTo(16); + assertThat(bb.position()).isZero(); + assertThat(bb.limit()).isEqualTo(16); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void acquireForFill_uniqueChunksUpToDepth_thenStopsAllocating() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(3, 8); + ByteBuffer c1 = pipe.acquireForFill(); + ByteBuffer c2 = pipe.acquireForFill(); + ByteBuffer c3 = pipe.acquireForFill(); + + c1.put((byte) 1); + c1.flip(); + pipe.publish(c1); + pipe.pollDrain(ByteBuffer.allocate(8)); + ByteBuffer reused = pipe.acquireForFill(); + + assertThat(c1).isNotSameAs(c2).isNotSameAs(c3); + assertThat(c2).isNotSameAs(c3); + assertThat(pipe.allocatedForTest()).isEqualTo(3); + assertThat(reused).isSameAs(c1); + } + + @Test + void acquireForFill_recycledChunkReused_noNewAllocation() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + bb.put(new byte[]{1, 2, 3}); + bb.flip(); + pipe.publish(bb); + pipe.pollDrain(ByteBuffer.allocate(8)); + + ByteBuffer reused = pipe.acquireForFill(); + + assertThat(reused).isSameAs(bb); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } + + @Test + void acquireForFill_afterAbort_returnsNull() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.abort(); + + ByteBuffer bb = pipe.acquireForFill(); + + assertThat(bb).isNull(); + } + + @Test + void acquireForFill_afterSignalError_returnsNull() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.signalError(new RuntimeException("boom")); + + ByteBuffer bb = pipe.acquireForFill(); + + assertThat(bb).isNull(); + } + + @Test + void constructor_invalidDepth_throws() { + assertThatThrownBy(() -> new BodyChunkPipe(0, 8)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("depth"); + } + + @Test + void constructor_invalidChunkSize_throws() { + assertThatThrownBy(() -> new BodyChunkPipe(2, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("chunkSize"); + } + + /** + * Multi-threaded ordering test: producer races to call {@link BodyChunkPipe#signalError(Throwable)} + * while a consumer is concurrently spinning on {@link BodyChunkPipe#pollDrain(java.nio.ByteBuffer)}. + * The contract is that whenever the consumer observes the ERROR state, the cause must already + * be visible (no {@code RuntimeException("Producer failed", null)}). RepeatedTest amplifies the + * race window. With the cause published before the CAS, this should pass on every iteration. + */ + @RepeatedTest(50) + void signalError_concurrentPollDrain_consumerNeverSeesNullCause() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + IllegalStateException expected = new IllegalStateException("boom"); + CountDownLatch start = new CountDownLatch(1); + AtomicReference consumerError = new AtomicReference<>(); + AtomicReference nullCauseSighting = new AtomicReference<>(); + + Thread consumer = new Thread(() -> { + try { + start.await(); + ByteBuffer dst = ByteBuffer.allocate(16); + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + try { + int n = pipe.pollDrain(dst); + if (n < 0) { + return; + } + dst.clear(); + } catch (RuntimeException re) { + if (re.getCause() == null) { + nullCauseSighting.set(re); + } + return; + } + } + } catch (Throwable t) { + consumerError.set(t); + } + }, "pipe-consumer"); + + Thread producer = new Thread(() -> { + try { + start.await(); + pipe.signalError(expected); + } catch (Throwable t) { + consumerError.set(t); + } + }, "pipe-producer"); + + consumer.start(); + producer.start(); + start.countDown(); + producer.join(5_000); + consumer.join(5_000); + + assertThat(consumer.isAlive()).isFalse(); + assertThat(producer.isAlive()).isFalse(); + assertThat(consumerError.get()).isNull(); + assertThat(nullCauseSighting.get()).isNull(); + } + + /** + * Multi-threaded test for the recycle/notify path: producer is forced to block on + * {@link BodyChunkPipe#acquireForFill()} because all chunks are in flight, then the consumer + * drains a chunk which {@code recycle()}s and notifies the producer to wake. This exercises the + * full {@code freeLock.notifyAll()} hand-off rather than relying on the defensive 50ms wakeup. + */ + @Test + void acquireForFill_blocksUntilConsumerRecycles_thenWakesAndCompletes() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(1, 8); + ByteBuffer first = pipe.acquireForFill(); + first.put(new byte[]{1, 2, 3, 4}); + first.flip(); + pipe.publish(first); + + CountDownLatch producerEntered = new CountDownLatch(1); + AtomicReference reused = new AtomicReference<>(); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + producerEntered.countDown(); + ByteBuffer bb = pipe.acquireForFill(); + reused.set(bb); + } catch (Throwable t) { + producerError.set(t); + } + }, "pipe-producer"); + + producer.start(); + producerEntered.await(); + // Drain so the chunk is recycled and the producer is woken via notifyAll. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (reused.get() == null && System.nanoTime() < deadline) { + pipe.pollDrain(ByteBuffer.allocate(8)); + producer.join(50); + } + + assertThat(producerError.get()).isNull(); + assertThat(reused.get()).isSameAs(first); + assertThat(pipe.allocatedForTest()).isEqualTo(1); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java new file mode 100644 index 000000000000..6122adbae2d8 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java @@ -0,0 +1,141 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import org.junit.jupiter.api.Test; + +class PipeBackedRequestBodyStreamTest { + + @Test + void sendRequestBody_emptyOpenPipe_returnsFalseAndCopiesNothing() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + ByteBuffer dst = ByteBuffer.allocate(8); + + boolean done = stream.sendRequestBody(dst); + + assertThat(done).isFalse(); + assertThat(dst.position()).isZero(); + } + + @Test + void sendRequestBody_afterEofAndDrained_returnsTrue() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {1, 2, 3}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + ByteBuffer first = ByteBuffer.allocate(8); + boolean firstDone = stream.sendRequestBody(first); + ByteBuffer second = ByteBuffer.allocate(8); + boolean secondDone = stream.sendRequestBody(second); + + assertThat(firstDone).isFalse(); + assertThat(first.position()).isEqualTo(payload.length); + assertThat(secondDone).isTrue(); + assertThat(second.position()).isZero(); + } + + @Test + void sendRequestBody_pipeInError_throwsRuntimeExceptionWithCause() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + IllegalStateException cause = new IllegalStateException("upstream broke"); + pipe.signalError(cause); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("upstream broke"); + } + + @Test + void sendRequestBody_pipeAborted_throwsRuntimeException() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + pipe.abort(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("aborted"); + } + + @Test + void resetPosition_returnsFalse() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + assertThat(stream.resetPosition()).isFalse(); + } + + /** + * When CRT's destination buffer is smaller than the chunk size, draining a single chunk + * requires multiple {@code sendRequestBody} calls. This exercises {@link BodyChunkPipe#pollDrain}'s + * {@code pendingDrain} state being carried across consumer invocations. + */ + @Test + void sendRequestBody_destinationSmallerThanChunk_drainsAcrossMultipleCalls() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + ByteBuffer bb = pipe.acquireForFill(); + byte[] payload = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + bb.put(payload); + bb.flip(); + pipe.publish(bb); + pipe.signalEof(); + PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe); + + ByteBuffer first = ByteBuffer.allocate(3); + ByteBuffer second = ByteBuffer.allocate(3); + ByteBuffer third = ByteBuffer.allocate(3); + ByteBuffer fourth = ByteBuffer.allocate(3); + ByteBuffer fifth = ByteBuffer.allocate(3); + boolean firstDone = stream.sendRequestBody(first); + boolean secondDone = stream.sendRequestBody(second); + boolean thirdDone = stream.sendRequestBody(third); + boolean fourthDone = stream.sendRequestBody(fourth); + boolean fifthDone = stream.sendRequestBody(fifth); + + assertThat(firstDone).isFalse(); + assertThat(secondDone).isFalse(); + assertThat(thirdDone).isFalse(); + assertThat(fourthDone).isFalse(); + assertThat(fifthDone).isTrue(); + assertThat(first.position()).isEqualTo(3); + assertThat(second.position()).isEqualTo(3); + assertThat(third.position()).isEqualTo(3); + assertThat(fourth.position()).isEqualTo(1); + assertThat(fifth.position()).isZero(); + + byte[] reassembled = new byte[payload.length]; + first.flip(); + first.get(reassembled, 0, 3); + second.flip(); + second.get(reassembled, 3, 3); + third.flip(); + third.get(reassembled, 6, 3); + fourth.flip(); + fourth.get(reassembled, 9, 1); + assertThat(reassembled).containsExactly(payload); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java new file mode 100644 index 000000000000..21d0d4fa1ec6 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java @@ -0,0 +1,239 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.http.SdkHttpFullResponse; + +class SyncRequestBodyPumpTest { + + @Test + void pump_happyPath_consumerSeesAllProducerBytes() throws Exception { + byte[] payload = new byte[200]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) (i & 0xFF); + } + BodyChunkPipe pipe = new BodyChunkPipe(2, 32); + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + byte[] consumed = drainAll(pipe, payload.length); + producer.join(5_000); + + assertThat(producerError.get()).isNull(); + assertThat(producer.isAlive()).isFalse(); + assertThat(consumed).containsExactly(payload); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + } + + @Test + void pump_emptyStream_signalsEofWithoutPublish() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(new byte[0]), pipe); + + pump.pump(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF); + assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1); + } + + @Test + void pump_inputStreamThrowsIoException_pumpSignalsErrorAndRethrows() { + IOException ioe = new IOException("disk gone"); + BodyChunkPipe pipe = new BodyChunkPipe(2, 16); + ContentStreamProvider provider = () -> new InputStream() { + @Override + public int read() { + throw new UnsupportedOperationException(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw ioe; + } + }; + SyncRequestBodyPump pump = new SyncRequestBodyPump(provider, pipe); + + assertThatThrownBy(pump::pump).isSameAs(ioe); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR); + assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8))) + .hasMessageContaining("Producer failed") + .hasRootCauseMessage("disk gone"); + } + + @Test + void pump_abortedWhilePumping_returnsWithoutSignalingEof() throws Exception { + // pipe depth 1 + payload larger than chunk forces producer to block on second acquireForFill, + // giving the test thread a deterministic point to call abort(). + BodyChunkPipe pipe = new BodyChunkPipe(1, 8); + byte[] payload = new byte[64]; + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + waitUntilStateIsOpenWithChunkInFlight(pipe); + pump.abort(); + producer.join(5_000); + + assertThat(producer.isAlive()).isFalse(); + assertThat(producerError.get()).isNull(); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + /** + * Regression test for the producer-livelock-on-CRT-failure path. + * + *

If CRT signals request failure (network error, idle/health timeout, etc.) while the + * producer is parked in {@link BodyChunkPipe#acquireForFill()}, nothing in the pipe's normal + * contract wakes it without a recycle/abort. The fix in {@code AwsCrtHttpClient.CrtHttpRequest.call()} + * registers a {@code responseFuture.whenComplete(...)} hook that calls {@code pump.abort()} + * when the response future completes exceptionally. This test reproduces that wiring + * at the unit level: a pump runs against a pipe with no consumer, the producer parks once the + * pipe is full, and we then complete a separate response future exceptionally with the same + * hook to verify the producer unblocks and {@code pump()} returns. + * + *

Without the hook (or equivalent abort path), {@code producer.join(5_000)} would time out + * and the test would fail. + */ + @Test + void pump_responseFutureFailsExceptionally_whileProducerParked_unblocksProducerViaAbortHook() throws Exception { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + // Payload larger than depth*chunkSize forces the producer to park on acquireForFill once + // both chunks are sitting in the ready queue with no consumer draining. + byte[] payload = new byte[128]; + SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe); + CompletableFuture responseFuture = new CompletableFuture<>(); + responseFuture.whenComplete((r, t) -> { + if (t != null) { + pump.abort(); + } + }); + + AtomicReference producerError = new AtomicReference<>(); + Thread producer = new Thread(() -> { + try { + pump.pump(); + } catch (Throwable t) { + producerError.set(t); + } + }, "pump-producer"); + + producer.start(); + waitUntilProducerIsParked(pipe); + responseFuture.completeExceptionally(new IOException("simulated CRT failure")); + producer.join(5_000); + + assertThat(producer.isAlive()).isFalse(); + assertThat(producerError.get()).isNull(); + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + @Test + void abort_propagatesToPipe() { + BodyChunkPipe pipe = new BodyChunkPipe(2, 8); + SyncRequestBodyPump pump = new SyncRequestBodyPump( + ContentStreamProvider.fromByteArray(new byte[0]), pipe); + + pump.abort(); + + assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED); + } + + private static byte[] drainAll(BodyChunkPipe pipe, int expected) { + byte[] out = new byte[expected]; + int written = 0; + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (written < expected && System.nanoTime() < deadline) { + ByteBuffer scratch = ByteBuffer.allocate(Math.min(64, expected - written)); + int n = pipe.pollDrain(scratch); + if (n < 0) { + break; + } + if (n == 0) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + continue; + } + scratch.flip(); + scratch.get(out, written, n); + written += n; + } + if (written < expected) { + throw new AssertionError("Drained only " + written + " of " + expected + " bytes"); + } + return out; + } + + private static void waitUntilStateIsOpenWithChunkInFlight(BodyChunkPipe pipe) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + if (pipe.allocatedForTest() >= 1) { + return; + } + Thread.sleep(1); + } + throw new AssertionError("Producer did not allocate a chunk within timeout"); + } + + /** + * Wait for the producer to park on {@code acquireForFill}. Detected by the pipe reaching its + * configured depth in allocations and then staying there for a couple of consecutive observations + * (the producer can't make further progress without a recycle). + */ + private static void waitUntilProducerIsParked(BodyChunkPipe pipe) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + int stableObservations = 0; + int lastAllocated = -1; + while (System.nanoTime() < deadline) { + int allocated = pipe.allocatedForTest(); + if (allocated == lastAllocated && allocated > 0) { + if (++stableObservations >= 3) { + return; + } + } else { + stableObservations = 0; + lastAllocated = allocated; + } + Thread.sleep(20); + } + throw new AssertionError("Producer did not park within timeout"); + } +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java index f1eb1c6e4f23..943635dd41f0 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java @@ -16,12 +16,13 @@ package software.amazon.awssdk.services.s3.crthttpclient; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import org.assertj.core.api.Assertions; +import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.utils.ChecksumUtils; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.Md5Utils; @@ -80,4 +82,22 @@ void getObject_toFile_objectSentCorrectly() throws Exception { assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile)); } + + @Test + void getObject_responseStreamPipedIntoPutObject_completesWithoutDeadlock() throws Exception { + String destinationKey = "piped-" + TEST_KEY; + try (ResponseInputStream sourceStream = + s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream())) { + long contentLength = sourceStream.response().contentLength(); + + PutObjectResponse putResponse = assertTimeoutPreemptively( + Duration.ofSeconds(120), + () -> s3WithCrtHttpClient.putObject( + r -> r.bucket(TEST_BUCKET).key(destinationKey).contentLength(contentLength), + RequestBody.fromInputStream(sourceStream, contentLength))); + + assertThat(putResponse.eTag()).isNotBlank(); + } + } }