Skip to content

[KIP-932] Implement KIP-932 based Share Consumer#2286

Merged
Kaushik Raina (k-raina) merged 10 commits into
masterfrom
dev_kip-932_queues-for-kafka
Jun 23, 2026
Merged

[KIP-932] Implement KIP-932 based Share Consumer#2286
Kaushik Raina (k-raina) merged 10 commits into
masterfrom
dev_kip-932_queues-for-kafka

Conversation

@k-raina

@k-raina Kaushik Raina (k-raina) commented Jun 19, 2026

Copy link
Copy Markdown
Member

Summary
This PR is for merging Share Consumer feature branch to master. All the commits are PR reviewed in the feature branch. This is just the merge PR.

DO NOT SQUASH AND MERGE. ALL COMMITS SHOULD APPEAR IN THE FINAL MASTER BRANCH.

NOTE:
This branch points to librdkafka branch confluentinc/librdkafka#5506 . Once librdkafka PR is merged we will be raising another PR to point python master to librdkafka master.

Copilot AI review requested due to automatic review settings June 19, 2026 04:27
@confluent-cla-assistant

Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR merges the KIP-932 “Share Consumer” feature into the Python client, introducing a new ShareConsumer C-extension type (plus a DeserializingShareConsumer wrapper), adding delivery-count support on Message, and expanding unit/integration coverage. It also adjusts CI/test tooling to run against Kafka 4.2.0 and (temporarily) build librdkafka from a branch until share-consumer support is available in a released librdkafka.

Changes:

  • Add ShareConsumer implementation in the C extension, export it in confluent_kafka, and add stubs/types for new APIs (including AcknowledgeType).
  • Extend Message with delivery_count() and update pickling behavior/tests accordingly.
  • Add extensive unit + integration tests for share consumer behavior (polling, callbacks, deserialization, SASL, OAUTH), and update CI/test scripts for Kafka 4.2.0 + branch-built librdkafka.

Reviewed changes

Copilot reviewed 38 out of 40 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tools/wheels/build-librdkafka-branch.sh New helper to build/install librdkafka from a git branch into the wheel/NuGet-style layout.
tools/source-package-verification.sh Pin Kafka tarball cache to 4.2.0 and add LIBRDKAFKA_BRANCH fallback for tests.
tests/test_ShareConsumer.py New unit tests covering ShareConsumer constructor/subscribe/poll/ack/commit/lifecycle/thread-conflict/dealloc behavior.
tests/test_ShareConsumer_callbacks.py New unit tests for callback dispatch paths (error/log/oauth/throttle/stats rejection) for ShareConsumer.
tests/test_message.py Extend Message pickling tests to include delivery_count and validate overflow behavior.
tests/test_docs.py Avoid descending into enum members during doc-tree validation to prevent false failures.
tests/test_DeserializingShareConsumer.py New unit tests for DeserializingShareConsumer offline construction and _deserialize() behavior.
tests/integration/share_consumer/test_share_consumer.py New integration tests for share consumer semantics (no overlap, ordering, unsubscribe, max.poll.records, redelivery, etc.).
tests/integration/share_consumer/test_share_consumer_deserialization.py New integration coverage for deserialization success/failure handling incl. Schema Registry formats.
tests/integration/share_consumer/test_share_consumer_callbacks.py New integration test ensuring throttle_cb can actually fire under broker quota.
tests/integration/share_consumer/conftest.py Autouse cleanup fixture deleting share-consumer test topics after module runs.
tests/integration/share_consumer/init.py New package marker for share consumer integration tests.
tests/integration/share_consumer_sasl/test_share_consumer_sasl.py New SASL-cluster integration tests for set_sasl_credentials behavior.
tests/integration/share_consumer_sasl/conftest.py SASL-cluster topic cleanup fixture (mirrors plaintext share-consumer cleanup).
tests/integration/share_consumer_sasl/init.py New package marker for share consumer SASL integration tests.
tests/integration/share_consumer_oauth/unsecured_token.py Helper to mint unsigned JWTs for the trivup OAUTHBEARER unsecured validator tests.
tests/integration/share_consumer_oauth/test_oauth_integration.py End-to-end OAUTHBEARER integration tests for ShareConsumer (init + refresh path).
tests/integration/share_consumer_oauth/conftest.py Trivup OAUTHBEARER cluster fixture + stripped baseline config for oauth_cb path.
tests/integration/share_consumer_oauth/init.py New package marker for share consumer OAUTH integration tests.
tests/integration/cluster_fixture.py Add share-consumer factory helpers + per-group share.auto.offset.reset setup via incremental alter configs.
tests/integration/admin/test_incremental_alter_configs.py Account for Kafka 4.1+ ELR side-effects in expected DescribeConfigs results (skip pre-4.1).
tests/common/init.py Add ShareConsumer/DeserializingShareConsumer test wrappers + share-consumer drain helpers; pin broker version/conf for share groups.
src/confluent_kafka/src/ShareConsumer.c New C extension implementation of ShareConsumer (subscribe/poll/ack/commit callbacks/oauth/log wiring/etc.).
src/confluent_kafka/src/Producer.c Adjust oauth-token wait path: caller now owns destroy on wait timeout.
src/confluent_kafka/src/Consumer.c Adjust oauth-token wait path: caller now owns destroy on wait timeout.
src/confluent_kafka/src/confluent_kafka.h Add ShareConsumer state in Handle union, new error message constant, and Message.delivery_count field.
src/confluent_kafka/src/confluent_kafka.c Add Message.delivery_count(), include it in pickling, add share-commit helpers, and fix yield/oauth to use callback rk.
src/confluent_kafka/src/AdminTypes.c Export share acknowledge type constants to Python module.
src/confluent_kafka/src/Admin.c Adjust oauth-token wait path: caller now owns destroy on wait timeout.
src/confluent_kafka/deserializing_share_consumer.py New DeserializingShareConsumer implementation built on ShareConsumer with per-record error marking.
src/confluent_kafka/cimpl.pyi Update stubs for ShareConsumer, AcknowledgeType, Message.delivery_count/set_error, and callback typing.
src/confluent_kafka/_types.py Add AcknowledgementCommitCallback type alias.
src/confluent_kafka/_model/init.py Add AcknowledgeType IntEnum mapping to exported cimpl constants.
src/confluent_kafka/init.py Export ShareConsumer, DeserializingShareConsumer, and AcknowledgeType from top-level package.
setup.py Add ShareConsumer.c to extension sources.
examples/share_consumer.py New example showing basic ShareConsumer usage and implicit-ack semantics.
.semaphore/semaphore.yml Add LIBRDKAFKA_BRANCH env var and branch fallback for source-package verification (wheel blocks still TODO).
.formatignore Exclude cimpl.pyi from clang-format to avoid stub corruption.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +161 to +165
rd_kafka_topic_partition_list_add(c_topics,
cfl_PyUnistr_AsUTF8(uo, &uo8),
RD_KAFKA_PARTITION_UA);
Py_XDECREF(uo8);
Py_DECREF(uo);
Comment on lines +35 to +37
ARCH=${ARCH:-x64}
SRC=/tmp/librdkafka-${BRANCH}
INSTALL=$SRC/install
Comment on lines +676 to +682
sc = kafka_cluster.share_consumer()
# TODO KIP-932: move producer creation inside the try block (consistent
# with the other tests) so sc doesn't leak if cimpl_producer() raises.
producer = kafka_cluster.cimpl_producer()
try:
# Phase 1: prove the topic_a subscription actually works before we
# switch — otherwise we'd never know whether subscribe([topic_b])
Comment thread .semaphore/semaphore.yml
Comment on lines 9 to 15
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.14.2
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
- name: LIBRDKAFKA_BRANCH
value: dev_kip-932_queues-for-kafka
prologue:
@sonarqube-confluent

Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
9.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

@k-raina Kaushik Raina (k-raina) changed the title KIP-932] Implement KIP-932 based Share Consumer [KIP-932] Implement KIP-932 based Share Consumer Jun 19, 2026

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving the PR as all the commits were already approved as individual PRs.

Kaushik Raina (@k-raina) - Please check the copilot comments in a separate PR.

LGTM!. Thanks Kaushik Raina (@k-raina)!

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!.

@k-raina Kaushik Raina (k-raina) merged commit 2c91557 into master Jun 23, 2026
1 of 2 checks passed
@k-raina Kaushik Raina (k-raina) deleted the dev_kip-932_queues-for-kafka branch June 23, 2026 08:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants