Skip to content

feat: Improve kafka backpreassure#410

Open
CarlosGamero wants to merge 23 commits intomainfrom
feat/improve_kafka_backpreassure
Open

feat: Improve kafka backpreassure#410
CarlosGamero wants to merge 23 commits intomainfrom
feat/improve_kafka_backpreassure

Conversation

@CarlosGamero
Copy link
Collaborator

@CarlosGamero CarlosGamero commented Feb 26, 2026

Summary by CodeRabbit

  • New Features & Improvements

    • More robust Kafka batch processing with centralized error handling, backpressure support, and configurable batch size, timeout, and read high-water mark.
    • Batches are now emitted as events (arrays of messages) for simpler consumption.
  • Chores

    • Bumped Kafka package version.
  • Tests

    • Updated tests to the event-based batch API and lowered function coverage threshold.

@CarlosGamero CarlosGamero self-assigned this Feb 26, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors batching to a Duplex-based KafkaMessageBatchStream that emits TMessage[] via 'data' events, wires consumer stream through pipeline(...), adds handleSyncStreamBatch for async batch iteration, and centralizes error propagation through handlerError with adjusted commit error logging.

Changes

Cohort / File(s) Summary
Batch stream implementation & tests
packages/kafka/lib/utils/KafkaMessageBatchStream.ts, packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts
Replaced Transform with a Duplex-based KafkaMessageBatchStream; constructor now accepts options ({ batchSize, timeoutMilliseconds, readableHighWaterMark? }); emits batches as TMessage[] on 'data'; tests updated to consume .on('data', ...). Added _write/_read/_final, timeout batching and backpressure handling.
Consumer orchestration & error flow
packages/kafka/lib/AbstractKafkaConsumer.ts
Switched to pipeline(this.consumerStream, this.messageBatchStream); added handleSyncStreamBatch to iterate batches; unified error propagation to this.handlerError(error, { topic, ...errorContext }); replaced several throws with handlerError calls; downgraded known commit ProtocolErrors to warnings.
Minor config & formatting
packages/kafka/lib/AbstractKafkaService.ts, packages/kafka/package.json, packages/kafka/vitest.config.ts
Formatting change in error reporter call; bumped @platformatic/kafka ^1.26.0^1.28.0; lowered Vitest functions coverage threshold 9188.

Sequence Diagram(s)

sequenceDiagram
  participant ConsumerStream as ConsumerStream
  participant BatchStream as KafkaMessageBatchStream
  participant Consumer as AbstractKafkaConsumer
  participant Committer as Commit/Offsets
  participant ErrorHandler as handlerError

  ConsumerStream->>BatchStream: stream messages
  BatchStream->>Consumer: emit 'data' (TMessage[] batch)
  Consumer->>Consumer: handleSyncStreamBatch -> consume(batch)
  alt consume resolves
    Consumer->>Committer: commit offsets
    Committer-->>Consumer: commit result / ProtocolError?
    alt known ProtocolError
      Consumer->>ErrorHandler: warn log (handled)
    else unknown error
      Consumer->>ErrorHandler: handlerError(error, { topic, ...ctx })
    end
  else consume/commit throws
    Consumer->>ErrorHandler: handlerError(error, { topic, ...ctx })
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • kjamrog
  • kibertoad

Poem

🐇 I hopped through streams both old and new,
Batches bundled snug in arrays of few,
Pipelines hum and errors find their berth,
Commits whisper softly, keeping worth,
🥕 hop, emit, consume — a rabbit's view.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: Improve kafka backpreassure' directly describes the main objective of the PR - improving Kafka backpressure handling through stream-based improvements.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/improve_kafka_backpreassure

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@CarlosGamero CarlosGamero marked this pull request as ready for review February 26, 2026 20:36
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)

56-60: Validate batching options at construction time.

Fail fast for non-positive batchSize and negative/invalid timeoutMilliseconds to avoid undefined stream behavior.

Proposed guard
 constructor(options: KafkaMessageBatchOptions) {
   super({ objectMode: true })
+  if (!Number.isInteger(options.batchSize) || options.batchSize < 1) {
+    throw new Error('batchSize must be an integer >= 1')
+  }
+  if (!Number.isFinite(options.timeoutMilliseconds) || options.timeoutMilliseconds < 0) {
+    throw new Error('timeoutMilliseconds must be a finite number >= 0')
+  }
   this.batchSize = options.batchSize
   this.timeout = options.timeoutMilliseconds
   this.messages = []
   this.isBackPreassured = false
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts` around lines 56 - 60, In
the KafkaMessageBatchStream constructor validate the incoming
KafkaMessageBatchOptions: ensure options.batchSize is a finite integer > 0 and
options.timeoutMilliseconds is a finite number >= 0 (or absent if allowed), and
throw a clear RangeError/TypeError from the constructor if these checks fail;
update the constructor around KafkaMessageBatchStream, batchSize, timeout and
messages to perform these guards before assigning fields so invalid options
fail-fast.
packages/kafka/vitest.config.ts (1)

17-17: Please document why the function coverage gate was reduced.

A short inline note (or PR reference) helps prevent this from becoming an untracked baseline drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/vitest.config.ts` at line 17, Add an inline comment above the
coverage threshold setting (the "functions: 88" entry in the Vitest coverage
config) explaining why the function coverage gate was lowered, e.g., noting the
deliberate temporary relaxation, the related PR or issue number, and the
acceptance criteria or owner who must raise it later; update the comment to
include a TODO or ticket/PR reference so future reviewers can trace and revert
the change when appropriate.
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)

324-376: Please add a shutdown-under-backpressure regression test.

Add a case that calls end() while backpressured and with pending buffered messages, then asserts all buffered messages are emitted before EOF.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts` around lines 324 -
376, Add a regression test in KafkaMessageBatchStream.spec that simulates
shutdown while backpressured: reuse the existing backpressure setup
(vi.useFakeTimers(), create new KafkaMessageBatchStream, spyOn batchStream.push
to return false when simulateBackpressure is true), write some messages to
create a buffered batch, advance timers until the first flush sets
isBackPreassured (readableLength === 1), then write additional messages so they
sit in this.messages, call batchStream.end() while simulateBackpressure remains
true, then clear backpressure (simulateBackpressure = false), consume the
buffered batches (batchStream.read()) and assert all messages written before
end() are emitted in order and that subsequent reads return null/EOF; ensure to
advance timers as needed and restore real timers at the end.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts`:
- Around line 104-117: The _final() currently calls flushMessages() then
immediately pushes null, which can drop buffered this.messages if
flushMessages() returns false due to isBackPreassured; change the flow so EOF is
only signaled once buffered messages are actually flushed: have _final() call
flushMessages() and if it returns true then push(null) and invoke callback,
otherwise set an "ending" flag or store the final callback (e.g.,
this._finalCallback) and let flushMessages() trigger push(null) and call that
stored callback when it later finishes draining messages (ensure existingTimeout
logic still schedules retries with this.timeout and clear the stored callback
after invoking). This involves the methods/properties: _final, flushMessages,
isBackPreassured, existingTimeout, timeout, messages, and push(null).

---

Nitpick comments:
In `@packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts`:
- Around line 324-376: Add a regression test in KafkaMessageBatchStream.spec
that simulates shutdown while backpressured: reuse the existing backpressure
setup (vi.useFakeTimers(), create new KafkaMessageBatchStream, spyOn
batchStream.push to return false when simulateBackpressure is true), write some
messages to create a buffered batch, advance timers until the first flush sets
isBackPreassured (readableLength === 1), then write additional messages so they
sit in this.messages, call batchStream.end() while simulateBackpressure remains
true, then clear backpressure (simulateBackpressure = false), consume the
buffered batches (batchStream.read()) and assert all messages written before
end() are emitted in order and that subsequent reads return null/EOF; ensure to
advance timers as needed and restore real timers at the end.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts`:
- Around line 56-60: In the KafkaMessageBatchStream constructor validate the
incoming KafkaMessageBatchOptions: ensure options.batchSize is a finite integer
> 0 and options.timeoutMilliseconds is a finite number >= 0 (or absent if
allowed), and throw a clear RangeError/TypeError from the constructor if these
checks fail; update the constructor around KafkaMessageBatchStream, batchSize,
timeout and messages to perform these guards before assigning fields so invalid
options fail-fast.

In `@packages/kafka/vitest.config.ts`:
- Line 17: Add an inline comment above the coverage threshold setting (the
"functions: 88" entry in the Vitest coverage config) explaining why the function
coverage gate was lowered, e.g., noting the deliberate temporary relaxation, the
related PR or issue number, and the acceptance criteria or owner who must raise
it later; update the comment to include a TODO or ticket/PR reference so future
reviewers can trace and revert the change when appropriate.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87b1614 and 719d19f.

📒 Files selected for processing (6)
  • packages/kafka/lib/AbstractKafkaConsumer.ts
  • packages/kafka/lib/AbstractKafkaService.ts
  • packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts
  • packages/kafka/lib/utils/KafkaMessageBatchStream.ts
  • packages/kafka/package.json
  • packages/kafka/vitest.config.ts

Comment on lines 104 to 117
override _final(callback: CallbackFunction) {
this.flushMessages()
this.push(null) // Signal end-of-stream to the readable side
callback()
}

private async flushAllBatches() {
for (const key of Object.keys(this.currentBatchPerTopicPartition)) {
const { topic, partition } = splitTopicPartitionKey(key)
await this.flushCurrentBatchMessages(topic, partition)
private flushMessages(): boolean {
clearTimeout(this.existingTimeout)
this.existingTimeout = undefined

if (this.isBackPreassured) {
this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout)
return false
}
Copy link

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential buffered-message loss when ending under backpressure.

_final() closes the readable side unconditionally after flushMessages(). If isBackPreassured is true, flushMessages() returns early and buffered this.messages can be left behind before EOF.

Proposed fix
 override _final(callback: CallbackFunction) {
-  this.flushMessages()
+  this.flushMessages(true)
   this.push(null) // Signal end-of-stream to the readable side
   callback()
 }

-private flushMessages(): boolean {
+private flushMessages(force = false): boolean {
   clearTimeout(this.existingTimeout)
   this.existingTimeout = undefined

-  if (this.isBackPreassured) {
+  if (this.isBackPreassured && !force) {
     this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout)
     return false
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts` around lines 104 - 117,
The _final() currently calls flushMessages() then immediately pushes null, which
can drop buffered this.messages if flushMessages() returns false due to
isBackPreassured; change the flow so EOF is only signaled once buffered messages
are actually flushed: have _final() call flushMessages() and if it returns true
then push(null) and invoke callback, otherwise set an "ending" flag or store the
final callback (e.g., this._finalCallback) and let flushMessages() trigger
push(null) and call that stored callback when it later finishes draining
messages (ensure existingTimeout logic still schedules retries with this.timeout
and clear the stored callback after invoking). This involves the
methods/properties: _final, flushMessages, isBackPreassured, existingTimeout,
timeout, messages, and push(null).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CarlosGamero Technically the messages won't be committed if dropped, so data is not lost and the messages will be re-fetched.

Please confirm, that this is intended, because _final() is meant to actually flush the buffer, not drop it (though I guess sometimes it is better to drop it) 😄

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. After giving it some thought, in an end-of-stream scenario (e.g., a new release), I believe the safest approach is to ignore any remaining messages and allow them to be processed by the new service instead.

This way, we avoid adding extra load to the service while it is shutting down, reducing the risk of delays or instability during the transition.

If you agree, I will remove the flush from the final step.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it should be configurable then. Not sure that discarding messages in all cases is a good idea, but honestly, I wonder if the tradeof is worth it.

Comment on lines 104 to 117
override _final(callback: CallbackFunction) {
this.flushMessages()
this.push(null) // Signal end-of-stream to the readable side
callback()
}

private async flushAllBatches() {
for (const key of Object.keys(this.currentBatchPerTopicPartition)) {
const { topic, partition } = splitTopicPartitionKey(key)
await this.flushCurrentBatchMessages(topic, partition)
private flushMessages(): boolean {
clearTimeout(this.existingTimeout)
this.existingTimeout = undefined

if (this.isBackPreassured) {
this.existingTimeout = setTimeout(() => this.flushMessages(), this.timeout)
return false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CarlosGamero Technically the messages won't be committed if dropped, so data is not lost and the messages will be re-fetched.

Please confirm, that this is intended, because _final() is meant to actually flush the buffer, not drop it (though I guess sometimes it is better to drop it) 😄

// Flush all remaining batches when stream is closing
override async _flush(callback: () => void) {
await this.flushAllBatches()
override _final(callback: CallbackFunction) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the stream end covered by tests? Would probably be a good idea to cover this + test whether the remaining messages are actually flished after calling .end() or a stream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! it depends on the outcome of #410 (comment)

if you agree with me, then tests are not needed for _final

options: { batchSize: number; timeoutMilliseconds: number },
) {
constructor(options: KafkaMessageBatchOptions) {
super({ objectMode: true })
Copy link

@DrParanoia DrParanoia Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 The default readableHighWaterMark for objectMode is 16. This means that with a batchSize of 1000 we will potentially buffer ~16000 messages (16 batches) before we enter the "back-pressured" mode.

@CarlosGamero is this intended? Or maybe we can consider setting readableHighWaterMark to something lower?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This highly depend on the config params (eg for a batch size of 50, the default is too low), so I feel the right approach is to let users set what makes more sense for them for readableHighWaterMark. Going to add it as new config param for batch mode

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure if users need that low-level control though 😄 Won't it be confusing for people who don't dig deep enough?

But at least the ability to override it is better than having everything at default 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep :D

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)

23-41: ⚠️ Potential issue | 🟡 Minor

Close test streams to avoid dangling timers.

Multiple tests create KafkaMessageBatchStream instances with long timeouts but don't call .end() or .destroy(). Since these streams schedule flush timers that are never cancelled, they can outlive test completion and cause cross-test bleed and flakiness.

Wrap stream usage in try-finally and call .end() to clean up timers:

✅ Suggested pattern
-    const batchStream = new KafkaMessageBatchStream<any>({
+    const batchStream = new KafkaMessageBatchStream<any>({
       batchSize: 3,
       timeoutMilliseconds: 10000,
     }) // Setting big timeout to check batch size only
+
+    try {
       batchStream.on('data', (batch) => {
         receivedBatches.push(batch)
         // We expect 3 batches and the last message waiting in the stream
         if (receivedBatches.length >= 3) {
           resolvePromise()
         }
       })

       for (const message of messages) {
         batchStream.write(message)
       }

       await dataFetchingPromise

       // Then
       expect(receivedBatches).toEqual([
         [messages[0], messages[1], messages[2]],
         [messages[3], messages[4], messages[5]],
         [messages[6], messages[7], messages[8]],
       ])
+    } finally {
+      batchStream.end()
+    }

Affects tests at lines 23–48, 118–153, and others following the same pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts` around lines 23 -
41, The test creates KafkaMessageBatchStream instances (KafkaMessageBatchStream)
with long timeouts but never closes them, leaving flush timers running; update
the spec to wrap usage in a try/finally (or ensure cleanup) and call
stream.end() (or stream.destroy()) in the finally block after writing messages
and awaiting the data promise so the stream's internal timers are cancelled;
apply this pattern to the block that constructs KafkaMessageBatchStream, the
code that calls batchStream.write(...), and the on('data') handler to ensure no
dangling timers remain.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts`:
- Around line 112-120: The _final override in KafkaMessageBatchStream currently
clears timeouts and drops buffered messages but never signals EOF on the
readable side; update the override of _final(callback: CallbackFunction) in
class KafkaMessageBatchStream so that after clearing this.existingTimeout and
resetting this.messages it calls this.push(null) to end the readable side (then
invoke callback()), ensuring downstream consumers using pipeline() or async
iteration receive EOF and do not hang.

---

Outside diff comments:
In `@packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts`:
- Around line 23-41: The test creates KafkaMessageBatchStream instances
(KafkaMessageBatchStream) with long timeouts but never closes them, leaving
flush timers running; update the spec to wrap usage in a try/finally (or ensure
cleanup) and call stream.end() (or stream.destroy()) in the finally block after
writing messages and awaiting the data promise so the stream's internal timers
are cancelled; apply this pattern to the block that constructs
KafkaMessageBatchStream, the code that calls batchStream.write(...), and the
on('data') handler to ensure no dangling timers remain.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 719d19f and 87e8a1c.

📒 Files selected for processing (2)
  • packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts
  • packages/kafka/lib/utils/KafkaMessageBatchStream.ts

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
packages/kafka/lib/AbstractKafkaConsumer.ts (2)

236-242: Accessing messageBatch[0] without empty-array guard.

While current flushMessages() logic shouldn't emit empty arrays (the push loop only runs for non-empty topic-partition groups), this is an implicit contract. A defensive check would prevent runtime errors if the upstream behavior ever changes.

🛡️ Suggested defensive guard
   private async handleSyncStreamBatch(
     stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
   ): Promise<void> {
     for await (const messageBatch of stream) {
+      if (messageBatch.length === 0) continue
       await this.consume(messageBatch[0].topic, messageBatch)
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 236 - 242, The loop
in handleSyncStreamBatch assumes each KafkaMessageBatchStream item has at least
one message and directly accesses messageBatch[0], which can throw if an empty
array is ever emitted; update handleSyncStreamBatch to defensively check that
messageBatch.length > 0 before accessing messageBatch[0] (e.g., skip/continue
when empty) and then call this.consume(topic, messageBatch) only for non-empty
batches so consume, KafkaMessageBatchStream and messageBatch are protected from
empty-array cases.

203-223: Potential duplicate error reporting via pipeline and stream iteration handlers.

The pipeline .catch() and handleSyncStreamBatch/handleSyncStream .catch() handlers both listen to the same streams. When a stream error occurs, both may be triggered for the same error, causing handlerError to be called twice. The handlerError implementation (AbstractKafkaService.ts:128-133) has no deduplication logic—it will independently log and report each invocation. Consider using a flag or error tracking to prevent duplicate logging and error reporting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 203 - 223, The
pipeline and the stream-iteration catch handlers can both invoke handlerError
for the same stream error, causing duplicate reporting; fix by introducing a
simple deduplication flag on the consumer instance (e.g., this.errorHandled) and
use it to ensure handlerError is only acted-on once: set the flag immediately
before calling handlerError from any of the three call sites in
AbstractKafkaConsumer.ts (the pipeline .catch, the consumerStream 'error'
listener, and the .catch on handleSyncStreamBatch/handleSyncStream) or
alternatively implement the same guard inside handlerError in
AbstractKafkaService.ts (check and set this.errorHandled at the top), so
duplicate invocations are ignored. Ensure the flag is cleared/reinitialized
appropriately when restarting/reconnecting the consumer.
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)

91-110: Error propagation in _write may silently mask exceptions.

If flushMessages() throws, the finally block executes before the exception propagates, invoking callback() (since canContinue remains true). This signals success to the writable side while also throwing, causing inconsistent state.

Currently flushMessages() appears not to throw, but this pattern is fragile for future maintenance. Consider wrapping with explicit error handling:

💡 Suggested defensive fix
   override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) {
     let canContinue = true
+    let error: Error | undefined

     try {
       this.messages.push(message)

       if (this.messages.length >= this.batchSize) {
         canContinue = this.flushMessages()
       } else {
         this.existingTimeout ??= setTimeout(() => this.flushMessages(), this.timeout)
       }
+    } catch (e) {
+      error = e instanceof Error ? e : new Error(String(e))
     } finally {
-      if (!canContinue) this.pendingCallback = callback
-      else callback()
+      if (error) callback(error)
+      else if (!canContinue) this.pendingCallback = callback
+      else callback()
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts` around lines 91 - 110,
The _write method can mask exceptions from flushMessages because the finally
block always calls callback() on success; wrap the flushMessages()/setTimeout
logic in a try/catch so any thrown error is captured and passed to the writable
callback instead of signaling success. Specifically, in _write (method name
_write in KafkaMessageBatchStream) catch errors from flushMessages() (and any
push logic), store the caught error, and in the finally use callback(err) when
err is present; only set pendingCallback for backpressure when there is no
error. Also ensure existingTimeout, messages, batchSize and timeout behavior
remain unchanged when no error occurs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 195-210: The KafkaMessageBatchStream instantiation in
AbstractKafkaConsumer isn't passing through the optional readableHighWaterMark
from this.options.batchProcessingOptions; update the constructor call that
creates this.messageBatchStream (the new KafkaMessageBatchStream<...> block) to
include readableHighWaterMark (guarded if undefined) — either add
readableHighWaterMark: this.options.batchProcessingOptions.readableHighWaterMark
or spread the batchProcessingOptions into the ctor after selecting
batchSize/timeoutMilliseconds so the optional property is forwarded and types
remain satisfied.

---

Nitpick comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 236-242: The loop in handleSyncStreamBatch assumes each
KafkaMessageBatchStream item has at least one message and directly accesses
messageBatch[0], which can throw if an empty array is ever emitted; update
handleSyncStreamBatch to defensively check that messageBatch.length > 0 before
accessing messageBatch[0] (e.g., skip/continue when empty) and then call
this.consume(topic, messageBatch) only for non-empty batches so consume,
KafkaMessageBatchStream and messageBatch are protected from empty-array cases.
- Around line 203-223: The pipeline and the stream-iteration catch handlers can
both invoke handlerError for the same stream error, causing duplicate reporting;
fix by introducing a simple deduplication flag on the consumer instance (e.g.,
this.errorHandled) and use it to ensure handlerError is only acted-on once: set
the flag immediately before calling handlerError from any of the three call
sites in AbstractKafkaConsumer.ts (the pipeline .catch, the consumerStream
'error' listener, and the .catch on handleSyncStreamBatch/handleSyncStream) or
alternatively implement the same guard inside handlerError in
AbstractKafkaService.ts (check and set this.errorHandled at the top), so
duplicate invocations are ignored. Ensure the flag is cleared/reinitialized
appropriately when restarting/reconnecting the consumer.

In `@packages/kafka/lib/utils/KafkaMessageBatchStream.ts`:
- Around line 91-110: The _write method can mask exceptions from flushMessages
because the finally block always calls callback() on success; wrap the
flushMessages()/setTimeout logic in a try/catch so any thrown error is captured
and passed to the writable callback instead of signaling success. Specifically,
in _write (method name _write in KafkaMessageBatchStream) catch errors from
flushMessages() (and any push logic), store the caught error, and in the finally
use callback(err) when err is present; only set pendingCallback for backpressure
when there is no error. Also ensure existingTimeout, messages, batchSize and
timeout behavior remain unchanged when no error occurs.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87e8a1c and 876f49a.

📒 Files selected for processing (2)
  • packages/kafka/lib/AbstractKafkaConsumer.ts
  • packages/kafka/lib/utils/KafkaMessageBatchStream.ts

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 204-208: The inline comment above the pipeline call is truncated;
update the comment for clarity (near pipeline(this.consumerStream,
this.messageBatchStream).catch((error) => this.handlerError(error))) to finish
the sentence—e.g., state that pipeline() internally listens for errors on all
streams (including this.consumerStream and this.messageBatchStream) so no
separate error handler is required for the consumer stream, and that using
pipeline provides built-in error propagation and backpressure handling for the
AbstractKafkaConsumer's stream chain and handlerError invocation.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 876f49a and 983424f.

📒 Files selected for processing (1)
  • packages/kafka/lib/AbstractKafkaConsumer.ts

Comment on lines 204 to 208
// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams, so no separate
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete comment.

The comment appears to be truncated mid-sentence: "so no separate" doesn't complete the thought. Consider completing it, e.g., "so no separate error handler is needed for consumerStream."

📝 Suggested fix
-        // Use pipeline for better error handling and backpressure management.
-        // pipeline() internally listens for errors on all streams, so no separate
+        // Use pipeline for better error handling and backpressure management.
+        // pipeline() internally listens for errors on all streams, so no separate error handler is needed.
         pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
           this.handlerError(error),
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams, so no separate
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)
// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams, so no separate error handler is needed.
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 204 - 208, The
inline comment above the pipeline call is truncated; update the comment for
clarity (near pipeline(this.consumerStream,
this.messageBatchStream).catch((error) => this.handlerError(error))) to finish
the sentence—e.g., state that pipeline() internally listens for errors on all
streams (including this.consumerStream and this.messageBatchStream) so no
separate error handler is required for the consumer stream, and that using
pipeline provides built-in error propagation and backpressure handling for the
AbstractKafkaConsumer's stream chain and handlerError invocation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
packages/kafka/lib/AbstractKafkaConsumer.ts (1)

237-243: Consider adding a defensive guard for empty batches.

Line 241 accesses messageBatch[0].topic without checking if the batch is non-empty. While KafkaMessageBatchStream should only emit non-empty arrays by design (it groups messages by topic-partition before pushing), a defensive check would prevent potential runtime errors if the upstream behavior changes.

🛡️ Optional defensive guard
   private async handleSyncStreamBatch(
     stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
   ): Promise<void> {
     for await (const messageBatch of stream) {
+      if (messageBatch.length === 0) continue
       await this.consume(messageBatch[0].topic, messageBatch)
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 237 - 243, The loop
in handleSyncStreamBatch reads messageBatch[0].topic without ensuring
messageBatch is non-empty; add a defensive guard at the top of the for-await
loop to skip empty batches (e.g., if (!messageBatch || messageBatch.length ===
0) continue) so consume(topic, messageBatch) is only called with a non-empty
messageBatch; update the handleSyncStreamBatch implementation to perform this
check before deriving topic and invoking this.consume.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 237-243: The loop in handleSyncStreamBatch reads
messageBatch[0].topic without ensuring messageBatch is non-empty; add a
defensive guard at the top of the for-await loop to skip empty batches (e.g., if
(!messageBatch || messageBatch.length === 0) continue) so consume(topic,
messageBatch) is only called with a non-empty messageBatch; update the
handleSyncStreamBatch implementation to perform this check before deriving topic
and invoking this.consume.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 983424f and b68c633.

📒 Files selected for processing (1)
  • packages/kafka/lib/AbstractKafkaConsumer.ts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants