Skip to content

[client] Propagate write batch failures to flush#3468

Open
raoluSmile wants to merge 1 commit into
apache:mainfrom
raoluSmile:fix-propagate-write-failures
Open

[client] Propagate write batch failures to flush#3468
raoluSmile wants to merge 1 commit into
apache:mainfrom
raoluSmile:fix-propagate-write-failures

Conversation

@raoluSmile

Copy link
Copy Markdown

Purpose

When WriterClient.flush() waits for pending write batches, it relies on
RecordAccumulator.awaitFlushCompletion(), which waits on each
WriteBatch.RequestFuture.

Before this change, a write batch could be completed exceptionally through
WriteBatch.completeExceptionally(exception), but RequestFuture only counted
down its latch and did not keep or rethrow the exception. As a result,
flush() could wait silently without surfacing the underlying write failure,
and in retry/error paths this could look like a hang to the caller.

I hit this when TabletServer rejected writes with DISK_WRITE_LOCKED.
The writer kept retrying until request timeout, but the failure was not
propagated through the flush waiting path, so the test appeared to hang instead
of failing with the real write error.

This issue is independent of DISK_WRITE_LOCKED itself. Any write batch that
is completed exceptionally should be observable by callers waiting in flush().

This is related to, but different from, #1258. #1258 fixed a lost WriteBatch
case where awaitFlushCompletion() could wait forever. This PR fixes another
failure path: a WriteBatch is completed exceptionally, but the corresponding
RequestFuture.await() did not propagate the batch exception.

Brief change log

  • Store the batch exception in WriteBatch.RequestFuture when a batch completes.
  • Rethrow the stored exception from RequestFuture.await().
  • Let RecordAccumulator.awaitFlushCompletion() propagate batch failures.
  • Wrap propagated flush failures in WriterClient.flush().
  • Stop retrying retriable write errors after the request timeout has elapsed.
  • Add a unit test to verify that awaitFlushCompletion() observes batch failures.

Tests

  • ./mvnw -pl fluss-client -DfailIfNoTests=false -Dtest=RecordAccumulatorTest#testAwaitFlushCompletionPropagatesBatchFailure test
  • ./mvnw -pl fluss-client -am -DskipTests compile

API and Format

No public API or storage format changes.

Documentation

No documentation changes are required.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant