Skip to content

[fix][broker] Fix replicator getting stuck under rate limiter throttling and honor readBatchSize/maxReadSizeBytes on the default read path#26005

Open
void-ptr974 wants to merge 21 commits into
apache:masterfrom
void-ptr974:fix/replicator-rate-limiter-inflight
Open

[fix][broker] Fix replicator getting stuck under rate limiter throttling and honor readBatchSize/maxReadSizeBytes on the default read path#26005
void-ptr974 wants to merge 21 commits into
apache:masterfrom
void-ptr974:fix/replicator-rate-limiter-inflight

Conversation

@void-ptr974

@void-ptr974 void-ptr974 commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Motivation

PersistentReplicator.readMoreEntries() created an InFlightTask before checking the replicator dispatch rate limiter.

When replicator dispatch throttling was enabled and the rate limiter had no message or byte permits, the method scheduled a retry and returned without issuing cursor.asyncReadEntriesOrWait(...). The newly-created task stayed in entries == null, so it looked like a pending cursor read even though no read request existed.

On the next retry, hasPendingRead() returned true and the replicator stopped scheduling further reads. This could leave geo-replication stuck with backlog when replicator dispatch throttling is enabled.

In addition, readBatchSize/maxReadSizeBytes logic wasn't applied on the default read path when the rate limiter isn't configured.

Modifications

Compute producer and rate-limiter permits before creating the in-flight read task.

Create the InFlightTask only after confirming that a real cursor read will be issued. This preserves the invariant that an entries == null in-flight task corresponds to an actual pending cursor read.

Store the computed byte read limit on the InFlightTask, so the task carries both message and byte read limits without an extra wrapper object.

Added a unit test covering the no-permit rate-limiter path for both message and byte throttling, verifying that no pending in-flight read task is created.

Honor readBatchSize/maxReadSizeBytes on the default read path. This was previously ignored as a result of some earlier changes.

Verifying this change

This change added tests and can be verified as follows:

  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testRateLimiterWithoutPermitsDoesNotCreateInFlightTask
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testCreateOrRecycleInFlightTaskIntoQueue
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testMaybeCreateInFlightReadTask
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterMessageReceivedAllMessages
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterByBytes

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@void-ptr974 void-ptr974 force-pushed the fix/replicator-rate-limiter-inflight branch from 7fed1bd to 16ad2d7 Compare June 12, 2026 01:59
@void-ptr974 void-ptr974 force-pushed the fix/replicator-rate-limiter-inflight branch from 16ad2d7 to 25ed1ac Compare June 12, 2026 02:29
@void-ptr974 void-ptr974 marked this pull request as ready for review June 12, 2026 02:38
@void-ptr974

Copy link
Copy Markdown
Contributor Author

Hi @poorbarcode, this one adjusts when PersistentReplicator creates in-flight read tasks under dispatch rate limiting. Could you help review the pending-read / permit invariant when you have time? Thanks!

@lhotari lhotari added the triage/lhotari/important lhotari's triaging label for important issues or PRs label Jun 15, 2026

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are small details to address. I'd like to get this included in 4.2.3 release which will start very soon. I might just make the changes to this PR and get this merged. I hope that's fine with you.

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I addressed the remaining issue about code duplication and the regression related to the isWritable handling. I refactored the original code since the "AvailablePermits" concept was misleading. It's "ReadLimits" now which conveys the meaning in a better way.

In addition, there was a bug in the existing code where the readBatchSize logic had been disabled unless there was a rate limiter configured. That is now fixed since the getReadLimits method handles all logic related to the limits.

@lhotari lhotari requested a review from poorbarcode June 24, 2026 23:46
@lhotari lhotari changed the title [fix][broker] Prevent replicator from getting stuck when dispatch rate limiter has no permits [fix][broker] Fix replicator getting stuck under rate limiter throttling and honor readBatchSize/maxReadSizeBytes on the default read path Jun 24, 2026
protected static class InFlightTask {
Position readPos;
int readingEntries;
long bytesToRead;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not add the field bytesToRead, which is meaningless, and it will mislead readers into thinking that this field will also be involved in the determination of isDone()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently a tradeoff to avoid adding another wrapper object. The original solution didn't have it in InFlightTask. I think it makes sense from conceptual perspective to simply add it to InFlightTask since having yet another concept for a similar thing is confusing. (#26005 (comment))

@lhotari lhotari Jun 25, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will mislead readers into thinking that this field will also be involved in the determination of isDone()

valid point. could renaming to maxBytesToRead address this?

@void-ptr974

Copy link
Copy Markdown
Contributor Author

Thanks @poorbarcode and @lhotari. I pushed the remaining follow-ups.

  • Renamed InFlightTask.bytesToRead to maxBytesToRead. This should make it clearer that the field only carries the max byte limit for cursor.asyncReadEntriesOrWait(...); it is not involved in InFlightTask.isDone(), which still only depends on the read result entries and completed entries.
  • Converted testRateLimiterWithoutPermitsDoesNotCreateInFlightTask to use a TestNG @DataProvider, so the message-permit and byte-permit cases run as separate cases.
  • Added an integration test in OneWayReplicatorTest to cover the real replication path when the replicator dispatch rate limiter temporarily has no permits, and verify replication continues afterwards.

I also ran the focused broker tests locally multiple times, including the new integration test.

Consumer<String> consumer = null;
boolean topicCreated = false;
try {
admin1.namespaces().setReplicatorDispatchRate(replicatedNamespace, dispatchRate);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the topic-level policy, which will not affect other topics in the same namespace.

protected static class InFlightTask {
Position readPos;
int readingEntries;
long maxBytesToRead;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still consider removing the field is better, InFlightTask does not care about this field except for logging

@void-ptr974

Copy link
Copy Markdown
Contributor Author

Thanks @poorbarcode and @lhotari. I pushed another follow-up to make the read-limit handling clearer.

What changed:

  • Changed the new integration test to use topic-level replicator dispatch rate policy, so the test does not affect other topics in the namespace.
  • Removed maxBytesToRead from InFlightTask.
  • Split the decision into two parts:
    • ReadLimits decides the next read size, including both message and byte limits.
    • InFlightTask only tracks the in-flight read/publish lifecycle.
  • readMoreEntries() now keeps the byte limit as a local value and passes it directly to cursor.asyncReadEntriesOrWait(...).
  • Updated the unit tests to validate read-limit decisions directly, including both message and byte limits.

The main idea is to avoid mixing a read request parameter into InFlightTask. maxBytesToRead only belongs to the next cursor read, while InFlightTask should only represent the pending/completed entries lifecycle. This should make the intent clearer and avoid an extra wrapper object/allocation on the read scheduling path.


protected InFlightTask acquirePermitsIfNotFetchingSchema() {
@VisibleForTesting
ReadLimits maybeGetReadLimitsForNextRead() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a bit costly to abstract a method with only one line of code for use by test, and then distribute the lock to multiple locations for invocation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants