Skip to content

[GSoC 2026] Kafka Streams runner — WatermarkManager part 1: in-memory per-source-partition tracking#38957

Open
junaiddshaukat wants to merge 1 commit into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-watermark-manager-part1
Open

[GSoC 2026] Kafka Streams runner — WatermarkManager part 1: in-memory per-source-partition tracking#38957
junaiddshaukat wants to merge 1 commit into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-watermark-manager-part1

Conversation

@junaiddshaukat

@junaiddshaukat junaiddshaukat commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Summary

First slice of the WatermarkManager, the prerequisite for any stateful transform (GroupByKey etc.). Too large for one PR, so split; this part is the in-memory core, decoupled from the Kafka wiring so it can be unit-tested in isolation. Plan agreed with @je-ik on Slack.

Design (agreed with @je-ik)

A stage's input watermark is min() over its upstream source partitions' committed watermarks. Tracking is keyed on source partitions, not producer
instances:

  • the total source-partition count travels in-band with every report, so the
    reader always knows how many it's waiting for;
  • a partition is owned by exactly one live instance, and on failure its
    partitions are reassigned and the new owner keeps reporting — so a killed
    instance never leaves the reader stuck (no instance liveness tracking, no
    describeConsumerGroups, no generationId needed).
    Validated in a standalone Kafka Streams PoC before implementation.

Scope (this PR)

  • WatermarkManager: observe(sourcePartition, committedWatermarkMillis, totalSourcePartitions); holds at BoundedWindow.TIMESTAMP_MIN_VALUE until
    every source partition has reported; then emits min(); output is clamped
    non-decreasing; a change in totalSourcePartitions re-opens the hold (the
    "revert" case, without an explicit epoch).
  • 9 unit tests: hold/emit, per-partition monotonicity, no-regression clamp,
    partition-count increase/decrease, argument validation.

Out of scope (later parts)

  • Part 2: wire into ExecutableStageProcessor — flush
    (sourcePartition, committedWatermark, totalSourcePartitions) atomically
    with the offset commit (EOS), fan out to all downstream partitions, consume
    • feed the manager, replace the provisional "flush on every watermark"
      behavior; real-Kafka integration tests over the 5 scenarios (steady,
      scale-out, clean scale-in, SIGKILL, partition reassignment).
  • Part 3: persistence / watermark holds and downstream timer firing once
    state + timers land.

Testing

./gradlew :runners:kafka-streams:check green; 9 new unit tests.

Fixes #38955
Refs #18479

First slice of the Kafka Streams runner WatermarkManager, decoupled from the
Kafka wiring so it can be unit-tested in isolation.

A stage's input watermark is min() over its upstream source partitions'
committed watermarks. Tracking is keyed on source partitions rather than
producer instances: the total source-partition count travels in-band with
every report, and a partition is always owned by exactly one live instance
(reassigned to a new owner on failure), so a killed instance never leaves the
reader stuck and no instance-liveness tracking is needed. This was validated
in a standalone Kafka Streams PoC before implementation.

WatermarkManager holds at BoundedWindow.TIMESTAMP_MIN_VALUE until every source
partition has reported, then emits min(); the emitted watermark is clamped to
be non-decreasing, and a change in totalSourcePartitions re-opens the hold
(the "revert" case, without an explicit epoch). Wiring into
ExecutableStageProcessor (flush coupled to the offset commit, fan-out to all
downstream partitions, real-Kafka integration tests) is a follow-up.

Refs apache#18479
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the core in-memory logic for the WatermarkManager, a critical component for enabling stateful transforms in the Kafka Streams runner. By decoupling watermark tracking from Kafka-specific infrastructure, the implementation provides a robust, testable mechanism for calculating stage input watermarks based on source partition reports. This is the first of a multi-part effort to fully integrate watermark propagation into the runner.

Highlights

  • WatermarkManager Implementation: Introduced the WatermarkManager class to track input watermarks for fused stages in the Kafka Streams runner, decoupled from Kafka-specific wiring.
  • Robust Partition Tracking: Implemented tracking based on source partitions rather than producer instances to ensure stability during rebalances and instance failures.
  • Watermark Logic: Added logic to maintain monotonicity, handle partition count changes, and enforce non-regression of the stage watermark.
  • Comprehensive Testing: Added 9 unit tests covering hold/emit behavior, monotonicity, partition count adjustments, and input validation.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the WatermarkManager class and its corresponding unit tests to track a fused stage's input watermark based on upstream source partitions. The reviewer identified a critical issue where a change in the total source partition count does not clear existing partition reports, which could lead to using stale watermarks and premature advancement. To resolve this, the reviewer suggested clearing the watermark map whenever the partition count changes to ensure fresh reports are collected.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +101 to +106
if (totalSourcePartitions != expectedSourcePartitionCount) {
expectedSourcePartitionCount = totalSourcePartitions;
// On a partition-count decrease, drop reports for partitions that no longer exist so
// completeness and min() are computed over the current partition set only.
committedWatermarkByPartition.keySet().removeIf(p -> p >= totalSourcePartitions);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When the totalSourcePartitions count changes (e.g., during a repartitioning or scaling event), the mapping and distribution of partitions change.

Currently, if the partition count decreases (e.g., from 4 to 2), the code only prunes partitions with indices >= totalSourcePartitions (line 105). However, if partitions 0 and 1 had already reported watermarks in the previous epoch, isComplete() will immediately return true without waiting for fresh reports from the new epoch. This can lead to using stale/outdated watermarks from the old partition layout, potentially causing the stage watermark to over-advance incorrectly.

To ensure correctness and adhere to the design goal of re-opening the hold until the new full set has reported, we should clear the committedWatermarkByPartition map whenever totalSourcePartitions changes. Since lastEmittedMillis protects the stage watermark from regressing, clearing the map is perfectly safe and guarantees we wait for a fresh report from every partition in the new layout.

Note: If you apply this change, you will also need to update the corresponding unit tests in WatermarkManagerTest.java (such as partitionCountIncreaseReopensHold and partitionCountDecreasePrunesStalePartitions) to ensure all partitions report after a partition count change.

Suggested change
if (totalSourcePartitions != expectedSourcePartitionCount) {
expectedSourcePartitionCount = totalSourcePartitions;
// On a partition-count decrease, drop reports for partitions that no longer exist so
// completeness and min() are computed over the current partition set only.
committedWatermarkByPartition.keySet().removeIf(p -> p >= totalSourcePartitions);
}
if (totalSourcePartitions != expectedSourcePartitionCount) {
expectedSourcePartitionCount = totalSourcePartitions;
// On a partition-count change, clear previous reports to ensure we wait for a fresh
// report from every partition in the new layout, avoiding stale watermarks.
committedWatermarkByPartition.clear();
}

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @kennknowles added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant