diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index 81c62cabdc..315ee0009c 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -13,10 +13,13 @@ import io.sentry.util.SpanUtils; import io.sentry.util.TracingUtils; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -97,8 +100,10 @@ public void close() {} public void configure(final @Nullable Map configs) {} private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { + final @Nullable List existingBaggageHeaders = + readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); final @Nullable TracingUtils.TracingHeaders tracingHeaders = - TracingUtils.trace(scopes, null, span); + TracingUtils.trace(scopes, existingBaggageHeaders, span); if (tracingHeaders != null) { final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); headers.remove(sentryTraceHeader.getName()); @@ -120,4 +125,19 @@ private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis())) .getBytes(StandardCharsets.UTF_8)); } + + private static @Nullable List readHeaderValues( + final @NotNull Headers headers, final @NotNull String name) { + @Nullable List values = null; + for (final @NotNull Header header : headers.headers(name)) { + final byte @Nullable [] value = header.value(); + if (value != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(value, StandardCharsets.UTF_8)); + } + } + return values; + } } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index b9787aba09..072af926a3 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -1,5 +1,6 @@ package io.sentry.kafka +import io.sentry.BaggageHeader import io.sentry.IScopes import io.sentry.ISentryLifecycleToken import io.sentry.Sentry @@ -79,6 +80,37 @@ class SentryKafkaProducerInterceptorTest { assertTrue(enqueuedTime > 0) } + @Test + fun `preserves pre-existing third-party baggage header entries`() { + val tx = createTransaction() + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + record + .headers() + .add( + BaggageHeader.BAGGAGE_HEADER, + "othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8), + ) + + interceptor.onSend(record) + + val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList() + assertEquals(1, baggageHeaders.size) + val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8) + assertTrue( + baggageValue.contains("othervendor=someValue"), + "expected third-party baggage entry preserved, got: $baggageValue", + ) + assertTrue( + baggageValue.contains("another=thing"), + "expected third-party baggage entry preserved, got: $baggageValue", + ) + assertTrue( + baggageValue.contains("sentry-"), + "expected Sentry baggage entries appended, got: $baggageValue", + ) + } + @Test fun `does not create span when queue tracing is disabled`() { val tx = createTransaction()