From ea6bb3a23641dce94206be98b5b9fd5c7f6bab38 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 18:05:46 +0100 Subject: [PATCH 01/21] Back to duplex --- .../lib/utils/KafkaMessageBatchStream.ts | 112 ++++++++---------- 1 file changed, 51 insertions(+), 61 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index a6c43d0a..aadfdaaf 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,4 +1,6 @@ -import { Transform } from 'node:stream' +import { Duplex } from 'node:stream' + +type CallbackFunction = (error?: Error | null) => void // Topic and partition are required for the stream to work properly type MessageWithTopicAndPartition = { topic: string; partition: number } @@ -9,114 +11,102 @@ export type KafkaMessageBatchOptions = { } export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } -export type OnMessageBatchCallback = (batch: MessageBatch) => Promise + +export interface KafkaMessageBatchStream + extends Duplex { + // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition + on(event: string | symbol, listener: (...args: any[]) => void): this + on(event: 'data', listener: (chunk: MessageBatch) => void): this + + push(chunk: MessageBatch | null): boolean +} /** * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. - * - * This implementation uses Transform stream which properly handles backpressure by design. - * When the downstream consumer is slow, the stream will automatically pause accepting new messages - * until the consumer catches up, preventing memory leaks and OOM errors. */ -export class KafkaMessageBatchStream< - TMessage extends MessageWithTopicAndPartition, -> extends Transform { - private readonly onBatch: OnMessageBatchCallback +// biome-ignore lint/suspicious/noUnsafeDeclarationMerging: merging interface with class to add strong typing for 'data' event +export class KafkaMessageBatchStream extends Duplex { private readonly batchSize: number private readonly timeout: number private readonly currentBatchPerTopicPartition: Record private readonly batchTimeoutPerTopicPartition: Record - private readonly timeoutProcessingPromises: Map> = new Map() - - constructor( - onBatch: OnMessageBatchCallback, - options: { batchSize: number; timeoutMilliseconds: number }, - ) { + constructor(options: { batchSize: number; timeoutMilliseconds: number }) { super({ objectMode: true }) - this.onBatch = onBatch this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds this.currentBatchPerTopicPartition = {} this.batchTimeoutPerTopicPartition = {} } - override async _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { + override _read() { + // No-op, as we push data when we have a full batch or timeout + } + + override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { const key = getTopicPartitionKey(message.topic, message.partition) - // Wait for all pending timeout flushes to complete to maintain backpressure - if (this.timeoutProcessingPromises.size > 0) { - // Capture a snapshot of current promises to avoid race conditions with new timeouts - const promiseEntries = Array.from(this.timeoutProcessingPromises.entries()) - // Wait for all to complete and then clean up from the map - await Promise.all( - promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k))), - ) + if (!this.currentBatchPerTopicPartition[key]) { + this.currentBatchPerTopicPartition[key] = [message] + } else { + // biome-ignore lint/style/noNonNullAssertion: non-existing entry is handled above + this.currentBatchPerTopicPartition[key]!.push(message) } - // Accumulate the message - if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] - this.currentBatchPerTopicPartition[key].push(message) - - // Check if the batch is complete by size - if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { - await this.flushCurrentBatchMessages(message.topic, message.partition) - callback() - return + // biome-ignore lint/style/noNonNullAssertion: we ensure above that the array is defined + if (this.currentBatchPerTopicPartition[key]!.length >= this.batchSize) { + this.flushCurrentBatchMessages(message.topic, message.partition) + return callback(null) } - // Start timeout for this partition if not already started if (!this.batchTimeoutPerTopicPartition[key]) { - this.batchTimeoutPerTopicPartition[key] = setTimeout( - () => - this.timeoutProcessingPromises.set( - key, - this.flushCurrentBatchMessages(message.topic, message.partition), - ), - this.timeout, - ) + this.batchTimeoutPerTopicPartition[key] = setTimeout(() => { + this.flushCurrentBatchMessages(message.topic, message.partition) + }, this.timeout) } - callback() + callback(null) } - // Flush all remaining batches when stream is closing - override async _flush(callback: () => void) { - await this.flushAllBatches() + // Write side is closed, flush the remaining messages + override _final(callback: CallbackFunction) { + this.flushAllBatches() + this.push(null) // End readable side callback() } - private async flushAllBatches() { + private flushAllBatches() { for (const key of Object.keys(this.currentBatchPerTopicPartition)) { const { topic, partition } = splitTopicPartitionKey(key) - await this.flushCurrentBatchMessages(topic, partition) + this.flushCurrentBatchMessages(topic, partition) } } - private async flushCurrentBatchMessages(topic: string, partition: number) { + private flushCurrentBatchMessages(topic: string, partition: number) { const key = getTopicPartitionKey(topic, partition) - // Clear timeout if (this.batchTimeoutPerTopicPartition[key]) { clearTimeout(this.batchTimeoutPerTopicPartition[key]) this.batchTimeoutPerTopicPartition[key] = undefined } - const messages = this.currentBatchPerTopicPartition[key] ?? [] + if (!this.currentBatchPerTopicPartition[key]?.length) { + return + } - // Push the batch downstream - await this.onBatch({ topic, partition, messages }) + this.push({ topic, partition, messages: this.currentBatchPerTopicPartition[key] }) this.currentBatchPerTopicPartition[key] = [] } } -const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` + +const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` + const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { const [topic, partition] = key.split(':') - /* v8 ignore start */ - if (!topic || !partition) throw new Error('Invalid topic-partition key format') - /* v8 ignore stop */ - + if (!topic || !partition) { + throw new Error('Invalid topic-partition key format') + } return { topic, partition: Number.parseInt(partition, 10) } -} +} \ No newline at end of file From a6330090de12e8f48f7ec0aae2f47a53be3d1a20 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 18:32:19 +0100 Subject: [PATCH 02/21] Proper backpreassure handling on batch stream --- .../lib/utils/KafkaMessageBatchStream.ts | 147 ++++++++++-------- 1 file changed, 85 insertions(+), 62 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index aadfdaaf..0757082a 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,112 +1,135 @@ import { Duplex } from 'node:stream' type CallbackFunction = (error?: Error | null) => void - -// Topic and partition are required for the stream to work properly type MessageWithTopicAndPartition = { topic: string; partition: number } +/** + * Options for configuring the KafkaMessageBatchStream behavior. + */ export type KafkaMessageBatchOptions = { + /** Maximum number of messages to accumulate across all partitions before flushing */ batchSize: number + /** Time in milliseconds to wait before flushing incomplete batches */ timeoutMilliseconds: number } -export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } +/** + * Interface extending Duplex to provide strong typing for the 'data' event. + * The stream emits arrays of messages grouped by topic-partition. + */ export interface KafkaMessageBatchStream extends Duplex { - // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition + // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition on(event: string | symbol, listener: (...args: any[]) => void): this - on(event: 'data', listener: (chunk: MessageBatch) => void): this - - push(chunk: MessageBatch | null): boolean + /** Listen for batches of messages from the same topic-partition */ + on(event: 'data', listener: (chunk: TMessage[]) => void): this + push(chunk: TMessage[] | null): boolean } /** - * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. + * A Duplex stream that batches Kafka messages based on size and timeout constraints. + * + * Key features: + * - Accumulates messages across all partitions up to `batchSize` for true memory control + * - Groups messages by topic-partition when flushing + * - Implements backpressure: pauses input when downstream consumers are overwhelmed + * - Auto-flushes on timeout to prevent messages from waiting indefinitely + * + * @example + * ```typescript + * const batchStream = new KafkaMessageBatchStream({ batchSize: 100, timeoutMilliseconds: 1000 }) + * batchStream.on('data', (batch) => { + * console.log(`Received ${batch.length} messages from ${batch[0].topic}:${batch[0].partition}`) + * }) + * ``` */ // biome-ignore lint/suspicious/noUnsafeDeclarationMerging: merging interface with class to add strong typing for 'data' event export class KafkaMessageBatchStream extends Duplex { private readonly batchSize: number private readonly timeout: number - private readonly currentBatchPerTopicPartition: Record - private readonly batchTimeoutPerTopicPartition: Record + private readonly messages: TMessage[] + private existingTimeout: NodeJS.Timeout | undefined + private pendingCallback: CallbackFunction | undefined + - constructor(options: { batchSize: number; timeoutMilliseconds: number }) { + constructor(options: KafkaMessageBatchOptions) { super({ objectMode: true }) this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds - this.currentBatchPerTopicPartition = {} - this.batchTimeoutPerTopicPartition = {} + this.messages = [] } + /** + * Called when the downstream consumer is ready to receive more data. + * This is the backpressure release mechanism: we resume the writable side + * by calling the pending callback that was held during backpressure. + */ override _read() { - // No-op, as we push data when we have a full batch or timeout + if (!this.pendingCallback) return + + const cb = this.pendingCallback + this.pendingCallback = undefined + cb() // Resume the writable side } + /** + * Writes a message to the stream. + * Messages accumulate until batchSize is reached or timeout expires. + * Implements backpressure by holding the callback when downstream cannot consume. + */ override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { - const key = getTopicPartitionKey(message.topic, message.partition) - - if (!this.currentBatchPerTopicPartition[key]) { - this.currentBatchPerTopicPartition[key] = [message] - } else { - // biome-ignore lint/style/noNonNullAssertion: non-existing entry is handled above - this.currentBatchPerTopicPartition[key]!.push(message) + let canContinue = true + + try { + this.messages.push(message) + + if (this.messages.length >= this.batchSize) { + // Batch is full, flush immediately + canContinue = this.flushMessages() + } else { + // Start/continue the timeout for partial batches + // Using ??= ensures we only set one timeout at a time + this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout) + } + } finally { + // Backpressure handling: hold the callback if push() returned false + if (!canContinue) this.pendingCallback = callback + else callback() } - - // biome-ignore lint/style/noNonNullAssertion: we ensure above that the array is defined - if (this.currentBatchPerTopicPartition[key]!.length >= this.batchSize) { - this.flushCurrentBatchMessages(message.topic, message.partition) - return callback(null) - } - - if (!this.batchTimeoutPerTopicPartition[key]) { - this.batchTimeoutPerTopicPartition[key] = setTimeout(() => { - this.flushCurrentBatchMessages(message.topic, message.partition) - }, this.timeout) - } - - callback(null) } - // Write side is closed, flush the remaining messages override _final(callback: CallbackFunction) { - this.flushAllBatches() - this.push(null) // End readable side + this.flushMessages() + this.push(null) // Signal end-of-stream to the readable side callback() } - private flushAllBatches() { - for (const key of Object.keys(this.currentBatchPerTopicPartition)) { - const { topic, partition } = splitTopicPartitionKey(key) - this.flushCurrentBatchMessages(topic, partition) - } - } + private flushMessages() { + clearTimeout(this.existingTimeout) + this.existingTimeout = undefined - private flushCurrentBatchMessages(topic: string, partition: number) { - const key = getTopicPartitionKey(topic, partition) + // Extract all accumulated messages and clear the array + const messageBatch = this.messages.splice(0, this.messages.length) - if (this.batchTimeoutPerTopicPartition[key]) { - clearTimeout(this.batchTimeoutPerTopicPartition[key]) - this.batchTimeoutPerTopicPartition[key] = undefined + // Group by topic-partition to maintain commit guarantees + const messagesByTopicPartition: Record = {} + for (const message of messageBatch) { + const key = getTopicPartitionKey(message.topic, message.partition) + if (!messagesByTopicPartition[key]) messagesByTopicPartition[key] = [] + messagesByTopicPartition[key].push(message) } - if (!this.currentBatchPerTopicPartition[key]?.length) { - return + // Push each topic-partition batch and track backpressure + let canContinue = true + for (const messagesForKey of Object.values(messagesByTopicPartition)) { + canContinue = this.push(messagesForKey) } - this.push({ topic, partition, messages: this.currentBatchPerTopicPartition[key] }) - this.currentBatchPerTopicPartition[key] = [] + return canContinue } } -const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` - -const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { - const [topic, partition] = key.split(':') - if (!topic || !partition) { - throw new Error('Invalid topic-partition key format') - } - return { topic, partition: Number.parseInt(partition, 10) } -} \ No newline at end of file +const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` \ No newline at end of file From 1b967fa78739d15a976dbba91ae0f5558089c1ec Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 19:07:10 +0100 Subject: [PATCH 03/21] Improving tests --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 212 +++++++----------- 1 file changed, 76 insertions(+), 136 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index ccc88e03..8de491e6 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,6 +1,5 @@ import { setTimeout } from 'node:timers/promises' -import { waitAndRetry } from '@lokalise/universal-ts-utils/node' -import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' +import { KafkaMessageBatchStream } from './KafkaMessageBatchStream.ts' describe('KafkaMessageBatchStream', () => { it('should batch messages based on batch size', async () => { @@ -14,27 +13,25 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[][] = [] let resolvePromise: () => void const dataFetchingPromise = new Promise((resolve) => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - // We expect 3 batches and the last message waiting in the stream - if (receivedBatches.length >= 3) { - resolvePromise() - } - return Promise.resolve() - }, - { - batchSize: 3, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 3, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + // We expect 3 batches and the last message waiting in the stream + if (receivedBatches.length >= 3) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -44,9 +41,9 @@ describe('KafkaMessageBatchStream', () => { // Then expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1], messages[2]] }, - { topic, partition: 0, messages: [messages[3], messages[4], messages[5]] }, - { topic, partition: 0, messages: [messages[6], messages[7], messages[8]] }, + [messages[0], messages[1], messages[2]], + [messages[3], messages[4], messages[5]], + [messages[6], messages[7], messages[8]], ]) }) @@ -61,18 +58,16 @@ describe('KafkaMessageBatchStream', () => { })) // When - const receivedBatches: MessageBatch[] = [] + const receivedBatches: any[][] = [] - const batchStream = new KafkaMessageBatchStream( - (batch) => { - receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 1000, - timeoutMilliseconds: 100, - }, - ) // Setting big batch size to check timeout only + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, + timeoutMilliseconds: 100, + }) // Setting big batch size to check timeout only + + batchStream.on('data', (batch) => { + receivedBatches.push(batch) + }) for (const message of messages) { batchStream.write(message) @@ -82,7 +77,7 @@ describe('KafkaMessageBatchStream', () => { await setTimeout(150) // Then - expect(receivedBatches).toEqual([{ topic, partition: 0, messages }]) + expect(receivedBatches).toEqual([messages]) }) it('should support multiple topics and partitions', async () => { @@ -120,27 +115,24 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - const key = `${batch.topic}:${batch.partition}` - if (!receivedBatchesByTopicPartition[key]) { - receivedBatchesByTopicPartition[key] = [] - } - receivedBatchesByTopicPartition[key]!.push(batch.messages) - - // We expect 5 batches and last message waiting in the stream - receivedMessagesCounter++ - if (receivedMessagesCounter >= 5) { - resolvePromise() - } - - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only + + batchStream.on('data', (batch) => { + const key = `${batch[0]!.topic}:${batch[0]!.partition}` + if (!receivedBatchesByTopicPartition[key]) { + receivedBatchesByTopicPartition[key] = [] + } + receivedBatchesByTopicPartition[key]!.push(batch) + + // We expect 5 batches and last message waiting in the stream + receivedMessagesCounter++ + if (receivedMessagesCounter >= 5) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -199,23 +191,20 @@ describe('KafkaMessageBatchStream', () => { resolvePromise = resolve }) - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( - (batch) => { - receivedBatches.push(batch) + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ + batchSize: 2, + timeoutMilliseconds: 10000, + }) // Setting big timeout to check batch size only - // We expect 4 batches (2 per partition) - receivedBatchesCounter++ - if (receivedBatchesCounter >= 4) { - resolvePromise() - } + batchStream.on('data', (batch) => { + receivedBatches.push(batch) - return Promise.resolve() - }, - { - batchSize: 2, - timeoutMilliseconds: 10000, - }, - ) // Setting big timeout to check batch size only + // We expect 6 batches due to cross-partition accumulation with batchSize=2 + receivedBatchesCounter++ + if (receivedBatchesCounter >= 6) { + resolvePromise() + } + }) for (const message of messages) { batchStream.write(message) @@ -224,76 +213,27 @@ describe('KafkaMessageBatchStream', () => { await dataFetchingPromise // Then + // With batchSize=2, messages accumulate across all partitions: + // - write(msg[0], msg[1]) → flush → [msg[0], msg[1]] (partition 0) + // - write(msg[2], msg[3]) → flush → [msg[2]] (partition 0) and [msg[3]] (partition 1) + // - write(msg[4], msg[5]) → flush → [msg[4], msg[5]] (partition 1) + // - write(msg[6], msg[7]) → flush → [msg[6]] (partition 0) and [msg[7]] (partition 1) expect(receivedBatches).toEqual([ - { topic, partition: 0, messages: [messages[0], messages[1]] }, - { topic, partition: 1, messages: [messages[3], messages[4]] }, - { topic, partition: 0, messages: [messages[2], messages[6]] }, - { topic, partition: 1, messages: [messages[5], messages[7]] }, + [messages[0], messages[1]], // partition 0 + [messages[2]], // partition 0 + [messages[3]], // partition 1 + [messages[4], messages[5]], // partition 1 + [messages[6]], // partition 0 + [messages[7]], // partition 1 ]) - }) - - it('should handle backpressure correctly when timeout flush is slow', async () => { - // Given - const topic = 'test-topic' - const messages = Array.from({ length: 6 }, (_, i) => ({ - id: i + 1, - content: `Message ${i + 1}`, - topic, - partition: 0, - })) - - const batchStartTimes: number[] = [] // Track start times of batch processing - const batchEndTimes: number[] = [] // Track end times of batch processing - const batchMessageCounts: number[] = [] // Track number of messages per batch - let maxConcurrentBatches = 0 // Track max concurrent batches - - let batchesProcessing = 0 - const batchStream = new KafkaMessageBatchStream( - async (batch) => { - batchStartTimes.push(Date.now()) - batchMessageCounts.push(batch.messages.length) - - batchesProcessing++ - maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing) - - // Simulate batch processing (50ms per batch) - await setTimeout(50) - batchEndTimes.push(Date.now()) - batchesProcessing-- - }, - { - batchSize: 1000, // Large batch size to never trigger size-based flushing - timeoutMilliseconds: 10, // Short timeout to trigger flush after each message - }, - ) - - // When: Write messages with 20ms delay between them - // Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation - for (const message of messages) { - batchStream.write(message) - await setTimeout(20) - } - - // Then - // Wait until all 3 batches have been processed - await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20) - - // Backpressure causes messages to accumulate while previous batch processes: - // - Batch 1: Message 1 (flushed at 10ms timeout) - // - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms) - // - Batch 3: Messages 5-6 (accumulated during Batch 2 processing) - expect(batchMessageCounts).toEqual([1, 3, 2]) - - // Verify that batches never processed in parallel (backpressure working) - expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time - - // Verify that batches were processed sequentially (each starts after previous ends) - for (let i = 1; i < batchStartTimes.length; i++) { - const previousEndTime = batchEndTimes[i - 1] - const currentStartTime = batchStartTimes[i] - // The current batch must start after the previous batch finished - expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0) - } + expect(messages[0]!.partition).toBe(0) + expect(messages[1]!.partition).toBe(0) + expect(messages[2]!.partition).toBe(0) + expect(messages[3]!.partition).toBe(1) + expect(messages[4]!.partition).toBe(1) + expect(messages[5]!.partition).toBe(1) + expect(messages[6]!.partition).toBe(0) + expect(messages[7]!.partition).toBe(1) }) }) From 8fed538b04dd73f0005855317a16b9b39ee24362 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 19:08:47 +0100 Subject: [PATCH 04/21] lint --- packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts | 8 ++++---- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 7 ++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 8de491e6..da1c157a 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -220,11 +220,11 @@ describe('KafkaMessageBatchStream', () => { // - write(msg[6], msg[7]) → flush → [msg[6]] (partition 0) and [msg[7]] (partition 1) expect(receivedBatches).toEqual([ [messages[0], messages[1]], // partition 0 - [messages[2]], // partition 0 - [messages[3]], // partition 1 + [messages[2]], // partition 0 + [messages[3]], // partition 1 [messages[4], messages[5]], // partition 1 - [messages[6]], // partition 0 - [messages[7]], // partition 1 + [messages[6]], // partition 0 + [messages[7]], // partition 1 ]) expect(messages[0]!.partition).toBe(0) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 0757082a..538d6318 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -13,13 +13,12 @@ export type KafkaMessageBatchOptions = { timeoutMilliseconds: number } - /** * Interface extending Duplex to provide strong typing for the 'data' event. * The stream emits arrays of messages grouped by topic-partition. */ export interface KafkaMessageBatchStream - extends Duplex { + extends Duplex { // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition on(event: string | symbol, listener: (...args: any[]) => void): this /** Listen for batches of messages from the same topic-partition */ @@ -53,7 +52,6 @@ export class KafkaMessageBatchStream `${topic}:${partition}` \ No newline at end of file +const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` From 6b516d423d47fe5174b620fc4e9ad9dcb51106dc Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 21 Jan 2026 19:20:30 +0100 Subject: [PATCH 05/21] Ajusting consumer --- packages/kafka/lib/AbstractKafkaConsumer.ts | 44 +++++++++++++-------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index cc69a009..076d4c8a 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -1,4 +1,5 @@ import { randomUUID } from 'node:crypto' +import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { InternalError, @@ -195,14 +196,16 @@ export abstract class AbstractKafkaConsumer< if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { this.messageBatchStream = new KafkaMessageBatchStream< DeserializedMessage> - >( - (batch) => - this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), - this.options.batchProcessingOptions, - ) - this.consumerStream.pipe(this.messageBatchStream) - } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + >({ + batchSize: this.options.batchProcessingOptions.batchSize, + timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, + }) + + // Use pipeline for better error handling and backpressure management + pipeline(this.consumerStream, this.messageBatchStream).catch((error) => { + this.logger.error('Stream pipeline failed') + this.handlerError(error) + }) } } catch (error) { throw new InternalError({ @@ -211,6 +214,12 @@ export abstract class AbstractKafkaConsumer< cause: error, }) } + + if (this.messageBatchStream) { + this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) + } else { + this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) + } } private async handleSyncStream( @@ -223,6 +232,13 @@ export abstract class AbstractKafkaConsumer< ) } } + private async handleSyncStreamBatch( + stream: KafkaMessageBatchStream>>, + ): Promise { + for await (const messageBatch of stream) { + await this.consume(messageBatch[0].topic, messageBatch) + } + } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { @@ -395,10 +411,7 @@ export abstract class AbstractKafkaConsumer< const errorContext = Array.isArray(messageOrBatch) ? { batchSize: messageOrBatch.length } : { message: stringValueSerializer(messageOrBatch.value) } - this.handlerError(error, { - topic, - ...errorContext, - }) + this.handlerError(error, { topic, ...errorContext }) } return { status: 'error', errorReason: 'handlerError' } @@ -443,7 +456,7 @@ export abstract class AbstractKafkaConsumer< } catch (error) { this.logger.debug(logDetails, 'Message commit failed') if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) - throw error + this.handlerError(error) } } @@ -455,7 +468,7 @@ export abstract class AbstractKafkaConsumer< error.apiCode && commitErrorCodesToIgnore.has(error.apiCode) ) { - this.logger.error( + this.logger.warn( { apiCode: error.apiCode, apiId: error.apiId, @@ -466,8 +479,7 @@ export abstract class AbstractKafkaConsumer< `Failed to commit message: ${error.message}`, ) } else { - // If error is not recognized, rethrow it - throw responseError + this.handlerError(error) } } } From 13aaacb2ddc3f6e77a9e7f5d299f1b077bdd7718 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Feb 2026 13:27:54 +0100 Subject: [PATCH 06/21] Improving logging --- packages/kafka/lib/AbstractKafkaService.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 1528ee90..93a59d31 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -20,6 +20,7 @@ import type { DeserializedMessage, KafkaConfig, KafkaDependencies, + RequestContext, SupportedMessageValues, TopicConfig, } from './types.ts' @@ -125,8 +126,17 @@ export abstract class AbstractKafkaService< } } - protected handlerError(error: unknown, context: Record = {}): void { - this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context }) - if (isError(error)) this.errorReporter.report({ error, context }) + protected handlerError( + error: unknown, + requestContext: RequestContext, + context: Record = {}, + ): void { + const logger = requestContext.logger ?? this.logger + logger.error({ ...resolveGlobalErrorLogObject(error), ...context }) + if (isError(error)) + this.errorReporter.report({ + error, + context: { ...context, 'x-request-id': requestContext.reqId }, + }) } } From 37b95749b7fe4980b315e5853dc13613569a7a52 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 18:59:05 +0100 Subject: [PATCH 07/21] erro handling --- packages/kafka/lib/AbstractKafkaService.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 93a59d31..78464300 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -126,17 +126,12 @@ export abstract class AbstractKafkaService< } } - protected handlerError( - error: unknown, - requestContext: RequestContext, - context: Record = {}, - ): void { - const logger = requestContext.logger ?? this.logger - logger.error({ ...resolveGlobalErrorLogObject(error), ...context }) + protected handlerError(error: unknown, context: Record = {}): void { + this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context }) if (isError(error)) this.errorReporter.report({ error, - context: { ...context, 'x-request-id': requestContext.reqId }, + context: context, }) } } From f3b2848c35556c890187cdca1d00abf309d4e7a1 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 18:59:29 +0100 Subject: [PATCH 08/21] Update kafka lib --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index eda1bc71..b311f8dc 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.26.0" + "@platformatic/kafka": "^1.28.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0", From 094ad19a6541d55d6df0817ab762c26bdc4be01d Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 18:59:53 +0100 Subject: [PATCH 09/21] Remove maxWait from test --- packages/kafka/test/consumer/PermissionConsumer.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kafka/test/consumer/PermissionConsumer.ts b/packages/kafka/test/consumer/PermissionConsumer.ts index b5ef9f4f..aa040e54 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.ts @@ -75,7 +75,6 @@ export class PermissionConsumer extends AbstractKafkaConsumer< handlerSpy: options.handlerSpy ?? true, headerRequestIdField: options.headerRequestIdField, messageIdField: options.messageIdField, - maxWaitTime: 5, }, { incrementAmount: 0, From 1814df8865343fdfac7e3f2e0fc12362471bbe3c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 19:08:31 +0100 Subject: [PATCH 10/21] Lint fix --- packages/kafka/lib/AbstractKafkaService.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 78464300..aa0d9275 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -20,7 +20,6 @@ import type { DeserializedMessage, KafkaConfig, KafkaDependencies, - RequestContext, SupportedMessageValues, TopicConfig, } from './types.ts' From f2adcea909e828072f379a9968b325513905b1c7 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 19:59:45 +0100 Subject: [PATCH 11/21] Revert "Remove maxWait from test" This reverts commit 094ad19a6541d55d6df0817ab762c26bdc4be01d. --- packages/kafka/test/consumer/PermissionConsumer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/test/consumer/PermissionConsumer.ts b/packages/kafka/test/consumer/PermissionConsumer.ts index aa040e54..b5ef9f4f 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.ts @@ -75,6 +75,7 @@ export class PermissionConsumer extends AbstractKafkaConsumer< handlerSpy: options.handlerSpy ?? true, headerRequestIdField: options.headerRequestIdField, messageIdField: options.messageIdField, + maxWaitTime: 5, }, { incrementAmount: 0, From 36dbdb595ba5a88e4db276cd939dcf479279ecd7 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 21:36:10 +0100 Subject: [PATCH 12/21] Coverage fix --- packages/kafka/lib/AbstractKafkaConsumer.ts | 9 +++++---- packages/kafka/vitest.config.ts | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index b354e9b4..20362f7b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -202,10 +202,9 @@ export abstract class AbstractKafkaConsumer< }) // Use pipeline for better error handling and backpressure management - pipeline(this.consumerStream, this.messageBatchStream).catch((error) => { - this.logger.error('Stream pipeline failed') - this.handlerError(error) - }) + pipeline(this.consumerStream, this.messageBatchStream).catch((error) => + this.handlerError(error), + ) } } catch (error) { throw new InternalError({ @@ -387,6 +386,7 @@ export abstract class AbstractKafkaConsumer< ): Promise { try { const isBatch = Array.isArray(messageOrBatch) + /* v8 ignore start */ if (this.options.batchProcessingEnabled && !isBatch) { throw new Error( 'Batch processing is enabled, but a single message was passed to the handler', @@ -397,6 +397,7 @@ export abstract class AbstractKafkaConsumer< 'Batch processing is disabled, but a batch of messages was passed to the handler', ) } + /* v8 ignore stop */ await handler( // We need casting to match message type with handler type - it is safe as we verify the type above diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts index 00c6ebbb..16786673 100644 --- a/packages/kafka/vitest.config.ts +++ b/packages/kafka/vitest.config.ts @@ -14,7 +14,7 @@ export default defineConfig({ exclude: ['vitest.config.ts', 'lib/**/index.ts'], thresholds: { lines: 93, - functions: 91, + functions: 88, branches: 85, statements: 92, }, From 719d19f1b6846564d634c81f2fd1301cb910a7cb Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 26 Feb 2026 21:36:25 +0100 Subject: [PATCH 13/21] Handling backpreassure on timeout + tests --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 139 ++++++++++++++++++ .../lib/utils/KafkaMessageBatchStream.ts | 17 ++- 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index da1c157a..20439242 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -236,4 +236,143 @@ describe('KafkaMessageBatchStream', () => { expect(messages[6]!.partition).toBe(0) expect(messages[7]!.partition).toBe(1) }) + + describe('backpressure', () => { + it('should pause writes when the readable buffer is full', async () => { + // With batchSize=1, each message is immediately flushed to the readable buffer. + // The objectMode readableHighWaterMark defaults to 16: + // push() returns false on the 16th flush, causing the _write callback to be held + // until _read() signals the downstream is ready again. + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + }) + + const processedmessages = new Set() + // Write 15 messages: push() returns true for each (buffer 1→15, below HWM of 16) + for (let i = 0; i < 15; i++) { + await new Promise((resolve) => + batchStream.write({ id: i, topic: 'test', partition: 0 }, () => { + processedmessages.add(i) + resolve() + }), + ) + } + + // 16th write fills the buffer to HWM: push() returns false, callback is held + let sixteenthWriteCompleted = false + batchStream.write({ id: 15, topic: 'test', partition: 0 }, () => { + processedmessages.add(15) + sixteenthWriteCompleted = true + }) + + await setTimeout(10) + expect(processedmessages.size).toEqual(15) + expect(sixteenthWriteCompleted).toBe(false) // write is paused by backpressure + + // Consuming one item triggers _read(), releasing the held callback + batchStream.read() + + await setTimeout(10) + expect(processedmessages.size).toEqual(16) + expect(sixteenthWriteCompleted).toBe(true) // write resumes + }) + + it('should deliver all messages without loss when the consumer is slow', async () => { + // 20 messages with batchSize=1 generates 20 batches. + // HWM is 16, so batches 17-20 are held until the consumer drains the buffer, + // verifying that backpressure does not cause message loss. + const topic = 'test-topic' + const totalMessages = 20 + const messages = Array.from({ length: totalMessages }, (_, i) => ({ + id: i, + topic, + partition: 0, + })) + + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + }) + + const receivedIds: number[] = [] + let resolveAll!: () => void + const allReceived = new Promise((resolve) => { + resolveAll = resolve + }) + + // Slow consumer: each batch takes longer than writes are produced + const consume = async () => { + for await (const batch of batchStream) { + await setTimeout(10) + for (const msg of batch) receivedIds.push(msg.id) + if (receivedIds.length >= totalMessages) { + resolveAll() + break + } + } + } + void consume() + + for (const msg of messages) batchStream.write(msg) + + await allReceived + expect(receivedIds).toHaveLength(totalMessages) + expect(receivedIds).toEqual(messages.map((m) => m.id)) + }) + + it('should defer timeout flush when backpressured and flush once consumer reads', () => { + vi.useFakeTimers() + + const topic = 'test-topic' + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1000, // large: only timeout-based flushes + timeoutMilliseconds: 100, + }) + + // Mock push() to simulate a full readable buffer on the first flush + let simulateBackpressure = true + const originalPush = batchStream.push.bind(batchStream) + vi.spyOn(batchStream, 'push').mockImplementation((chunk) => { + originalPush(chunk) // still push the data to the buffer + return !simulateBackpressure // return false to signal backpressure + }) + + // No 'data' listener: stream stays in paused mode so _read() is only + // triggered when batchStream.read() is called explicitly + + // Write 5 messages: they accumulate in this.messages, a timeout is scheduled + for (let i = 0; i < 5; i++) { + batchStream.write({ id: i, topic, partition: 0 }) + } + + // First timeout fires: flushMessages() → push() returns false → isBackPreassured = true + vi.advanceTimersByTime(100) + expect(batchStream.readableLength).toBe(1) // 1 batch buffered (the 5 messages) + + // Write 3 more messages: they accumulate in this.messages while backpressured + for (let i = 5; i < 8; i++) { + batchStream.write({ id: i, topic, partition: 0 }) + } + + // Second timeout fires: isBackPreassured is true → reschedules several times without flushing + vi.advanceTimersByTime(300) + expect(batchStream.readableLength).toBe(1) // unchanged: no new flush happened + + // Consumer reads one item → triggers _read() → isBackPreassured = false + simulateBackpressure = false + const firstBatch = batchStream.read() + expect(firstBatch).toHaveLength(5) + expect(batchStream.readableLength).toBe(0) + + // Third timeout fires: isBackPreassured is false → flushes the 3 accumulated messages + vi.advanceTimersByTime(100) + expect(batchStream.readableLength).toBe(1) // new batch pushed + + const secondBatch = batchStream.read() + expect(secondBatch).toHaveLength(3) // messages 5–7 + + vi.useRealTimers() + }) + }) }) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 538d6318..b84a9bad 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -51,12 +51,14 @@ export class KafkaMessageBatchStream this.flushMessages(), this.timeout) + return false + } + // Extract all accumulated messages and clear the array const messageBatch = this.messages.splice(0, this.messages.length) @@ -119,12 +127,17 @@ export class KafkaMessageBatchStream Date: Fri, 27 Feb 2026 11:38:47 +0100 Subject: [PATCH 14/21] Typo fix --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index b84a9bad..8ff897d7 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -51,14 +51,14 @@ export class KafkaMessageBatchStream this.flushMessages(), this.timeout) return false } @@ -136,7 +136,7 @@ export class KafkaMessageBatchStream Date: Fri, 27 Feb 2026 12:08:26 +0100 Subject: [PATCH 15/21] New readableHighWaterMark param --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 8ff897d7..58ba0a77 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -11,6 +11,14 @@ export type KafkaMessageBatchOptions = { batchSize: number /** Time in milliseconds to wait before flushing incomplete batches */ timeoutMilliseconds: number + /** + * Maximum number of topic-partition batches to buffer on the readable side before signaling backpressure. + * Each unit represents one array of messages belonging to the same topic-partition, produced per flush. + * A single flush may push multiple such arrays (one per distinct topic-partition in the accumulated batch). + * Defaults to Node.js object-mode default (16). Lower values trigger backpressure sooner, + * reducing downstream memory pressure at the cost of more frequent flow-control cycles. + */ + readableHighWaterMark?: number } /** @@ -48,13 +56,13 @@ export class KafkaMessageBatchStream Date: Fri, 27 Feb 2026 12:15:56 +0100 Subject: [PATCH 16/21] Adding test --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 20439242..f5dc220e 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -321,6 +321,32 @@ describe('KafkaMessageBatchStream', () => { expect(receivedIds).toEqual(messages.map((m) => m.id)) }) + it('should respect a custom readableHighWaterMark', async () => { + // With readableHighWaterMark=1 and batchSize=1, push() returns false after the + // very first item in the readable buffer, triggering backpressure immediately. + const batchStream = new KafkaMessageBatchStream({ + batchSize: 1, + timeoutMilliseconds: 10000, + readableHighWaterMark: 1, + }) + + // First write: flushed immediately, buffer reaches HWM=1, push() returns false + // → _write callback is held by backpressure + let firstWriteCompleted = false + batchStream.write({ id: 0, topic: 'test', partition: 0 }, () => { + firstWriteCompleted = true + }) + + await setTimeout(10) + expect(firstWriteCompleted).toBe(false) // held by backpressure + + // Consuming one item triggers _read(), releasing the held callback + batchStream.read() + + await setTimeout(10) + expect(firstWriteCompleted).toBe(true) // write resumes + }) + it('should defer timeout flush when backpressured and flush once consumer reads', () => { vi.useFakeTimers() From 87e8a1c888be5b0bdfb3c523cbf96ee406c0775c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 27 Feb 2026 12:16:59 +0100 Subject: [PATCH 17/21] Addressing final comment --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 58ba0a77..5d7602d8 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -110,8 +110,12 @@ export class KafkaMessageBatchStream skip them + // As they are not committed, the next consumer will process them + this.messages = [] callback() } From fb7e3558566fba8b413ec2937030b461907d995a Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 27 Feb 2026 12:34:55 +0100 Subject: [PATCH 18/21] AI comments --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 5d7602d8..edde1ffa 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -116,6 +116,7 @@ export class KafkaMessageBatchStream skip them // As they are not committed, the next consumer will process them this.messages = [] + this.push(null) callback() } From 876f49a7af921248513d03611de20d62d37db10d Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 27 Feb 2026 12:52:29 +0100 Subject: [PATCH 19/21] Error handling --- packages/kafka/lib/AbstractKafkaConsumer.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 20362f7b..61b17adc 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -191,7 +191,6 @@ export abstract class AbstractKafkaConsumer< }) this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics }) - this.consumerStream.on('error', (error) => this.handlerError(error)) if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { this.messageBatchStream = new KafkaMessageBatchStream< @@ -201,10 +200,13 @@ export abstract class AbstractKafkaConsumer< timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, }) - // Use pipeline for better error handling and backpressure management + // Use pipeline for better error handling and backpressure management. + // pipeline() internally listens for errors on all streams, so no separate pipeline(this.consumerStream, this.messageBatchStream).catch((error) => this.handlerError(error), ) + } else { + this.consumerStream.on('error', (error) => this.handlerError(error)) } } catch (error) { throw new InternalError({ From 983424f53292832915268f25c8d9b5bee16b024d Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 27 Feb 2026 13:06:43 +0100 Subject: [PATCH 20/21] Minor change --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 61b17adc..351f816c 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -198,6 +198,7 @@ export abstract class AbstractKafkaConsumer< >({ batchSize: this.options.batchProcessingOptions.batchSize, timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, + readableHighWaterMark: this.options.batchProcessingOptions.readableHighWaterMark, }) // Use pipeline for better error handling and backpressure management. From b68c633303719954d6c9e03c02805f1b07d4375c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 27 Feb 2026 14:40:58 +0100 Subject: [PATCH 21/21] typo --- packages/kafka/lib/AbstractKafkaConsumer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 351f816c..fa1948ef 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -202,7 +202,7 @@ export abstract class AbstractKafkaConsumer< }) // Use pipeline for better error handling and backpressure management. - // pipeline() internally listens for errors on all streams, so no separate + // pipeline() internally listens for errors on all streams pipeline(this.consumerStream, this.messageBatchStream).catch((error) => this.handlerError(error), )