Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { randomUUID } from 'node:crypto'
import { pipeline } from 'node:stream/promises'
import { setTimeout } from 'node:timers/promises'
import {
InternalError,
Expand Down Expand Up @@ -190,19 +191,23 @@ export abstract class AbstractKafkaConsumer<
})

this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
this.consumerStream.on('error', (error) => this.handlerError(error))

if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
this.messageBatchStream = new KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>(
(batch) =>
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
this.options.batchProcessingOptions,
>({
batchSize: this.options.batchProcessingOptions.batchSize,
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
readableHighWaterMark: this.options.batchProcessingOptions.readableHighWaterMark,
})

// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)
this.consumerStream.pipe(this.messageBatchStream)
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
this.consumerStream.on('error', (error) => this.handlerError(error))
}
} catch (error) {
throw new InternalError({
Expand All @@ -211,6 +216,12 @@ export abstract class AbstractKafkaConsumer<
cause: error,
})
}

if (this.messageBatchStream) {
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
}
}

private async handleSyncStream(
Expand All @@ -223,6 +234,13 @@ export abstract class AbstractKafkaConsumer<
)
}
}
private async handleSyncStreamBatch(
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
): Promise<void> {
for await (const messageBatch of stream) {
await this.consume(messageBatch[0].topic, messageBatch)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
Expand Down Expand Up @@ -371,6 +389,7 @@ export abstract class AbstractKafkaConsumer<
): Promise<MessageProcessingResult> {
try {
const isBatch = Array.isArray(messageOrBatch)
/* v8 ignore start */
if (this.options.batchProcessingEnabled && !isBatch) {
throw new Error(
'Batch processing is enabled, but a single message was passed to the handler',
Expand All @@ -381,6 +400,7 @@ export abstract class AbstractKafkaConsumer<
'Batch processing is disabled, but a batch of messages was passed to the handler',
)
}
/* v8 ignore stop */

await handler(
// We need casting to match message type with handler type - it is safe as we verify the type above
Expand All @@ -395,10 +415,7 @@ export abstract class AbstractKafkaConsumer<
const errorContext = Array.isArray(messageOrBatch)
? { batchSize: messageOrBatch.length }
: { message: stringValueSerializer(messageOrBatch.value) }
this.handlerError(error, {
topic,
...errorContext,
})
this.handlerError(error, { topic, ...errorContext })
}

return { status: 'error', errorReason: 'handlerError' }
Expand Down Expand Up @@ -443,7 +460,7 @@ export abstract class AbstractKafkaConsumer<
} catch (error) {
this.logger.debug(logDetails, 'Message commit failed')
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
throw error
this.handlerError(error)
}
}

Expand All @@ -455,7 +472,7 @@ export abstract class AbstractKafkaConsumer<
error.apiCode &&
commitErrorCodesToIgnore.has(error.apiCode)
) {
this.logger.error(
this.logger.warn(
{
apiCode: error.apiCode,
apiId: error.apiId,
Expand All @@ -466,8 +483,7 @@ export abstract class AbstractKafkaConsumer<
`Failed to commit message: ${error.message}`,
)
} else {
// If error is not recognized, rethrow it
throw responseError
this.handlerError(error)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/kafka/lib/AbstractKafkaService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ export abstract class AbstractKafkaService<

protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {
this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context })
if (isError(error)) this.errorReporter.report({ error, context })
if (isError(error))
this.errorReporter.report({
error,
context: context,
})
}
}
Loading
Loading