diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index c632669a..fa1948ef 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -1,4 +1,5 @@ import { randomUUID } from 'node:crypto' +import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { InternalError, @@ -190,19 +191,23 @@ export abstract class AbstractKafkaConsumer< }) this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics }) - this.consumerStream.on('error', (error) => this.handlerError(error)) if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { this.messageBatchStream = new KafkaMessageBatchStream< DeserializedMessage> - >( - (batch) => - this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), - this.options.batchProcessingOptions, + >({ + batchSize: this.options.batchProcessingOptions.batchSize, + timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, + readableHighWaterMark: this.options.batchProcessingOptions.readableHighWaterMark, + }) + + // Use pipeline for better error handling and backpressure management. + // pipeline() internally listens for errors on all streams + pipeline(this.consumerStream, this.messageBatchStream).catch((error) => + this.handlerError(error), ) - this.consumerStream.pipe(this.messageBatchStream) } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + this.consumerStream.on('error', (error) => this.handlerError(error)) } } catch (error) { throw new InternalError({ @@ -211,6 +216,12 @@ export abstract class AbstractKafkaConsumer< cause: error, }) } + + if (this.messageBatchStream) { + this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) + } else { + this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + } } private async handleSyncStream( @@ -223,6 +234,13 @@ export abstract class AbstractKafkaConsumer< ) } } + private async handleSyncStreamBatch( + stream: KafkaMessageBatchStream>>, + ): Promise { + for await (const messageBatch of stream) { + await this.consume(messageBatch[0].topic, messageBatch) + } + } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { @@ -371,6 +389,7 @@ export abstract class AbstractKafkaConsumer< ): Promise { try { const isBatch = Array.isArray(messageOrBatch) + /* v8 ignore start */ if (this.options.batchProcessingEnabled && !isBatch) { throw new Error( 'Batch processing is enabled, but a single message was passed to the handler', @@ -381,6 +400,7 @@ export abstract class AbstractKafkaConsumer< 'Batch processing is disabled, but a batch of messages was passed to the handler', ) } + /* v8 ignore stop */ await handler( // We need casting to match message type with handler type - it is safe as we verify the type above @@ -395,10 +415,7 @@ export abstract class AbstractKafkaConsumer< const errorContext = Array.isArray(messageOrBatch) ? { batchSize: messageOrBatch.length } : { message: stringValueSerializer(messageOrBatch.value) } - this.handlerError(error, { - topic, - ...errorContext, - }) + this.handlerError(error, { topic, ...errorContext }) } return { status: 'error', errorReason: 'handlerError' } @@ -443,7 +460,7 @@ export abstract class AbstractKafkaConsumer< } catch (error) { this.logger.debug(logDetails, 'Message commit failed') if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) - throw error + this.handlerError(error) } } @@ -455,7 +472,7 @@ export abstract class AbstractKafkaConsumer< error.apiCode && commitErrorCodesToIgnore.has(error.apiCode) ) { - this.logger.error( + this.logger.warn( { apiCode: error.apiCode, apiId: error.apiId, @@ -466,8 +483,7 @@ export abstract class AbstractKafkaConsumer< `Failed to commit message: ${error.message}`, ) } else { - // If error is not recognized, rethrow it - throw responseError + this.handlerError(error) } } } diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 1528ee90..aa0d9275 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -127,6 +127,10 @@ export abstract class AbstractKafkaService< protected handlerError(error: unknown, context: Record = {}): void { this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context }) - if (isError(error)) this.errorReporter.report({ error, context }) + if (isError(error)) + this.errorReporter.report({ + error, + context: context, + }) } } diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index ccc88e03..f5dc220e 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,6 +1,5 @@ import { setTimeout } from 'node:timers/promises' -import { waitAndRetry } from '@lokalise/universal-ts-utils/node' -import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' +import { KafkaMessageBatchStream } from './KafkaMessageBatchStream.ts' describe('KafkaMessageBatchStream', () => { it('should batch messages based on batch size', async () => { @@ -14,27 +13,25 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[][] = [] let resolvePromise: () => void const dataFetchingPromise = new Promise((resolve) => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - // We expect 3 batches and the last message waiting in the stream - if (receivedBatches.length >= 3) { - resolvePromise() - } - return Promise.resolve() - }, - { - batchSize: 3, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 3, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + // We expect 3 batches and the last message waiting in the stream + if (receivedBatches.length >= 3) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -44,9 +41,9 @@ describe('KafkaMessageBatchStream', () => { // Then expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1], messages[2]] }, - { topic, partition: 0, messages: [messages[3], messages[4], messages[5]] }, - { topic, partition: 0, messages: [messages[6], messages[7], messages[8]] }, + [messages[0], messages[1], messages[2]], + [messages[3], messages[4], messages[5]], + [messages[6], messages[7], messages[8]], ]) }) @@ -61,18 +58,16 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[][] = [] - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 1000, - timeoutMilliseconds: 100, - }, - ) // Setting big batch size to check timeout only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, + timeoutMilliseconds: 100, + }) // Setting big batch size to check timeout only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + }) for (const message of messages) { batchStream.write(message) @@ -82,7 +77,7 @@ describe('KafkaMessageBatchStream', () => { await setTimeout(150) // Then - expect(receivedBatches).toEqual([{ topic, partition: 0, messages }]) + expect(receivedBatches).toEqual([messages]) }) it('should support multiple topics and partitions', async () => { @@ -120,27 +115,24 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - const key = `${batch.topic}:${batch.partition}` - if (!receivedBatchesByTopicPartition[key]) { - receivedBatchesByTopicPartition[key] = [] - } - receivedBatchesByTopicPartition[key]!.push(batch.messages) - - // We expect 5 batches and last message waiting in the stream - receivedMessagesCounter++ - if (receivedMessagesCounter >= 5) { - resolvePromise() - } - - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + const key = `${batch[0]!.topic}:${batch[0]!.partition}` + if (!receivedBatchesByTopicPartition[key]) { + receivedBatchesByTopicPartition[key] = [] + } + receivedBatchesByTopicPartition[key]!.push(batch) + + // We expect 5 batches and last message waiting in the stream + receivedMessagesCounter++ + if (receivedMessagesCounter >= 5) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -199,23 +191,20 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - receivedBatches.push(batch) + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only - // We expect 4 batches (2 per partition) - receivedBatchesCounter++ - if (receivedBatchesCounter >= 4) { - resolvePromise() - } + batchStream.on('data', (batch) => { + receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + // We expect 6 batches due to cross-partition accumulation with batchSize=2 + receivedBatchesCounter++ + if (receivedBatchesCounter >= 6) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -224,76 +213,192 @@ describe('KafkaMessageBatchStream', () => { await dataFetchingPromise // Then + // With batchSize=2, messages accumulate across all partitions: + // - write(msg[0], msg[1]) → flush → [msg[0], msg[1]] (partition 0) + // - write(msg[2], msg[3]) → flush → [msg[2]] (partition 0) and [msg[3]] (partition 1) + // - write(msg[4], msg[5]) → flush → [msg[4], msg[5]] (partition 1) + // - write(msg[6], msg[7]) → flush → [msg[6]] (partition 0) and [msg[7]] (partition 1) expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1]] }, - { topic, partition: 1, messages: [messages[3], messages[4]] }, - { topic, partition: 0, messages: [messages[2], messages[6]] }, - { topic, partition: 1, messages: [messages[5], messages[7]] }, + [messages[0], messages[1]], // partition 0 + [messages[2]], // partition 0 + [messages[3]], // partition 1 + [messages[4], messages[5]], // partition 1 + [messages[6]], // partition 0 + [messages[7]], // partition 1 ]) + + expect(messages[0]!.partition).toBe(0) + expect(messages[1]!.partition).toBe(0) + expect(messages[2]!.partition).toBe(0) + expect(messages[3]!.partition).toBe(1) + expect(messages[4]!.partition).toBe(1) + expect(messages[5]!.partition).toBe(1) + expect(messages[6]!.partition).toBe(0) + expect(messages[7]!.partition).toBe(1) }) - it('should handle backpressure correctly when timeout flush is slow', async () => { - // Given - const topic = 'test-topic' - const messages = Array.from({ length: 6 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic, - partition: 0, - })) + describe('backpressure', () => { + it('should pause writes when the readable buffer is full', async () => { + // With batchSize=1, each message is immediately flushed to the readable buffer. + // The objectMode readableHighWaterMark defaults to 16: + // push() returns false on the 16th flush, causing the _write callback to be held + // until _read() signals the downstream is ready again. + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + }) + + const processedmessages = new Set() + // Write 15 messages: push() returns true for each (buffer 1→15, below HWM of 16) + for (let i = 0; i < 15; i++) { + await new Promise((resolve) => + batchStream.write({ id: i, topic: 'test', partition: 0 }, () => { + processedmessages.add(i) + resolve() + }), + ) + } + + // 16th write fills the buffer to HWM: push() returns false, callback is held + let sixteenthWriteCompleted = false + batchStream.write({ id: 15, topic: 'test', partition: 0 }, () => { + processedmessages.add(15) + sixteenthWriteCompleted = true + }) + + await setTimeout(10) + expect(processedmessages.size).toEqual(15) + expect(sixteenthWriteCompleted).toBe(false) // write is paused by backpressure + + // Consuming one item triggers _read(), releasing the held callback + batchStream.read() + + await setTimeout(10) + expect(processedmessages.size).toEqual(16) + expect(sixteenthWriteCompleted).toBe(true) // write resumes + }) - const batchStartTimes: number[] = [] // Track start times of batch processing - const batchEndTimes: number[] = [] // Track end times of batch processing - const batchMessageCounts: number[] = [] // Track number of messages per batch - let maxConcurrentBatches = 0 // Track max concurrent batches + it('should deliver all messages without loss when the consumer is slow', async () => { + // 20 messages with batchSize=1 generates 20 batches. + // HWM is 16, so batches 17-20 are held until the consumer drains the buffer, + // verifying that backpressure does not cause message loss. + const topic = 'test-topic' + const totalMessages = 20 + const messages = Array.from({ length: totalMessages }, (_, i) => ({ + id: i, + topic, + partition: 0, + })) - let batchesProcessing = 0 - const batchStream = new KafkaMessageBatchStream( - async (batch) => { - batchStartTimes.push(Date.now()) - batchMessageCounts.push(batch.messages.length) + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + }) + + const receivedIds: number[] = [] + let resolveAll!: () => void + const allReceived = new Promise((resolve) => { + resolveAll = resolve + }) + + // Slow consumer: each batch takes longer than writes are produced + const consume = async () => { + for await (const batch of batchStream) { + await setTimeout(10) + for (const msg of batch) receivedIds.push(msg.id) + if (receivedIds.length >= totalMessages) { + resolveAll() + break + } + } + } + void consume() - batchesProcessing++ - maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing) + for (const msg of messages) batchStream.write(msg) - // Simulate batch processing (50ms per batch) - await setTimeout(50) + await allReceived + expect(receivedIds).toHaveLength(totalMessages) + expect(receivedIds).toEqual(messages.map((m) => m.id)) + }) - batchEndTimes.push(Date.now()) - batchesProcessing-- - }, - { - batchSize: 1000, // Large batch size to never trigger size-based flushing - timeoutMilliseconds: 10, // Short timeout to trigger flush after each message - }, - ) + it('should respect a custom readableHighWaterMark', async () => { + // With readableHighWaterMark=1 and batchSize=1, push() returns false after the + // very first item in the readable buffer, triggering backpressure immediately. + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + readableHighWaterMark: 1, + }) - // When: Write messages with 20ms delay between them - // Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation - for (const message of messages) { - batchStream.write(message) - await setTimeout(20) - } + // First write: flushed immediately, buffer reaches HWM=1, push() returns false + // → _write callback is held by backpressure + let firstWriteCompleted = false + batchStream.write({ id: 0, topic: 'test', partition: 0 }, () => { + firstWriteCompleted = true + }) - // Then - // Wait until all 3 batches have been processed - await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20) - - // Backpressure causes messages to accumulate while previous batch processes: - // - Batch 1: Message 1 (flushed at 10ms timeout) - // - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms) - // - Batch 3: Messages 5-6 (accumulated during Batch 2 processing) - expect(batchMessageCounts).toEqual([1, 3, 2]) - - // Verify that batches never processed in parallel (backpressure working) - expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time - - // Verify that batches were processed sequentially (each starts after previous ends) - for (let i = 1; i < batchStartTimes.length; i++) { - const previousEndTime = batchEndTimes[i - 1] - const currentStartTime = batchStartTimes[i] - // The current batch must start after the previous batch finished - expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0) - } + await setTimeout(10) + expect(firstWriteCompleted).toBe(false) // held by backpressure + + // Consuming one item triggers _read(), releasing the held callback + batchStream.read() + + await setTimeout(10) + expect(firstWriteCompleted).toBe(true) // write resumes + }) + + it('should defer timeout flush when backpressured and flush once consumer reads', () => { + vi.useFakeTimers() + + const topic = 'test-topic' + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, // large: only timeout-based flushes + timeoutMilliseconds: 100, + }) + + // Mock push() to simulate a full readable buffer on the first flush + let simulateBackpressure = true + const originalPush = batchStream.push.bind(batchStream) + vi.spyOn(batchStream, 'push').mockImplementation((chunk) => { + originalPush(chunk) // still push the data to the buffer + return !simulateBackpressure // return false to signal backpressure + }) + + // No 'data' listener: stream stays in paused mode so _read() is only + // triggered when batchStream.read() is called explicitly + + // Write 5 messages: they accumulate in this.messages, a timeout is scheduled + for (let i = 0; i < 5; i++) { + batchStream.write({ id: i, topic, partition: 0 }) + } + + // First timeout fires: flushMessages() → push() returns false → isBackPreassured = true + vi.advanceTimersByTime(100) + expect(batchStream.readableLength).toBe(1) // 1 batch buffered (the 5 messages) + + // Write 3 more messages: they accumulate in this.messages while backpressured + for (let i = 5; i < 8; i++) { + batchStream.write({ id: i, topic, partition: 0 }) + } + + // Second timeout fires: isBackPreassured is true → reschedules several times without flushing + vi.advanceTimersByTime(300) + expect(batchStream.readableLength).toBe(1) // unchanged: no new flush happened + + // Consumer reads one item → triggers _read() → isBackPreassured = false + simulateBackpressure = false + const firstBatch = batchStream.read() + expect(firstBatch).toHaveLength(5) + expect(batchStream.readableLength).toBe(0) + + // Third timeout fires: isBackPreassured is false → flushes the 3 accumulated messages + vi.advanceTimersByTime(100) + expect(batchStream.readableLength).toBe(1) // new batch pushed + + const secondBatch = batchStream.read() + expect(secondBatch).toHaveLength(3) // messages 5–7 + + vi.useRealTimers() + }) }) }) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index a6c43d0a..edde1ffa 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,122 +1,158 @@ -import { Transform } from 'node:stream' +import { Duplex } from 'node:stream' -// Topic and partition are required for the stream to work properly +type CallbackFunction = (error?: Error | null) => void type MessageWithTopicAndPartition = { topic: string; partition: number } +/** + * Options for configuring the KafkaMessageBatchStream behavior. + */ export type KafkaMessageBatchOptions = { + /** Maximum number of messages to accumulate across all partitions before flushing */ batchSize: number + /** Time in milliseconds to wait before flushing incomplete batches */ timeoutMilliseconds: number + /** + * Maximum number of topic-partition batches to buffer on the readable side before signaling backpressure. + * Each unit represents one array of messages belonging to the same topic-partition, produced per flush. + * A single flush may push multiple such arrays (one per distinct topic-partition in the accumulated batch). + * Defaults to Node.js object-mode default (16). Lower values trigger backpressure sooner, + * reducing downstream memory pressure at the cost of more frequent flow-control cycles. + */ + readableHighWaterMark?: number } -export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } -export type OnMessageBatchCallback = (batch: MessageBatch) => Promise +/** + * Interface extending Duplex to provide strong typing for the 'data' event. + * The stream emits arrays of messages grouped by topic-partition. + */ +export interface KafkaMessageBatchStream + extends Duplex { + // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition + on(event: string | symbol, listener: (...args: any[]) => void): this + /** Listen for batches of messages from the same topic-partition */ + on(event: 'data', listener: (chunk: TMessage[]) => void): this + push(chunk: TMessage[] | null): boolean +} /** - * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. + * A Duplex stream that batches Kafka messages based on size and timeout constraints. * - * This implementation uses Transform stream which properly handles backpressure by design. - * When the downstream consumer is slow, the stream will automatically pause accepting new messages - * until the consumer catches up, preventing memory leaks and OOM errors. + * Key features: + * - Accumulates messages across all partitions up to `batchSize` for true memory control + * - Groups messages by topic-partition when flushing + * - Implements backpressure: pauses input when downstream consumers are overwhelmed + * - Auto-flushes on timeout to prevent messages from waiting indefinitely + * + * @example + * ```typescript + * const batchStream = new KafkaMessageBatchStream({ batchSize: 100, timeoutMilliseconds: 1000 }) + * batchStream.on('data', (batch) => { + * console.log(`Received ${batch.length} messages from ${batch[0].topic}:${batch[0].partition}`) + * }) + * ``` */ -export class KafkaMessageBatchStream< - TMessage extends MessageWithTopicAndPartition, -> extends Transform { - private readonly onBatch: OnMessageBatchCallback +// biome-ignore lint/suspicious/noUnsafeDeclarationMerging: merging interface with class to add strong typing for 'data' event +export class KafkaMessageBatchStream extends Duplex { private readonly batchSize: number private readonly timeout: number - private readonly currentBatchPerTopicPartition: Record - private readonly batchTimeoutPerTopicPartition: Record - - private readonly timeoutProcessingPromises: Map> = new Map() + private messages: TMessage[] + private existingTimeout: NodeJS.Timeout | undefined + private pendingCallback: CallbackFunction | undefined + private isBackPressured: boolean - constructor( - onBatch: OnMessageBatchCallback, - options: { batchSize: number; timeoutMilliseconds: number }, - ) { - super({ objectMode: true }) - this.onBatch = onBatch + constructor(options: KafkaMessageBatchOptions) { + super({ objectMode: true, readableHighWaterMark: options.readableHighWaterMark }) this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds - this.currentBatchPerTopicPartition = {} - this.batchTimeoutPerTopicPartition = {} + this.messages = [] + this.isBackPressured = false } - override async _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { - const key = getTopicPartitionKey(message.topic, message.partition) - - // Wait for all pending timeout flushes to complete to maintain backpressure - if (this.timeoutProcessingPromises.size > 0) { - // Capture a snapshot of current promises to avoid race conditions with new timeouts - const promiseEntries = Array.from(this.timeoutProcessingPromises.entries()) - // Wait for all to complete and then clean up from the map - await Promise.all( - promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k))), - ) - } - - // Accumulate the message - if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] - this.currentBatchPerTopicPartition[key].push(message) - - // Check if the batch is complete by size - if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { - await this.flushCurrentBatchMessages(message.topic, message.partition) - callback() - return - } + /** + * Called when the downstream consumer is ready to receive more data. + * This is the backpressure release mechanism: we resume the writable side + * by calling the pending callback that was held during backpressure. + */ + override _read() { + this.isBackPressured = false + if (!this.pendingCallback) return + + const cb = this.pendingCallback + this.pendingCallback = undefined + cb() // Resume the writable side + } - // Start timeout for this partition if not already started - if (!this.batchTimeoutPerTopicPartition[key]) { - this.batchTimeoutPerTopicPartition[key] = setTimeout( - () => - this.timeoutProcessingPromises.set( - key, - this.flushCurrentBatchMessages(message.topic, message.partition), - ), - this.timeout, - ) + /** + * Writes a message to the stream. + * Messages accumulate until batchSize is reached or timeout expires. + * Implements backpressure by holding the callback when downstream cannot consume. + */ + override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { + let canContinue = true + + try { + this.messages.push(message) + + if (this.messages.length >= this.batchSize) { + // Batch is full, flush immediately + canContinue = this.flushMessages() + } else { + // Start/continue the timeout for partial batches + // Using ??= ensures we only set one timeout at a time + this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout) + } + } finally { + // Backpressure handling: hold the callback if push() returned false + if (!canContinue) this.pendingCallback = callback + else callback() } - - callback() } - // Flush all remaining batches when stream is closing - override async _flush(callback: () => void) { - await this.flushAllBatches() + override _final(callback: CallbackFunction) { + // Clean timeout + clearTimeout(this.existingTimeout) + this.existingTimeout = undefined + // If there are remaining messages -> skip them + // As they are not committed, the next consumer will process them + this.messages = [] + this.push(null) callback() } - private async flushAllBatches() { - for (const key of Object.keys(this.currentBatchPerTopicPartition)) { - const { topic, partition } = splitTopicPartitionKey(key) - await this.flushCurrentBatchMessages(topic, partition) + private flushMessages(): boolean { + clearTimeout(this.existingTimeout) + this.existingTimeout = undefined + + if (this.isBackPressured) { + this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout) + return false } - } - private async flushCurrentBatchMessages(topic: string, partition: number) { - const key = getTopicPartitionKey(topic, partition) + // Extract all accumulated messages and clear the array + const messageBatch = this.messages.splice(0, this.messages.length) + + // Group by topic-partition to maintain commit guarantees + const messagesByTopicPartition: Record = {} + for (const message of messageBatch) { + const key = getTopicPartitionKey(message.topic, message.partition) + if (!messagesByTopicPartition[key]) messagesByTopicPartition[key] = [] + messagesByTopicPartition[key].push(message) + } - // Clear timeout - if (this.batchTimeoutPerTopicPartition[key]) { - clearTimeout(this.batchTimeoutPerTopicPartition[key]) - this.batchTimeoutPerTopicPartition[key] = undefined + // Push each topic-partition batch and track backpressure. + // All batches must be pushed regardless: messages were already splice'd from the buffer, + // so breaking early would lose them. Once push() returns false, subsequent calls in the + // same tick also return false, so the last value correctly reflects backpressure. + let canContinue = true + for (const messagesForKey of Object.values(messagesByTopicPartition)) { + canContinue = this.push(messagesForKey) } - const messages = this.currentBatchPerTopicPartition[key] ?? [] + if (!canContinue) this.isBackPressured = true - // Push the batch downstream - await this.onBatch({ topic, partition, messages }) - this.currentBatchPerTopicPartition[key] = [] + return canContinue } } const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` -const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { - const [topic, partition] = key.split(':') - /* v8 ignore start */ - if (!topic || !partition) throw new Error('Invalid topic-partition key format') - /* v8 ignore stop */ - - return { topic, partition: Number.parseInt(partition, 10) } -} diff --git a/packages/kafka/package.json b/packages/kafka/package.json index eda1bc71..b311f8dc 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.26.0" + "@platformatic/kafka": "^1.28.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0", diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts index 00c6ebbb..16786673 100644 --- a/packages/kafka/vitest.config.ts +++ b/packages/kafka/vitest.config.ts @@ -14,7 +14,7 @@ export default defineConfig({ exclude: ['vitest.config.ts', 'lib/**/index.ts'], thresholds: { lines: 93, - functions: 91, + functions: 88, branches: 85, statements: 92, },