Skip to content

feat: BufferExec waits for hash join dynamic filters before buffering#21350

Draft
LiaCastaneda wants to merge 6 commits intoapache:mainfrom
LiaCastaneda:lia/buffer-exec-dynamic-filter-20778
Draft

feat: BufferExec waits for hash join dynamic filters before buffering#21350
LiaCastaneda wants to merge 6 commits intoapache:mainfrom
LiaCastaneda:lia/buffer-exec-dynamic-filter-20778

Conversation

@LiaCastaneda
Copy link
Copy Markdown
Contributor

@LiaCastaneda LiaCastaneda commented Apr 3, 2026

BufferExec now walks its child subtree at execution time, collects any HashJoin dynamic filters, and waits for them to complete before the background task begins polling the input stream. This allows scans below BufferExec to benefit from dynamic filters rather than being eagerly read before the filter is populated.

Which issue does this PR close?

Closes #20778

Rationale for this change

BufferExec was introduced to eagerly buffer the probe side of hash joins (instead of waiting for the build to fully complete) but it starts buffering before the build side finishes, so dynamic filters produced by the hash join are not yet populated when the probe-side scan executes. This PR makes BufferExec wait for those filters before starting to buffer, allowing scans to still benefir from dynamic filtering.

What changes are included in this PR?

Adds a ProducerKind enum to DynamicFilterPhysicalExpr to distinguish filter producers (HashJoin, TopK, Aggregate). BufferExec now collects HashJoin dynamic filters from its child subtree at execution time and waits for them to be populated before starting to buffer, with a configurable timeout via hash_join_buffering_dynamic_filter_wait_ms. TopK and Aggregate filters are intentionally skipped to avoid deadlocks, since those filters are populated incrementally as rows flow through the plan.

Are these changes tested?

Yes, tests verify correct results when waiting for hash join dynamic filters and confirm no deadlock occurs when a TopK node appears below a hash join on the probe side.

Are there any user-facing changes?

A new option was added: hash_join_buffering_dynamic_filter_wait_ms

Adds ProducerKind to DynamicFilterPhysicalExpr to distinguish between
hash join (safe to wait), TopK, and Aggregate (would deadlock) producers.

BufferExec now walks its child subtree at execution time, collects any
HashJoin dynamic filters, and waits for them to complete before the
background task begins polling the input stream. This allows scans below
BufferExec to benefit from dynamic filters rather than being eagerly
read before the filter is populated.

A new session config option `hash_join_buffering_dynamic_filter_wait_ms`
controls the wait behavior (0 = disabled, usize::MAX = wait indefinitely,
any other value = timeout in ms). Defaults to usize::MAX.

Closes apache#20778
@github-actions github-actions bot added physical-expr Changes to the physical-expr crates core Core DataFusion crate common Related to common crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Apr 3, 2026
@@ -684,6 +684,17 @@ config_namespace! {
///
/// Disabled by default, set to a number greater than 0 for enabling it.
pub hash_join_buffering_capacity: usize, default = 0
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this by default? I think now it should be safe

if let Some(df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>()
&& df.producer_kind() == ProducerKind::HashJoin
{
filters.push(df.clone());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone should be cheap

let mut filters = vec![];
let _ = plan.apply_expressions(&mut |expr| {
if let Some(df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>()
&& df.producer_kind() == ProducerKind::HashJoin
Copy link
Copy Markdown
Contributor Author

@LiaCastaneda LiaCastaneda Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know who the producer is, otherwise if we wait on a dynamic filter from a TopK (or AggregateExec) we can enter a deadlock.

@LiaCastaneda LiaCastaneda force-pushed the lia/buffer-exec-dynamic-filter-20778 branch from 496a4dd to 0c4469f Compare April 3, 2026 15:51
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Apr 3, 2026
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Apr 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Handle DynamicFilters crossing BufferExec nodes

1 participant