From 47b2d2f6c81be600eb4ed2150fc5a36af2619bc9 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 22 Apr 2026 10:26:54 +0200 Subject: [PATCH] fix(spring-jakarta): [Queue Instrumentation 27] Delegate Kafka record thread-state hooks SentryKafkaRecordInterceptor wraps an existing customer RecordInterceptor when one is present on the listener container factory, but it previously only delegated intercept, success, failure, and afterRecord. setupThreadState was not overridden, so the default no-op from ThreadStateProcessor shadowed any delegate implementation. clearThreadState performed Sentry cleanup but never forwarded to the delegate either. Customers relying on these hooks for MDC, security context, or other thread-local state on Kafka listener threads would silently lose that behavior once Sentry auto-wrapped their interceptor. Delegate setupThreadState to the wrapped interceptor, and in clearThreadState run Sentry cleanup inside try and delegate to the wrapped interceptor in finally so delegate cleanup still executes if Sentry cleanup throws. Co-Authored-By: Claude --- .../api/sentry-spring-jakarta.api | 1 + .../kafka/SentryKafkaRecordInterceptor.java | 15 +++++- .../kafka/SentryKafkaRecordInterceptorTest.kt | 46 +++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index edfa6399d7..24b9af7e14 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -263,6 +263,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 025fe9762b..b07d761a92 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -106,9 +106,22 @@ public void afterRecord( } } + @Override + public void setupThreadState(final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.setupThreadState(consumer); + } + } + @Override public void clearThreadState(final @NotNull Consumer consumer) { - finishStaleContext(); + try { + finishStaleContext(); + } finally { + if (delegate != null) { + delegate.clearThreadState(consumer); + } + } } private boolean isIgnored() { diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 9a8ad5343f..ac47d3654a 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -298,6 +298,52 @@ class SentryKafkaRecordInterceptorTest { interceptor.clearThreadState(consumer) } + @Test + fun `setupThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + + verify(delegate).setupThreadState(consumer) + } + + @Test + fun `setupThreadState is no-op without delegate`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.setupThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.clearThreadState(consumer) + + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() { + val delegate = mock>() + whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom")) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.intercept(record, consumer) + + try { + interceptor.clearThreadState(consumer) + } catch (ignored: RuntimeException) { + // expected + } + + verify(delegate).clearThreadState(consumer) + } + @Test fun `intercept cleans up stale context from previous record`() { val lifecycleToken2 = mock()