[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132
[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132Kaixuan-Duan wants to merge 4 commits intoapache:mainfrom
Conversation
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks for the contribution. I will help to review this PR
One process point: this issue was already assigned and I was actively working on it. In that situation, please coordinate on the issue before opening an overlapping PR. Assignment is not exclusive ownership, but it is an important coordination signal, and skipping it usually leads to duplicated effort and fragmented review.
We can evaluate this PR on its merits, but for future cases please check on the issue first.
fresh-borzoni
left a comment
There was a problem hiding this comment.
Ty, direction is right, I left some cooments, PTAL
|
|
||
| private void cancelPrefetch() { | ||
| if (nextDownloadedSegmentFuture != null) { | ||
| nextDownloadedSegmentFuture.cancel(true); |
There was a problem hiding this comment.
cancel(true) on an already-completed future is a no-op and drops the reference to the downloaded File, which then lives in tempDir until fetcher-level close()
There was a problem hiding this comment.
Thanks — I have updated the logic to handle this explicitly.
| activeIterator = null; | ||
| } | ||
| } finally { | ||
| downloadExecutor.shutdownNow(); |
There was a problem hiding this comment.
shutdownNow() doesn't wait - if a prefetch is mid-flush, it can write to tempDir after deleteDirectoryQuietly runs. Either downloadExecutor.awaitTermination() with a short timeout before deletion, or make downloadSegment interruption-aware (most S3 SDKs don't honor Thread.isInterrupted() during socket reads, so the interrupt from shutdownNow is effectively decorative)
There was a problem hiding this comment.
Thanks for pointing this out. I updated close() to call awaitTermination() after shutdownNow() before deleting the temp directory. If the download executor does not terminate within the timeout, the cleanup is skipped to avoid racing with an in-progress download that may still write into tempDir.
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { |
There was a problem hiding this comment.
If fetch() is called twice, the first Iterable still wraps the now-closed iterator and iterating it re-enters advance() on a closed instance, downloading into the shared tempDir, racing with the new iterator
There was a problem hiding this comment.
I updated the closed iterator to stop immediately by marking it finished, clearing the pending batch, and making hasNext() return false after close.
| } | ||
|
|
||
| private File fetchSegmentFile(RemoteLogSegment segment) throws IOException { | ||
| if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) { |
There was a problem hiding this comment.
This depends on RemoteLogSegment having value-based equals(), or on both references coming from the same segments list (reference equality). Works today, but safer to compare by segment id tbh.
There was a problem hiding this comment.
I changed the prefetch match to compare remoteLogSegmentId() instead of relying on RemoteLogSegment.equals().
| if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) { | ||
| try { | ||
| return nextDownloadedSegmentFuture.get(); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
Also catch CancellationException - it's unchecked (extends RuntimeException) and CompletableFuture.get() throws it on a cancelled future. Not a live bug in the current state machine (every cancelPrefetch nulls the field) but cheap defense-in-depth, especially given closed is volatile.
There was a problem hiding this comment.
I added this as a defensive guard: if the prefetched future is cancelled and get() throws CancellationException.
| @@ -28,10 +28,13 @@ | |||
| import org.junit.jupiter.api.Test; | |||
There was a problem hiding this comment.
non-blocking: Two of the three new tests inject state via reflection (setPrivateField) instead of exercising a real async prefetch - they cover the branches in fetchSegmentFile, but not close-during-real-in-flight-download or the orphan-file cleanup.
Consider one integration-style test with a real slow/failing download source.
There was a problem hiding this comment.
Ty, Addressed comments. PTAL
| "Prefetched segment {} failed, fallback to sync download.", | ||
| segment.remoteLogSegmentId(), | ||
| e.getCause()); | ||
| return downloadSegment(segment); |
There was a problem hiding this comment.
Non-blocking: No retry on transient S3 failure - one flaky segment fails the entire recovery. In fluss-rust we added exponential backoff (100ms -> 5s with jitter) for this.
There was a problem hiding this comment.
Thanks. The retry design in fluss-rust is solid, so I followed the same idea here and added retries around synchronous segment downloads with exponential backoff and jitter, so transient remote storage failures are retried before failing the recovery.
| return downloadSegment(segment); | ||
| } | ||
|
|
||
| private void prefetchNextSegment() { |
There was a problem hiding this comment.
Prefetch depth hardcoded to 1. If S3 p99 download time > consume time for a segment, the downloader sits idle and the optimization is half-realized. On the Rust side (fluss-rust #187) we landed on configurable depth with default 4 for exactly this reason. Since it's KV depth = 1 might be fine, but it's still better to configure and reason properly
There was a problem hiding this comment.
You're right, I optimized the prefetch depth, PTAL.
|
@fresh-borzoni Thanks for the review, and sorry for not coordinating on the issue beforehand. I didn’t realize it was already being actively worked on. |
|
@fresh-borzoni Thank you for the review. Addressed, PTAL. |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Ty for the changes, left some comments, PTAL
|
|
||
| final RemoteLogSegment target = segment; | ||
| CompletableFuture<File> future; | ||
| try { |
There was a problem hiding this comment.
The async download calls downloadSegment(target), while retry only exists in the sync fallback. So a transient remote-storage failure in the prefetch worker isn't retried in the background - instead the consumer later hits the failed future and runs the retry loop synchronously. This pushes retry latency back onto the recovery thread and largely defeats the point of prefetching. The async worker should call downloadSegmentWithRetry(target).
| // no local file to clean up | ||
| } | ||
| } else { | ||
| future.cancel(true); |
There was a problem hiding this comment.
CompletableFuture.cancel(true) flips the future state to CANCELLED but does not propagate an interrupt to the underlying supplyAsync task. The worker keeps running, finishes the download, and writes the file to disk after we thought we cancelled. Even when cancel(true) returns true, the task may still complete. The isDone() check added here only handles the "already-completed when drained" case.
Switch to executor.submit() and store Future in PrefetchEntry, then FutureTask.cancel(true) actually interrupts the worker
| // exceptionally (an interrupted Thread.sleep surfaces as InterruptedException). | ||
| for (CompletableFuture<File> f : inflight) { | ||
| assertThat(f.isDone()).isTrue(); | ||
| assertThat(f.isCancelled() || f.isCompletedExceptionally()).isTrue(); |
There was a problem hiding this comment.
f.isCancelled() is just API state and says nothing about whether the download stopped. After the submit/Future switch, rewrite using smth like PR #2786's BlockingFileDownloader + latch pattern to assert real interruption.
| * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used by a single thread | ||
| * only. | ||
| */ | ||
| @NotThreadSafe |
There was a problem hiding this comment.
@NotThreadSafe plus volatile plus async workers is a contradiction. KV recovery is single-threaded per bucket in practice, so we might wish to prefer single-threaded usage here. Also with this hybrid design I believe we have possible race condition in ArrayDeque
WDYT?
| * respectively. When a prefetched segment is consumed, its permit is released and the fetcher | ||
| * immediately tries to start the next download, giving the window the behavior of Rust's RAII | ||
| * {@code PrefetchPermit} + recycle-notify pattern in <a | ||
| * href="https://github.com/apache/fluss-rust/pull/187">fluss-rust PR#187</a>. |
There was a problem hiding this comment.
I wouldn't reference fluss-rust here - cross-repo Javadoc links go stale(especially bc fluss-rust is planned to be ported to this repo), and the description should stand on its own.
Also: this pattern shines in async Rust where awaiting tasks need explicit notification on permit release (and RAII gives you deterministic permit drop). Java's sync consumer makes the permit-recycle machinery redundant - a Future[] indexed by seq % depth is fine as well.
Not a blocker, just worth considering.
Purpose
Linked issue: close #3091
This PR improves KV recovery performance by reducing wait time between remote log segments in
RemoteLogFetcher.Brief change log
Prefetch limit (default 4) — caps the number of downloaded-but-not-consumed segments via a Semaphore, bounding local-disk usage.
Download-thread limit (default 3) — caps simultaneous downloads via a dedicated bounded ExecutorService.
5times with backoff100ms → 5sand0.25jitter (ExponentialBackoff), preventing thrash on transient remote-storage failures while still surfacing persistent failures to the recovery driver.fetchSegmentFile()transparently falls back to a synchronousdownloadSegmentWithRetry, so a single bad segment cannot stall recovery as long as it is recoverable; if it is not, the failure is propagated and the prefetch window is drained to release permits.Tests
./mvnw -pl fluss-server spotless:apply -Dtest='RemoteLogFetcherTest' -DfailIfNoTests=false testAPI and Format
kv.recover.remote-log.prefetch-num int(default 4) : Max remote log segments downloaded but not yet consumed during KV recovery. 1 preserves the legacy one-step-prefetch behavior.kv.recover.remote-log.download-threads(default 3) : Threads used to download remote log segments during KV recovery. Should be≤ prefetch-num.No breaking changes to user-facing public APIs.
Documentation
No user-facing feature. No documentation update required.