From 6eff5503672506aa9e96627e40b577937bb32151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 2 Apr 2026 21:27:52 +0200 Subject: [PATCH 1/3] Defer task spawning in SortPreservingMergeExec to first poll Previously, SortPreservingMergeExec eagerly executed all input partitions and spawned buffered tasks in execute(). This meant that even if the output stream was never polled, all tasks would be spawned. This changes the multi-partition path to use a lazy stream that defers spawning and building the streaming merge until first poll_next(). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/sorts/sort_preserving_merge.rs | 146 ++++++++++++++---- 1 file changed, 117 insertions(+), 29 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b1ee5b4d5e8da..6f58eb6ea8dc6 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -18,7 +18,9 @@ //! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream. use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::common::spawn_buffered; use crate::limit::LimitStream; @@ -27,16 +29,18 @@ use crate::projection::{ProjectionExec, make_with_child, update_ordering}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, check_if_same_properties, }; +use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use futures::{Stream, StreamExt}; use crate::execution_plan::{EvaluationType, SchedulingType}; use log::{debug, trace}; @@ -362,34 +366,19 @@ impl ExecutionPlan for SortPreservingMergeExec { } }, _ => { - let receivers = (0..input_partitions) - .map(|partition| { - let stream = - self.input.execute(partition, Arc::clone(&context))?; - Ok(spawn_buffered(stream, 1)) - }) - .collect::>()?; - - debug!( - "Done setting up sender-receiver for SortPreservingMergeExec::execute" - ); - - let result = StreamingMergeBuilder::new() - .with_streams(receivers) - .with_schema(schema) - .with_expressions(&self.expr) - .with_metrics(BaselineMetrics::new(&self.metrics, partition)) - .with_batch_size(context.session_config().batch_size()) - .with_fetch(self.fetch) - .with_reservation(reservation) - .with_round_robin_tie_breaker(self.enable_round_robin_repartition) - .build()?; - - debug!( - "Got stream result from SortPreservingMergeStream::new_from_receivers" - ); - - Ok(result) + let batch_size = context.session_config().batch_size(); + Ok(Box::pin(LazySortPreservingMergeStream { + schema, + input: Arc::clone(&self.input), + context, + expr: self.expr.clone(), + metrics: BaselineMetrics::new(&self.metrics, partition), + batch_size, + fetch: self.fetch, + reservation, + enable_round_robin_repartition: self.enable_round_robin_repartition, + state: LazySPMState::Pending, + })) } } } @@ -433,6 +422,105 @@ impl ExecutionPlan for SortPreservingMergeExec { } } +/// A stream that lazily spawns input partition tasks and builds the streaming +/// merge on first poll, rather than eagerly in `execute()`. +struct LazySortPreservingMergeStream { + schema: SchemaRef, + input: Arc, + context: Arc, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + fetch: Option, + reservation: datafusion_execution::memory_pool::MemoryReservation, + enable_round_robin_repartition: bool, + state: LazySPMState, +} + +enum LazySPMState { + /// Tasks have not been spawned yet. + Pending, + /// The streaming merge has been built and is running. + Running(SendableRecordBatchStream), + /// Initialization failed. + Failed, +} + +impl LazySortPreservingMergeStream { + fn start(&mut self) -> Result<&mut SendableRecordBatchStream> { + let input_partitions = self.input.output_partitioning().partition_count(); + + let receivers = (0..input_partitions) + .map(|partition| { + let stream = + self.input.execute(partition, Arc::clone(&self.context))?; + Ok(spawn_buffered(stream, 1)) + }) + .collect::>()?; + + debug!( + "Done setting up sender-receiver for SortPreservingMergeExec::execute" + ); + + // Take reservation out of self via mem::replace to pass ownership + let reservation = std::mem::replace( + &mut self.reservation, + MemoryConsumer::new("empty").register( + &self.context.runtime_env().memory_pool, + ), + ); + + let result = StreamingMergeBuilder::new() + .with_streams(receivers) + .with_schema(self.schema.clone()) + .with_expressions(&self.expr) + .with_metrics(self.metrics.clone()) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_reservation(reservation) + .with_round_robin_tie_breaker(self.enable_round_robin_repartition) + .build()?; + + debug!( + "Got stream result from SortPreservingMergeStream::new_from_receivers" + ); + + self.state = LazySPMState::Running(result); + match &mut self.state { + LazySPMState::Running(s) => Ok(s), + _ => unreachable!(), + } + } +} + +impl RecordBatchStream for LazySortPreservingMergeStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for LazySortPreservingMergeStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let stream = match &mut self.state { + LazySPMState::Running(s) => s, + LazySPMState::Failed => return Poll::Ready(None), + LazySPMState::Pending => match self.start() { + Ok(s) => s, + Err(e) => { + self.state = LazySPMState::Failed; + return Poll::Ready(Some(Err(e))); + } + }, + }; + stream.poll_next_unpin(cx) + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; From eef441bcc4ffeee9f80811f2b3dbbfcab08e0cf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 07:49:21 +0200 Subject: [PATCH 2/3] Fix clippy and fmt warnings Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/sorts/sort_preserving_merge.rs | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 6f58eb6ea8dc6..5362845abe41a 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -29,8 +29,8 @@ use crate::projection::{ProjectionExec, make_with_child, update_ordering}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, - check_if_same_properties, + Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + Statistics, check_if_same_properties, }; use arrow::datatypes::SchemaRef; @@ -452,27 +452,23 @@ impl LazySortPreservingMergeStream { let receivers = (0..input_partitions) .map(|partition| { - let stream = - self.input.execute(partition, Arc::clone(&self.context))?; + let stream = self.input.execute(partition, Arc::clone(&self.context))?; Ok(spawn_buffered(stream, 1)) }) .collect::>()?; - debug!( - "Done setting up sender-receiver for SortPreservingMergeExec::execute" - ); + debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); // Take reservation out of self via mem::replace to pass ownership let reservation = std::mem::replace( &mut self.reservation, - MemoryConsumer::new("empty").register( - &self.context.runtime_env().memory_pool, - ), + MemoryConsumer::new("empty") + .register(&self.context.runtime_env().memory_pool), ); let result = StreamingMergeBuilder::new() .with_streams(receivers) - .with_schema(self.schema.clone()) + .with_schema(Arc::clone(&self.schema)) .with_expressions(&self.expr) .with_metrics(self.metrics.clone()) .with_batch_size(self.batch_size) @@ -481,9 +477,7 @@ impl LazySortPreservingMergeStream { .with_round_robin_tie_breaker(self.enable_round_robin_repartition) .build()?; - debug!( - "Got stream result from SortPreservingMergeStream::new_from_receivers" - ); + debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); self.state = LazySPMState::Running(result); match &mut self.state { @@ -495,7 +489,7 @@ impl LazySortPreservingMergeStream { impl RecordBatchStream for LazySortPreservingMergeStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + Arc::clone(&self.schema) } } From 26c9a0ae880c9878959821db98c1b7fe29aca329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 08:02:41 +0200 Subject: [PATCH 3/3] Rename LazySortPreservingMergeStream to SortPreservingMergeExecStream Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/sorts/sort_preserving_merge.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 46d2a5a523072..b24ff57e7bbcd 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -367,7 +367,7 @@ impl ExecutionPlan for SortPreservingMergeExec { }, _ => { let batch_size = context.session_config().batch_size(); - Ok(Box::pin(LazySortPreservingMergeStream { + Ok(Box::pin(SortPreservingMergeExecStream { schema, input: Arc::clone(&self.input), context, @@ -377,7 +377,7 @@ impl ExecutionPlan for SortPreservingMergeExec { fetch: self.fetch, reservation, enable_round_robin_repartition: self.enable_round_robin_repartition, - state: LazySPMState::Pending, + state: SPMStreamState::Pending, })) } } @@ -424,7 +424,7 @@ impl ExecutionPlan for SortPreservingMergeExec { /// A stream that lazily spawns input partition tasks and builds the streaming /// merge on first poll, rather than eagerly in `execute()`. -struct LazySortPreservingMergeStream { +struct SortPreservingMergeExecStream { schema: SchemaRef, input: Arc, context: Arc, @@ -434,10 +434,10 @@ struct LazySortPreservingMergeStream { fetch: Option, reservation: datafusion_execution::memory_pool::MemoryReservation, enable_round_robin_repartition: bool, - state: LazySPMState, + state: SPMStreamState, } -enum LazySPMState { +enum SPMStreamState { /// Tasks have not been spawned yet. Pending, /// The streaming merge has been built and is running. @@ -446,7 +446,7 @@ enum LazySPMState { Failed, } -impl LazySortPreservingMergeStream { +impl SortPreservingMergeExecStream { fn start(&mut self) -> Result<&mut SendableRecordBatchStream> { let input_partitions = self.input.output_partitioning().partition_count(); @@ -479,21 +479,21 @@ impl LazySortPreservingMergeStream { debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); - self.state = LazySPMState::Running(result); + self.state = SPMStreamState::Running(result); match &mut self.state { - LazySPMState::Running(s) => Ok(s), + SPMStreamState::Running(s) => Ok(s), _ => unreachable!(), } } } -impl RecordBatchStream for LazySortPreservingMergeStream { +impl RecordBatchStream for SortPreservingMergeExecStream { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } } -impl Stream for LazySortPreservingMergeStream { +impl Stream for SortPreservingMergeExecStream { type Item = Result; fn poll_next( @@ -501,12 +501,12 @@ impl Stream for LazySortPreservingMergeStream { cx: &mut Context<'_>, ) -> Poll> { let stream = match &mut self.state { - LazySPMState::Running(s) => s, - LazySPMState::Failed => return Poll::Ready(None), - LazySPMState::Pending => match self.start() { + SPMStreamState::Running(s) => s, + SPMStreamState::Failed => return Poll::Ready(None), + SPMStreamState::Pending => match self.start() { Ok(s) => s, Err(e) => { - self.state = LazySPMState::Failed; + self.state = SPMStreamState::Failed; return Poll::Ready(Some(Err(e))); } },