[KIP-932] Implement KIP-932 based Share Consumer#2286
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR merges the KIP-932 “Share Consumer” feature into the Python client, introducing a new ShareConsumer C-extension type (plus a DeserializingShareConsumer wrapper), adding delivery-count support on Message, and expanding unit/integration coverage. It also adjusts CI/test tooling to run against Kafka 4.2.0 and (temporarily) build librdkafka from a branch until share-consumer support is available in a released librdkafka.
Changes:
- Add
ShareConsumerimplementation in the C extension, export it inconfluent_kafka, and add stubs/types for new APIs (includingAcknowledgeType). - Extend
Messagewithdelivery_count()and update pickling behavior/tests accordingly. - Add extensive unit + integration tests for share consumer behavior (polling, callbacks, deserialization, SASL, OAUTH), and update CI/test scripts for Kafka 4.2.0 + branch-built librdkafka.
Reviewed changes
Copilot reviewed 38 out of 40 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tools/wheels/build-librdkafka-branch.sh | New helper to build/install librdkafka from a git branch into the wheel/NuGet-style layout. |
| tools/source-package-verification.sh | Pin Kafka tarball cache to 4.2.0 and add LIBRDKAFKA_BRANCH fallback for tests. |
| tests/test_ShareConsumer.py | New unit tests covering ShareConsumer constructor/subscribe/poll/ack/commit/lifecycle/thread-conflict/dealloc behavior. |
| tests/test_ShareConsumer_callbacks.py | New unit tests for callback dispatch paths (error/log/oauth/throttle/stats rejection) for ShareConsumer. |
| tests/test_message.py | Extend Message pickling tests to include delivery_count and validate overflow behavior. |
| tests/test_docs.py | Avoid descending into enum members during doc-tree validation to prevent false failures. |
| tests/test_DeserializingShareConsumer.py | New unit tests for DeserializingShareConsumer offline construction and _deserialize() behavior. |
| tests/integration/share_consumer/test_share_consumer.py | New integration tests for share consumer semantics (no overlap, ordering, unsubscribe, max.poll.records, redelivery, etc.). |
| tests/integration/share_consumer/test_share_consumer_deserialization.py | New integration coverage for deserialization success/failure handling incl. Schema Registry formats. |
| tests/integration/share_consumer/test_share_consumer_callbacks.py | New integration test ensuring throttle_cb can actually fire under broker quota. |
| tests/integration/share_consumer/conftest.py | Autouse cleanup fixture deleting share-consumer test topics after module runs. |
| tests/integration/share_consumer/init.py | New package marker for share consumer integration tests. |
| tests/integration/share_consumer_sasl/test_share_consumer_sasl.py | New SASL-cluster integration tests for set_sasl_credentials behavior. |
| tests/integration/share_consumer_sasl/conftest.py | SASL-cluster topic cleanup fixture (mirrors plaintext share-consumer cleanup). |
| tests/integration/share_consumer_sasl/init.py | New package marker for share consumer SASL integration tests. |
| tests/integration/share_consumer_oauth/unsecured_token.py | Helper to mint unsigned JWTs for the trivup OAUTHBEARER unsecured validator tests. |
| tests/integration/share_consumer_oauth/test_oauth_integration.py | End-to-end OAUTHBEARER integration tests for ShareConsumer (init + refresh path). |
| tests/integration/share_consumer_oauth/conftest.py | Trivup OAUTHBEARER cluster fixture + stripped baseline config for oauth_cb path. |
| tests/integration/share_consumer_oauth/init.py | New package marker for share consumer OAUTH integration tests. |
| tests/integration/cluster_fixture.py | Add share-consumer factory helpers + per-group share.auto.offset.reset setup via incremental alter configs. |
| tests/integration/admin/test_incremental_alter_configs.py | Account for Kafka 4.1+ ELR side-effects in expected DescribeConfigs results (skip pre-4.1). |
| tests/common/init.py | Add ShareConsumer/DeserializingShareConsumer test wrappers + share-consumer drain helpers; pin broker version/conf for share groups. |
| src/confluent_kafka/src/ShareConsumer.c | New C extension implementation of ShareConsumer (subscribe/poll/ack/commit callbacks/oauth/log wiring/etc.). |
| src/confluent_kafka/src/Producer.c | Adjust oauth-token wait path: caller now owns destroy on wait timeout. |
| src/confluent_kafka/src/Consumer.c | Adjust oauth-token wait path: caller now owns destroy on wait timeout. |
| src/confluent_kafka/src/confluent_kafka.h | Add ShareConsumer state in Handle union, new error message constant, and Message.delivery_count field. |
| src/confluent_kafka/src/confluent_kafka.c | Add Message.delivery_count(), include it in pickling, add share-commit helpers, and fix yield/oauth to use callback rk. |
| src/confluent_kafka/src/AdminTypes.c | Export share acknowledge type constants to Python module. |
| src/confluent_kafka/src/Admin.c | Adjust oauth-token wait path: caller now owns destroy on wait timeout. |
| src/confluent_kafka/deserializing_share_consumer.py | New DeserializingShareConsumer implementation built on ShareConsumer with per-record error marking. |
| src/confluent_kafka/cimpl.pyi | Update stubs for ShareConsumer, AcknowledgeType, Message.delivery_count/set_error, and callback typing. |
| src/confluent_kafka/_types.py | Add AcknowledgementCommitCallback type alias. |
| src/confluent_kafka/_model/init.py | Add AcknowledgeType IntEnum mapping to exported cimpl constants. |
| src/confluent_kafka/init.py | Export ShareConsumer, DeserializingShareConsumer, and AcknowledgeType from top-level package. |
| setup.py | Add ShareConsumer.c to extension sources. |
| examples/share_consumer.py | New example showing basic ShareConsumer usage and implicit-ack semantics. |
| .semaphore/semaphore.yml | Add LIBRDKAFKA_BRANCH env var and branch fallback for source-package verification (wheel blocks still TODO). |
| .formatignore | Exclude cimpl.pyi from clang-format to avoid stub corruption. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| rd_kafka_topic_partition_list_add(c_topics, | ||
| cfl_PyUnistr_AsUTF8(uo, &uo8), | ||
| RD_KAFKA_PARTITION_UA); | ||
| Py_XDECREF(uo8); | ||
| Py_DECREF(uo); |
| ARCH=${ARCH:-x64} | ||
| SRC=/tmp/librdkafka-${BRANCH} | ||
| INSTALL=$SRC/install |
| sc = kafka_cluster.share_consumer() | ||
| # TODO KIP-932: move producer creation inside the try block (consistent | ||
| # with the other tests) so sc doesn't leak if cimpl_producer() raises. | ||
| producer = kafka_cluster.cimpl_producer() | ||
| try: | ||
| # Phase 1: prove the topic_a subscription actually works before we | ||
| # switch — otherwise we'd never know whether subscribe([topic_b]) |
| env_vars: | ||
| - name: LIBRDKAFKA_VERSION | ||
| value: v2.14.2 | ||
| # TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support | ||
| - name: LIBRDKAFKA_BRANCH | ||
| value: dev_kip-932_queues-for-kafka | ||
| prologue: |
|
Pranav Rathi (pranavrth)
left a comment
There was a problem hiding this comment.
Approving the PR as all the commits were already approved as individual PRs.
Kaushik Raina (@k-raina) - Please check the copilot comments in a separate PR.
LGTM!. Thanks Kaushik Raina (@k-raina)!


Summary
This PR is for merging Share Consumer feature branch to master. All the commits are PR reviewed in the feature branch. This is just the merge PR.
DO NOT SQUASH AND MERGE. ALL COMMITS SHOULD APPEAR IN THE FINAL MASTER BRANCH.
NOTE:
This branch points to librdkafka branch confluentinc/librdkafka#5506 . Once librdkafka PR is merged we will be raising another PR to point python master to librdkafka master.