diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0551cbbb15ae1..f090e657d0f2d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -684,6 +684,17 @@ config_namespace! { /// /// Disabled by default, set to a number greater than 0 for enabling it. pub hash_join_buffering_capacity: usize, default = 0 + + /// How long (in milliseconds) the probe-side buffer should wait for hash join dynamic + /// filters to be fully populated before starting to buffer data. + /// + /// When `hash_join_buffering_capacity` is enabled and dynamic filters are present below + /// the buffer, waiting for the filter to complete allows the input scan to benefit from + /// the filter and skip reading unnecessary data. + /// + /// Set to 0 to disable waiting (buffer immediately, same as previous behavior). + /// Set to `usize::MAX` to wait indefinitely until the filter is complete. + pub hash_join_buffering_dynamic_filter_wait_ms: usize, default = usize::MAX } } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index a255c07545829..ce3fb15098545 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -5146,3 +5146,243 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() { " ); } + +/// Helper: build and optimize a HashJoin plan with dynamic filter pushdown + buffering enabled. +/// Returns the optimized plan ready for execution. +fn build_buffered_hash_join_plan( + build_scan: Arc, + probe_scan: Arc, + build_schema: &SchemaRef, + probe_schema: &SchemaRef, + join_type: JoinType, + config: &ConfigOptions, +) -> Arc { + use datafusion_common::NullEquality; + use datafusion_physical_optimizer::hash_join_buffering::HashJoinBuffering; + + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + vec![( + col("a", build_schema).unwrap(), + col("a", probe_schema).unwrap(), + )], + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // Apply dynamic filter pushdown so the probe scan receives the dynamic filter + let plan = FilterPushdown::new_post_optimization() + .optimize(join, config) + .unwrap(); + + // Apply buffering so BufferExec wraps the probe side + HashJoinBuffering::new().optimize(plan, config).unwrap() +} + +/// Test that BufferExec correctly waits for a HashJoin dynamic filter before buffering, +/// and that the query produces correct results. +#[tokio::test] +async fn test_buffer_exec_waits_for_hash_join_dynamic_filter() { + let build_batches = + vec![record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Int32, [1, 2])).unwrap()]; + let build_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + // Delay the build scan so the filter is definitely not populated yet when + // the probe scan would start without waiting. With wait_ms=usize::MAX, + // BufferExec waits for the slow build to finish, so output_rows=2 proves + // that waiting actually occurred. + let build_scan = TestScanBuilder::new(Arc::clone(&build_schema)) + .with_support(true) + .with_batches(build_batches) + .with_open_delay_ms(20) + .build(); + + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac"]), + ("c", Float64, [1.0, 2.0, 3.0]) + ) + .unwrap(), + ]; + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + let probe_scan_ref = Arc::clone(&probe_scan); + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + config.execution.parquet.pushdown_filters = true; + config.execution.hash_join_buffering_capacity = 1024 * 1024; + config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX; + + let plan = build_buffered_hash_join_plan( + build_scan, + probe_scan, + &build_schema, + &probe_schema, + JoinType::Inner, + &config, + ); + + let session_config = SessionConfig::from(config); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let task_ctx = session_ctx.state().task_ctx(); + let _batches = collect(Arc::clone(&plan), task_ctx).await.unwrap(); + + // The probe scan had 3 rows but the dynamic filter pruned "ac", so only 2 rows + // were output. This verifies the filter was applied before the scan ran. + let probe_metrics = probe_scan_ref.metrics().unwrap(); + assert_eq!(probe_metrics.output_rows().unwrap(), 2); +} + +/// Test that when wait_ms=0, BufferExec does not wait for the dynamic filter, +/// so the probe scan outputs all rows (no pruning). The build scan has a delay +/// to ensure it hasn't finished by the time the probe scan runs. +#[tokio::test] +async fn test_buffer_exec_no_wait_probe_scan_reads_all_rows() { + let build_batches = + vec![record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Int32, [1, 2])).unwrap()]; + let build_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + // Delay a bit the build scan so the probe scan starts before the filter is populated. + let build_scan = TestScanBuilder::new(Arc::clone(&build_schema)) + .with_support(true) + .with_batches(build_batches) + .with_open_delay_ms(20) + .build(); + + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac"]), + ("c", Float64, [1.0, 2.0, 3.0]) + ) + .unwrap(), + ]; + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + let probe_scan_ref = Arc::clone(&probe_scan); + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + config.execution.parquet.pushdown_filters = true; + config.execution.hash_join_buffering_capacity = 1024 * 1024; + config.execution.hash_join_buffering_dynamic_filter_wait_ms = 0; + + let plan = build_buffered_hash_join_plan( + build_scan, + probe_scan, + &build_schema, + &probe_schema, + JoinType::Inner, + &config, + ); + + let session_config = SessionConfig::from(config); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let task_ctx = session_ctx.state().task_ctx(); + collect(plan, task_ctx).await.unwrap(); + + // Without waiting, the probe scan reads all 3 rows before the filter is populated. + let probe_metrics = probe_scan_ref.metrics().unwrap(); + assert_eq!(probe_metrics.output_rows().unwrap(), 3); +} + +/// Test that BufferExec + HashJoin with a TopK above does not deadlock. +/// The TopK filter is above BufferExec, so it is NOT in the child subtree +/// and will not be waited on. +#[tokio::test] +async fn test_buffer_exec_hash_join_with_topk_above_no_deadlock() { + let build_batches = vec![ + record_batch!(("a", Utf8, ["aa", "ab", "ac"]), ("b", Int32, [1, 2, 3])).unwrap(), + ]; + let build_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("c", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + config.execution.parquet.pushdown_filters = true; + config.execution.hash_join_buffering_capacity = 1024 * 1024; + config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX; + + let plan = build_buffered_hash_join_plan( + build_scan, + probe_scan, + &build_schema, + &probe_schema, + JoinType::Inner, + &config, + ); + + // Wrap with TopK (SortExec with fetch) above the join + let sort_expr = + PhysicalSortExpr::new(col("a", &plan.schema()).unwrap(), SortOptions::default()); + let topk = Arc::new( + SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), plan) + .with_fetch(Some(2)), + ) as Arc; + + let session_config = SessionConfig::from(config); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let task_ctx = session_ctx.state().task_ctx(); + + let batches = collect(topk, task_ctx).await.unwrap(); + + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); +} diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index e8c02d4efcfd4..7352ebf5fb462 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -54,10 +54,12 @@ pub struct TestOpener { batch_size: Option, projection: Option, predicate: Option>, + open_delay_ms: Option, } impl FileOpener for TestOpener { fn open(&self, _partitioned_file: PartitionedFile) -> Result { + let delay_ms = self.open_delay_ms; let mut batches = self.batches.clone(); if self.batches.is_empty() { return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed()); @@ -95,7 +97,13 @@ impl FileOpener for TestOpener { let stream = TestStream::new(batches); - Ok((async { Ok(stream.boxed()) }).boxed()) + Ok((async move { + if let Some(ms) = delay_ms { + tokio::time::sleep(std::time::Duration::from_millis(ms)).await; + } + Ok(stream.boxed()) + }) + .boxed()) } } @@ -109,6 +117,7 @@ pub struct TestSource { metrics: ExecutionPlanMetricsSet, projection: Option, table_schema: datafusion_datasource::TableSchema, + open_delay_ms: Option, } impl TestSource { @@ -122,6 +131,7 @@ impl TestSource { batch_size: None, projection: None, table_schema, + open_delay_ms: None, } } } @@ -138,6 +148,7 @@ impl FileSource for TestSource { batch_size: self.batch_size, projection: self.projection.clone(), predicate: self.predicate.clone(), + open_delay_ms: self.open_delay_ms, })) } @@ -266,6 +277,7 @@ pub struct TestScanBuilder { support: bool, batches: Vec, schema: SchemaRef, + open_delay_ms: Option, } impl TestScanBuilder { @@ -274,6 +286,7 @@ impl TestScanBuilder { support: false, batches: vec![], schema, + open_delay_ms: None, } } @@ -287,12 +300,16 @@ impl TestScanBuilder { self } + pub fn with_open_delay_ms(mut self, delay_ms: u64) -> Self { + self.open_delay_ms = Some(delay_ms); + self + } + pub fn build(self) -> Arc { - let source = Arc::new(TestSource::new( - Arc::clone(&self.schema), - self.support, - self.batches, - )); + let mut source = + TestSource::new(Arc::clone(&self.schema), self.support, self.batches); + source.open_delay_ms = self.open_delay_ms; + let source = Arc::new(source); let base_config = FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source) .with_file(PartitionedFile::new("test.parquet", 123)) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c81..6943c777b44f0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1588,7 +1588,7 @@ mod test { use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{Column, DynamicFilterPhysicalExpr, Literal, ProducerKind}, planner::logical2physical, projection::ProjectionExprs, }; @@ -1922,6 +1922,7 @@ mod test { Arc::new(DynamicFilterPhysicalExpr::new( expr.children().into_iter().map(Arc::clone).collect(), expr, + ProducerKind::HashJoin, )) } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..1a0bd3d0b9131 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -28,6 +28,26 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::DynHash; +/// Identifies which kind of operator produced a [`DynamicFilterPhysicalExpr`]. +/// +/// This is used by operators like `BufferExec` to decide whether it is safe +/// to wait for a dynamic filter to complete before starting execution. +/// Only [`ProducerKind::HashJoin`] filters are guaranteed to be fully populated +/// before the probe side starts, making them safe to wait on. +/// [`ProducerKind::TopK`] and [`ProducerKind::Aggregate`] filters are populated +/// incrementally during execution and waiting on them would cause a deadlock. +/// + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProducerKind { + /// Produced by a hash join build side. Safe to wait on — fully populated before probe starts. + HashJoin, + /// Produced by a TopK operator. Not safe to wait on — populated incrementally. + TopK, + /// Produced by an aggregate operator. Not safe to wait on — may never complete. + Aggregate, +} + /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum FilterState { @@ -55,7 +75,7 @@ impl FilterState { /// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog] /// /// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) @@ -74,6 +94,8 @@ pub struct DynamicFilterPhysicalExpr { /// But this can have overhead in production, so it's only included in our tests. data_type: Arc>>, nullable: Arc>>, + /// The kind of operator that produced this dynamic filter. + producer_kind: ProducerKind, } #[derive(Debug)] @@ -168,6 +190,7 @@ impl DynamicFilterPhysicalExpr { pub fn new( children: Vec>, inner: Arc, + producer_kind: ProducerKind, ) -> Self { let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); Self { @@ -177,6 +200,7 @@ impl DynamicFilterPhysicalExpr { state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), + producer_kind, } } @@ -289,6 +313,11 @@ impl DynamicFilterPhysicalExpr { let _ = rx.wait_for(|state| state.generation() > current_gen).await; } + /// Returns the [`ProducerKind`] of this dynamic filter. + pub fn producer_kind(&self) -> ProducerKind { + self.producer_kind + } + /// Wait asynchronously until this dynamic filter is marked as complete. /// /// This method returns immediately if the filter is already complete. @@ -372,6 +401,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), + producer_kind: self.producer_kind, })) } @@ -478,6 +508,7 @@ mod test { let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![col("a", &table_schema).unwrap()], expr as Arc, + ProducerKind::HashJoin, )); // Simulate two `ParquetSource` files with different filter schemas // Both of these should hit the same inner `PhysicalExpr` even after `update()` is called @@ -572,7 +603,11 @@ mod test { #[test] fn test_snapshot() { let expr = lit(42) as Arc; - let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + let dynamic_filter = DynamicFilterPhysicalExpr::new( + vec![], + Arc::clone(&expr), + ProducerKind::HashJoin, + ); // Take a snapshot of the current expression let snapshot = dynamic_filter.snapshot().unwrap(); @@ -588,8 +623,11 @@ mod test { #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { - let dynamic_filter = - DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc); + let dynamic_filter = DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + ProducerKind::HashJoin, + ); // First call to data_type and nullable should set the initial values. let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); @@ -632,6 +670,7 @@ mod test { let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![], lit(42) as Arc, + ProducerKind::HashJoin, )); // Mark as complete immediately @@ -667,6 +706,7 @@ mod test { let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![Arc::clone(&col_a), Arc::clone(&col_b)], expr as Arc, + ProducerKind::HashJoin, )); // Clone the Arc (two references to the same DynamicFilterPhysicalExpr) @@ -730,6 +770,7 @@ mod test { let filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![], lit(true) as Arc, + ProducerKind::HashJoin, )); // Initially, only one reference to the inner Arc exists @@ -776,8 +817,11 @@ mod test { use std::hash::{Hash, Hasher}; // Create filter with initial value - let filter = - DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + let filter = DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + ProducerKind::HashJoin, + ); // Compute hash BEFORE update let mut hasher_before = DefaultHasher::new(); @@ -810,10 +854,16 @@ mod test { #[test] fn test_identity_based_equality() { // Create two separate filters with identical initial expressions - let filter1 = - DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); - let filter2 = - DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + let filter1 = DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + ProducerKind::HashJoin, + ); + let filter2 = DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + ProducerKind::HashJoin, + ); // Different instances should NOT be equal even with same expression // because they have independent inner Arcs (different update lifecycles) @@ -833,8 +883,11 @@ mod test { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; - let filter = - DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + let filter = DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + ProducerKind::HashJoin, + ); // Compute hash twice for the same instance let hash1 = { diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..b88d52c2e617c 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, ProducerKind}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 24bf2265ff054..e8b0acaf4d1a4 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -52,7 +52,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + Column, DynamicFilterPhysicalExpr, ProducerKind, lit, +}; use datafusion_physical_expr::{ ConstExpr, EquivalenceProperties, physical_exprs_contains, }; @@ -1167,7 +1169,11 @@ impl AggregateExec { if !aggr_dyn_filters.is_empty() { self.dynamic_filter = Some(Arc::new(AggrDynFilter { - filter: Arc::new(DynamicFilterPhysicalExpr::new(all_cols, lit(true))), + filter: Arc::new(DynamicFilterPhysicalExpr::new( + all_cols, + lit(true), + ProducerKind::Aggregate, + )), supported_accumulators_info: aggr_dyn_filters, })) } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 0cc4a1d71814e..af4381be509b5 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -36,6 +36,7 @@ use datafusion_common::{Result, Statistics, internal_err, plan_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, ProducerKind}; use datafusion_physical_expr_common::metrics::{ ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet, }; @@ -191,10 +192,28 @@ impl ExecutionPlan for BufferExec { partition: usize, context: Arc, ) -> Result { + let wait_ms = context + .session_config() + .options() + .execution + .hash_join_buffering_dynamic_filter_wait_ms; let mem_reservation = MemoryConsumer::new(format!("BufferExec[{partition}]")) .register(context.memory_pool()); let in_stream = self.input.execute(partition, context)?; + // Collect hash join dynamic filters from the child subtree. + // We wait on these before starting to buffer so that the filter is fully populated + // before the background task begins polling the input, allowing the scan to benefit + // from the dynamic filter and skip reading unnecessary data. + // We only wait on HashJoin filters because they are guaranteed to be fully populated + // before the probe side starts. TopK and Aggregate filters are populated incrementally + // during execution and waiting on them would cause a deadlock. + let hash_join_filters = if wait_ms > 0 { + collect_hash_join_dynamic_filters(self.input.as_ref()) + } else { + vec![] + }; + // Set up the metrics for the stream. let curr_mem_in = Arc::new(AtomicUsize::new(0)); let curr_mem_out = Arc::clone(&curr_mem_in); @@ -226,8 +245,13 @@ impl ExecutionPlan for BufferExec { } }); // Buffer the input. - let out_stream = - MemoryBufferedStream::new(in_stream, self.capacity, mem_reservation); + let out_stream = MemoryBufferedStream::new( + in_stream, + self.capacity, + mem_reservation, + hash_join_filters, + wait_ms, + ); // Update in the metrics that when an element gets out, some memory gets freed. let out_stream = out_stream.inspect_ok(move |v| { curr_mem_out.fetch_sub(v.get_array_memory_size(), Ordering::Relaxed); @@ -298,6 +322,26 @@ impl ExecutionPlan for BufferExec { } } +/// Walks the plan tree rooted at `plan` and collects all [`DynamicFilterPhysicalExpr`]s +/// with [`ProducerKind::HashJoin`] found in expressions of any node in the subtree. +fn collect_hash_join_dynamic_filters( + plan: &dyn ExecutionPlan, +) -> Vec { + let mut filters = vec![]; + let _ = plan.apply_expressions(&mut |expr| { + if let Some(df) = expr.as_any().downcast_ref::() + && df.producer_kind() == ProducerKind::HashJoin + { + filters.push(df.clone()); + } + Ok(TreeNodeRecursion::Continue) + }); + for child in plan.children() { + filters.extend(collect_hash_join_dynamic_filters(child.as_ref())); + } + filters +} + /// Represents anything that occupies a capacity in a [MemoryBufferedStream]. pub trait SizedMessage { fn size(&self) -> usize; @@ -325,10 +369,14 @@ impl MemoryBufferedStream { /// Builds a new [MemoryBufferedStream] with the provided capacity and event handler. /// /// This immediately spawns a Tokio task that will start consumption of the input stream. + /// If `dynamic_filters` is non-empty, the task will wait for all of them to complete + /// before beginning to poll the input stream. pub fn new( mut input: impl Stream> + Unpin + Send + 'static, capacity: usize, memory_reservation: MemoryReservation, + dynamic_filters: Vec, + dynamic_filter_wait_ms: usize, ) -> Self { let semaphore = Arc::new(Semaphore::new(capacity)); let (batch_tx, batch_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -336,6 +384,19 @@ impl MemoryBufferedStream { let memory_reservation = Arc::new(memory_reservation); let memory_reservation_clone = Arc::clone(&memory_reservation); let task = SpawnedTask::spawn(async move { + // Wait for all hash join dynamic filters to be fully populated before + // starting to buffer, so the input scan can benefit from the filters. + for filter in &dynamic_filters { + if dynamic_filter_wait_ms == usize::MAX { + filter.wait_complete().await; + } else { + let _ = tokio::time::timeout( + std::time::Duration::from_millis(dynamic_filter_wait_ms as u64), + filter.wait_complete(), + ) + .await; + } + } loop { // Select on both the input stream and the channel being closed. // By down this, we abort polling the input as soon as the consumer channel is @@ -439,7 +500,7 @@ mod tests { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok); let (_, res) = memory_pool_and_reservation(); - let buffered = MemoryBufferedStream::new(input, 4, res); + let buffered = MemoryBufferedStream::new(input, 4, res, vec![], usize::MAX); wait_for_buffering().await; assert_eq!(buffered.messages_queued(), 2); Ok(()) @@ -450,7 +511,7 @@ mod tests { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok); let (_, res) = memory_pool_and_reservation(); - let mut buffered = MemoryBufferedStream::new(input, 10, res); + let mut buffered = MemoryBufferedStream::new(input, 10, res, vec![], usize::MAX); wait_for_buffering().await; assert_eq!(buffered.messages_queued(), 4); @@ -467,7 +528,7 @@ mod tests { let input = futures::stream::iter([25, 1, 2, 3]).map(Ok); let (_, res) = memory_pool_and_reservation(); - let mut buffered = MemoryBufferedStream::new(input, 10, res); + let mut buffered = MemoryBufferedStream::new(input, 10, res, vec![], usize::MAX); wait_for_buffering().await; assert_eq!(buffered.messages_queued(), 1); pull_ok_msg(&mut buffered).await?; @@ -479,7 +540,7 @@ mod tests { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok); let (_, res) = bounded_memory_pool_and_reservation(7); - let mut buffered = MemoryBufferedStream::new(input, 10, res); + let mut buffered = MemoryBufferedStream::new(input, 10, res, vec![], usize::MAX); wait_for_buffering().await; pull_ok_msg(&mut buffered).await?; @@ -496,7 +557,7 @@ mod tests { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok); let (_, res) = bounded_memory_pool_and_reservation(7); - let mut buffered = MemoryBufferedStream::new(input, 3, res); + let mut buffered = MemoryBufferedStream::new(input, 3, res, vec![], usize::MAX); wait_for_buffering().await; pull_ok_msg(&mut buffered).await?; @@ -519,7 +580,7 @@ mod tests { let input = futures::stream::iter([3, 3, 3, 3]).map(Ok); let (_, res) = memory_pool_and_reservation(); - let mut buffered = MemoryBufferedStream::new(input, 2, res); + let mut buffered = MemoryBufferedStream::new(input, 2, res, vec![], usize::MAX); wait_for_buffering().await; assert_eq!(buffered.messages_queued(), 1); pull_ok_msg(&mut buffered).await?; @@ -551,7 +612,7 @@ mod tests { }); let (_, res) = memory_pool_and_reservation(); - let mut buffered = MemoryBufferedStream::new(input, 10, res); + let mut buffered = MemoryBufferedStream::new(input, 10, res, vec![], usize::MAX); wait_for_buffering().await; pull_ok_msg(&mut buffered).await?; @@ -566,7 +627,7 @@ mod tests { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok); let (pool, res) = memory_pool_and_reservation(); - let mut buffered = MemoryBufferedStream::new(input, 10, res); + let mut buffered = MemoryBufferedStream::new(input, 10, res, vec![], usize::MAX); wait_for_buffering().await; assert_eq!(buffered.messages_queued(), 4); assert_eq!(pool.reserved(), 10); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index d064f5ce6c3b7..ad00062ba6e4c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -85,7 +85,9 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + Column, DynamicFilterPhysicalExpr, ProducerKind, lit, +}; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -837,7 +839,11 @@ impl HashJoinExec { // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); // Initialize with a placeholder expression (true) that will be updated when the hash table is built - Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + Arc::new(DynamicFilterPhysicalExpr::new( + right_keys, + lit(true), + ProducerKind::HashJoin, + )) } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 9cb4382e21020..7853b2296cde3 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -1200,7 +1200,8 @@ mod tests { use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{ - BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit, + BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, ProducerKind, binary, + col, lit, }; #[test] @@ -1780,6 +1781,7 @@ mod tests { let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( vec![Arc::clone(&col_a)], lit(true), + ProducerKind::HashJoin, )); // Initial state should be lit(true) let current = dynamic_filter.current()?; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 583bfa29b04ad..7bd7f721ffd21 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -67,7 +67,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, ProducerKind, lit, +}; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -982,7 +984,7 @@ impl SortExec { .map(|sort_expr| Arc::clone(&sort_expr.expr)) .collect::>(); Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(children, lit(true)), + DynamicFilterPhysicalExpr::new(children, lit(true), ProducerKind::TopK), )))) } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ab9249985b863..fb5d1f72fdaaf 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -1065,7 +1065,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; use datafusion_common::assert_batches_eq; - use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::expressions::{ProducerKind, col}; use futures::TryStreamExt; /// This test ensures the size calculation is correct for RecordBatches with multiple columns. @@ -1142,7 +1142,7 @@ mod tests { runtime, &metrics, Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![], lit(true)), + DynamicFilterPhysicalExpr::new(vec![], lit(true), ProducerKind::TopK), )))), )?; @@ -1215,7 +1215,11 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); // Create a dynamic filter that we'll check for completion - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(true), + ProducerKind::TopK, + )); let dynamic_filter_clone = Arc::clone(&dynamic_filter); // Create a TopK instance diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 978d79b1f2fb0..d54962b49d07c 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -1971,7 +1971,7 @@ mod tests { use datafusion_expr::{Expr, cast, is_null, try_cast}; use datafusion_functions_nested::expr_fn::{array_has, make_array}; use datafusion_physical_expr::expressions::{ - self as phys_expr, DynamicFilterPhysicalExpr, + self as phys_expr, DynamicFilterPhysicalExpr, ProducerKind, }; use datafusion_physical_expr::planner::logical2physical; use itertools::Itertools; @@ -2980,9 +2980,11 @@ mod tests { .iter() .map(|c| Arc::new(c.clone()) as Arc) .collect_vec(); - let dynamic_phys_expr = - Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr)) - as Arc; + let dynamic_phys_expr = Arc::new(DynamicFilterPhysicalExpr::new( + children, + phys_expr, + ProducerKind::HashJoin, + )) as Arc; // Simulate the partition value substitution that would happen in ParquetOpener let remapped_expr = dynamic_phys_expr .children() diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a334bd25b0ce3..bb89be275725e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -221,6 +221,7 @@ datafusion.execution.enable_ansi_mode false datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.hash_join_buffering_capacity 0 +datafusion.execution.hash_join_buffering_dynamic_filter_wait_ms 18446744073709551615 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true @@ -363,6 +364,7 @@ datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. +datafusion.execution.hash_join_buffering_dynamic_filter_wait_ms 18446744073709551615 How long (in milliseconds) the probe-side buffer should wait for hash join dynamic filters to be fully populated before starting to buffer data. When `hash_join_buffering_capacity` is enabled and dynamic filters are present below the buffer, waiting for the filter to complete allows the input scan to benefit from the filter and skip reading unnecessary data. Set to 0 to disable waiting (buffer immediately, same as previous behavior). Set to `usize::MAX` to wait indefinitely until the filter is complete. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69627e3cb9148..659408932b542 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -134,6 +134,7 @@ The following configuration settings are available: | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | | datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | +| datafusion.execution.hash_join_buffering_dynamic_filter_wait_ms | 18446744073709551615 | How long (in milliseconds) the probe-side buffer should wait for hash join dynamic filters to be fully populated before starting to buffer data. When `hash_join_buffering_capacity` is enabled and dynamic filters are present below the buffer, waiting for the filter to complete allows the input scan to benefit from the filter and skip reading unnecessary data. Set to 0 to disable waiting (buffer immediately, same as previous behavior). Set to `usize::MAX` to wait indefinitely until the filter is complete. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |