Skip to content

Add rerun policy for rollups when an upstream partition is re-run#68778

Open
FrankYang0529 wants to merge 1 commit into
apache:mainfrom
FrankYang0529:airflow-65923
Open

Add rerun policy for rollups when an upstream partition is re-run#68778
FrankYang0529 wants to merge 1 commit into
apache:mainfrom
FrankYang0529:airflow-65923

Conversation

@FrankYang0529

Copy link
Copy Markdown
Member

When an upstream partition that a rollup's downstream window already consumed is cleared and re-run, the framework had no defined behavior. The de-facto outcome silently depended on the rollup's wait policy: WaitForAll left a provisional run stuck waiting for keys that never re-arrive, while MinimumCount re-fired the downstream run on partial data. Neither is something a Dag author can rely on.

Give RollupMapper an explicit rerun_policy so the author chooses what happens. The default preserves the historical behavior, so existing Dags are unchanged; re-firing with the corrected data is opt-in.

closes: #65923

Was generative AI tooling used to co-author this PR?
  • Yes - Claude Code with Opus 4.8

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

When an upstream partition that a rollup's downstream window already
consumed is cleared and re-run, the framework had no defined behavior.
The de-facto outcome silently depended on the rollup's wait policy:
WaitForAll left a provisional run stuck waiting for keys that never
re-arrive, while MinimumCount re-fired the downstream run on partial
data. Neither is something a Dag author can rely on.

Give RollupMapper an explicit rerun_policy so the author chooses what
happens. The default preserves the historical behavior, so existing
Dags are unchanged; re-firing with the corrected data is opt-in.

Signed-off-by: PoAn Yang <payang@apache.org>
Comment on lines +648 to +651
# rerun_policy only applies to rollups (it decides what happens when an
# upstream key re-arrives after the window already fired). Non-rollup
# mappers have no such field; ``None`` keeps their legacy behavior.
rerun_policy = mapper.rerun_policy if is_rollup(mapper) else None

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t like this. The mapper should instead decide on its own whether the rerun policy applies to it or not. So rerun_policy should simply always be None on any mapper that is not a rollup (and emit an error if the user tries to set it to something else).

rerun_policy=rerun_policy,
session=session,
)
# IGNORE dropped this late event for an already-fired window.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be expanded somewhat.

target_dag: DagModel,
rollup_fingerprint: dict,
asset_id: int,
rerun_policy: RerunPolicy | None = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a default needed here? This is a private function so we should control all its call sites and can change them directly.

Comment on lines +730 to +741
is_refresh = False
if latest_apdr is not None:
# The latest APDR already fired, so this event re-arrives for an
# already-materialized window. Apply the rollup's rerun policy.
if rerun_policy is RerunPolicy.IGNORE:
cls.logger().debug(
"Dropping re-arrived event for fired window key %s dag_id %s (IGNORE)",
target_key,
target_dag.dag_id,
)
return None
is_refresh = rerun_policy is RerunPolicy.REFRESH

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s probably better to make this a function on RerunPolicy instead, like this

if rerun_policy is None:
    is_refresh = False
else:
    is_refresh = rerun_policy.is_refresh(target_dag, target_key)

Furthermore, it is worth considering whether instead of None, we can add a dedicated RerunPolicy.NONE instead. This avoids needing to special-case None every time the value is used.

if not evaluator.run(timetable.asset_condition, statuses=statuses):
continue
contributing_assets = asset_info_per_apdr[apdr.id]
if apdr.is_refresh:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of storing a simple boolean, it may be a good idea to store the retun policy directly on the model (as a string), and resolve the stored value to a full RerunPolicy object. That way, this block can be extracted into a function on RerunPolicy. Making the logic live on RerunPolicy is nicer because it is easier to keep track of possible enum values on the same class, and if we potentially add new values to the enum, it is much easier to remember to add a branch if the logic is close.

Comment on lines 226 to +229
wait_policy=decode_wait_policy(data["wait_policy"]),
# Default for serialized Dags written before rerun_policy existed —
# HOLD reproduces their pre-feature behavior (wait for the full window).
rerun_policy=data.get("rerun_policy", RerunPolicy.HOLD),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is obvious. Remove it.

upstream_mapper: PartitionMapper,
window: Window,
wait_policy: WaitPolicy | None = None,
rerun_policy: RerunPolicy | str = RerunPolicy.HOLD,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rerun_policy: RerunPolicy | str = RerunPolicy.HOLD,
rerun_policy: RerunPolicy = RerunPolicy.HOLD,

No need to consider the str case since this is a core class, not user-facing. (See wait_policy.)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pre-commit check should be added to ensure the enum classes here and in sdk have the same cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Idempotency: downstream rollup behavior when an upstream partition is cleared and re-run

2 participants