Skip to content

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326

Open
autophagy wants to merge 13 commits into
apache:masterfrom
autophagy:flink-39378-timers-2
Open

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326
autophagy wants to merge 13 commits into
apache:masterfrom
autophagy:flink-39378-timers-2

Conversation

@autophagy

@autophagy autophagy commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

This PR adds support for Context, OnTimerContext, TimeContext, timer registration and firing, watermark management, and rowtime support to the PTF Test Harness.

When a PTF registers a timer, the timer is stored in the TimerManager and keyed by the partition row. When the user advances the watermark using
either setWatermark or setWatermarkForTable, the manager then fires all pending timers below or equal to the watermark in a deterministic order.

I moved per-invocation state stuff into an InvocationContext to make separation of concerns a little easier to follow.

Rows emitted from test harness setups that configure an on_time column also contain the rowtime column, and the PTF rejects at the point of registration if the user tries to register a timer with PASS_COLUMNS_THROUGH
enabled (similar to live, per the PROCESS_INVALID_PASS_THROUGH_TIMERS test on live.)

Some open questions remain. One is that there's an edge case where if a user's onTimer registers a new timer at or before the current watermark, it then fires, and then registers,
then fires, etc. Should there be some sort of max depth check here?

The second is that when defining the on time parameter in SQL, you pass in DESCRIPTOR(ts), but in the harness you just pass in the string name of the column. Would it instead be better to support withOnTime("DESCRIPTOR(ts)") to better mirror SQL, rather than withOnTimeColumn("ts")?

Brief change log

  • Added support for Context, OnTimerContext, TimeContext in ProcessTableFunctionTestHarness
  • Added support for timer registration, watermark tracking and timer firing in ProcessTableFunctionTestHarness
  • Added an InvocationContext to capture per-invocation state to simplify the collector logic
  • Reworked derriving the output type from the system inference to account for on_time and uid
  • Fixed an issue in createStateConverter where the incorrect state converters were being used for Map/List state.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests to ProcessFunctionTestHarnessesTest to cover timer firing, watermark advancement and context.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

2.1.156 (Claude Code)

@flinkbot

flinkbot commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@autophagy autophagy marked this pull request as ready for review June 5, 2026 11:28
@fhueske fhueske changed the title Timers [FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness Jun 5, 2026
@autophagy autophagy force-pushed the flink-39378-timers-2 branch from b05a727 to efe9d3a Compare June 8, 2026 09:59

@fhueske fhueske left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @autophagy!

I've reviewed the changes except for the test (will do that later this week).
Overall, the changes are good. Left a bunch of comments with suggestions and questions.

Cheers, Fabian

Comment thread docs/content.zh/docs/dev/table/functions/ptfs.md Outdated
Comment thread docs/content.zh/docs/dev/table/functions/ptfs.md
Comment thread docs/content.zh/docs/dev/table/functions/ptfs.md
Comment thread docs/content.zh/docs/dev/table/functions/ptfs.md Outdated
Comment thread docs/content.zh/docs/dev/table/functions/ptfs.md
Comment on lines +1966 to +1971
Optional<List<StaticArgument>> staticArgsOpt = systemTypeInference.getStaticArguments();
List<StaticArgument> staticArgs =
staticArgsOpt.orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
Optional<List<StaticArgument>> staticArgsOpt = systemTypeInference.getStaticArguments();
List<StaticArgument> staticArgs =
staticArgsOpt.orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));
List<StaticArgument> staticArgs =
systemTypeInference.getStaticArguments().orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));

Can be folded?

* eligible to fire.
*/
@Internal
class TestHarnessTimerManager {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add a separate test for this class.

Comment on lines +102 to +104
if (name != null) {
timerSet.removeIf(t -> name.equals(t.name));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also remove unnamed timers.
There can only be one timer per timestamp and name (also if name = null).

@fhueske fhueske left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @autophagy, thanks for addressing most of my earlier comments!
I've had a look at the tests and they are pretty complete.
Left a few comments with suggestions to increase the coverage.

Cheers, Fabian

Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);

to check another codepath?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check with Long and java.sql.Timestamp?

Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
TableSemantics semantics = ctx.tableSemanticsFor("input");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to do the same check also for ctx.tableSemanticsFor() being called from the onTimer() method.

Comment on lines 321 to 326
@DataTypeHint("ROW<value INT>")
public static class PTFWithContext extends ProcessTableFunction<Row> {
public void eval(Context ctx, @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
collect(input);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this PTF (and the corresponding test) anymore.
Everything it covers is covered many times by all the new tests.

private final Class<TimeType> conversionClass;

TestTimeContext(Class<TimeType> conversionClass) {
this.conversionClass = conversionClass;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we eagerly check here if conversionClass is a supported class?


// Advance only left table past the timer — global watermark is still min(left, right)
// Since right has no watermark yet, global won't advance enough to fire
harness.setWatermarkForTable("rightTable", LocalDateTime.of(2025, 1, 1, 0, 0, 3));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also have tests that use the other supported WM data types?

LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

assertThat(harness.getPendingTimers()).isEmpty();
assertThat(harness.getFiredTimers()).hasSize(1);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also assert the specific fired timer (name + ts)?

.withOnTimeColumn("ts")
.build()) {

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 5));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check for setWatermarkForTable()?


harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

assertThrows(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if error message is meaningful for users?

.build()) {

assertThrows(
TableRuntimeException.class,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if error message is meaningful?

harness.clearOutput();

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput()).hasSize(1);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check that the right onTimer() method was called by checking the output row?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants