diff --git a/kafka/future.py b/kafka/future.py index ecaee6985..629845152 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -8,16 +8,31 @@ class Future: - __slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock') - error_on_callbacks = False # and errbacks + __slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock', '_error_on_callbacks') - def __init__(self): + # Class-level default for error_on_callbacks. Per-instance values passed to + # __init__ take precedence; this remains overridable globally (e.g. test + # suites set it to surface callback exceptions). + _default_error_on_callbacks = False + + def __init__(self, error_on_callbacks=None): self.is_done = False self.value = None self.exception = None self._callbacks = [] self._errbacks = [] self._lock = threading.Lock() + # None means "inherit the class-level default"; an explicit bool + # overrides it for this instance only. + self._error_on_callbacks = error_on_callbacks + + @property + def error_on_callbacks(self): + """When True, exceptions raised inside callbacks/errbacks are re-raised + to the caller instead of only being logged.""" + if self._error_on_callbacks is None: + return self._default_error_on_callbacks + return self._error_on_callbacks def succeeded(self): return self.is_done and self.exception is None diff --git a/kafka/producer/future.py b/kafka/producer/future.py index f04fa9632..7f0a69a3f 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -44,8 +44,8 @@ class FutureRecordMetadata(Future): :class:`~kafka.errors.KafkaTimeoutError`). """ __slots__ = ('_produce_future', 'args') - def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): - super().__init__() + def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size, error_on_callbacks=None): + super().__init__(error_on_callbacks=error_on_callbacks) self._produce_future = produce_future # packing args as a tuple is a minor speed optimization self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a1dada2ae..cd9d6d426 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -220,6 +220,10 @@ class KafkaProducer: acknowledgement. Users should generally prefer to leave this config unset and instead use delivery_timeout_ms to control retry behavior. Default: float('inf') (infinite) + error_on_callbacks (bool): If True, exceptions raised inside callbacks + and errbacks registered on the future returned by send() are + re-raised to the caller instead of only being logged. If None, the + Future class-level default is used (False). Default: None. batch_size (int): Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce @@ -414,6 +418,7 @@ class KafkaProducer: 'acks': -1, 'compression_type': None, 'retries': float('inf'), + 'error_on_callbacks': None, 'batch_size': 16384, 'linger_ms': 0, 'partitioner': DefaultPartitioner(), @@ -936,6 +941,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest len(key_bytes) if key_bytes is not None else -1, len(value_bytes) if value_bytes is not None else -1, sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, + error_on_callbacks=self.config['error_on_callbacks'], ).failure(e) # Track if the user passed an explicit partition b/c sticky logic does not apply diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py index e32d5c961..104de6934 100644 --- a/kafka/producer/producer_batch.py +++ b/kafka/producer/producer_batch.py @@ -16,7 +16,7 @@ class FinalState(IntEnum): class ProducerBatch: - def __init__(self, tp, records, now=None): + def __init__(self, tp, records, now=None, error_on_callbacks=None): now = time.monotonic() if now is None else now self.max_record_size = 0 self.created = now @@ -29,6 +29,10 @@ def __init__(self, tp, records, now=None): self.produce_future = FutureProduceResult(tp) self._record_futures = [] self._retry = False + # Propagated to each per-record FutureRecordMetadata so user callbacks + # registered via send().add_callback()/add_errback() can opt in to + # re-raising exceptions instead of only logging them. + self._error_on_callbacks = error_on_callbacks self._final_state = None @property @@ -71,7 +75,8 @@ def try_append(self, timestamp_ms, key, value, headers, now=None): metadata.crc, len(key) if key is not None else -1, len(value) if value is not None else -1, - sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) + sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1, + error_on_callbacks=self._error_on_callbacks) self._record_futures.append(future) return future diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 35572615b..94a6c02e7 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -71,6 +71,7 @@ class RecordAccumulator: 'retry_backoff_ms': 100, 'transaction_manager': None, 'message_version': 2, + 'error_on_callbacks': None, } def __init__(self, **configs): @@ -182,7 +183,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None, self.config['batch_size'] ) - batch = ProducerBatch(tp, records, now=now) + batch = ProducerBatch(tp, records, now=now, error_on_callbacks=self.config['error_on_callbacks']) future = batch.try_append(timestamp_ms, key, value, headers, now=now) if not future: raise Exception() @@ -271,7 +272,7 @@ def split_and_reenqueue(self, batch, now=None): self.config['compression_attrs'], self.config['batch_size'], ) - current_batch = ProducerBatch(tp, builder, now=now) + current_batch = ProducerBatch(tp, builder, now=now, error_on_callbacks=self.config['error_on_callbacks']) current_batch.created = batch.created for record in group: @@ -285,7 +286,7 @@ def split_and_reenqueue(self, batch, now=None): self.config['compression_attrs'], self.config['batch_size'], ) - current_batch = ProducerBatch(tp, builder, now=now) + current_batch = ProducerBatch(tp, builder, now=now, error_on_callbacks=self.config['error_on_callbacks']) current_batch.created = batch.created metadata = builder.append(record.timestamp, record.key, record.value, record.headers) diff --git a/test/__init__.py b/test/__init__.py index 1dba54863..1b86a3472 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -25,4 +25,4 @@ from kafka.future import Future -Future.error_on_callbacks = True # always fail during testing +Future._default_error_on_callbacks = True # always fail during testing diff --git a/test/producer/test_producer_batch.py b/test/producer/test_producer_batch.py index e49aba437..8dc530a55 100644 --- a/test/producer/test_producer_batch.py +++ b/test/producer/test_producer_batch.py @@ -3,6 +3,7 @@ import pytest from kafka.errors import IllegalStateError, KafkaError +from kafka.future import Future from kafka.producer.future import FutureRecordMetadata, RecordMetadata from kafka.producer.producer_batch import ProducerBatch from kafka.record.memory_records import MemoryRecordsBuilder @@ -24,6 +25,23 @@ def batch(tp, memory_records_builder): return ProducerBatch(tp, memory_records_builder) +def test_producer_batch_error_on_callbacks_default(tp, memory_records_builder): + # Unset -> record futures inherit the Future class-level default (#2366). + batch = ProducerBatch(tp, memory_records_builder) + future = batch.try_append(0, b'key', b'value', []) + assert future.error_on_callbacks is Future._default_error_on_callbacks + + +def test_producer_batch_error_on_callbacks_propagates(tp, memory_records_builder): + # ProducerBatch threads an explicit error_on_callbacks down to each record + # future, overriding the class-level default in both directions (#2366). + batch = ProducerBatch(tp, memory_records_builder, error_on_callbacks=True) + assert batch.try_append(0, b'k', b'v', []).error_on_callbacks is True + + batch_off = ProducerBatch(tp, memory_records_builder, error_on_callbacks=False) + assert batch_off.try_append(0, b'k', b'v', []).error_on_callbacks is False + + def test_producer_batch_producer_id(tp, memory_records_builder): batch = ProducerBatch(tp, memory_records_builder) assert batch.producer_id == -1 diff --git a/test/test_future.py b/test/test_future.py index 5b0b5b9e2..087c7d76a 100644 --- a/test/test_future.py +++ b/test/test_future.py @@ -55,3 +55,45 @@ def test_chain_failure(self): f1.failure(ValueError('err')) assert f2.failed() assert isinstance(f2.exception, ValueError) + + +def _raise(exc): + raise exc + + +class TestFutureErrorOnCallbacks: + """error_on_callbacks is now a per-instance option (see #2366). + + An explicit value passed to ``Future(error_on_callbacks=...)`` takes + precedence over the class-level default. (Note: the test suite sets the + class-level default to True via ``test/__init__.py``, so these tests pass + explicit values to assert override behavior independent of that default.) + """ + + def test_none_inherits_class_default(self): + assert Future().error_on_callbacks is Future._default_error_on_callbacks + + def test_explicit_false_overrides_class_default(self): + f = Future(error_on_callbacks=False) + assert f.error_on_callbacks is False + f.add_callback(lambda v: _raise(ValueError('boom'))) + f.success(1) # suppressed despite class default True + assert f.succeeded() + + def test_callback_exception_raised_when_enabled(self): + f = Future(error_on_callbacks=True) + f.add_callback(lambda v: _raise(ValueError('boom'))) + with pytest.raises(ValueError, match='boom'): + f.success(1) + + def test_errback_exception_raised_when_enabled(self): + f = Future(error_on_callbacks=True) + f.add_errback(lambda e: _raise(RuntimeError('boom'))) + with pytest.raises(RuntimeError, match='boom'): + f.failure(ValueError('orig')) + + def test_already_done_callback_raises_when_enabled(self): + f = Future(error_on_callbacks=True) + f.success(1) + with pytest.raises(ValueError, match='boom'): + f.add_callback(lambda v: _raise(ValueError('boom')))