From 4a48e547b82049b1827b3d756818a1785ed3ab1a Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 22 Apr 2026 11:23:33 +0200 Subject: [PATCH] fix(spring-jakarta): [Queue Instrumentation 29] Set body_size on Spring Kafka consumer transaction The Spring Kafka consumer path (`SentryKafkaRecordInterceptor`) never set `messaging.message.body_size`, while the raw Kafka consumer helper (`SentryKafkaConsumerTracing`) already sets it from `ConsumerRecord#serializedValueSize()`. Both are first-party Kafka consumer integrations shipped in the same stack and should emit the same messaging schema so dashboards and queries remain consistent across Spring vs. raw Kafka setups. Mirror the raw helper: set `SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE` on the `queue.process` transaction when `serializedValueSize() >= 0`. Add regression tests for both the positive and the -1 (unknown) cases. #skip-changelog --- .../kafka/SentryKafkaRecordInterceptor.java | 5 +++ .../kafka/SentryKafkaRecordInterceptorTest.kt | 39 +++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index b07d761a92..d2302dca57 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -177,6 +177,11 @@ private boolean isIgnored() { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); } + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + final @Nullable Integer retryCount = retryCount(record); if (retryCount != null) { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index ac47d3654a..703f22fe3e 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor import io.sentry.test.initForTest import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.util.Optional import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test @@ -22,6 +23,7 @@ import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never @@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest { private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(), + serializedValueSize: Int = -1, ): ConsumerRecord { - val record = ConsumerRecord(topic, 0, 0L, "key", "value") - headers.forEach { record.headers().add(it) } - return record + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) } private fun createRecordWithHeaders( @@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest { ) } + @Test + fun `sets body size from serializedValueSize`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = 42) + + interceptor.intercept(record, consumer) + + assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `does not set body size when serializedValueSize is negative`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = -1) + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + @Test fun `sets retry count from delivery attempt header`() { val interceptor = SentryKafkaRecordInterceptor(scopes)