[fix][broker] Fix replicator getting stuck under rate limiter throttling and honor readBatchSize/maxReadSizeBytes on the default read path#26005
Conversation
7fed1bd to
16ad2d7
Compare
…e limiter has no permits
16ad2d7 to
25ed1ac
Compare
|
Hi @poorbarcode, this one adjusts when |
lhotari
left a comment
There was a problem hiding this comment.
There are small details to address. I'd like to get this included in 4.2.3 release which will start very soon. I might just make the changes to this PR and get this merged. I hope that's fine with you.
…-limiter-inflight
…rlapping with "permits"
There was a problem hiding this comment.
LGTM, I addressed the remaining issue about code duplication and the regression related to the isWritable handling. I refactored the original code since the "AvailablePermits" concept was misleading. It's "ReadLimits" now which conveys the meaning in a better way.
In addition, there was a bug in the existing code where the readBatchSize logic had been disabled unless there was a rate limiter configured. That is now fixed since the getReadLimits method handles all logic related to the limits.
| protected static class InFlightTask { | ||
| Position readPos; | ||
| int readingEntries; | ||
| long bytesToRead; |
There was a problem hiding this comment.
Please do not add the field bytesToRead, which is meaningless, and it will mislead readers into thinking that this field will also be involved in the determination of isDone()
There was a problem hiding this comment.
It's currently a tradeoff to avoid adding another wrapper object. The original solution didn't have it in InFlightTask. I think it makes sense from conceptual perspective to simply add it to InFlightTask since having yet another concept for a similar thing is confusing. (#26005 (comment))
There was a problem hiding this comment.
it will mislead readers into thinking that this field will also be involved in the determination of
isDone()
valid point. could renaming to maxBytesToRead address this?
|
Thanks @poorbarcode and @lhotari. I pushed the remaining follow-ups.
I also ran the focused broker tests locally multiple times, including the new integration test. |
| Consumer<String> consumer = null; | ||
| boolean topicCreated = false; | ||
| try { | ||
| admin1.namespaces().setReplicatorDispatchRate(replicatedNamespace, dispatchRate); |
There was a problem hiding this comment.
using the topic-level policy, which will not affect other topics in the same namespace.
| protected static class InFlightTask { | ||
| Position readPos; | ||
| int readingEntries; | ||
| long maxBytesToRead; |
There was a problem hiding this comment.
I still consider removing the field is better, InFlightTask does not care about this field except for logging
|
Thanks @poorbarcode and @lhotari. I pushed another follow-up to make the read-limit handling clearer. What changed:
The main idea is to avoid mixing a read request parameter into |
|
|
||
| protected InFlightTask acquirePermitsIfNotFetchingSchema() { | ||
| @VisibleForTesting | ||
| ReadLimits maybeGetReadLimitsForNextRead() { |
There was a problem hiding this comment.
Isn't it a bit costly to abstract a method with only one line of code for use by test, and then distribute the lock to multiple locations for invocation?
Motivation
PersistentReplicator.readMoreEntries()created anInFlightTaskbefore checking the replicator dispatch rate limiter.When replicator dispatch throttling was enabled and the rate limiter had no message or byte permits, the method scheduled a retry and returned without issuing
cursor.asyncReadEntriesOrWait(...). The newly-created task stayed inentries == null, so it looked like a pending cursor read even though no read request existed.On the next retry,
hasPendingRead()returned true and the replicator stopped scheduling further reads. This could leave geo-replication stuck with backlog when replicator dispatch throttling is enabled.In addition,
readBatchSize/maxReadSizeByteslogic wasn't applied on the default read path when the rate limiter isn't configured.Modifications
Compute producer and rate-limiter permits before creating the in-flight read task.
Create the
InFlightTaskonly after confirming that a real cursor read will be issued. This preserves the invariant that anentries == nullin-flight task corresponds to an actual pending cursor read.Store the computed byte read limit on the
InFlightTask, so the task carries both message and byte read limits without an extra wrapper object.Added a unit test covering the no-permit rate-limiter path for both message and byte throttling, verifying that no pending in-flight read task is created.
Honor
readBatchSize/maxReadSizeByteson the default read path. This was previously ignored as a result of some earlier changes.Verifying this change
This change added tests and can be verified as follows:
./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testRateLimiterWithoutPermitsDoesNotCreateInFlightTask./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testCreateOrRecycleInFlightTaskIntoQueue./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testMaybeCreateInFlightReadTask./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterMessageReceivedAllMessages./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterByBytesDoes this pull request potentially affect one of the following parts: