feat: BufferExec waits for hash join dynamic filters before buffering#21350
Draft
LiaCastaneda wants to merge 6 commits intoapache:mainfrom
Draft
feat: BufferExec waits for hash join dynamic filters before buffering#21350LiaCastaneda wants to merge 6 commits intoapache:mainfrom
LiaCastaneda wants to merge 6 commits intoapache:mainfrom
Conversation
Adds ProducerKind to DynamicFilterPhysicalExpr to distinguish between hash join (safe to wait), TopK, and Aggregate (would deadlock) producers. BufferExec now walks its child subtree at execution time, collects any HashJoin dynamic filters, and waits for them to complete before the background task begins polling the input stream. This allows scans below BufferExec to benefit from dynamic filters rather than being eagerly read before the filter is populated. A new session config option `hash_join_buffering_dynamic_filter_wait_ms` controls the wait behavior (0 = disabled, usize::MAX = wait indefinitely, any other value = timeout in ms). Defaults to usize::MAX. Closes apache#20778
LiaCastaneda
commented
Apr 3, 2026
| @@ -684,6 +684,17 @@ config_namespace! { | |||
| /// | |||
| /// Disabled by default, set to a number greater than 0 for enabling it. | |||
| pub hash_join_buffering_capacity: usize, default = 0 | |||
Contributor
Author
There was a problem hiding this comment.
Should we enable this by default? I think now it should be safe
LiaCastaneda
commented
Apr 3, 2026
| if let Some(df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() | ||
| && df.producer_kind() == ProducerKind::HashJoin | ||
| { | ||
| filters.push(df.clone()); |
Contributor
Author
There was a problem hiding this comment.
This clone should be cheap
LiaCastaneda
commented
Apr 3, 2026
| let mut filters = vec![]; | ||
| let _ = plan.apply_expressions(&mut |expr| { | ||
| if let Some(df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() | ||
| && df.producer_kind() == ProducerKind::HashJoin |
Contributor
Author
There was a problem hiding this comment.
We need to know who the producer is, otherwise if we wait on a dynamic filter from a TopK (or AggregateExec) we can enter a deadlock.
496a4dd to
0c4469f
Compare
…ble parquet pushdown_filters in tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
BufferExec now walks its child subtree at execution time, collects any HashJoin dynamic filters, and waits for them to complete before the background task begins polling the input stream. This allows scans below
BufferExecto benefit from dynamic filters rather than being eagerly read before the filter is populated.Which issue does this PR close?
Closes #20778
Rationale for this change
BufferExec was introduced to eagerly buffer the probe side of hash joins (instead of waiting for the build to fully complete) but it starts buffering before the build side finishes, so dynamic filters produced by the hash join are not yet populated when the probe-side scan executes. This PR makes BufferExec wait for those filters before starting to buffer, allowing scans to still benefir from dynamic filtering.
What changes are included in this PR?
Adds a ProducerKind enum to
DynamicFilterPhysicalExprto distinguish filter producers (HashJoin, TopK, Aggregate). BufferExec now collects HashJoin dynamic filters from its child subtree at execution time and waits for them to be populated before starting to buffer, with a configurable timeout viahash_join_buffering_dynamic_filter_wait_ms.TopKandAggregatefilters are intentionally skipped to avoid deadlocks, since those filters are populated incrementally as rows flow through the plan.Are these changes tested?
Yes, tests verify correct results when waiting for hash join dynamic filters and confirm no deadlock occurs when a TopK node appears below a hash join on the probe side.
Are there any user-facing changes?
A new option was added:
hash_join_buffering_dynamic_filter_wait_ms