Skip to content

[FLINK-39050][Table SQL/Planner] Support configurable handling strategy for null rowtime field in watermark generation#27554

Open
Myracle wants to merge 1 commit intoapache:masterfrom
Myracle:FLINK-39050-null-rowtime-for-watermark
Open

[FLINK-39050][Table SQL/Planner] Support configurable handling strategy for null rowtime field in watermark generation#27554
Myracle wants to merge 1 commit intoapache:masterfrom
Myracle:FLINK-39050-null-rowtime-for-watermark

Conversation

@Myracle
Copy link
Contributor

@Myracle Myracle commented Feb 9, 2026

What is the purpose of the change

This pull request introduces a new configuration option table.exec.source.rowtime-null-handling to handle null rowtime fields during watermark generation in Flink Table SQL. Previously, when the rowtime field was null, the WatermarkAssignerOperator would throw a RuntimeException, which could cause job failures in scenarios where null rowtime values are expected (e.g., CDC sources with missing timestamps).

This change provides users with three configurable strategies:

  • FAIL (default): Throw an exception (maintains backward compatibility)
  • DROP: Silently drop the record and increment a metric counter
  • SKIP_WATERMARK: Forward the record without advancing the watermark and increment a metric counter

Brief change log

  • Added new configuration option table.exec.source.rowtime-null-handling in ExecutionConfigOptions with enum RowtimeNullHandling
  • Modified WatermarkAssignerOperator to handle null rowtime fields based on the configured strategy
  • Added two new metrics: numNullRowtimeRecordsDropped and numNullRowtimeRecordsSkipped to track null rowtime handling
  • Extended WatermarkAssignerOperatorFactory to pass the null handling configuration
  • Updated StreamExecWatermarkAssigner to read and apply the configuration
  • Added comprehensive unit tests for all three strategies
  • Updated documentation (both English and Chinese) for the new configuration option

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • Added unit test testNullRowtimeWithFailStrategy to verify the FAIL strategy throws exception with helpful error message
  • Added unit test testNullRowtimeWithDropStrategy to verify records with null rowtime are dropped correctly
  • Added unit test testNullRowtimeDropMetricCounter to verify DROP strategy handles multiple null records
  • Added unit test testNullRowtimeSkipMetricCounter to verify SKIP_WATERMARK strategy forwards records without advancing watermark
  • Added unit test testNullRowtimeWithSkipWatermarkStrategy to verify watermark is not affected by null rowtime records when using SKIP_WATERMARK

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (added new @PublicEvolving enum RowtimeNullHandling and configuration option)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (added null check and switch statement in WatermarkAssignerOperator.processElement(), but impact is minimal as null check is a single field access)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs (Updated time_attributes.md in both English and Chinese, added configuration documentation in execution_config_configuration.html, and added JavaDoc for the new configuration option and enum)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 9, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

}

// Register metrics for null rowtime handling
if (rowtimeNullHandling == RowtimeNullHandling.DROP) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion/nit: alternatively, we could consider initializing both counters in open() regardless of the null-handling strategy. That would help decouple setup and usage and make this a bit more resilient to future changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I have done it. Thanks.

Copy link
Contributor

@rionmonster rionmonster left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look great overall — left one very minor comment. LGTM, approving!

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Feb 10, 2026
.linebreak()
.text(
"FAIL: Throw a runtime exception when encountering null rowtime (default). "
+ "This preserves the current behavior.")
Copy link
Contributor

@davidradl davidradl Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not make much sense to say "This preserves the current behavior" in the message - as " the current behaviour" could be read as something that is occurring in the flow - but you mean the previous behaviour of the code.

I suggest this option is the recommended option, as it is getting data that is not meeting what we expect for rowTime, implying that the flow needs to be cleaned up.

I assume that this scenario occurs when a source gets null in a column that should not be null or there is an error in the operator see here.

@alpinegizmo do you think this is a reasonable addition? If FAIL is not specified ,we either lose data or pass on data that might cause issues downstream.

Copy link
Contributor Author

@Myracle Myracle Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the thoughtful review! Your suggestion is valid and I agree with your points.
Changes made:

  1. Removed the ambiguous phrase - Replaced "This preserves the current behavior" as it could be misread as "something occurring in the flow" rather than "the previous default behavior of the code."
  2. Marked FAIL as recommended - Added "(default, recommended)" to the FAIL option description, as you correctly pointed out that null rowtime typically indicates data quality issues that need attention.
  3. Added risk warnings - Enhanced descriptions for DROP and SKIP_WATERMARK to clarify their potential implications:
  4. DROP: "Note that this may result in data loss."
  5. SKIP_WATERMARK: "Note that this may cause issues in downstream operators if they expect valid rowtime values."
  6. Added context about null rowtime scenarios - The FAIL description now explains: "A null rowtime typically indicates that the source is receiving unexpected null values in a column that should not be null, which suggests the data flow needs to be cleaned up."

The updated description now reads:

FAIL (default, recommended): Throw a runtime exception when encountering null rowtime. 
A null rowtime typically indicates that the source is receiving unexpected null values 
in a column that should not be null, which suggests the data flow needs to be cleaned up.

DROP: Drop the record silently. The 'numNullRowtimeRecordsDropped' metric will be incremented. 
Note that this may result in data loss.

SKIP_WATERMARK: Forward the record without advancing the watermark. 
The 'numNullRowtimeRecordsSkipped' metric will be incremented. 
Note that this may cause issues in downstream operators if they expect valid rowtime values.

I've also updated the HTML configuration documentation accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl Could you please take another look when you have a moment? Thanks!

…gy for null rowtime field in watermark generation
@Myracle Myracle force-pushed the FLINK-39050-null-rowtime-for-watermark branch from 0f3aa0d to 76cd937 Compare February 12, 2026 08:24
@Myracle
Copy link
Contributor Author

Myracle commented Mar 5, 2026

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants