[FLINK-39050][Table SQL/Planner] Support configurable handling strategy for null rowtime field in watermark generation#27554
Conversation
| } | ||
|
|
||
| // Register metrics for null rowtime handling | ||
| if (rowtimeNullHandling == RowtimeNullHandling.DROP) { |
There was a problem hiding this comment.
Minor suggestion/nit: alternatively, we could consider initializing both counters in open() regardless of the null-handling strategy. That would help decouple setup and usage and make this a bit more resilient to future changes.
There was a problem hiding this comment.
Good suggestion. I have done it. Thanks.
rionmonster
left a comment
There was a problem hiding this comment.
Changes look great overall — left one very minor comment. LGTM, approving!
| .linebreak() | ||
| .text( | ||
| "FAIL: Throw a runtime exception when encountering null rowtime (default). " | ||
| + "This preserves the current behavior.") |
There was a problem hiding this comment.
It does not make much sense to say "This preserves the current behavior" in the message - as " the current behaviour" could be read as something that is occurring in the flow - but you mean the previous behaviour of the code.
I suggest this option is the recommended option, as it is getting data that is not meeting what we expect for rowTime, implying that the flow needs to be cleaned up.
I assume that this scenario occurs when a source gets null in a column that should not be null or there is an error in the operator see here.
@alpinegizmo do you think this is a reasonable addition? If FAIL is not specified ,we either lose data or pass on data that might cause issues downstream.
There was a problem hiding this comment.
Thank you for the thoughtful review! Your suggestion is valid and I agree with your points.
Changes made:
- Removed the ambiguous phrase - Replaced "This preserves the current behavior" as it could be misread as "something occurring in the flow" rather than "the previous default behavior of the code."
- Marked FAIL as recommended - Added "(default, recommended)" to the FAIL option description, as you correctly pointed out that null rowtime typically indicates data quality issues that need attention.
- Added risk warnings - Enhanced descriptions for DROP and SKIP_WATERMARK to clarify their potential implications:
- DROP: "Note that this may result in data loss."
- SKIP_WATERMARK: "Note that this may cause issues in downstream operators if they expect valid rowtime values."
- Added context about null rowtime scenarios - The FAIL description now explains: "A null rowtime typically indicates that the source is receiving unexpected null values in a column that should not be null, which suggests the data flow needs to be cleaned up."
The updated description now reads:
FAIL (default, recommended): Throw a runtime exception when encountering null rowtime.
A null rowtime typically indicates that the source is receiving unexpected null values
in a column that should not be null, which suggests the data flow needs to be cleaned up.
DROP: Drop the record silently. The 'numNullRowtimeRecordsDropped' metric will be incremented.
Note that this may result in data loss.
SKIP_WATERMARK: Forward the record without advancing the watermark.
The 'numNullRowtimeRecordsSkipped' metric will be incremented.
Note that this may cause issues in downstream operators if they expect valid rowtime values.
I've also updated the HTML configuration documentation accordingly.
There was a problem hiding this comment.
@davidradl Could you please take another look when you have a moment? Thanks!
…gy for null rowtime field in watermark generation
0f3aa0d to
76cd937
Compare
|
@flinkbot run azure |
What is the purpose of the change
This pull request introduces a new configuration option
table.exec.source.rowtime-null-handlingto handle null rowtime fields during watermark generation in Flink Table SQL. Previously, when the rowtime field was null, theWatermarkAssignerOperatorwould throw aRuntimeException, which could cause job failures in scenarios where null rowtime values are expected (e.g., CDC sources with missing timestamps).This change provides users with three configurable strategies:
FAIL(default): Throw an exception (maintains backward compatibility)DROP: Silently drop the record and increment a metric counterSKIP_WATERMARK: Forward the record without advancing the watermark and increment a metric counterBrief change log
table.exec.source.rowtime-null-handlinginExecutionConfigOptionswith enumRowtimeNullHandlingWatermarkAssignerOperatorto handle null rowtime fields based on the configured strategynumNullRowtimeRecordsDroppedandnumNullRowtimeRecordsSkippedto track null rowtime handlingWatermarkAssignerOperatorFactoryto pass the null handling configurationStreamExecWatermarkAssignerto read and apply the configurationVerifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
testNullRowtimeWithFailStrategyto verify the FAIL strategy throws exception with helpful error messagetestNullRowtimeWithDropStrategyto verify records with null rowtime are dropped correctlytestNullRowtimeDropMetricCounterto verify DROP strategy handles multiple null recordstestNullRowtimeSkipMetricCounterto verify SKIP_WATERMARK strategy forwards records without advancing watermarktestNullRowtimeWithSkipWatermarkStrategyto verify watermark is not affected by null rowtime records when using SKIP_WATERMARKDoes this pull request potentially affect one of the following parts:
@Public(Evolving): yes (added new@PublicEvolvingenumRowtimeNullHandlingand configuration option)WatermarkAssignerOperator.processElement(), but impact is minimal as null check is a single field access)Documentation
time_attributes.mdin both English and Chinese, added configuration documentation inexecution_config_configuration.html, and added JavaDoc for the new configuration option and enum)