[feat] PIP-468: Add scalable topic protocol commands and connection handling#25564
Merged
merlimat merged 1 commit intoapache:masterfrom Apr 24, 2026
Merged
[feat] PIP-468: Add scalable topic protocol commands and connection handling#25564merlimat merged 1 commit intoapache:masterfrom
merlimat merged 1 commit intoapache:masterfrom
Conversation
This was referenced Apr 22, 2026
Define ScalableTopicLookup, ScalableTopicUpdate, and ScalableTopicClose protocol commands in PulsarApi.proto. Add Commands factory methods for building/parsing these messages, including error responses. Implement ServerCnx handlers for lookup (with DagWatchSession that resolves segment broker addresses and pushes DAG updates) and close commands. Add ClientCnx handler for ScalableTopicUpdate with DagWatchSession lifecycle management and connection-closed cleanup.
lhotari
approved these changes
Apr 24, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Second PR in the PIP-468 series. Introduces the wire protocol for the
scalable-topic client/broker interaction:
PulsarApi.proto:CommandScalableTopicLookup,CommandScalableTopicUpdate,CommandScalableTopicClose,CommandScalableTopicSubscribe,CommandScalableTopicSubscribeResponse,CommandScalableTopicAssignmentUpdate.Commandsfactory methods for building each message (including theerror variants for lookup/subscribe).
PulsarDecoderdispatch entries for all six new message types.DagWatchSession— resolves per-segment brokeraddresses, watches the metadata store for DAG changes, and pushes
ScalableTopicUpdateframes on the wire. Tied to the transportconnection — closed when the connection drops.
ServerCnxhandlers forScalableTopicLookupandScalableTopicClose.DagWatchSessioninterface (callback surface for the V5client that lands later).
ClientCnxhandler forScalableTopicUpdatethat dispatches to theregistered session.
ConsumerSession.sendAssignmentUpdatewired through to the newcommand sender (previously a placeholder no-op).
Tests
Three new unit-test classes:
CommandsScalableTopicTest(8 tests) — every new factory methodroundtrips through the full wire frame; uses
BaseCommand.materialize()so the buffer can be safely releasedbefore assertions.
ConsumerSessionTest(15 tests) — covers identity/equality,attach/markDisconnected, grace-timer lifecycle, all three
sendAssignmentUpdatebranches (connected, disconnected, nullcommand sender), and the
toProtostatic helper.DagWatchSessionTest(12 tests) — session identity, idempotentclose,
start()failure on missing topic, metadata-store listenerregistration, notification filtering (wrong path / Deleted /
post-close),
pushUpdateserializing the correct DAG to theconnection, and post-close/no-broker-map edge cases.
The heavy
NamespaceServicelookup path insidestart()/buildResponseis deferred to broker integration tests — mocking thefull namespace stack here is more noise than signal.
Test plan
./gradlew :pulsar-common:test --tests "org.apache.pulsar.common.protocol.CommandsScalableTopicTest"— 8/8 pass.
./gradlew :pulsar-broker:test --tests "org.apache.pulsar.broker.service.scalable.*"— 61/61 pass (5 test classes, including the two existing ones
from the previous PR).
./gradlew :pulsar-broker:checkstyleMain :pulsar-broker:checkstyleTest :pulsar-common:checkstyleMain :pulsar-common:checkstyleTest— all clean.