From d2bd84ce515b93d578a9c71192d2f53024fad9eb Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 10:27:22 -0700 Subject: [PATCH 1/5] Cancel AsyncBufferingSubscriber upstream subscription when future is cancelled or fails --- .../s3/internal/AsyncBufferingSubscriber.java | 5 ++++- .../s3/internal/AsyncBufferingSubscriberTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 75231bd262d5..832163fc65e7 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -48,7 +48,7 @@ public AsyncBufferingSubscriber(Function> consumer, CompletableFuture returnFuture, int maxConcurrentExecutions) { this.returnFuture = returnFuture; - this.consumer = consumer; + this.consumer = consumer; this.maxConcurrentExecutions = maxConcurrentExecutions; this.numRequestsInFlight = new AtomicInteger(0); this.requestsInFlight = ConcurrentHashMap.newKeySet(); @@ -56,6 +56,9 @@ public AsyncBufferingSubscriber(Function> consumer, returnFuture.whenComplete((r, t) -> { if (t != null) { requestsInFlight.forEach(f -> f.cancel(true)); + if (subscription != null) { + subscription.cancel(); + } } }); } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 3e64783561ec..4d9da2ceaff2 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -131,4 +131,18 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce verify(mockSubscription, times(1)).cancel(); assertThatThrownBy(future::join).hasCause(exception); } + + @Test + void returnFutureCancelled_shouldCancelUpstreamSubscription() { + CompletableFuture future = new CompletableFuture<>(); + AsyncBufferingSubscriber subscriber = new AsyncBufferingSubscriber<>( + s -> new CompletableFuture<>(), future, 10); + + Subscription mockSubscription = mock(Subscription.class); + subscriber.onSubscribe(mockSubscription); + subscriber.onNext("item"); + + future.cancel(true); + verify(mockSubscription, times(1)).cancel(); + } } From 1387eb800200029b05ea533d5184d4078cac0022 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 10:40:15 -0700 Subject: [PATCH 2/5] Add changelog --- .changes/next-release/bugfix-S3TransferManager-e28a164.json | 6 ++++++ .../transfer/s3/internal/AsyncBufferingSubscriber.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 .changes/next-release/bugfix-S3TransferManager-e28a164.json diff --git a/.changes/next-release/bugfix-S3TransferManager-e28a164.json b/.changes/next-release/bugfix-S3TransferManager-e28a164.json new file mode 100644 index 000000000000..2faa7bc68b8d --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-e28a164.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fixed an issue where cancelling a directory transfer did not fully stop the operation." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 832163fc65e7..64352787372f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -48,7 +48,7 @@ public AsyncBufferingSubscriber(Function> consumer, CompletableFuture returnFuture, int maxConcurrentExecutions) { this.returnFuture = returnFuture; - this.consumer = consumer; + this.consumer = consumer; this.maxConcurrentExecutions = maxConcurrentExecutions; this.numRequestsInFlight = new AtomicInteger(0); this.requestsInFlight = ConcurrentHashMap.newKeySet(); From 0d7976dd5516f228255a00f4403137d69d0f5ec7 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:10:08 -0700 Subject: [PATCH 3/5] Fix test --- .../transfer/s3/internal/AsyncBufferingSubscriberTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 4d9da2ceaff2..9e471c0a777b 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -128,7 +128,7 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce subscriber.onSubscribe(mockSubscription); subscriber.onNext("item"); - verify(mockSubscription, times(1)).cancel(); + verify(mockSubscription, times(2)).cancel(); assertThatThrownBy(future::join).hasCause(exception); } From 90d081f83d253d2b265a9a81319e4f95ab619d28 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:33:05 -0700 Subject: [PATCH 4/5] Update whenComplete to include a synchronization lock --- .../transfer/s3/internal/AsyncBufferingSubscriber.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 64352787372f..dae8976e4bf5 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -56,8 +56,10 @@ public AsyncBufferingSubscriber(Function> consumer, returnFuture.whenComplete((r, t) -> { if (t != null) { requestsInFlight.forEach(f -> f.cancel(true)); - if (subscription != null) { - subscription.cancel(); + synchronized (this) { + if (subscription != null) { + subscription.cancel(); + } } } }); From 58d296f6e37049f1b9790f5171812deb2e532bc0 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:15:02 -0700 Subject: [PATCH 5/5] Add comment clarifying change in unit test --- .../transfer/s3/internal/AsyncBufferingSubscriberTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 9e471c0a777b..053fa3b3b691 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -128,6 +128,11 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce subscriber.onSubscribe(mockSubscription); subscriber.onNext("item"); + /* + subscription.cancel() now exists in two codepaths: + - in onNext() catch block. + - in future.whenComplete() + */ verify(mockSubscription, times(2)).cancel(); assertThatThrownBy(future::join).hasCause(exception); }