Add rerun policy for rollups when an upstream partition is re-run#68778
Add rerun policy for rollups when an upstream partition is re-run#68778FrankYang0529 wants to merge 1 commit into
Conversation
640f119 to
ca171e8
Compare
When an upstream partition that a rollup's downstream window already consumed is cleared and re-run, the framework had no defined behavior. The de-facto outcome silently depended on the rollup's wait policy: WaitForAll left a provisional run stuck waiting for keys that never re-arrive, while MinimumCount re-fired the downstream run on partial data. Neither is something a Dag author can rely on. Give RollupMapper an explicit rerun_policy so the author chooses what happens. The default preserves the historical behavior, so existing Dags are unchanged; re-firing with the corrected data is opt-in. Signed-off-by: PoAn Yang <payang@apache.org>
ca171e8 to
5edcf3a
Compare
| # rerun_policy only applies to rollups (it decides what happens when an | ||
| # upstream key re-arrives after the window already fired). Non-rollup | ||
| # mappers have no such field; ``None`` keeps their legacy behavior. | ||
| rerun_policy = mapper.rerun_policy if is_rollup(mapper) else None |
There was a problem hiding this comment.
I don’t like this. The mapper should instead decide on its own whether the rerun policy applies to it or not. So rerun_policy should simply always be None on any mapper that is not a rollup (and emit an error if the user tries to set it to something else).
| rerun_policy=rerun_policy, | ||
| session=session, | ||
| ) | ||
| # IGNORE dropped this late event for an already-fired window. |
There was a problem hiding this comment.
This comment should be expanded somewhat.
| target_dag: DagModel, | ||
| rollup_fingerprint: dict, | ||
| asset_id: int, | ||
| rerun_policy: RerunPolicy | None = None, |
There was a problem hiding this comment.
Is a default needed here? This is a private function so we should control all its call sites and can change them directly.
| is_refresh = False | ||
| if latest_apdr is not None: | ||
| # The latest APDR already fired, so this event re-arrives for an | ||
| # already-materialized window. Apply the rollup's rerun policy. | ||
| if rerun_policy is RerunPolicy.IGNORE: | ||
| cls.logger().debug( | ||
| "Dropping re-arrived event for fired window key %s dag_id %s (IGNORE)", | ||
| target_key, | ||
| target_dag.dag_id, | ||
| ) | ||
| return None | ||
| is_refresh = rerun_policy is RerunPolicy.REFRESH |
There was a problem hiding this comment.
It’s probably better to make this a function on RerunPolicy instead, like this
if rerun_policy is None:
is_refresh = False
else:
is_refresh = rerun_policy.is_refresh(target_dag, target_key)Furthermore, it is worth considering whether instead of None, we can add a dedicated RerunPolicy.NONE instead. This avoids needing to special-case None every time the value is used.
| if not evaluator.run(timetable.asset_condition, statuses=statuses): | ||
| continue | ||
| contributing_assets = asset_info_per_apdr[apdr.id] | ||
| if apdr.is_refresh: |
There was a problem hiding this comment.
Instead of storing a simple boolean, it may be a good idea to store the retun policy directly on the model (as a string), and resolve the stored value to a full RerunPolicy object. That way, this block can be extracted into a function on RerunPolicy. Making the logic live on RerunPolicy is nicer because it is easier to keep track of possible enum values on the same class, and if we potentially add new values to the enum, it is much easier to remember to add a branch if the logic is close.
| wait_policy=decode_wait_policy(data["wait_policy"]), | ||
| # Default for serialized Dags written before rerun_policy existed — | ||
| # HOLD reproduces their pre-feature behavior (wait for the full window). | ||
| rerun_policy=data.get("rerun_policy", RerunPolicy.HOLD), |
There was a problem hiding this comment.
Comment is obvious. Remove it.
| upstream_mapper: PartitionMapper, | ||
| window: Window, | ||
| wait_policy: WaitPolicy | None = None, | ||
| rerun_policy: RerunPolicy | str = RerunPolicy.HOLD, |
There was a problem hiding this comment.
| rerun_policy: RerunPolicy | str = RerunPolicy.HOLD, | |
| rerun_policy: RerunPolicy = RerunPolicy.HOLD, |
No need to consider the str case since this is a core class, not user-facing. (See wait_policy.)
There was a problem hiding this comment.
A pre-commit check should be added to ensure the enum classes here and in sdk have the same cases.
When an upstream partition that a rollup's downstream window already consumed is cleared and re-run, the framework had no defined behavior. The de-facto outcome silently depended on the rollup's wait policy: WaitForAll left a provisional run stuck waiting for keys that never re-arrive, while MinimumCount re-fired the downstream run on partial data. Neither is something a Dag author can rely on.
Give RollupMapper an explicit rerun_policy so the author chooses what happens. The default preserves the historical behavior, so existing Dags are unchanged; re-firing with the corrected data is opt-in.
closes: #65923
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.