Skip to content

Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG#28025

Draft
raminqaf wants to merge 10 commits intoapache:masterfrom
raminqaf:FLINK-39537
Draft

Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG#28025
raminqaf wants to merge 10 commits intoapache:masterfrom
raminqaf:FLINK-39537

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FROM_CHANGELOG is currently locked to row semantics — each row is processed independently, with no way to co-locate rows for the same key in the same parallel operator instance. This is fine for stateless downstreams but limits use cases where the resulting changelog feeds into a stateful operator keyed on the same column.

This PR uses the conditional-traits machinery introduced for TO_CHANGELOG in FLINK-39392 to switch FROM_CHANGELOG to set semantics when the call provides PARTITION BY. Behavior without PARTITION BY is unchanged.

-- Row semantics (unchanged)
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream);                                                                          
                                                                                                                                  
-- Set semantics: planner inserts Exchange(hash[id])                                                                              
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id);                                                          

Brief change log

  • BuiltInFunctionDefinitions.FROM_CHANGELOG: input table argument adds withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy())
  • Added a plan test (FromChangelogTest#testSetSemanticsWithPartitionBy) verifying the Exchange(hash[...]) propagation
  • Added a semantic test program (FromChangelogTestPrograms#SET_SEMANTICS_PARTITION_BY) verifying end-to-end output equivalence
  • Updated SQL reference docs (docs/.../changelog.md) and Table#fromChangelog JavaDoc

Verifying this change

This change added tests and can be verified as follows:

  • FromChangelogTest#testSetSemanticsWithPartitionBy — plan test asserting the optimized rel plan contains Exchange(distribution=[hash[id]]) when PARTITION BY id is specified, and the output changelogMode propagates correctly
  • FromChangelogSemanticTests (via SET_SEMANTICS_PARTITION_BY program) — end-to-end run verifying that adding PARTITION BY changes he parallel execution layout but does NOT alter row-level output (same +I, -U, +U, -D sequence as the row-semantics tests)
  • All existing FromChangelogTest and FromChangelogSemanticTests cases continue to pass without modification, confirming the conditional trait does not regress the row-semantics path

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (only FROM_CHANGELOG's declared traits change; the function signature and Table#fromChangelog API are unchanged)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (runtime function FromChangelogFunction is unchanged)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs (docs/content/docs/sql/reference/queries/changelog.md) and JavaDocs
    (Table#fromChangelog)

Was generative AI tooling used to co-author this PR?

  • Yes (Claude Code / Opus 4.7)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 24, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants