diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario-error.mjs b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario-error.mjs new file mode 100644 index 000000000000..c995cd79da7f --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario-error.mjs @@ -0,0 +1,29 @@ +import { Kafka } from 'kafkajs'; + +async function run() { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'], + // The invalid-topic error is non-retriable, but disable retries anyway so the failing + // `send` rejects promptly and produces exactly one errored producer span. + retry: { retries: 0 }, + }); + + const producer = kafka.producer(); + await producer.connect(); + + // Topic names may not contain spaces, so the broker rejects this send. The producer span should + // be marked as errored. + await producer + .send({ + topic: 'invalid topic name', + messages: [{ value: 'TEST_MESSAGE' }], + }) + .catch(() => { + // swallow - we assert on the emitted span, not the thrown error + }); + + await producer.disconnect(); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts b/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts index 1c27c10becd4..0d87e5e69ded 100644 --- a/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts @@ -83,4 +83,34 @@ describe('kafkajs', () => { .completed(); }); }); + + createEsmAndCjsTests(__dirname, 'scenario-error.mjs', 'instrument.mjs', (createRunner, test) => { + test('marks the producer span as errored when a send fails', { timeout: 90_000 }, async () => { + await createRunner() + .withDockerCompose({ + workingDirectory: [__dirname], + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + expect(transaction.transaction).toBe('send invalid topic name'); + expect(transaction.contexts?.trace).toMatchObject( + expect.objectContaining({ + op: 'message', + status: 'internal_error', + data: expect.objectContaining({ + 'messaging.system': 'kafka', + 'messaging.destination.name': 'invalid topic name', + 'otel.kind': 'PRODUCER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.kafkajs.otel.producer', + 'error.type': 'KafkaJSNonRetriableError', + }), + }), + ); + }, + }) + .start() + .completed(); + }); + }); }); diff --git a/packages/node/src/integrations/tracing/kafka/index.ts b/packages/node/src/integrations/tracing/kafka/index.ts index 08677af4795d..d235f04f5eea 100644 --- a/packages/node/src/integrations/tracing/kafka/index.ts +++ b/packages/node/src/integrations/tracing/kafka/index.ts @@ -1,22 +1,11 @@ import { KafkaJsInstrumentation } from './vendored/instrumentation'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration } from '@sentry/core'; -import { addOriginToSpan, generateInstrumentOnce } from '@sentry/node-core'; +import { generateInstrumentOnce } from '@sentry/node-core'; const INTEGRATION_NAME = 'Kafka'; -export const instrumentKafka = generateInstrumentOnce( - INTEGRATION_NAME, - () => - new KafkaJsInstrumentation({ - consumerHook(span) { - addOriginToSpan(span, 'auto.kafkajs.otel.consumer'); - }, - producerHook(span) { - addOriginToSpan(span, 'auto.kafkajs.otel.producer'); - }, - }), -); +export const instrumentKafka = generateInstrumentOnce(INTEGRATION_NAME, () => new KafkaJsInstrumentation()); const _kafkaIntegration = (() => { return { diff --git a/packages/node/src/integrations/tracing/kafka/vendored/instrumentation.ts b/packages/node/src/integrations/tracing/kafka/vendored/instrumentation.ts index d951cfa561bc..0aed0d232654 100644 --- a/packages/node/src/integrations/tracing/kafka/vendored/instrumentation.ts +++ b/packages/node/src/integrations/tracing/kafka/vendored/instrumentation.ts @@ -6,163 +6,59 @@ * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 * - Some types vendored from kafkajs with simplifications - * - Minor TypeScript strictness adjustments for this repository's compiler settings + * - Refactored to use Sentry's span APIs instead of OpenTelemetry tracing APIs + * - Cross-broker trace propagation uses Sentry's `getTraceData`/`continueTrace` instead of the OTel + * propagator, so the vendored `bufferTextMapGetter` propagator is gone + * - Dropped the OTel metrics (no MeterProvider is wired up) and, with them, the `network.request` + * event listeners they relied on; origin is folded into span creation instead of `index.ts` hooks */ -/* eslint-disable */ +import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { InstrumentationBase, InstrumentationNodeModuleDefinition, isWrapped } from '@opentelemetry/instrumentation'; +import type { Span } from '@sentry/core'; import { - Attributes, - Context, - context, - Counter, - Histogram, - Link, - propagation, - ROOT_CONTEXT, - Span, - SpanKind, - SpanStatusCode, - trace, -} from '@opentelemetry/api'; -import { - InstrumentationBase, - InstrumentationNodeModuleDefinition, - isWrapped, - safeExecuteInTheMiddle, -} from '@opentelemetry/instrumentation'; -import { - ATTR_ERROR_TYPE, - ATTR_SERVER_ADDRESS, - ATTR_SERVER_PORT, - ERROR_TYPE_VALUE_OTHER, -} from '@opentelemetry/semantic-conventions'; -import type { Kafka, Transaction, Producer, ConsumerEvents, ProducerEvents, RequestEvent } from './kafkajs-types'; + continueTrace, + SDK_VERSION, + SPAN_STATUS_ERROR, + SPAN_STATUS_OK, + startInactiveSpan, + startNewTrace, + withActiveSpan, +} from '@sentry/core'; import type { Consumer, ConsumerRunConfig, EachBatchHandler, EachMessageHandler, + Kafka, KafkaMessage, - Message, + Producer, RecordMetadata, + Transaction, } from './kafkajs-types'; -import { EVENT_LISTENERS_SET } from './internal-types'; -import { bufferTextMapGetter } from './propagator'; import { ATTR_MESSAGING_BATCH_MESSAGE_COUNT, - ATTR_MESSAGING_DESTINATION_NAME, ATTR_MESSAGING_DESTINATION_PARTITION_ID, - ATTR_MESSAGING_KAFKA_MESSAGE_KEY, - ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE, - ATTR_MESSAGING_KAFKA_OFFSET, - ATTR_MESSAGING_OPERATION_NAME, - ATTR_MESSAGING_OPERATION_TYPE, - ATTR_MESSAGING_SYSTEM, MESSAGING_OPERATION_TYPE_VALUE_PROCESS, MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, - MESSAGING_OPERATION_TYPE_VALUE_SEND, - MESSAGING_SYSTEM_VALUE_KAFKA, - METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES, - METRIC_MESSAGING_CLIENT_OPERATION_DURATION, - METRIC_MESSAGING_CLIENT_SENT_MESSAGES, - METRIC_MESSAGING_PROCESS_DURATION, } from './semconv'; -import { KafkaJsInstrumentationConfig } from './types'; - -interface ConsumerSpanOptions { - topic: string; - message: KafkaMessage | undefined; - operationType: string; - attributes: Attributes; - ctx?: Context | undefined; - link?: Link; -} -import { SDK_VERSION } from '@sentry/core'; +import { + endSpansOnPromise, + getHeaderAsString, + getLinksFromHeaders, + startConsumerSpan, + startProducerSpan, +} from './utils'; const PACKAGE_NAME = '@sentry/instrumentation-kafkajs'; -// This interface acts as a strict subset of the KafkaJS Consumer and -// Producer interfaces (just for the event we're needing) -interface KafkaEventEmitter { - on(eventName: ConsumerEvents['REQUEST'] | ProducerEvents['REQUEST'], listener: (event: RequestEvent) => void): void; - events: { - REQUEST: ConsumerEvents['REQUEST'] | ProducerEvents['REQUEST']; - }; - [EVENT_LISTENERS_SET]?: boolean; -} - -interface StandardAttributes extends Attributes { - [ATTR_MESSAGING_SYSTEM]: string; - [ATTR_MESSAGING_OPERATION_NAME]: OP; - [ATTR_ERROR_TYPE]?: string; -} -interface TopicAttributes { - [ATTR_MESSAGING_DESTINATION_NAME]: string; - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]?: string; -} - -interface ClientDurationAttributes extends StandardAttributes, Partial { - [ATTR_SERVER_ADDRESS]: string; - [ATTR_SERVER_PORT]: number; - [ATTR_MESSAGING_OPERATION_TYPE]?: string; -} -interface SentMessagesAttributes extends StandardAttributes<'send'>, TopicAttributes { - [ATTR_ERROR_TYPE]?: string; -} -type ConsumedMessagesAttributes = StandardAttributes<'receive' | 'process'>; -interface MessageProcessDurationAttributes extends StandardAttributes<'process'>, TopicAttributes { - [ATTR_MESSAGING_SYSTEM]: string; - [ATTR_MESSAGING_OPERATION_NAME]: 'process'; - [ATTR_ERROR_TYPE]?: string; -} -type RecordPendingMetric = (errorType?: string | undefined) => void; - -function prepareCounter(meter: Counter, value: number, attributes: T): RecordPendingMetric { - return (errorType?: string | undefined) => { - meter.add(value, { - ...attributes, - ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), - }); - }; -} - -function prepareDurationHistogram( - meter: Histogram, - value: number, - attributes: T, -): RecordPendingMetric { - return (errorType?: string | undefined) => { - meter.record((Date.now() - value) / 1000, { - ...attributes, - ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), - }); - }; -} - -const HISTOGRAM_BUCKET_BOUNDARIES = [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]; -export class KafkaJsInstrumentation extends InstrumentationBase { - declare private _clientDuration: Histogram; - declare private _sentMessages: Counter; - declare private _consumedMessages: Counter; - declare private _processDuration: Histogram; - - constructor(config: KafkaJsInstrumentationConfig = {}) { +export class KafkaJsInstrumentation extends InstrumentationBase { + public constructor(config: InstrumentationConfig = {}) { super(PACKAGE_NAME, SDK_VERSION, config); } - override _updateMetricInstruments() { - this._clientDuration = this.meter.createHistogram(METRIC_MESSAGING_CLIENT_OPERATION_DURATION, { - advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES }, - }); - this._sentMessages = this.meter.createCounter(METRIC_MESSAGING_CLIENT_SENT_MESSAGES); - this._consumedMessages = this.meter.createCounter(METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES); - this._processDuration = this.meter.createHistogram(METRIC_MESSAGING_PROCESS_DURATION, { - advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES }, - }); - } - - protected init() { - const unpatch = (moduleExports: any) => { + protected init(): InstrumentationNodeModuleDefinition { + const unpatch = (moduleExports: any): void => { if (isWrapped(moduleExports?.Kafka?.prototype.producer)) { this._unwrap(moduleExports.Kafka.prototype, 'producer'); } @@ -192,63 +88,42 @@ export class KafkaJsInstrumentation extends InstrumentationBase) { const newConsumer: Consumer = original.apply(this, args); + // oxlint-disable-next-line typescript/unbound-method -- property check, the method is never called if (isWrapped(newConsumer.run)) { instrumentation._unwrap(newConsumer, 'run'); } instrumentation._wrap(newConsumer, 'run', instrumentation._getConsumerRunPatch()); - instrumentation._setKafkaEventListeners(newConsumer); - return newConsumer; }; }; } - private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) { - if (kafkaObj[EVENT_LISTENERS_SET]) return; - - // The REQUEST Consumer event was added in kafkajs@1.5.0. - if (kafkaObj.events?.REQUEST) { - kafkaObj.on(kafkaObj.events.REQUEST, this._recordClientDurationMetric.bind(this)); - } - - kafkaObj[EVENT_LISTENERS_SET] = true; - } - - private _recordClientDurationMetric(event: Pick) { - const [address = '', port = '0'] = event.payload.broker.split(':'); - this._clientDuration.record(event.payload.duration / 1000, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, - [ATTR_SERVER_ADDRESS]: address, - [ATTR_SERVER_PORT]: Number.parseInt(port, 10), - }); - } - private _getProducerPatch() { const instrumentation = this; return (original: Kafka['producer']) => { return function consumer(this: Kafka, ...args: Parameters) { const newProducer: Producer = original.apply(this, args); + // oxlint-disable-next-line typescript/unbound-method -- property check, the method is never called if (isWrapped(newProducer.sendBatch)) { instrumentation._unwrap(newProducer, 'sendBatch'); } instrumentation._wrap(newProducer, 'sendBatch', instrumentation._getSendBatchPatch()); + // oxlint-disable-next-line typescript/unbound-method -- property check, the method is never called if (isWrapped(newProducer.send)) { instrumentation._unwrap(newProducer, 'send'); } instrumentation._wrap(newProducer, 'send', instrumentation._getSendPatch()); + // oxlint-disable-next-line typescript/unbound-method -- property check, the method is never called if (isWrapped(newProducer.transaction)) { instrumentation._unwrap(newProducer, 'transaction'); } instrumentation._wrap(newProducer, 'transaction', instrumentation._getProducerTransactionPatch()); - instrumentation._setKafkaEventListeners(newProducer); - return newProducer; }; }; @@ -277,107 +152,68 @@ export class KafkaJsInstrumentation extends InstrumentationBase { return function eachMessage(this: unknown, ...args: Parameters): Promise { const payload = args[0]; - const propagatedContext: Context = propagation.extract( - ROOT_CONTEXT, - payload.message.headers, - bufferTextMapGetter, - ); - const span = instrumentation._startConsumerSpan({ - topic: payload.topic, - message: payload.message, - operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, - ctx: propagatedContext, - attributes: { - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.partition), - }, - }); - - const pendingMetrics: RecordPendingMetric[] = [ - prepareDurationHistogram(instrumentation._processDuration, Date.now(), { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'process', - [ATTR_MESSAGING_DESTINATION_NAME]: payload.topic, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.partition), - }), - prepareCounter(instrumentation._consumedMessages, 1, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'process', - [ATTR_MESSAGING_DESTINATION_NAME]: payload.topic, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.partition), - }), - ]; + const sentryTrace = getHeaderAsString(payload.message.headers, 'sentry-trace'); + const baggage = getHeaderAsString(payload.message.headers, 'baggage'); + + // Continue the producer's trace so the consumer span is parented to the message's producer. + return continueTrace({ sentryTrace, baggage }, () => { + const span = startConsumerSpan({ + topic: payload.topic, + message: payload.message, + operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + attributes: { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.partition), + }, + }); - const eachMessagePromise = context.with(trace.setSpan(propagatedContext, span), () => { - return original!.apply(this, args); + const eachMessagePromise = withActiveSpan(span, () => { + return original!.apply(this, args); + }); + return endSpansOnPromise([span], eachMessagePromise); }); - return instrumentation._endSpansOnPromise([span], pendingMetrics, eachMessagePromise); }; }; } private _getConsumerEachBatchPatch() { return (original: ConsumerRunConfig['eachBatch']) => { - const instrumentation = this; return function eachBatch(this: unknown, ...args: Parameters): Promise { const payload = args[0]; // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers - const receivingSpan = instrumentation._startConsumerSpan({ - topic: payload.batch.topic, - message: undefined, - operationType: MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, - ctx: ROOT_CONTEXT, - attributes: { - [ATTR_MESSAGING_BATCH_MESSAGE_COUNT]: payload.batch.messages.length, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.batch.partition), - }, - }); - return context.with(trace.setSpan(context.active(), receivingSpan), () => { - const startTime = Date.now(); - const spans: Span[] = []; - const pendingMetrics: RecordPendingMetric[] = [ - prepareCounter(instrumentation._consumedMessages, payload.batch.messages.length, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'process', - [ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic, + // A batch pull aggregates messages from many producers, so the receiving span is a fresh root + // trace and each processed message links back to its own producer span. + const receivingSpan = startNewTrace(() => + startConsumerSpan({ + topic: payload.batch.topic, + message: undefined, + operationType: MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + attributes: { + [ATTR_MESSAGING_BATCH_MESSAGE_COUNT]: payload.batch.messages.length, [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.batch.partition), - }), - ]; - payload.batch.messages.forEach((message: any) => { - const propagatedContext: Context = propagation.extract(ROOT_CONTEXT, message.headers, bufferTextMapGetter); - const spanContext = trace.getSpan(propagatedContext)?.spanContext(); - let origSpanLink: Link | undefined; - if (spanContext) { - origSpanLink = { - context: spanContext, - }; - } + }, + }), + ); + + return withActiveSpan(receivingSpan, () => { + const spans: Span[] = [receivingSpan]; + payload.batch.messages.forEach((message: KafkaMessage) => { spans.push( - instrumentation._startConsumerSpan({ + startConsumerSpan({ topic: payload.batch.topic, message, operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, - link: origSpanLink, + links: getLinksFromHeaders(message.headers), attributes: { [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.batch.partition), }, }), ); - pendingMetrics.push( - prepareDurationHistogram(instrumentation._processDuration, startTime, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'process', - [ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(payload.batch.partition), - }), - ); }); const batchMessagePromise: Promise = original!.apply(this, args); - spans.unshift(receivingSpan); - return instrumentation._endSpansOnPromise(spans, pendingMetrics, batchMessagePromise); + return endSpansOnPromise(spans, batchMessagePromise); }); }; }; @@ -390,62 +226,63 @@ export class KafkaJsInstrumentation extends InstrumentationBase ): ReturnType { - const transactionSpan = instrumentation.tracer.startSpan('transaction'); + const transactionSpan = startInactiveSpan({ name: 'transaction' }); const transactionPromise = original.apply(this, args); transactionPromise .then((transaction: Transaction) => { + // oxlint-disable-next-line typescript/unbound-method -- re-bound below via `.apply(this, args)` const originalSend = transaction.send; transaction.send = function send(this: Transaction, ...args) { - return context.with(trace.setSpan(context.active(), transactionSpan), () => { + return withActiveSpan(transactionSpan, () => { const patched = instrumentation._getSendPatch()(originalSend); return patched.apply(this, args).catch((err: any) => { transactionSpan.setStatus({ - code: SpanStatusCode.ERROR, + code: SPAN_STATUS_ERROR, message: err?.message, }); - transactionSpan.recordException(err); throw err; }); }); }; + // oxlint-disable-next-line typescript/unbound-method -- re-bound below via `.apply(this, args)` const originalSendBatch = transaction.sendBatch; transaction.sendBatch = function sendBatch(this: Transaction, ...args) { - return context.with(trace.setSpan(context.active(), transactionSpan), () => { + return withActiveSpan(transactionSpan, () => { const patched = instrumentation._getSendBatchPatch()(originalSendBatch); return patched.apply(this, args).catch((err: any) => { transactionSpan.setStatus({ - code: SpanStatusCode.ERROR, + code: SPAN_STATUS_ERROR, message: err?.message, }); - transactionSpan.recordException(err); throw err; }); }); }; + // oxlint-disable-next-line typescript/unbound-method -- re-bound below via `.apply(this, args)` const originalCommit = transaction.commit; transaction.commit = function commit(this: Transaction, ...args) { const originCommitPromise = originalCommit.apply(this, args).then(() => { - transactionSpan.setStatus({ code: SpanStatusCode.OK }); + transactionSpan.setStatus({ code: SPAN_STATUS_OK }); }); - return instrumentation._endSpansOnPromise([transactionSpan], [], originCommitPromise); + return endSpansOnPromise([transactionSpan], originCommitPromise); }; + // oxlint-disable-next-line typescript/unbound-method -- re-bound below via `.apply(this, args)` const originalAbort = transaction.abort; transaction.abort = function abort(this: Transaction, ...args) { const originAbortPromise = originalAbort.apply(this, args); - return instrumentation._endSpansOnPromise([transactionSpan], [], originAbortPromise); + return endSpansOnPromise([transactionSpan], originAbortPromise); }; }) .catch((err: any) => { transactionSpan.setStatus({ - code: SpanStatusCode.ERROR, + code: SPAN_STATUS_ERROR, message: err?.message, }); - transactionSpan.recordException(err); transactionSpan.end(); }); @@ -455,7 +292,6 @@ export class KafkaJsInstrumentation extends InstrumentationBase { return function sendBatch( this: Producer | Transaction, @@ -465,33 +301,19 @@ export class KafkaJsInstrumentation extends InstrumentationBase { topicMessage.messages.forEach((message: any) => { - spans.push(instrumentation._startProducerSpan(topicMessage.topic, message)); - pendingMetrics.push( - prepareCounter(instrumentation._sentMessages, 1, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'send', - [ATTR_MESSAGING_DESTINATION_NAME]: topicMessage.topic, - ...(message.partition !== undefined - ? { - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(message.partition), - } - : {}), - }), - ); + spans.push(startProducerSpan(topicMessage.topic, message)); }); }); const origSendResult: Promise = original.apply(this, args); - return instrumentation._endSpansOnPromise(spans, pendingMetrics, origSendResult); + return endSpansOnPromise(spans, origSendResult); }; }; } private _getSendPatch() { - const instrumentation = this; return (original: Producer['send'] | Transaction['send']) => { return function send( this: Producer | Transaction, @@ -499,131 +321,12 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const record = args[0]; const spans: Span[] = record.messages.map((message: any) => { - return instrumentation._startProducerSpan(record.topic, message); + return startProducerSpan(record.topic, message); }); - const pendingMetrics: RecordPendingMetric[] = record.messages.map((m: any) => - prepareCounter(instrumentation._sentMessages, 1, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'send', - [ATTR_MESSAGING_DESTINATION_NAME]: record.topic, - ...(m.partition !== undefined - ? { - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(m.partition), - } - : {}), - }), - ); const origSendResult: Promise = original.apply(this, args); - return instrumentation._endSpansOnPromise(spans, pendingMetrics, origSendResult); + return endSpansOnPromise(spans, origSendResult); }; }; } - - private _endSpansOnPromise( - spans: Span[], - pendingMetrics: RecordPendingMetric[], - sendPromise: Promise, - ): Promise { - return Promise.resolve(sendPromise) - .then(result => { - pendingMetrics.forEach(m => m()); - return result; - }) - .catch(reason => { - let errorMessage: string | undefined; - let errorType: string = ERROR_TYPE_VALUE_OTHER; - if (typeof reason === 'string' || reason === undefined) { - errorMessage = reason; - } else if (typeof reason === 'object' && Object.prototype.hasOwnProperty.call(reason, 'message')) { - errorMessage = reason.message; - errorType = reason.constructor.name; - } - pendingMetrics.forEach(m => m(errorType)); - - spans.forEach(span => { - span.setAttribute(ATTR_ERROR_TYPE, errorType); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: errorMessage, - }); - }); - - throw reason; - }) - .finally(() => { - spans.forEach(span => span.end()); - }); - } - - private _startConsumerSpan({ topic, message, operationType, ctx, link, attributes }: ConsumerSpanOptions) { - const operationName = - operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE - ? 'poll' // for batch processing spans - : operationType; // for individual message processing spans - - const span = this.tracer.startSpan( - `${operationName} ${topic}`, - { - kind: operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE ? SpanKind.CLIENT : SpanKind.CONSUMER, - attributes: { - ...attributes, - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_DESTINATION_NAME]: topic, - [ATTR_MESSAGING_OPERATION_TYPE]: operationType, - [ATTR_MESSAGING_OPERATION_NAME]: operationName, - [ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message?.key ? String(message.key) : undefined, - [ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]: message?.key && message.value === null ? true : undefined, - [ATTR_MESSAGING_KAFKA_OFFSET]: message?.offset, - }, - links: link ? [link] : [], - }, - ctx, - ); - - const { consumerHook } = this.getConfig(); - if (consumerHook && message) { - safeExecuteInTheMiddle( - () => consumerHook(span, { topic, message }), - e => { - if (e) this._diag.error('consumerHook error', e); - }, - true, - ); - } - - return span; - } - - private _startProducerSpan(topic: string, message: Message) { - const span = this.tracer.startSpan(`send ${topic}`, { - kind: SpanKind.PRODUCER, - attributes: { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_DESTINATION_NAME]: topic, - [ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message.key ? String(message.key) : undefined, - [ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]: message.key && message.value === null ? true : undefined, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: - message.partition !== undefined ? String(message.partition) : undefined, - [ATTR_MESSAGING_OPERATION_NAME]: 'send', - [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, - }, - }); - - message.headers = message.headers ?? {}; - propagation.inject(trace.setSpan(context.active(), span), message.headers); - - const { producerHook } = this.getConfig(); - if (producerHook) { - safeExecuteInTheMiddle( - () => producerHook(span, { topic, message }), - e => { - if (e) this._diag.error('producerHook error', e); - }, - true, - ); - } - - return span; - } } diff --git a/packages/node/src/integrations/tracing/kafka/vendored/internal-types.ts b/packages/node/src/integrations/tracing/kafka/vendored/internal-types.ts deleted file mode 100644 index d44f26a65952..000000000000 --- a/packages/node/src/integrations/tracing/kafka/vendored/internal-types.ts +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors, Aspecto - * SPDX-License-Identifier: Apache-2.0 - * - * NOTICE from the Sentry authors: - * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs - * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 - * - Some types vendored from kafkajs with simplifications - */ -/* eslint-disable */ - -import type { Consumer, Producer } from './kafkajs-types'; - -export const EVENT_LISTENERS_SET = Symbol('opentelemetry.instrumentation.kafkajs.eventListenersSet'); - -export interface ConsumerExtended extends Consumer { - [EVENT_LISTENERS_SET]?: boolean; // flag to identify if the event listeners for instrumentation have been set -} - -export interface ProducerExtended extends Producer { - [EVENT_LISTENERS_SET]?: boolean; // flag to identify if the event listeners for instrumentation have been set -} diff --git a/packages/node/src/integrations/tracing/kafka/vendored/kafkajs-types.ts b/packages/node/src/integrations/tracing/kafka/vendored/kafkajs-types.ts index 1ff955322071..d6db2e7a486e 100644 --- a/packages/node/src/integrations/tracing/kafka/vendored/kafkajs-types.ts +++ b/packages/node/src/integrations/tracing/kafka/vendored/kafkajs-types.ts @@ -3,39 +3,6 @@ * Only includes members accessed by this instrumentation. */ -export interface InstrumentationEvent { - id: string; - type: string; - timestamp: number; - payload: T; -} - -export type RequestEvent = InstrumentationEvent<{ - apiKey: number; - apiName: string; - apiVersion: number; - broker: string; - clientId: string; - correlationId: number; - createdAt: number; - duration: number; - pendingDuration: number; - sentAt: number; - size: number; -}>; - -export type RemoveInstrumentationEventListener<_T> = () => void; - -export type ConsumerEvents = { - REQUEST: 'consumer.network.request'; - [key: string]: string; -}; - -export type ProducerEvents = { - REQUEST: 'producer.network.request'; - [key: string]: string; -}; - type Sender = { send(record: any): Promise; sendBatch(batch: any): Promise; @@ -45,8 +12,6 @@ export type Producer = Sender & { connect(): Promise; disconnect(): Promise; isIdempotent(): boolean; - readonly events: ProducerEvents; - on(eventName: string, listener: (...args: any[]) => void): RemoveInstrumentationEventListener; transaction(): Promise; [key: string]: any; }; @@ -63,8 +28,6 @@ export type Consumer = { disconnect(): Promise; subscribe(subscription: any): Promise; run(config?: any): Promise; - readonly events: ConsumerEvents; - on(eventName: string, listener: (...args: any[]) => void): RemoveInstrumentationEventListener; [key: string]: any; }; diff --git a/packages/node/src/integrations/tracing/kafka/vendored/propagator.ts b/packages/node/src/integrations/tracing/kafka/vendored/propagator.ts deleted file mode 100644 index 022699db041f..000000000000 --- a/packages/node/src/integrations/tracing/kafka/vendored/propagator.ts +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors, Aspecto - * SPDX-License-Identifier: Apache-2.0 - * - * NOTICE from the Sentry authors: - * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs - * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 - */ -/* eslint-disable */ - -import type { TextMapGetter } from '@opentelemetry/api'; - -/* -same as open telemetry's `defaultTextMapGetter`, -but also handle case where header is buffer, -adding toString() to make sure string is returned -*/ -export const bufferTextMapGetter: TextMapGetter = { - get(carrier, key) { - if (!carrier) { - return undefined; - } - - const keys = Object.keys(carrier); - - for (const carrierKey of keys) { - if (carrierKey === key || carrierKey.toLowerCase() === key) { - return carrier[carrierKey]?.toString(); - } - } - - return undefined; - }, - - keys(carrier) { - return carrier ? Object.keys(carrier) : []; - }, -}; diff --git a/packages/node/src/integrations/tracing/kafka/vendored/semconv.ts b/packages/node/src/integrations/tracing/kafka/vendored/semconv.ts index 69185b619b0c..f47029d82e44 100644 --- a/packages/node/src/integrations/tracing/kafka/vendored/semconv.ts +++ b/packages/node/src/integrations/tracing/kafka/vendored/semconv.ts @@ -5,8 +5,8 @@ * NOTICE from the Sentry authors: * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 + * - Metric semantic conventions dropped; `error.type` inlined from `@opentelemetry/semantic-conventions` */ -/* eslint-disable */ /* * This file contains a copy of unstable semantic convention definitions @@ -125,38 +125,17 @@ export const MESSAGING_OPERATION_TYPE_VALUE_SEND = 'send' as const; export const MESSAGING_SYSTEM_VALUE_KAFKA = 'kafka' as const; /** - * Number of messages that were delivered to the application. + * Describes a class of error the operation ended with. * - * @note Records the number of messages pulled from the broker or number of messages dispatched to the application in push-based scenarios. - * The metric **SHOULD** be reported once per message delivery. For example, if receiving and processing operations are both instrumented for a single message delivery, this counter is incremented when the message is received and not reported when it is processed. - * - * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + * @example timeout + * @example java.net.UnknownHostException + * @example server_certificate_invalid + * @example 500 */ -export const METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES = 'messaging.client.consumed.messages' as const; +export const ATTR_ERROR_TYPE = 'error.type' as const; /** - * Duration of messaging operation initiated by a producer or consumer client. - * - * @note This metric **SHOULD NOT** be used to report processing duration - processing duration is reported in `messaging.process.duration` metric. - * - * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - */ -export const METRIC_MESSAGING_CLIENT_OPERATION_DURATION = 'messaging.client.operation.duration' as const; - -/** - * Number of messages producer attempted to send to the broker. - * - * @note This metric **MUST NOT** count messages that were created but haven't yet been sent. - * - * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - */ -export const METRIC_MESSAGING_CLIENT_SENT_MESSAGES = 'messaging.client.sent.messages' as const; - -/** - * Duration of processing operation. - * - * @note This metric **MUST** be reported for operations with `messaging.operation.type` that matches `process`. - * - * @experimental This metric is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + * Enum value "_OTHER" for attribute {@link ATTR_ERROR_TYPE}. A fallback error value to be used when + * the instrumentation doesn't define a custom value. */ -export const METRIC_MESSAGING_PROCESS_DURATION = 'messaging.process.duration' as const; +export const ERROR_TYPE_VALUE_OTHER = '_OTHER' as const; diff --git a/packages/node/src/integrations/tracing/kafka/vendored/types.ts b/packages/node/src/integrations/tracing/kafka/vendored/types.ts deleted file mode 100644 index b57af67007a0..000000000000 --- a/packages/node/src/integrations/tracing/kafka/vendored/types.ts +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors, Aspecto - * SPDX-License-Identifier: Apache-2.0 - * - * NOTICE from the Sentry authors: - * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs - * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 - */ -/* eslint-disable */ - -import { Span } from '@opentelemetry/api'; -import { InstrumentationConfig } from '@opentelemetry/instrumentation'; - -export interface KafkajsMessage { - key?: Buffer | string | null; - value: Buffer | string | null; - partition?: number; - headers?: Record; - timestamp?: string; -} - -export interface MessageInfo { - topic: string; - message: T; -} - -export interface KafkaProducerCustomAttributeFunction { - (span: Span, info: MessageInfo): void; -} - -export interface KafkaConsumerCustomAttributeFunction { - (span: Span, info: MessageInfo): void; -} - -export interface KafkaJsInstrumentationConfig extends InstrumentationConfig { - /** hook for adding custom attributes before producer message is sent */ - producerHook?: KafkaProducerCustomAttributeFunction; - - /** hook for adding custom attributes before consumer message is processed */ - consumerHook?: KafkaConsumerCustomAttributeFunction; -} diff --git a/packages/node/src/integrations/tracing/kafka/vendored/utils.ts b/packages/node/src/integrations/tracing/kafka/vendored/utils.ts new file mode 100644 index 000000000000..9514856decb6 --- /dev/null +++ b/packages/node/src/integrations/tracing/kafka/vendored/utils.ts @@ -0,0 +1,177 @@ +/* + * Copyright The OpenTelemetry Authors, Aspecto + * SPDX-License-Identifier: Apache-2.0 + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-kafkajs + * - Upstream version: @opentelemetry/instrumentation-kafkajs@0.27.0 + * - Span creation extracted here and migrated to the @sentry/core API; origin folded into span creation + */ + +import { SpanKind, TraceFlags } from '@opentelemetry/api'; +import type { Span, SpanAttributes, SpanLink } from '@sentry/core'; +import { + getTraceData, + propagationContextFromHeaders, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + SPAN_STATUS_ERROR, + startInactiveSpan, +} from '@sentry/core'; +import type { KafkaMessage, Message } from './kafkajs-types'; +import { + ATTR_ERROR_TYPE, + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_DESTINATION_PARTITION_ID, + ATTR_MESSAGING_KAFKA_MESSAGE_KEY, + ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE, + ATTR_MESSAGING_KAFKA_OFFSET, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_OPERATION_TYPE, + ATTR_MESSAGING_SYSTEM, + ERROR_TYPE_VALUE_OTHER, + MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + MESSAGING_OPERATION_TYPE_VALUE_SEND, + MESSAGING_SYSTEM_VALUE_KAFKA, +} from './semconv'; + +const PRODUCER_ORIGIN = 'auto.kafkajs.otel.producer'; +const CONSUMER_ORIGIN = 'auto.kafkajs.otel.consumer'; + +export interface ConsumerSpanOptions { + topic: string; + message: KafkaMessage | undefined; + operationType: string; + attributes: SpanAttributes; + links?: SpanLink[]; +} + +/** + * Reads a header value off a kafkajs message as a string. kafkajs delivers headers as `Buffer`s (or + * arrays of them), so we normalize to a string before handing them to Sentry's trace helpers. + */ +export function getHeaderAsString(headers: KafkaMessage['headers'], key: string): string | undefined { + const value = headers?.[key]; + if (value == null) { + return undefined; + } + return Array.isArray(value) ? value[0]?.toString() : value.toString(); +} + +/** + * Builds a span link to the producer span carried in the message headers, mirroring the upstream + * behavior of linking each batch-processed message to its originating producer span. + */ +export function getLinksFromHeaders(headers: KafkaMessage['headers']): SpanLink[] | undefined { + const sentryTrace = getHeaderAsString(headers, 'sentry-trace'); + if (!sentryTrace) { + return undefined; + } + + const { traceId, parentSpanId, sampled } = propagationContextFromHeaders( + sentryTrace, + getHeaderAsString(headers, 'baggage'), + ); + if (!parentSpanId) { + return undefined; + } + + return [ + { + context: { + traceId, + spanId: parentSpanId, + isRemote: true, + traceFlags: sampled ? TraceFlags.SAMPLED : TraceFlags.NONE, + }, + }, + ]; +} + +/** Starts an inactive consumer (process/receive) span carrying the kafkajs messaging attributes. */ +export function startConsumerSpan({ topic, message, operationType, links, attributes }: ConsumerSpanOptions): Span { + const operationName = + operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE + ? 'poll' // for batch processing spans + : operationType; // for individual message processing spans + + return startInactiveSpan({ + name: `${operationName} ${topic}`, + kind: operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE ? SpanKind.CLIENT : SpanKind.CONSUMER, + links, + attributes: { + ...attributes, + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: topic, + [ATTR_MESSAGING_OPERATION_TYPE]: operationType, + [ATTR_MESSAGING_OPERATION_NAME]: operationName, + [ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message?.key ? String(message.key) : undefined, + [ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]: message?.key && message.value === null ? true : undefined, + [ATTR_MESSAGING_KAFKA_OFFSET]: message?.offset, + // Mirror the upstream behavior of only tagging per-message processing spans (not the batch + // receiving span, which carries no message) with the auto origin. + ...(message ? { [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: CONSUMER_ORIGIN } : {}), + }, + }); +} + +/** Starts an inactive producer span and propagates its trace into the message headers. */ +export function startProducerSpan(topic: string, message: Message): Span { + const span = startInactiveSpan({ + name: `send ${topic}`, + kind: SpanKind.PRODUCER, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: topic, + [ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message.key ? String(message.key) : undefined, + [ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]: message.key && message.value === null ? true : undefined, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: + message.partition !== undefined ? String(message.partition) : undefined, + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: PRODUCER_ORIGIN, + }, + }); + + // Propagate the producer span's trace to consumers via the message headers. + message.headers = message.headers ?? {}; + const traceData = getTraceData({ span }); + if (traceData['sentry-trace']) { + message.headers['sentry-trace'] = traceData['sentry-trace']; + } + if (traceData.baggage) { + message.headers['baggage'] = traceData.baggage; + } + + return span; +} + +/** + * Resolves once `sendPromise` settles, ending all `spans` and, on failure, marking them with the + * error status and `error.type` before re-throwing. + */ +export function endSpansOnPromise(spans: Span[], sendPromise: Promise): Promise { + return Promise.resolve(sendPromise) + .catch(reason => { + let errorMessage: string | undefined; + let errorType: string = ERROR_TYPE_VALUE_OTHER; + if (typeof reason === 'string' || reason === undefined) { + errorMessage = reason; + } else if (typeof reason === 'object' && Object.prototype.hasOwnProperty.call(reason, 'message')) { + errorMessage = reason.message; + errorType = reason.constructor.name; + } + + spans.forEach(span => { + span.setAttribute(ATTR_ERROR_TYPE, errorType); + span.setStatus({ + code: SPAN_STATUS_ERROR, + message: errorMessage, + }); + }); + + throw reason; + }) + .finally(() => { + spans.forEach(span => span.end()); + }); +}