Conversation
| subscriber.onNext("item"); | ||
|
|
||
| verify(mockSubscription, times(1)).cancel(); | ||
| verify(mockSubscription, times(2)).cancel(); |
There was a problem hiding this comment.
subscription.cancel() now exists in two codepaths:
- The existing codepath within
onNext()catch block. - The new addition in
future.whenComplete()
Flow:
- onNext()
-- catch
-- subscription.cancel()
-- onError()
- onError()
-- completes exceptionally
-- future.whenCompleted()
-- subscription.cancel() // <-- new addition
I believe this is safe to call twice because:
5
Subscription.cancelMUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.
Note: the codepath which the new addition solves is when the cancellation signal comes from outside of the pub/sub pipeline. If a user aborts the operation by cancelling the DirectoryDownload completion future directly, neither onNext nor onError are involved. The whenComplete handler is the only place that reacts to this external cancellation.
| subscriber.onNext("item"); | ||
|
|
||
| verify(mockSubscription, times(1)).cancel(); | ||
| verify(mockSubscription, times(2)).cancel(); |
There was a problem hiding this comment.
Probably worth adding a quick comment here?
|



When a
DirectoryDownload(orDirectoryUpload) future is cancelled or completed exceptionally,AsyncBufferingSubscribercancels in flight file transfer futures but does not cancel the upstream Subscription.This means the listObjectsV2 paginator continues delivering objects, and new file downloads continue to be started after the user has cancelled the operation.
This change adds
subscription.cancel()to thewhenCompletehandler so that cancellation stops the entire pipeline of both inflight transfers and new work from the upstream publisher.Conforms to Reactive Streams spec: