Skip to content

[FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream#28012

Open
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39533
Open

[FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream#28012
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39533

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

NativeS3InputStream calls ResponseInputStream.close() when releasing streams during seek(), skip(), and close() operations. Apache HttpClient's close() implementation
drains all remaining bytes from the response body to enable HTTP connection reuse. For large S3 objects where only a small portion was read (e.g., checkpoint metadata from a
multi-GB state file), this drains potentially gigabytes of data over the network — causing severe latency during checkpoint restore and seek-heavy read patterns.

The AWS SDK v2 ResponseInputStream JavaDoc explicitly recommends
calling abort() when remaining data is not needed. This PR replaces close() with abort() in the stream release path.

Brief change log

  • Added releaseStream() method to NativeS3InputStream that calls abort() instead of close() on the underlying ResponseInputStream, and drops the BufferedInputStream
    wrapper without closing it (closing would delegate to the drain path)

  • openStreamAtCurrentPosition() and close() now use releaseStream() for stream cleanup

  • Added NativeS3InputStreamTest with 8 tests covering abort lifecycle, data correctness, position tracking, and error paths

    Verifying this change

    This change added tests and can be verified as follows:

  • Unit Test

  • Manually validated end-to-end on a local Flink 2.3-SNAPSHOT cluster with a stateful job writing checkpoints (up to 199MB) to S3, triggering a savepoint, restoring from it, and confirming checkpoints completed successfully after restore with zero S3/stream errors

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API, i.e., is any changed class annotated with @Public(Evolving): no
    • The serializers: no
    • The runtime per-record code paths (performance sensitive): no
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
    • The S3 file system connector: yes

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable

    Was generative AI tooling used to co-author this PR?
    • [] Yes (please specify the tool below)

@Samrat002
Copy link
Copy Markdown
Contributor Author

cc: @gaborgsomogyi

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 23, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…aining bytes exceed threshold in NativeS3InputStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants