streamFuture() {
+ return streamFuture;
+ }
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
new file mode 100644
index 000000000000..6fc436d5b896
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipe.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * Bounded producer/consumer hand-off between the caller thread (producer) and the CRT event-loop thread (consumer).
+ *
+ * The producer reads from the customer's {@code InputStream} into heap {@link ByteBuffer}s acquired
+ * via {@link #acquireForFill()}, then {@link #publish(ByteBuffer) publishes} them into a bounded
+ * {@link ArrayBlockingQueue}. The consumer drains those buffers via {@link #pollDrain(ByteBuffer)},
+ * which is non-blocking: if no data is ready the consumer returns 0 bytes and CRT reschedules itself
+ * via {@code aws_channel_schedule_task_now}.
+ *
+ *
Drained buffers are returned to a free {@link ArrayDeque} (LIFO for cache hotness) guarded by a
+ * private monitor. The producer parks on this monitor when the free deque is empty, providing back-pressure.
+ *
+ *
Buffers are heap-allocated lazily on the producer's first {@link #acquireForFill()}, not in the
+ * constructor. This keeps per-request heap minimal while a request is queued on the CRT connection
+ * pool waiting for a stream: the pipe object exists but its backing buffers do not.
+ *
+ *
Buffer modes follow standard NIO conventions: {@link #acquireForFill()} returns a buffer in
+ * write mode (cleared); the producer fills it, calls {@code flip()}, and {@link #publish(ByteBuffer)
+ * publishes}. The consumer drains in read mode and {@code clear()}s during recycle.
+ *
+ *
State machine: {@code OPEN -> {EOF | ERROR | ABORTED}}. Transitions are one-way.
+ */
+@SdkInternalApi
+final class BodyChunkPipe {
+ private static final Logger LOG = Logger.loggerFor(BodyChunkPipe.class);
+
+ enum State {
+ OPEN,
+ EOF,
+ ERROR,
+ ABORTED
+ }
+
+ /**
+ * Defense-in-depth wait timeout for {@link #acquireForFill()}. Even if a code path forgets
+ * to call {@link #abort()}, a parked producer wakes every {@value} ms to re-check state.
+ * Spurious wakeups are harmless.
+ */
+ private static final long ACQUIRE_WAIT_TIMEOUT_MS = 50L;
+
+ private final int depth;
+ private final int chunkSize;
+ private final ArrayBlockingQueue ready;
+ private final Deque free;
+ private final AtomicReference state = new AtomicReference<>(State.OPEN);
+ /**
+ * Guards the free deque, allocated counter, and producer wait/notify protocol. Kept private
+ * so external code cannot synchronize on the pipe instance and stall the producer.
+ */
+ private final Object freeLock = new Object();
+
+ private int allocated;
+ private volatile Throwable error;
+ private ByteBuffer pendingDrain;
+
+ BodyChunkPipe(int depth, int chunkSize) {
+ if (depth < 1) {
+ throw new IllegalArgumentException("depth must be >= 1");
+ }
+ if (chunkSize < 1) {
+ throw new IllegalArgumentException("chunkSize must be >= 1");
+ }
+ this.depth = depth;
+ this.chunkSize = chunkSize;
+ this.ready = new ArrayBlockingQueue<>(depth);
+ this.free = new ArrayDeque<>(depth);
+ }
+
+ /**
+ * Producer side: acquire a buffer in write mode (position=0, limit=capacity). Blocks if all
+ * buffers are currently in flight. Returns {@code null} only if the pipe was aborted while the
+ * producer was waiting.
+ *
+ * Allocates the buffer on first use up to the configured depth. This keeps the per-request
+ * footprint minimal until the producer actually starts pumping (i.e., until after the CRT stream
+ * has been acquired).
+ */
+ ByteBuffer acquireForFill() throws InterruptedException {
+ synchronized (freeLock) {
+ while (true) {
+ State s = state.get();
+ if (s == State.ABORTED || s == State.ERROR) {
+ LOG.debug(() -> "acquireForFill returning null, state=" + s);
+ return null;
+ }
+ ByteBuffer bb = free.pollFirst();
+ if (bb != null) {
+ return bb;
+ }
+ if (allocated < depth) {
+ allocated++;
+ return ByteBuffer.allocate(chunkSize);
+ }
+ freeLock.wait(ACQUIRE_WAIT_TIMEOUT_MS);
+ }
+ }
+ }
+
+ /**
+ * Producer side: publish a filled buffer to the consumer. Caller must have called
+ * {@link ByteBuffer#flip()} so the buffer is in read mode (position=0, limit=N).
+ *
+ *
If the buffer has no remaining bytes (zero-length read), it is recycled back to the free
+ * deque rather than pushed to the ready queue: an empty buffer would otherwise be leaked from
+ * the bounded pool, and the consumer would interpret it as a no-op anyway.
+ */
+ void publish(ByteBuffer chunk) throws InterruptedException {
+ if (!chunk.hasRemaining()) {
+ recycle(chunk);
+ return;
+ }
+ // ready.put() blocks if the queue is full, but the queue capacity == pool size,
+ // so this can only block briefly while the consumer drains.
+ ready.put(chunk);
+ }
+
+ /**
+ * Producer side: signal end-of-stream. Idempotent.
+ */
+ void signalEof() {
+ if (state.compareAndSet(State.OPEN, State.EOF)) {
+ LOG.debug(() -> "state OPEN -> EOF");
+ }
+ }
+
+ /**
+ * Producer side: signal a fatal producer-side error. Idempotent.
+ */
+ void signalError(Throwable t) {
+ synchronized (freeLock) {
+ // Publish the cause BEFORE flipping state so a consumer's lock-free read in pollDrain
+ // never observes state==ERROR with error==null. The volatile write to `error` is
+ // harmless if the CAS later loses (idempotent signal).
+ error = t;
+ if (state.compareAndSet(State.OPEN, State.ERROR)) {
+ LOG.debug(() -> "state OPEN -> ERROR (" + t.getClass().getSimpleName() + ")");
+ }
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * External-cancel: clear ready queue, flip state, wake producer.
+ */
+ void abort() {
+ synchronized (freeLock) {
+ if (state.compareAndSet(State.OPEN, State.ABORTED)) {
+ LOG.debug(() -> "state OPEN -> ABORTED");
+ ready.clear();
+ }
+ freeLock.notifyAll();
+ }
+ }
+
+ /**
+ * Consumer side: drain bytes into {@code dst}. NEVER blocks.
+ *
+ *
Single-consumer only. {@code pendingDrain} is non-volatile, so this method MUST be
+ * invoked from a single thread. CRT honors that by scheduling the outgoing-stream task serially
+ * on one event-loop thread per stream. Concurrent invocation will silently corrupt body delivery.
+ *
+ * @return number of bytes drained, or {@code -1} on EOF with no remaining data.
+ * @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data.
+ */
+ int pollDrain(ByteBuffer dst) {
+ int totalBytesConsumed = 0;
+ while (dst.hasRemaining()) {
+ if (pendingDrain == null) {
+ pendingDrain = ready.poll();
+ }
+ if (pendingDrain == null) {
+ State s = state.get();
+ if (s == State.OPEN) {
+ return totalBytesConsumed;
+ }
+ // JMM happens-before: the producer's program order is publish() (ready.put) THEN
+ // signalEof/Error/abort (volatile state CAS). Once we observe the volatile state
+ // transition here, the producer's prior ready.put is guaranteed visible to a
+ // subsequent poll on this thread. Re-poll once to drain any chunk that landed in
+ // ready before the producer's terminal CAS.
+ pendingDrain = ready.poll();
+ if (pendingDrain != null) {
+ continue;
+ }
+ switch (s) {
+ case ERROR:
+ throw new RuntimeException("Producer failed", error);
+ case ABORTED:
+ throw new RuntimeException("Request body stream was aborted");
+ case EOF:
+ return totalBytesConsumed > 0 ? totalBytesConsumed : -1;
+ default:
+ return totalBytesConsumed;
+ }
+ }
+ int n = Math.min(dst.remaining(), pendingDrain.remaining());
+ // Cap source's limit so dst.put(src) only consumes n bytes (avoids BufferOverflowException
+ // when pendingDrain.remaining() > dst.remaining()).
+ int srcOrigLimit = pendingDrain.limit();
+ pendingDrain.limit(pendingDrain.position() + n);
+ dst.put(pendingDrain);
+ pendingDrain.limit(srcOrigLimit);
+ totalBytesConsumed += n;
+ if (!pendingDrain.hasRemaining()) {
+ // Buffer fully copied into dst, return it to the free deque (and notify the producer
+ // in case it was waiting). This is what bounds the pool: buffers only re-enter the
+ // producer pool after the consumer has drained them.
+ ByteBuffer drained = pendingDrain;
+ pendingDrain = null;
+ recycle(drained);
+ }
+ }
+ return totalBytesConsumed;
+ }
+
+ /**
+ * Visible-for-test / test-only helper: current pipe state.
+ */
+ @SdkTestInternalApi
+ State state() {
+ return state.get();
+ }
+
+ /**
+ * Visible-for-test / test-only helper: number of buffers minted so far. The pipe lazily allocates
+ * buffers on the producer's first {@link #acquireForFill()}, so this is 0 until the producer
+ * starts pumping and grows up to {@code depth}.
+ */
+ @SdkTestInternalApi
+ int allocatedForTest() {
+ synchronized (freeLock) {
+ return allocated;
+ }
+ }
+
+ private void recycle(ByteBuffer bb) {
+ synchronized (freeLock) {
+ bb.clear();
+ free.push(bb);
+ freeLock.notifyAll();
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
index 8672d80b0d1b..4a3991c5d44a 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
@@ -23,6 +23,7 @@
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBase;
+import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.Protocol;
@@ -33,6 +34,16 @@
@SdkInternalApi
public final class CrtRequestAdapter {
+ /**
+ * Per-chunk size used by the sync request-body pipe.
+ */
+ private static final int CHUNK_SIZE = 128 * 1024;
+
+ /**
+ * Number of in-flight chunks the pipe holds.
+ */
+ private static final int PIPE_DEPTH = 4;
+
private CrtRequestAdapter() {
}
@@ -60,7 +71,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request)
crtRequestBodyAdapter);
}
- public static HttpRequest toCrtRequest(CrtRequestContext request) {
+ /**
+ * Build the CRT request for the sync path. When the SDK request has a body, this also constructs the
+ * {@link BodyChunkPipe} and a {@link SyncRequestBodyPump}; the caller thread is expected to drive
+ * the pump after the stream is activated.
+ */
+ public static SyncCrtRequest toCrtRequest(CrtRequestContext request) {
HttpExecuteRequest sdkExecuteRequest = request.sdkRequest();
SdkHttpRequest sdkRequest = sdkExecuteRequest.httpRequest();
@@ -78,14 +94,39 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));
String finalEncodedPath = encodedPath + encodedQueryString;
- return sdkExecuteRequest.contentStreamProvider()
- .map(provider -> new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray,
- new CrtRequestInputStreamAdapter(provider)))
- .orElse(new HttpRequest(method,
- finalEncodedPath,
- crtHeaderArray, null));
+
+ Optional providerOpt = sdkExecuteRequest.contentStreamProvider();
+ if (!providerOpt.isPresent()) {
+ return new SyncCrtRequest(new HttpRequest(method, finalEncodedPath, crtHeaderArray, null), null);
+ }
+
+ BodyChunkPipe pipe = new BodyChunkPipe(PIPE_DEPTH, CHUNK_SIZE);
+ PipeBackedRequestBodyStream bodyStream = new PipeBackedRequestBodyStream(pipe);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(providerOpt.get(), pipe);
+ HttpRequest crtRequest = new HttpRequest(method, finalEncodedPath, crtHeaderArray, bodyStream);
+ return new SyncCrtRequest(crtRequest, pump);
+ }
+
+ /**
+ * Holder returned from {@link #toCrtRequest(CrtRequestContext)} bundling the CRT-side request and the
+ * caller-thread producer pump (null when the SDK request has no body).
+ */
+ public static final class SyncCrtRequest {
+ private final HttpRequest httpRequest;
+ private final SyncRequestBodyPump pump;
+
+ SyncCrtRequest(HttpRequest httpRequest, SyncRequestBodyPump pump) {
+ this.httpRequest = httpRequest;
+ this.pump = pump;
+ }
+
+ public HttpRequest httpRequest() {
+ return httpRequest;
+ }
+
+ public SyncRequestBodyPump pump() {
+ return pump;
+ }
}
private static HttpHeader[] asArray(List crtHeaderList) {
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
deleted file mode 100644
index 68f418b9e1df..000000000000
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.http.crt.internal.request;
-
-import static java.lang.Math.min;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
-import software.amazon.awssdk.http.ContentStreamProvider;
-
-@SdkInternalApi
-final class CrtRequestInputStreamAdapter implements HttpRequestBodyStream {
- private static final int READ_BUFFER_SIZE = 16 * 1024;
-
- private final ContentStreamProvider provider;
- private volatile InputStream providerStream;
- private final byte[] readBuffer = new byte[READ_BUFFER_SIZE];
-
- CrtRequestInputStreamAdapter(ContentStreamProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
- int read;
-
- try {
- if (providerStream == null) {
- createNewStream();
- }
-
- int toRead = min(READ_BUFFER_SIZE, bodyBytesOut.remaining());
- read = providerStream.read(readBuffer, 0, toRead);
-
- if (read > 0) {
- bodyBytesOut.put(readBuffer, 0, read);
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return read < 0;
- }
-
- @Override
- public boolean resetPosition() {
- try {
- createNewStream();
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- return true;
- }
-
- private void createNewStream() throws IOException {
- if (providerStream != null) {
- providerStream.close();
- }
- providerStream = provider.newStream();
- }
-}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
new file mode 100644
index 000000000000..a2dd78eea0c4
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStream.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.nio.ByteBuffer;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
+
+/**
+ * A {@link HttpRequestBodyStream} adapter whose {@link #sendRequestBody(ByteBuffer)} drains bytes from a
+ * {@link BodyChunkPipe} that is fed by the caller thread. The pull callback NEVER blocks: if no data is ready,
+ * it returns 0 bytes and CRT reschedules the outgoing-stream task via {@code aws_channel_schedule_task_now},
+ * allowing other event-loop tasks (such as a concurrent GET response delivery) to run before the retry.
+ */
+@SdkInternalApi
+final class PipeBackedRequestBodyStream implements HttpRequestBodyStream {
+
+ private final BodyChunkPipe pipe;
+
+ PipeBackedRequestBodyStream(BodyChunkPipe pipe) {
+ this.pipe = pipe;
+ }
+
+ @Override
+ public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
+ int drained = pipe.pollDrain(bodyBytesOut);
+ return drained < 0;
+ }
+
+ @Override
+ public boolean resetPosition() {
+ // The SDK retry layer (RetryableStage) handles request-level retries by calling prepareRequest() again,
+ // CRT does not currently exercise resetPosition for HTTP/1.1, so opting out is safe in practice.
+ return false;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
new file mode 100644
index 000000000000..fa237045ede2
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPump.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.http.ContentStreamProvider;
+
+/**
+ * Caller-thread producer that reads from the customer's {@link InputStream} and publishes chunks to a
+ * {@link BodyChunkPipe}. Runs on the caller (sync) thread between stream activation and
+ * {@code responseFuture.join()}, ensuring the blocking {@code read()} happens off the CRT event loop.
+ */
+@SdkInternalApi
+public final class SyncRequestBodyPump {
+
+ private final ContentStreamProvider contentStreamProvider;
+ private final BodyChunkPipe pipe;
+
+ SyncRequestBodyPump(ContentStreamProvider contentStreamProvider, BodyChunkPipe pipe) {
+ this.contentStreamProvider = contentStreamProvider;
+ this.pipe = pipe;
+ }
+
+ /**
+ * Pump the entire input stream into the pipe. Runs on the caller thread; never invoked on the CRT
+ * event-loop thread. On EOF signals the pipe normally; on {@link IOException} signals an error and rethrows.
+ */
+ public void pump() throws IOException {
+ try (InputStream in = contentStreamProvider.newStream()) {
+ while (true) {
+ ByteBuffer chunk = pipe.acquireForFill();
+ if (chunk == null) {
+ // pipe was aborted while we were waiting; stop without signaling EOF.
+ return;
+ }
+ int read;
+ try {
+ read = in.read(chunk.array(), chunk.arrayOffset() + chunk.position(), chunk.remaining());
+ } catch (IOException ioe) {
+ pipe.signalError(ioe);
+ throw ioe;
+ }
+ if (read < 0) {
+ pipe.signalEof();
+ return;
+ }
+ chunk.position(chunk.position() + read);
+ chunk.flip();
+ pipe.publish(chunk);
+ }
+ } catch (InterruptedException ie) {
+ pipe.abort();
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing request body", ie);
+ }
+ }
+
+ /**
+ * Abort the underlying pipe (e.g., when the caller's {@code call()} is cancelled).
+ */
+ public void abort() {
+ pipe.abort();
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
index ce5d778f06a1..aad5103b287c 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java
@@ -17,8 +17,13 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase;
+import static com.github.tomakehurst.wiremock.client.WireMock.put;
+import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
@@ -26,13 +31,22 @@
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
+import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -45,6 +59,8 @@
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricCollector;
@@ -133,6 +149,421 @@ public void abortRequest_shouldFailTheExceptionWithIOException() throws Exceptio
}
}
+ @Test
+ public void putRequest_withInputStreamBody_serverReceivesBody() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.create()) {
+ String body = "hello pull pump";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+
+ HttpExecuteRequest executeRequest = HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ verify(putRequestedFor(urlPathEqualTo("/sink"))
+ .withHeader("Content-Length", equalTo(Integer.toString(bodyBytes.length)))
+ .withRequestBody(equalToIgnoreCase(body)));
+ }
+ }
+
+ @Test
+ public void inputStreamThrows_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ // Bound the pool to a single connection: if the failed request leaks its connection, the
+ // second call() either fails to acquire (with the explicit timeout below) or blocks until
+ // the test framework times out. Either manifests as a deterministic failure rather than a hang.
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ IOException expected = new IOException("simulated upstream failure");
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", "100")
+ .build();
+ HttpExecuteRequest failingExecute =
+ HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new InputStream() {
+ @Override
+ public int read() throws IOException {
+ throw expected;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw expected;
+ }
+ })
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ // If the previous failure leaked the connection, this second call would fail to acquire
+ // (bounded by the connectionAcquisitionTimeout configured above) instead of hanging.
+ String body = "second request body";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void abortMidRequest_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest delayedRequest = createRequest(uri);
+ HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder()
+ .request(delayedRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ ExecutableHttpRequest abortable = client.prepareRequest(delayedExecute);
+ executorService.schedule(abortable::abort, 100, TimeUnit.MILLISECONDS);
+ assertThatThrownBy(abortable::call).isInstanceOf(IOException.class).hasMessageContaining("cancelled");
+
+ String body = "after abort";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void serverResetsConnection_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink"))
+ .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));
+
+ byte[] bodyBytes = randomAlphabetic(64).getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest failingRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest failingExecute = HttpExecuteRequest.builder()
+ .request(failingRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+
+ assertThatThrownBy(() -> client.prepareRequest(failingExecute).call())
+ .isInstanceOf(IOException.class);
+
+ stubFor(put(urlPathEqualTo("/sink2")).willReturn(aResponse().withStatus(200)));
+ byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink2")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(okBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(okBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void interruptDuringCall_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(2000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest delayedRequest = createRequest(uri);
+ HttpExecuteRequest delayedExecute = HttpExecuteRequest.builder()
+ .request(delayedRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ CountDownLatch workerDone = new CountDownLatch(1);
+ AtomicReference workerError = new AtomicReference<>();
+ ExecutorService worker = Executors.newSingleThreadExecutor();
+ try {
+ Future> inFlight = worker.submit(() -> {
+ try {
+ client.prepareRequest(delayedExecute).call();
+ } catch (Throwable t) {
+ workerError.set(t);
+ } finally {
+ workerDone.countDown();
+ }
+ });
+
+ // Give call() time to enter joinInterruptibly() before we interrupt.
+ Thread.sleep(100);
+ inFlight.cancel(true);
+
+ assertThat(workerDone.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(workerError.get())
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("cancelled");
+ } finally {
+ worker.shutdownNow();
+ worker.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ // If the interrupt leaked the connection, this second call() would block on acquire and fail
+ // when connectionAcquisitionTimeout (10s above) elapses.
+ String body = "after-interrupt";
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes))
+ .build();
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ @Test
+ public void acquireTimeoutThenHolderCancelled_connectionReturnedToPool_subsequentRequestSucceeds() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(2))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(60_000).withBody("hello")));
+
+ SdkHttpRequest holderRequest = createRequest(uri);
+ HttpExecuteRequest holderExecute = HttpExecuteRequest.builder()
+ .request(holderRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+ ExecutableHttpRequest holder = client.prepareRequest(holderExecute);
+
+ SdkHttpRequest racerRequest = createRequest(uri);
+ HttpExecuteRequest racerExecute = HttpExecuteRequest.builder()
+ .request(racerRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Future holderFuture = pool.submit(holder::call);
+ // Give the holder time to acquire the only slot before the racer tries.
+ Thread.sleep(500);
+
+ Future racerFuture = pool.submit(() -> client.prepareRequest(racerExecute).call());
+ // CRT surfaces the acquire-timeout as HttpException; CrtHttpRequest.call() rethrows
+ // it directly (does not wrap in IOException).
+ assertThatThrownBy(() -> racerFuture.get(5, TimeUnit.SECONDS))
+ .hasMessageContaining("acquire");
+
+ // Release the slot via the same closeConnection path the other leak tests exercise.
+ holder.abort();
+ assertThatThrownBy(() -> holderFuture.get(5, TimeUnit.SECONDS))
+ .hasCauseInstanceOf(IOException.class);
+ } finally {
+ pool.shutdownNow();
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ // If the slot didn't reclaim, this third call() blocks on acquire and fails when the
+ // 2s connectionAcquisitionTimeout above elapses.
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+ byte[] okBytes = "ok".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest okRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(okBytes.length))
+ .build();
+ HttpExecuteRequest okExecute = HttpExecuteRequest.builder()
+ .request(okRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(okBytes))
+ .build();
+
+ HttpExecuteResponse response = client.prepareRequest(okExecute).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+ }
+ }
+
+ /**
+ * Regression test for the deadlock the pull-pump fix addresses. On master, the request body's
+ * {@code InputStream.read(...)} ran on the CRT event-loop thread (via the body callback), which
+ * meant a body sourced from a {@code GET}'s {@code ResponseInputStream} on the same event loop
+ * could deadlock: the GET held the event loop while the PUT body waited for it.
+ *
+ * Pull-pump moves the read to the caller (sync) thread. This test verifies that load-bearing
+ * claim by recording the thread that performs the body read and asserting it is the caller
+ * thread - not a CRT event-loop thread. Failure of either the assertion or the test timeout
+ * (a hang) is the deadlock signal.
+ */
+ @Test
+ public void putBodyReadHappensOnCallerThread_notOnCrtEventLoop() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(10))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ byte[] bodyBytes = "body-on-caller".getBytes(StandardCharsets.UTF_8);
+ AtomicReference readThreadName = new AtomicReference<>();
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(bodyBytes.length))
+ .build();
+ HttpExecuteRequest executeRequest =
+ HttpExecuteRequest.builder()
+ .request(request)
+ .contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes) {
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ readThreadName.compareAndSet(null, Thread.currentThread().getName());
+ return super.read(b, off, len);
+ }
+ })
+ .build();
+
+ String callerThreadName = Thread.currentThread().getName();
+ HttpExecuteResponse response = client.prepareRequest(executeRequest).call();
+ assertThat(response.httpResponse().statusCode()).isEqualTo(200);
+
+ String observed = readThreadName.get();
+ assertThat(observed)
+ .as("body read should happen on the caller thread, not the CRT event loop")
+ .isNotNull()
+ .isEqualTo(callerThreadName)
+ .doesNotContainIgnoringCase("AwsEventLoop")
+ .doesNotContainIgnoringCase("aws-event-loop");
+ }
+ }
+
+ /**
+ * Stress companion to {@link #putBodyReadHappensOnCallerThread_notOnCrtEventLoop}. Issues a
+ * delayed GET (response delayed server-side) and a PUT in parallel through the same
+ * {@code maxConcurrency(1)} client. On master, sequencing them through a single connection
+ * with the body read tied to the event-loop thread could deadlock; here both calls must
+ * complete within the test timeout.
+ */
+ @Test
+ public void getInFlight_concurrentPut_bothComplete() throws Exception {
+ try (SdkHttpClient client = AwsCrtHttpClient.builder()
+ .maxConcurrency(1)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(15))
+ .build()) {
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ stubFor(any(urlPathEqualTo("/slow"))
+ .willReturn(aResponse().withFixedDelay(2_000).withBody("hello")));
+ stubFor(put(urlPathEqualTo("/sink")).willReturn(aResponse().withStatus(200)));
+
+ SdkHttpRequest getRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.GET)
+ .encodedPath("/slow")
+ .putHeader("Host", uri.getHost())
+ .build();
+ HttpExecuteRequest getExecute = HttpExecuteRequest.builder()
+ .request(getRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]))
+ .build();
+
+ byte[] putBytes = "put-body".getBytes(StandardCharsets.UTF_8);
+ SdkHttpFullRequest putRequest = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.PUT)
+ .encodedPath("/sink")
+ .putHeader("Host", uri.getHost())
+ .putHeader("Content-Length", Integer.toString(putBytes.length))
+ .build();
+ HttpExecuteRequest putExecute = HttpExecuteRequest.builder()
+ .request(putRequest)
+ .contentStreamProvider(() -> new ByteArrayInputStream(putBytes))
+ .build();
+
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Callable getTask = () -> client.prepareRequest(getExecute).call();
+ Callable putTask = () -> client.prepareRequest(putExecute).call();
+ Future getFuture = pool.submit(getTask);
+ Future putFuture = pool.submit(putTask);
+
+ HttpExecuteResponse getResponse = getFuture.get(15, TimeUnit.SECONDS);
+ HttpExecuteResponse putResponse = putFuture.get(15, TimeUnit.SECONDS);
+ assertThat(getResponse.httpResponse().statusCode()).isEqualTo(200);
+ assertThat(putResponse.httpResponse().statusCode()).isEqualTo(200);
+ } finally {
+ pool.shutdownNow();
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ }
+
/**
* Make a simple request and wait for it to finish.
*
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
index 456000ac1150..8b916ba1ee98 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
@@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() {
.request(HttpExecuteRequest.builder().build())
.build();
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
}
@@ -102,7 +102,7 @@ public void execute_acquireStreamFails_wrapsWithIOException() {
.thenReturn(completableFuture);
completableFuture.completeExceptionally(exception);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
@@ -116,7 +116,7 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable)
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
}
@@ -133,7 +133,7 @@ public void execute_httpException_mapsToCorrectException(Entry executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}
@@ -146,7 +146,7 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
.thenReturn(completableFuture);
- CompletableFuture executeFuture = requestExecutor.execute(context);
+ CompletableFuture executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCause(exception);
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
new file mode 100644
index 000000000000..99097487a29b
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/BodyChunkPipeTest.java
@@ -0,0 +1,402 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+class BodyChunkPipeTest {
+
+ @Test
+ void pollDrain_emptyOpenPipe_returnsZero() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ int n = pipe.pollDrain(dst);
+
+ assertThat(n).isZero();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void pollDrain_afterEofWithEmptyQueue_returnsMinusOne() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ int n = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(n).isEqualTo(-1);
+ }
+
+ @Test
+ void publish_thenDrain_consumerSeesProducerBytes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+
+ int first = pipe.pollDrain(dst);
+ int second = pipe.pollDrain(dst);
+
+ assertThat(first).isEqualTo(payload.length);
+ assertThat(second).isEqualTo(-1);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ }
+
+ @Test
+ void signalError_pollDrainThrows() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void abort_emptiesReadyAndChangesState() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ bb.put(new byte[]{1, 2, 3, 4});
+ bb.flip();
+ pipe.publish(bb);
+
+ pipe.abort();
+
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void pollDrain_signalErrorWithQueuedChunks_drainsThenThrows() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {7, 8, 9};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalError(new RuntimeException("boom"));
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("boom");
+ }
+
+ @Test
+ void pollDrain_signalEofWithQueuedChunks_drainsThenReturnsMinusOne() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {10, 20, 30};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(payload.length);
+ dst.flip();
+ byte[] out = new byte[dst.remaining()];
+ dst.get(out);
+ assertThat(out).containsExactly(payload);
+ assertThat(afterDrain).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEof_leavesStateAsEof() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void abort_afterSignalEofWithQueuedChunks_doesNotClearReady() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+
+ pipe.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ ByteBuffer dst = ByteBuffer.allocate(payload.length);
+ int drained = pipe.pollDrain(dst);
+ assertThat(drained).isEqualTo(payload.length);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void recycle_intoEofPipe_doesNotThrowAndDoesNotCorruptPool() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ bb.put(new byte[]{1, 2, 3, 4});
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+
+ ByteBuffer dst = ByteBuffer.allocate(8);
+ int drained = pipe.pollDrain(dst);
+ int afterDrain = pipe.pollDrain(ByteBuffer.allocate(8));
+
+ assertThat(drained).isEqualTo(4);
+ assertThat(afterDrain).isEqualTo(-1);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void recycle_intoAbortedPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ pipe.abort();
+
+ bb.flip();
+ pipe.publish(bb);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void recycle_intoErrorPipe_doesNotThrow() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ pipe.signalError(new RuntimeException("boom"));
+
+ bb.flip();
+ pipe.publish(bb);
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ }
+
+ @Test
+ void constructor_doesNotAllocateChunks() {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ assertThat(pipe.allocatedForTest()).isZero();
+ }
+
+ @Test
+ void acquireForFill_firstCall_allocatesOneChunk() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(4, 16);
+
+ ByteBuffer bb = pipe.acquireForFill();
+
+ assertThat(bb).isNotNull();
+ assertThat(bb.capacity()).isEqualTo(16);
+ assertThat(bb.position()).isZero();
+ assertThat(bb.limit()).isEqualTo(16);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_uniqueChunksUpToDepth_thenStopsAllocating() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(3, 8);
+ ByteBuffer c1 = pipe.acquireForFill();
+ ByteBuffer c2 = pipe.acquireForFill();
+ ByteBuffer c3 = pipe.acquireForFill();
+
+ c1.put((byte) 1);
+ c1.flip();
+ pipe.publish(c1);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ ByteBuffer reused = pipe.acquireForFill();
+
+ assertThat(c1).isNotSameAs(c2).isNotSameAs(c3);
+ assertThat(c2).isNotSameAs(c3);
+ assertThat(pipe.allocatedForTest()).isEqualTo(3);
+ assertThat(reused).isSameAs(c1);
+ }
+
+ @Test
+ void acquireForFill_recycledChunkReused_noNewAllocation() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ bb.put(new byte[]{1, 2, 3});
+ bb.flip();
+ pipe.publish(bb);
+ pipe.pollDrain(ByteBuffer.allocate(8));
+
+ ByteBuffer reused = pipe.acquireForFill();
+
+ assertThat(reused).isSameAs(bb);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+
+ @Test
+ void acquireForFill_afterAbort_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+
+ ByteBuffer bb = pipe.acquireForFill();
+
+ assertThat(bb).isNull();
+ }
+
+ @Test
+ void acquireForFill_afterSignalError_returnsNull() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.signalError(new RuntimeException("boom"));
+
+ ByteBuffer bb = pipe.acquireForFill();
+
+ assertThat(bb).isNull();
+ }
+
+ @Test
+ void constructor_invalidDepth_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(0, 8))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("depth");
+ }
+
+ @Test
+ void constructor_invalidChunkSize_throws() {
+ assertThatThrownBy(() -> new BodyChunkPipe(2, 0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("chunkSize");
+ }
+
+ /**
+ * Multi-threaded ordering test: producer races to call {@link BodyChunkPipe#signalError(Throwable)}
+ * while a consumer is concurrently spinning on {@link BodyChunkPipe#pollDrain(java.nio.ByteBuffer)}.
+ * The contract is that whenever the consumer observes the ERROR state, the cause must already
+ * be visible (no {@code RuntimeException("Producer failed", null)}). RepeatedTest amplifies the
+ * race window. With the cause published before the CAS, this should pass on every iteration.
+ */
+ @RepeatedTest(50)
+ void signalError_concurrentPollDrain_consumerNeverSeesNullCause() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ IllegalStateException expected = new IllegalStateException("boom");
+ CountDownLatch start = new CountDownLatch(1);
+ AtomicReference consumerError = new AtomicReference<>();
+ AtomicReference nullCauseSighting = new AtomicReference<>();
+
+ Thread consumer = new Thread(() -> {
+ try {
+ start.await();
+ ByteBuffer dst = ByteBuffer.allocate(16);
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ try {
+ int n = pipe.pollDrain(dst);
+ if (n < 0) {
+ return;
+ }
+ dst.clear();
+ } catch (RuntimeException re) {
+ if (re.getCause() == null) {
+ nullCauseSighting.set(re);
+ }
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-consumer");
+
+ Thread producer = new Thread(() -> {
+ try {
+ start.await();
+ pipe.signalError(expected);
+ } catch (Throwable t) {
+ consumerError.set(t);
+ }
+ }, "pipe-producer");
+
+ consumer.start();
+ producer.start();
+ start.countDown();
+ producer.join(5_000);
+ consumer.join(5_000);
+
+ assertThat(consumer.isAlive()).isFalse();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumerError.get()).isNull();
+ assertThat(nullCauseSighting.get()).isNull();
+ }
+
+ /**
+ * Multi-threaded test for the recycle/notify path: producer is forced to block on
+ * {@link BodyChunkPipe#acquireForFill()} because all chunks are in flight, then the consumer
+ * drains a chunk which {@code recycle()}s and notifies the producer to wake. This exercises the
+ * full {@code freeLock.notifyAll()} hand-off rather than relying on the defensive 50ms wakeup.
+ */
+ @Test
+ void acquireForFill_blocksUntilConsumerRecycles_thenWakesAndCompletes() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ ByteBuffer first = pipe.acquireForFill();
+ first.put(new byte[]{1, 2, 3, 4});
+ first.flip();
+ pipe.publish(first);
+
+ CountDownLatch producerEntered = new CountDownLatch(1);
+ AtomicReference reused = new AtomicReference<>();
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ producerEntered.countDown();
+ ByteBuffer bb = pipe.acquireForFill();
+ reused.set(bb);
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pipe-producer");
+
+ producer.start();
+ producerEntered.await();
+ // Drain so the chunk is recycled and the producer is woken via notifyAll.
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (reused.get() == null && System.nanoTime() < deadline) {
+ pipe.pollDrain(ByteBuffer.allocate(8));
+ producer.join(50);
+ }
+
+ assertThat(producerError.get()).isNull();
+ assertThat(reused.get()).isSameAs(first);
+ assertThat(pipe.allocatedForTest()).isEqualTo(1);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
new file mode 100644
index 000000000000..6122adbae2d8
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/PipeBackedRequestBodyStreamTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Test;
+
+class PipeBackedRequestBodyStreamTest {
+
+ @Test
+ void sendRequestBody_emptyOpenPipe_returnsFalseAndCopiesNothing() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+ ByteBuffer dst = ByteBuffer.allocate(8);
+
+ boolean done = stream.sendRequestBody(dst);
+
+ assertThat(done).isFalse();
+ assertThat(dst.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_afterEofAndDrained_returnsTrue() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(8);
+ boolean firstDone = stream.sendRequestBody(first);
+ ByteBuffer second = ByteBuffer.allocate(8);
+ boolean secondDone = stream.sendRequestBody(second);
+
+ assertThat(firstDone).isFalse();
+ assertThat(first.position()).isEqualTo(payload.length);
+ assertThat(secondDone).isTrue();
+ assertThat(second.position()).isZero();
+ }
+
+ @Test
+ void sendRequestBody_pipeInError_throwsRuntimeExceptionWithCause() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ IllegalStateException cause = new IllegalStateException("upstream broke");
+ pipe.signalError(cause);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("upstream broke");
+ }
+
+ @Test
+ void sendRequestBody_pipeAborted_throwsRuntimeException() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ pipe.abort();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThatThrownBy(() -> stream.sendRequestBody(ByteBuffer.allocate(8)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("aborted");
+ }
+
+ @Test
+ void resetPosition_returnsFalse() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ assertThat(stream.resetPosition()).isFalse();
+ }
+
+ /**
+ * When CRT's destination buffer is smaller than the chunk size, draining a single chunk
+ * requires multiple {@code sendRequestBody} calls. This exercises {@link BodyChunkPipe#pollDrain}'s
+ * {@code pendingDrain} state being carried across consumer invocations.
+ */
+ @Test
+ void sendRequestBody_destinationSmallerThanChunk_drainsAcrossMultipleCalls() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ ByteBuffer bb = pipe.acquireForFill();
+ byte[] payload = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ bb.put(payload);
+ bb.flip();
+ pipe.publish(bb);
+ pipe.signalEof();
+ PipeBackedRequestBodyStream stream = new PipeBackedRequestBodyStream(pipe);
+
+ ByteBuffer first = ByteBuffer.allocate(3);
+ ByteBuffer second = ByteBuffer.allocate(3);
+ ByteBuffer third = ByteBuffer.allocate(3);
+ ByteBuffer fourth = ByteBuffer.allocate(3);
+ ByteBuffer fifth = ByteBuffer.allocate(3);
+ boolean firstDone = stream.sendRequestBody(first);
+ boolean secondDone = stream.sendRequestBody(second);
+ boolean thirdDone = stream.sendRequestBody(third);
+ boolean fourthDone = stream.sendRequestBody(fourth);
+ boolean fifthDone = stream.sendRequestBody(fifth);
+
+ assertThat(firstDone).isFalse();
+ assertThat(secondDone).isFalse();
+ assertThat(thirdDone).isFalse();
+ assertThat(fourthDone).isFalse();
+ assertThat(fifthDone).isTrue();
+ assertThat(first.position()).isEqualTo(3);
+ assertThat(second.position()).isEqualTo(3);
+ assertThat(third.position()).isEqualTo(3);
+ assertThat(fourth.position()).isEqualTo(1);
+ assertThat(fifth.position()).isZero();
+
+ byte[] reassembled = new byte[payload.length];
+ first.flip();
+ first.get(reassembled, 0, 3);
+ second.flip();
+ second.get(reassembled, 3, 3);
+ third.flip();
+ third.get(reassembled, 6, 3);
+ fourth.flip();
+ fourth.get(reassembled, 9, 1);
+ assertThat(reassembled).containsExactly(payload);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
new file mode 100644
index 000000000000..21d0d4fa1ec6
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/request/SyncRequestBodyPumpTest.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal.request;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.http.SdkHttpFullResponse;
+
+class SyncRequestBodyPumpTest {
+
+ @Test
+ void pump_happyPath_consumerSeesAllProducerBytes() throws Exception {
+ byte[] payload = new byte[200];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) (i & 0xFF);
+ }
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 32);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ byte[] consumed = drainAll(pipe, payload.length);
+ producer.join(5_000);
+
+ assertThat(producerError.get()).isNull();
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(consumed).containsExactly(payload);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ }
+
+ @Test
+ void pump_emptyStream_signalsEofWithoutPublish() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.pump();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.EOF);
+ assertThat(pipe.pollDrain(ByteBuffer.allocate(8))).isEqualTo(-1);
+ }
+
+ @Test
+ void pump_inputStreamThrowsIoException_pumpSignalsErrorAndRethrows() {
+ IOException ioe = new IOException("disk gone");
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 16);
+ ContentStreamProvider provider = () -> new InputStream() {
+ @Override
+ public int read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw ioe;
+ }
+ };
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(provider, pipe);
+
+ assertThatThrownBy(pump::pump).isSameAs(ioe);
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ERROR);
+ assertThatThrownBy(() -> pipe.pollDrain(ByteBuffer.allocate(8)))
+ .hasMessageContaining("Producer failed")
+ .hasRootCauseMessage("disk gone");
+ }
+
+ @Test
+ void pump_abortedWhilePumping_returnsWithoutSignalingEof() throws Exception {
+ // pipe depth 1 + payload larger than chunk forces producer to block on second acquireForFill,
+ // giving the test thread a deterministic point to call abort().
+ BodyChunkPipe pipe = new BodyChunkPipe(1, 8);
+ byte[] payload = new byte[64];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilStateIsOpenWithChunkInFlight(pipe);
+ pump.abort();
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ /**
+ * Regression test for the producer-livelock-on-CRT-failure path.
+ *
+ * If CRT signals request failure (network error, idle/health timeout, etc.) while the
+ * producer is parked in {@link BodyChunkPipe#acquireForFill()}, nothing in the pipe's normal
+ * contract wakes it without a recycle/abort. The fix in {@code AwsCrtHttpClient.CrtHttpRequest.call()}
+ * registers a {@code responseFuture.whenComplete(...)} hook that calls {@code pump.abort()}
+ * when the response future completes exceptionally. This test reproduces that wiring
+ * at the unit level: a pump runs against a pipe with no consumer, the producer parks once the
+ * pipe is full, and we then complete a separate response future exceptionally with the same
+ * hook to verify the producer unblocks and {@code pump()} returns.
+ *
+ *
Without the hook (or equivalent abort path), {@code producer.join(5_000)} would time out
+ * and the test would fail.
+ */
+ @Test
+ void pump_responseFutureFailsExceptionally_whileProducerParked_unblocksProducerViaAbortHook() throws Exception {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ // Payload larger than depth*chunkSize forces the producer to park on acquireForFill once
+ // both chunks are sitting in the ready queue with no consumer draining.
+ byte[] payload = new byte[128];
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(ContentStreamProvider.fromByteArray(payload), pipe);
+ CompletableFuture responseFuture = new CompletableFuture<>();
+ responseFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ pump.abort();
+ }
+ });
+
+ AtomicReference producerError = new AtomicReference<>();
+ Thread producer = new Thread(() -> {
+ try {
+ pump.pump();
+ } catch (Throwable t) {
+ producerError.set(t);
+ }
+ }, "pump-producer");
+
+ producer.start();
+ waitUntilProducerIsParked(pipe);
+ responseFuture.completeExceptionally(new IOException("simulated CRT failure"));
+ producer.join(5_000);
+
+ assertThat(producer.isAlive()).isFalse();
+ assertThat(producerError.get()).isNull();
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ @Test
+ void abort_propagatesToPipe() {
+ BodyChunkPipe pipe = new BodyChunkPipe(2, 8);
+ SyncRequestBodyPump pump = new SyncRequestBodyPump(
+ ContentStreamProvider.fromByteArray(new byte[0]), pipe);
+
+ pump.abort();
+
+ assertThat(pipe.state()).isEqualTo(BodyChunkPipe.State.ABORTED);
+ }
+
+ private static byte[] drainAll(BodyChunkPipe pipe, int expected) {
+ byte[] out = new byte[expected];
+ int written = 0;
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (written < expected && System.nanoTime() < deadline) {
+ ByteBuffer scratch = ByteBuffer.allocate(Math.min(64, expected - written));
+ int n = pipe.pollDrain(scratch);
+ if (n < 0) {
+ break;
+ }
+ if (n == 0) {
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+ continue;
+ }
+ scratch.flip();
+ scratch.get(out, written, n);
+ written += n;
+ }
+ if (written < expected) {
+ throw new AssertionError("Drained only " + written + " of " + expected + " bytes");
+ }
+ return out;
+ }
+
+ private static void waitUntilStateIsOpenWithChunkInFlight(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ if (pipe.allocatedForTest() >= 1) {
+ return;
+ }
+ Thread.sleep(1);
+ }
+ throw new AssertionError("Producer did not allocate a chunk within timeout");
+ }
+
+ /**
+ * Wait for the producer to park on {@code acquireForFill}. Detected by the pipe reaching its
+ * configured depth in allocations and then staying there for a couple of consecutive observations
+ * (the producer can't make further progress without a recycle).
+ */
+ private static void waitUntilProducerIsParked(BodyChunkPipe pipe) throws InterruptedException {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ int stableObservations = 0;
+ int lastAllocated = -1;
+ while (System.nanoTime() < deadline) {
+ int allocated = pipe.allocatedForTest();
+ if (allocated == lastAllocated && allocated > 0) {
+ if (++stableObservations >= 3) {
+ return;
+ }
+ } else {
+ stableObservations = 0;
+ lastAllocated = allocated;
+ }
+ Thread.sleep(20);
+ }
+ throw new AssertionError("Producer did not park within timeout");
+ }
+}
diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
index f1eb1c6e4f23..943635dd41f0 100644
--- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
+++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crthttpclient/S3WithCrtHttpClientsIntegrationTests.java
@@ -16,12 +16,13 @@
package software.amazon.awssdk.services.s3.crthttpclient;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import org.assertj.core.api.Assertions;
+import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.Md5Utils;
@@ -80,4 +82,22 @@ void getObject_toFile_objectSentCorrectly() throws Exception {
assertThat(Md5Utils.md5AsBase64(destination.toFile())).isEqualTo(Md5Utils.md5AsBase64(testFile));
}
+
+ @Test
+ void getObject_responseStreamPipedIntoPutObject_completesWithoutDeadlock() throws Exception {
+ String destinationKey = "piped-" + TEST_KEY;
+ try (ResponseInputStream sourceStream =
+ s3WithCrtHttpClient.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
+ ResponseTransformer.toInputStream())) {
+ long contentLength = sourceStream.response().contentLength();
+
+ PutObjectResponse putResponse = assertTimeoutPreemptively(
+ Duration.ofSeconds(120),
+ () -> s3WithCrtHttpClient.putObject(
+ r -> r.bucket(TEST_BUCKET).key(destinationKey).contentLength(contentLength),
+ RequestBody.fromInputStream(sourceStream, contentLength)));
+
+ assertThat(putResponse.eTag()).isNotBlank();
+ }
+ }
}