perf: Implement physical execution of uncorrelated scalar subqueries#21240
perf: Implement physical execution of uncorrelated scalar subqueries#21240neilconway wants to merge 34 commits intoapache:mainfrom
Conversation
| pub struct DefaultPhysicalProtoConverter; | ||
| #[derive(Default)] | ||
| pub struct DefaultPhysicalProtoConverter { | ||
| scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>, |
There was a problem hiding this comment.
I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.
| /// TODO: Consider overlapping computation of the subqueries with evaluating the | ||
| /// main query. | ||
| /// | ||
| /// TODO: Subqueries are evaluated sequentially. Consider parallel evaluation in | ||
| /// the future. |
There was a problem hiding this comment.
Happy to address these TODOs now or in a followup PR, if folks have opinions on the best way to do this.
There was a problem hiding this comment.
I implemented parallel evaluation but I haven't done overlapping evaluate of subqueries with the main query yet.
| // Create the shared results container and register it (along with | ||
| // the index map) in ExecutionProps so that `create_physical_expr` | ||
| // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr` | ||
| // nodes. We clone the SessionState so these are available | ||
| // throughout physical planning without mutating the caller's state. | ||
| // | ||
| // Ideally, the subquery state would live in a dedicated planning | ||
| // context rather than on ExecutionProps (which is meant for | ||
| // session-level configuration). It's here because | ||
| // `create_physical_expr` only receives `&ExecutionProps`, and | ||
| // changing that signature would be a breaking public API change. | ||
| let results: Arc<Vec<OnceLock<ScalarValue>>> = | ||
| Arc::new((0..links.len()).map(|_| OnceLock::new()).collect()); | ||
| let session_state = if links.is_empty() { | ||
| Cow::Borrowed(session_state) | ||
| } else { | ||
| let mut owned = session_state.clone(); | ||
| owned.execution_props_mut().subquery_indexes = index_map; | ||
| owned.execution_props_mut().subquery_results = Arc::clone(&results); | ||
| Cow::Owned(owned) | ||
| }; |
There was a problem hiding this comment.
This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
I am a bit suprised that it is even needed to do some speculative/overlapping to get performance parity. AFAIK NestedLoopJoinExec doesn't do this (it will execute build side first then probe). In a ideal scenario, we don't do that, as it will might increase the memory usage substantially by executing the pipelines concurrently and make it less cache/NUMA friendly, (we could instead improve parallelism at each individual pipeline e.g. using morsel approach). |
|
The most promising approach would perhaps be to check out the single join (it is also relatively easy to implement) and see what it does when switching to that type - I am wondering if it is able to do more optimization like pushing down the scalar value as a dynamic filter in certain cases (which I imagine would be pretty effective). Also, thinking more about it, in some cases a join should be more effecient in execution as well as it allows doing avoiding materialization of the output if the scalar filter is in the join (I wonder if that's what were seeing). In the scalar subquery case it will materialize the join first and then apply the filter using |
| 09)----------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] | ||
| 10)------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] | ||
| 11)--------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4 | ||
| 12)----------------------FilterExec: substr(c_phone@1, 1, 2) IN (SET) ([13, 31, 23, 29, 30, 18, 17]) AND CAST(c_acctbal@2 AS Decimal128(19, 6)) > scalar_subquery(<pending>) |
There was a problem hiding this comment.
Does it push down for parquet ?
There was a problem hiding this comment.
Not yet. I think we should be able to enable this but it isn't trivial, so I think better to defer it to another PR; I filed #21324 for this.
|
|
||
| impl Eq for ScalarSubqueryExpr {} | ||
|
|
||
| impl PhysicalExpr for ScalarSubqueryExpr { |
There was a problem hiding this comment.
Shouldn't this implement placement (as literal) so it benefits from pushdown?
There was a problem hiding this comment.
Ideally it would be possible to evaluate the subquery as eagerly as possible so it can be used in pruning already based on file/row group statistics.
ATM it seems it has already opened parquet file / processed some before filter can be evaluated?
There was a problem hiding this comment.
In principle we could evaluate scalar subqueries first, then inline the result into the main query plan and use that to aggressively pushdown filters and prune partitions. That isn't the approach that this PR takes; I looked briefly at doing that but it seemed hard to arrange to be able to do query evaluation as part of planning the main query. It also seemed a bit odd to me: if a query contains many scalar subqueries that are expensive to evaluate, "planning" could potentially take a very long time.
Curious what you think. @Dandandan
This was surprising to me as well, but on thinking about it further, it kinda makes sense. let stream = self.right.execute(partition, Arc::clone(&context))?;
// ...
let left_fut = self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, context)?;
Ok(load_left_input(
left_stream,
join_metrics.clone(),
reservation,
))
})?;
Ok(Box::pin(CrossJoinStream {...}))i.e., we basically start up both inputs and allow them to do some work if they want to (e.g., Whereas in the current ScalarSubqueryExec implementation, we don't do anything with the main plan until the evaluation of all subqueries has completed, which means we do lose some opportunity to overlap work that NLJ is able to take advantage of. |
|
BTW, recording mostly for posterity -- I notice that TPC-DS query 6 is ~7x faster with the ScalarSubqueryExec approach. Here are the plans on Here are the plans on the subquery-expr branch: It looks like the difference is that we're able to push the subquery predicate on |
In principle The `CrossJoinStream will drive the execution of the stream (and only can do so when the left side is loaded. |
Is that true in practice? e.g.,
Both plan nodes are used by TPC-DS Q24, as one example of a place where we saw a slowdown w/o adding additional overlapping in ScalarSubqueryExec. |
You are probably right in those cases it is doing potentially many things in parallel. But I think this is not what we want ideally - we want to run few independent pipelines as possible, and get (data) parallelism from the individual pipelines rather than executing all at the same time. |
I don't disagree 😊 But for the purposes of this PR, we will regress performance on some benchmark queries if we don't do some additional work to get the same degree of overlapping that the cross-join path gets today. Is that something we're okay with? I don't think the additional complexity to overlap subquery evaluation with main query evaluation is too bad (via |
Yeah I think that's okay, as far as we don't regress on memory usage too much I think we should be ok! @alamb we should consider (reducing) parallelism/implicit buffering from CoalescePartitionsExec / SortPreservingMergeExec once we land morsel-driven scanning. |
|
Seems that mainly CoalescePartitions is helpful for TPC-DS SF=1 (and slightly for TPCH). That benchmark has very limited parallelism (because of single-rowgroup tables). SortPreservingMergeExec doesn't seem to do much so far as I can see (which I think makes sense as it will be mostly used in the root. |
|
@Dandandan Based on discussion, I won't plan to implement the work to overlap main query and subquery evaluation for this PR. What do you think makes sense as a next step? We could wait to merge this PR until the morsel-driven parallelism work lands (so we can check that morsel-driven parallelism effectively recovers the parallelism that we'll lose from the simple approach in this PR), or land them separately and just make sure we verify that overall performance hasn't regressed before we ship 54. wdyt? |
|
Any chance we can break this PR into smaller pieces (e.g. move benchmarks, for example) to make it easier to review? |
Hmmm, that might be a bit tricky. The benchmarks are pretty trivial and could easily be omitted. Here's how Claude summarizes the PR:
If it is helpful, I could prepare two PRs that have a split like:
If you think that would be easier to review, lmk. |
Which issue does this PR close?
array_has#18181.Rationale for this change
Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has two shortcomings:
This PR introduces physical execution of uncorrelated scalar subqueries:
ScalarSubqueryExecplan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with aScalarSubqueryExpr.ScalarSubqueryExecmanages the execution of the subqueries and stores the result in a shared "results container", which is anArc<Vec<OnceLock<ScalarValue>>>. Subquery evaluation is done in parallel (for a given query level), but at present it is not overlapped with evaluation of the parent query.ScalarSubqueryExpris evaluated, it fetches the result of the subquery from the result container.This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.
What changes are included in this PR?
ScalarSubqueryExprPhysicalProtoConverterExtensionto wire upScalarSubqueryExprcorrectlyAre these changes tested?
Yes.
Are there any user-facing changes?
At the SQL-level, scalar subqueries that returned > 1 row will now be rejected instead of producing incorrect query results.
At the API-level, this PR adds several new public APIs (e.g.,
ScalarSubqueryExpr,ScalarSubqueryExec) and makes breaking changes to several public APIs (e.g.,parse_expr). It also introduces a new physical plan node (and allowsSubqueryto remain in logical plans); third-party query optimization code will encounter these nodes when they wouldn't have before.