Skip to content

[KIP-932] Add error translation in share consumer#2287

Merged
Kaushik Raina (k-raina) merged 9 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_queues-for-kafka_error_translation
Jun 23, 2026
Merged

[KIP-932] Add error translation in share consumer#2287
Kaushik Raina (k-raina) merged 9 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_queues-for-kafka_error_translation

Conversation

@k-raina

@k-raina Kaushik Raina (k-raina) commented Jun 19, 2026

Copy link
Copy Markdown
Member

Why

  • The KIP-932 share consumer wrapped every librdkafka error in KafkaException, blurring client misuse (illegal state, concurrent access, bad args) and real broker errors.
  • This PR translates each error code to the exception, which align with java share consumer, i.e _STATE→IllegalStateException, _CONFLICT→ConcurrentModificationException, _INVALID_ARG→ValueError, everything else stays KafkaException — with the two new types added to cimpl and exported from confluent_kafka.

Notes / caveats

  • The new types subclass RuntimeError, not KafkaException, so except KafkaException no longer catches these for the share consumer. args[0] is still a KafkaError in every translated case, so .code()/.str() keep working.
  • Scope is ShareConsumer only — Producer/Consumer/Admin are unchanged.
  • The ack-commit callback and commit_sync's per-partition map intentionally keep KafkaException (broker results, matching Java's callback contract).

@confluent-cla-assistant

Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_queues-for-kafka_error_translation branch from b2f55e4 to da10f75 Compare June 19, 2026 12:57
@pranavrth

Copy link
Copy Markdown
Member

set_sasl_credentials was missed by the translation — it's still raising a bare RuntimeError on a closed handle here, and plain KafkaException via cfl_PyErr_from_error_destroy. Please change that as well.

@pranavrth

Pranav Rathi (pranavrth) commented Jun 20, 2026

Copy link
Copy Markdown
Member

I think we can create Pure python level exception instead of C Binding level and use it in C Binding. We should check the feasibility of that. Same we should do with Messages class.

@k-raina

Copy link
Copy Markdown
Member Author

Thanks Pranav Rathi (@pranavrth) for reviews!

Regarding

I think we can create Pure python level exception instead of C Binding level and use it in C Binding. We should check the feasibility of that. Same we should do with Messages class.

Our C module cimpl loads first and error.py already does from confluent_kafka.cimpl import ....
If we move the exception classes into error.py and have cimpl import them while it's starting up, we get a cycle: cimpl needs error.py, but error.py needs cimpl. Since neither has finished loading, the import sees a half-built module and breaks.

The way around it is to not grab the Python classes at startup, but lazily at the first time C
actually needs to raise one and then cache the pointer so we only do it once. By then both modules are fully loaded, so there's no loop. It works fine, but note that: it adds a bit of C plumbing and a new edge case to get right.

@pranavrth

Copy link
Copy Markdown
Member

Our C module cimpl loads first and error.py already does from confluent_kafka.cimpl import ....
If we move the exception classes into error.py and have cimpl import them while it's starting up, we get a cycle: cimpl needs error.py, but error.py needs cimpl. Since neither has finished loading, the import sees a half-built module and breaks.

The way around it is to not grab the Python classes at startup, but lazily at the first time C
actually needs to raise one and then cache the pointer so we only do it once. By then both modules are fully loaded, so there's no loop. It works fine, but note that: it adds a bit of C plumbing and a new edge case to get right.

We are fine with the current approach. No need to complicate things.

"closed.\n"
"\n"
"Subclass of RuntimeError. ``exception.args[0]`` is a "
":py:class:`KafkaError`.\n"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not subtype of KafkaException so why do we keep KafkaError in these exceptions? We can keep this if its a big change right now but we don't need to keep KafkaError in these exceptions.

Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
PyErr_SetObject(KafkaException, eo);
Py_DECREF(eo);
} else {
PyErr_SetString(etype, buf);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PyErr_SetString(etype, buf);
PyErr_SetString(etype, fmt ? buf : rd_kafka_err2str(code));

Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Comment on lines +150 to +152
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
if(rmt) {
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
}

* a plain message string — the Python type already conveys the code, and these
* local errors have no retriable/fatal/abort flags worth preserving. */
static void
ShareConsumer_PyErr_Format(rd_kafka_resp_err_t code, const char *fmt, ...) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check KafkaError_new0

Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Comment on lines +156 to +160
PyObject *eo = KafkaError_new0(code, "%s", buf);
if (!eo)
return;
PyErr_SetObject(KafkaException, eo);
Py_DECREF(eo);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cfl_PyErr_Format instead.

Comment thread src/confluent_kafka/src/ShareConsumer.c
Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Comment on lines +201 to +202
ShareConsumer_PyErr_Format(RD_KAFKA_RESP_ERR__STATE, "%s",
ERR_MSG_SHARE_CONSUMER_CLOSED);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ShareConsumer_PyErr_Format(RD_KAFKA_RESP_ERR__STATE, "%s",
ERR_MSG_SHARE_CONSUMER_CLOSED);
PyErr_SetString(IllegalStateException,
ERR_MSG_SHARE_CONSUMER_CLOSED);

Same at other places

Comment on lines +278 to +285
# These subclass RuntimeError; str(exc) is the error message (not a KafkaError).
class IllegalStateException(RuntimeError):
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
args: Tuple[Any, ...]

class ConcurrentModificationException(RuntimeError):
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
args: Tuple[Any, ...]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if anything changed after we changed these errors to not take KafkaError

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build is failing. Check that.
Provided minor comment to be resolved in next PR.

Apart from that, LGTM!. Thanks Kaushik Raina (@k-raina) !.

Comment on lines +161 to +162
if (!eo)
return;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this as it is not setting error and returning as it is.

Comment thread src/confluent_kafka/src/ShareConsumer.c
@sonarqube-confluent

Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
10.2% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!.

@k-raina Kaushik Raina (k-raina) merged commit c0ef4b5 into dev_kip-932_queues-for-kafka Jun 23, 2026
2 checks passed
@k-raina Kaushik Raina (k-raina) deleted the dev_kip-932_queues-for-kafka_error_translation branch June 23, 2026 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants