diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index 315ee0009c..457ecd6b5f 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -59,11 +59,11 @@ public SentryKafkaProducerInterceptor( return record; } + @Nullable ISpan span = null; try { final @NotNull SpanOptions spanOptions = new SpanOptions(); spanOptions.setOrigin(traceOrigin); - final @NotNull ISpan span = - activeSpan.startChild("queue.publish", record.topic(), spanOptions); + span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); if (span.isNoOp()) { return record; } @@ -72,14 +72,20 @@ public SentryKafkaProducerInterceptor( span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); injectHeaders(record.headers(), span); - span.setStatus(SpanStatus.OK); - span.finish(); } catch (Throwable t) { + if (span != null) { + span.setThrowable(t); + span.setStatus(SpanStatus.INTERNAL_ERROR); + } scopes .getOptions() .getLogger() .log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t); + } finally { + if (span != null && !span.isFinished()) { + span.finish(); + } } return record; diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index 072af926a3..2c59f2a24c 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -3,10 +3,13 @@ package io.sentry.kafka import io.sentry.BaggageHeader import io.sentry.IScopes import io.sentry.ISentryLifecycleToken +import io.sentry.ISpan import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader import io.sentry.SentryTracer +import io.sentry.SpanOptions +import io.sentry.SpanStatus import io.sentry.TransactionContext import io.sentry.test.initForTest import java.nio.charset.StandardCharsets @@ -18,7 +21,12 @@ import kotlin.test.assertNotNull import kotlin.test.assertSame import kotlin.test.assertTrue import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers +import org.mockito.kotlin.any +import org.mockito.kotlin.eq import org.mockito.kotlin.mock +import org.mockito.kotlin.verify import org.mockito.kotlin.whenever class SentryKafkaProducerInterceptorTest { @@ -111,6 +119,35 @@ class SentryKafkaProducerInterceptorTest { ) } + @Test + fun `finishes span with error when header injection fails`() { + val activeSpan = mock() + val span = mock() + val headers = mock() + val record = mock>() + val exception = RuntimeException("boom") + whenever(scopes.span).thenReturn(activeSpan) + whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) + .thenReturn(span) + whenever(span.isNoOp).thenReturn(false) + whenever(span.isFinished).thenReturn(false) + whenever(span.toSentryTrace()) + .thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1")) + whenever(span.toBaggageHeader(null)).thenReturn(null) + whenever(record.topic()).thenReturn("my-topic") + whenever(record.headers()).thenReturn(headers) + whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList
()) + whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception) + + val interceptor = SentryKafkaProducerInterceptor(scopes) + + interceptor.onSend(record) + + verify(span).setStatus(SpanStatus.INTERNAL_ERROR) + verify(span).setThrowable(exception) + verify(span).finish() + } + @Test fun `does not create span when queue tracing is disabled`() { val tx = createTransaction()