Skip to content
11 changes: 11 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,17 @@ config_namespace! {
///
/// Disabled by default, set to a number greater than 0 for enabling it.
pub hash_join_buffering_capacity: usize, default = 0
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this by default? I think now it should be safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion it should be opt-in as it increases memory usage and in future we want to make parallelism / pipelining scheduling probably more explicit.


/// How long (in milliseconds) the probe-side buffer should wait for hash join dynamic
/// filters to be fully populated before starting to buffer data.
///
/// When `hash_join_buffering_capacity` is enabled and dynamic filters are present below
/// the buffer, waiting for the filter to complete allows the input scan to benefit from
/// the filter and skip reading unnecessary data.
///
/// Set to 0 to disable waiting (buffer immediately, same as previous behavior).
/// Set to `usize::MAX` to wait indefinitely until the filter is complete.
pub hash_join_buffering_dynamic_filter_wait_ms: usize, default = usize::MAX
}
}

Expand Down
240 changes: 240 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5146,3 +5146,243 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
"
);
}

/// Helper: build and optimize a HashJoin plan with dynamic filter pushdown + buffering enabled.
/// Returns the optimized plan ready for execution.
fn build_buffered_hash_join_plan(
build_scan: Arc<dyn ExecutionPlan>,
probe_scan: Arc<dyn ExecutionPlan>,
build_schema: &SchemaRef,
probe_schema: &SchemaRef,
join_type: JoinType,
config: &ConfigOptions,
) -> Arc<dyn ExecutionPlan> {
use datafusion_common::NullEquality;
use datafusion_physical_optimizer::hash_join_buffering::HashJoinBuffering;

let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
vec![(
col("a", build_schema).unwrap(),
col("a", probe_schema).unwrap(),
)],
None,
&join_type,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Apply dynamic filter pushdown so the probe scan receives the dynamic filter
let plan = FilterPushdown::new_post_optimization()
.optimize(join, config)
.unwrap();

// Apply buffering so BufferExec wraps the probe side
HashJoinBuffering::new().optimize(plan, config).unwrap()
}

/// Test that BufferExec correctly waits for a HashJoin dynamic filter before buffering,
/// and that the query produces correct results.
#[tokio::test]
async fn test_buffer_exec_waits_for_hash_join_dynamic_filter() {
let build_batches =
vec![record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Int32, [1, 2])).unwrap()];
let build_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
]));
// Delay the build scan so the filter is definitely not populated yet when
// the probe scan would start without waiting. With wait_ms=usize::MAX,
// BufferExec waits for the slow build to finish, so output_rows=2 proves
// that waiting actually occurred.
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
.with_support(true)
.with_batches(build_batches)
.with_open_delay_ms(20)
.build();

let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];
let probe_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
.with_support(true)
.with_batches(probe_batches)
.build();
let probe_scan_ref = Arc::clone(&probe_scan);

let mut config = ConfigOptions::default();
config.optimizer.enable_dynamic_filter_pushdown = true;
config.execution.parquet.pushdown_filters = true;
config.execution.hash_join_buffering_capacity = 1024 * 1024;
config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX;

let plan = build_buffered_hash_join_plan(
build_scan,
probe_scan,
&build_schema,
&probe_schema,
JoinType::Inner,
&config,
);

let session_config = SessionConfig::from(config);
let session_ctx = SessionContext::new_with_config(session_config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();
let _batches = collect(Arc::clone(&plan), task_ctx).await.unwrap();

// The probe scan had 3 rows but the dynamic filter pruned "ac", so only 2 rows
// were output. This verifies the filter was applied before the scan ran.
let probe_metrics = probe_scan_ref.metrics().unwrap();
assert_eq!(probe_metrics.output_rows().unwrap(), 2);
}

/// Test that when wait_ms=0, BufferExec does not wait for the dynamic filter,
/// so the probe scan outputs all rows (no pruning). The build scan has a delay
/// to ensure it hasn't finished by the time the probe scan runs.
#[tokio::test]
async fn test_buffer_exec_no_wait_probe_scan_reads_all_rows() {
let build_batches =
vec![record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Int32, [1, 2])).unwrap()];
let build_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
]));
// Delay a bit the build scan so the probe scan starts before the filter is populated.
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
.with_support(true)
.with_batches(build_batches)
.with_open_delay_ms(20)
.build();

let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];
let probe_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
.with_support(true)
.with_batches(probe_batches)
.build();
let probe_scan_ref = Arc::clone(&probe_scan);

let mut config = ConfigOptions::default();
config.optimizer.enable_dynamic_filter_pushdown = true;
config.execution.parquet.pushdown_filters = true;
config.execution.hash_join_buffering_capacity = 1024 * 1024;
config.execution.hash_join_buffering_dynamic_filter_wait_ms = 0;

let plan = build_buffered_hash_join_plan(
build_scan,
probe_scan,
&build_schema,
&probe_schema,
JoinType::Inner,
&config,
);

let session_config = SessionConfig::from(config);
let session_ctx = SessionContext::new_with_config(session_config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();
collect(plan, task_ctx).await.unwrap();

// Without waiting, the probe scan reads all 3 rows before the filter is populated.
let probe_metrics = probe_scan_ref.metrics().unwrap();
assert_eq!(probe_metrics.output_rows().unwrap(), 3);
}

/// Test that BufferExec + HashJoin with a TopK above does not deadlock.
/// The TopK filter is above BufferExec, so it is NOT in the child subtree
/// and will not be waited on.
#[tokio::test]
async fn test_buffer_exec_hash_join_with_topk_above_no_deadlock() {
let build_batches = vec![
record_batch!(("a", Utf8, ["aa", "ab", "ac"]), ("b", Int32, [1, 2, 3])).unwrap(),
];
let build_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
.with_support(true)
.with_batches(build_batches)
.build();

let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("c", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap(),
];
let probe_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

let mut config = ConfigOptions::default();
config.optimizer.enable_dynamic_filter_pushdown = true;
config.execution.parquet.pushdown_filters = true;
config.execution.hash_join_buffering_capacity = 1024 * 1024;
config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX;

let plan = build_buffered_hash_join_plan(
build_scan,
probe_scan,
&build_schema,
&probe_schema,
JoinType::Inner,
&config,
);

// Wrap with TopK (SortExec with fetch) above the join
let sort_expr =
PhysicalSortExpr::new(col("a", &plan.schema()).unwrap(), SortOptions::default());
let topk = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), plan)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let session_config = SessionConfig::from(config);
let session_ctx = SessionContext::new_with_config(session_config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();

let batches = collect(topk, task_ctx).await.unwrap();

assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
}
29 changes: 23 additions & 6 deletions datafusion/core/tests/physical_optimizer/pushdown_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ pub struct TestOpener {
batch_size: Option<usize>,
projection: Option<ProjectionExprs>,
predicate: Option<Arc<dyn PhysicalExpr>>,
open_delay_ms: Option<u64>,
}

impl FileOpener for TestOpener {
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let delay_ms = self.open_delay_ms;
let mut batches = self.batches.clone();
if self.batches.is_empty() {
return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
Expand Down Expand Up @@ -95,7 +97,13 @@ impl FileOpener for TestOpener {

let stream = TestStream::new(batches);

Ok((async { Ok(stream.boxed()) }).boxed())
Ok((async move {
if let Some(ms) = delay_ms {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
}
Ok(stream.boxed())
})
.boxed())
}
}

Expand All @@ -109,6 +117,7 @@ pub struct TestSource {
metrics: ExecutionPlanMetricsSet,
projection: Option<ProjectionExprs>,
table_schema: datafusion_datasource::TableSchema,
open_delay_ms: Option<u64>,
}

impl TestSource {
Expand All @@ -122,6 +131,7 @@ impl TestSource {
batch_size: None,
projection: None,
table_schema,
open_delay_ms: None,
}
}
}
Expand All @@ -138,6 +148,7 @@ impl FileSource for TestSource {
batch_size: self.batch_size,
projection: self.projection.clone(),
predicate: self.predicate.clone(),
open_delay_ms: self.open_delay_ms,
}))
}

Expand Down Expand Up @@ -266,6 +277,7 @@ pub struct TestScanBuilder {
support: bool,
batches: Vec<RecordBatch>,
schema: SchemaRef,
open_delay_ms: Option<u64>,
}

impl TestScanBuilder {
Expand All @@ -274,6 +286,7 @@ impl TestScanBuilder {
support: false,
batches: vec![],
schema,
open_delay_ms: None,
}
}

Expand All @@ -287,12 +300,16 @@ impl TestScanBuilder {
self
}

pub fn with_open_delay_ms(mut self, delay_ms: u64) -> Self {
self.open_delay_ms = Some(delay_ms);
self
}

pub fn build(self) -> Arc<dyn ExecutionPlan> {
let source = Arc::new(TestSource::new(
Arc::clone(&self.schema),
self.support,
self.batches,
));
let mut source =
TestSource::new(Arc::clone(&self.schema), self.support, self.batches);
source.open_delay_ms = self.open_delay_ms;
let source = Arc::new(source);
let base_config =
FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source)
.with_file(PartitionedFile::new("test.parquet", 123))
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ mod test {
use datafusion_expr::{col, lit};
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
expressions::{Column, DynamicFilterPhysicalExpr, Literal, ProducerKind},
planner::logical2physical,
projection::ProjectionExprs,
};
Expand Down Expand Up @@ -1922,6 +1922,7 @@ mod test {
Arc::new(DynamicFilterPhysicalExpr::new(
expr.children().into_iter().map(Arc::clone).collect(),
expr,
ProducerKind::HashJoin,
))
}

Expand Down
Loading
Loading