Skip to content

perf: Implement physical execution of uncorrelated scalar subqueries#21240

Open
neilconway wants to merge 34 commits intoapache:mainfrom
neilconway:neilc/scalar-subquery-expr
Open

perf: Implement physical execution of uncorrelated scalar subqueries#21240
neilconway wants to merge 34 commits intoapache:mainfrom
neilconway:neilc/scalar-subquery-expr

Conversation

@neilconway
Copy link
Copy Markdown
Contributor

@neilconway neilconway commented Mar 29, 2026

Which issue does this PR close?

Rationale for this change

Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has two shortcomings:

  1. Scalar subqueries that return > 1 row were allowed, producing incorrect query results. Such queries should instead result in a runtime error.
  2. Performance. Evaluating scalar subqueries as a join requires going through the join machinery. More importantly, it means that UDFs that have special-cases for scalar inputs cannot use those code paths for scalar subqueries, which often results in significantly slower query execution.

This PR introduces physical execution of uncorrelated scalar subqueries:

  • Uncorrelated subqueries are left in the plan by the optimizer, not rewritten into joins
  • The physical planner collects uncorrelated scalar subqueries and plans them recursively (supporting nested subqueries). We add a ScalarSubqueryExec plan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with a ScalarSubqueryExpr.
  • ScalarSubqueryExec manages the execution of the subqueries and stores the result in a shared "results container", which is an Arc<Vec<OnceLock<ScalarValue>>>. Subquery evaluation is done in parallel (for a given query level), but at present it is not overlapped with evaluation of the parent query.
  • When ScalarSubqueryExpr is evaluated, it fetches the result of the subquery from the result container.

This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.

What changes are included in this PR?

  • Benchmarks
  • Modify subquery rewriter to not transform subqueries -> joins
  • Collect and plan uncorrelated scalar subqueries in the physical planner, and wire up ScalarSubqueryExpr
  • Support for subqueries in physical plan serialization/deserialization using PhysicalProtoConverterExtension to wire up ScalarSubqueryExpr correctly
  • Support for subqueries in logical plan serialization/deserialization
  • Add various SLT tests and update expected plan shapes for some tests

Are these changes tested?

Yes.

Are there any user-facing changes?

At the SQL-level, scalar subqueries that returned > 1 row will now be rejected instead of producing incorrect query results.

At the API-level, this PR adds several new public APIs (e.g., ScalarSubqueryExpr, ScalarSubqueryExec) and makes breaking changes to several public APIs (e.g., parse_expr). It also introduces a new physical plan node (and allows Subquery to remain in logical plans); third-party query optimization code will encounter these nodes when they wouldn't have before.

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) proto Related to proto crate physical-plan Changes to the physical-plan crate labels Mar 29, 2026
pub struct DefaultPhysicalProtoConverter;
#[derive(Default)]
pub struct DefaultPhysicalProtoConverter {
scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.

Comment on lines +73 to +77
/// TODO: Consider overlapping computation of the subqueries with evaluating the
/// main query.
///
/// TODO: Subqueries are evaluated sequentially. Consider parallel evaluation in
/// the future.
Copy link
Copy Markdown
Contributor Author

@neilconway neilconway Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to address these TODOs now or in a followup PR, if folks have opinions on the best way to do this.

Copy link
Copy Markdown
Contributor Author

@neilconway neilconway Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented parallel evaluation but I haven't done overlapping evaluate of subqueries with the main query yet.

Comment on lines +443 to +463
// Create the shared results container and register it (along with
// the index map) in ExecutionProps so that `create_physical_expr`
// can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr`
// nodes. We clone the SessionState so these are available
// throughout physical planning without mutating the caller's state.
//
// Ideally, the subquery state would live in a dedicated planning
// context rather than on ExecutionProps (which is meant for
// session-level configuration). It's here because
// `create_physical_expr` only receives `&ExecutionProps`, and
// changing that signature would be a breaking public API change.
let results: Arc<Vec<OnceLock<ScalarValue>>> =
Arc::new((0..links.len()).map(|_| OnceLock::new()).collect());
let session_state = if links.is_empty() {
Cow::Borrowed(session_state)
} else {
let mut owned = session_state.clone();
owned.execution_props_mut().subquery_indexes = index_map;
owned.execution_props_mut().subquery_results = Arc::clone(&results);
Cow::Owned(owned)
};
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.

@github-actions github-actions bot added the development-process Related to development process of DataFusion label Mar 30, 2026
@Dandandan
Copy link
Copy Markdown
Contributor

run benchmarks

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4156823048-606-pw9cn 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4156823048-607-zdt8z 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4156823048-608-fgcr6 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and neilc_scalar-subquery-expr
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query     ┃                           HEAD ┃     neilc_scalar-subquery-expr ┃       Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1  │ 45.05 / 45.95 ±0.92 / 47.44 ms │ 45.56 / 45.98 ±0.75 / 47.47 ms │    no change │
│ QQuery 2  │ 21.19 / 21.38 ±0.21 / 21.66 ms │ 21.35 / 21.56 ±0.19 / 21.91 ms │    no change │
│ QQuery 3  │ 31.73 / 32.19 ±0.50 / 33.11 ms │ 31.92 / 32.43 ±0.31 / 32.80 ms │    no change │
│ QQuery 4  │ 20.46 / 21.29 ±0.60 / 22.11 ms │ 20.37 / 21.26 ±0.79 / 22.23 ms │    no change │
│ QQuery 5  │ 48.69 / 50.41 ±1.17 / 51.92 ms │ 48.36 / 49.77 ±1.68 / 52.96 ms │    no change │
│ QQuery 6  │ 17.02 / 17.19 ±0.14 / 17.45 ms │ 17.25 / 18.05 ±1.00 / 19.84 ms │    no change │
│ QQuery 7  │ 53.55 / 54.54 ±0.56 / 55.18 ms │ 54.03 / 54.80 ±0.71 / 55.93 ms │    no change │
│ QQuery 8  │ 47.88 / 48.53 ±0.50 / 49.43 ms │ 48.31 / 49.01 ±1.02 / 51.03 ms │    no change │
│ QQuery 9  │ 54.63 / 55.50 ±0.78 / 56.86 ms │ 54.33 / 55.42 ±0.91 / 56.60 ms │    no change │
│ QQuery 10 │ 71.18 / 71.61 ±0.39 / 72.33 ms │ 69.97 / 70.95 ±0.65 / 71.66 ms │    no change │
│ QQuery 11 │ 13.76 / 14.07 ±0.24 / 14.45 ms │ 34.60 / 35.26 ±0.51 / 36.02 ms │ 2.51x slower │
│ QQuery 12 │ 27.78 / 28.16 ±0.24 / 28.52 ms │ 28.04 / 28.71 ±1.10 / 30.90 ms │    no change │
│ QQuery 13 │ 38.02 / 38.83 ±0.59 / 39.63 ms │ 38.41 / 39.41 ±0.91 / 41.05 ms │    no change │
│ QQuery 14 │ 28.51 / 28.89 ±0.32 / 29.45 ms │ 28.51 / 28.71 ±0.15 / 28.96 ms │    no change │
│ QQuery 15 │ 33.38 / 33.64 ±0.23 / 34.01 ms │ 81.32 / 82.08 ±0.58 / 82.76 ms │ 2.44x slower │
│ QQuery 16 │ 15.85 / 16.08 ±0.20 / 16.44 ms │ 15.90 / 16.18 ±0.15 / 16.30 ms │    no change │
│ QQuery 17 │ 71.98 / 72.73 ±0.44 / 73.31 ms │ 73.16 / 73.69 ±0.33 / 74.06 ms │    no change │
│ QQuery 18 │ 76.62 / 78.05 ±1.00 / 79.49 ms │ 77.03 / 79.02 ±1.36 / 80.85 ms │    no change │
│ QQuery 19 │ 37.61 / 38.00 ±0.44 / 38.76 ms │ 37.96 / 38.14 ±0.20 / 38.43 ms │    no change │
│ QQuery 20 │ 40.10 / 40.87 ±0.74 / 42.16 ms │ 40.11 / 41.48 ±1.00 / 42.90 ms │    no change │
│ QQuery 21 │ 64.14 / 65.78 ±0.89 / 66.56 ms │ 64.51 / 65.90 ±0.71 / 66.44 ms │    no change │
│ QQuery 22 │ 17.71 / 18.20 ±0.33 / 18.70 ms │ 50.61 / 51.89 ±0.92 / 53.42 ms │ 2.85x slower │
└───────────┴────────────────────────────────┴────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                         ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (HEAD)                         │ 891.89ms │
│ Total Time (neilc_scalar-subquery-expr)   │ 999.71ms │
│ Average Time (HEAD)                       │  40.54ms │
│ Average Time (neilc_scalar-subquery-expr) │  45.44ms │
│ Queries Faster                            │        0 │
│ Queries Slower                            │        3 │
│ Queries with No Change                    │       19 │
│ Queries with Failure                      │        0 │
└───────────────────────────────────────────┴──────────┘

Resource Usage

tpch — base (merge-base)

Metric Value
Wall time 4.7s
Peak memory 4.0 GiB
Avg memory 3.6 GiB
CPU user 33.0s
CPU sys 3.1s
Disk read 0 B
Disk write 136.0 KiB

tpch — branch

Metric Value
Wall time 5.2s
Peak memory 4.0 GiB
Avg memory 3.6 GiB
CPU user 36.4s
CPU sys 3.2s
Disk read 0 B
Disk write 65.3 MiB

File an issue against this benchmark runner

@Dandandan
Copy link
Copy Markdown
Contributor

run benchmark tpch10

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4156947198-609-ngld5 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and neilc_scalar-subquery-expr
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃                                  HEAD ┃            neilc_scalar-subquery-expr ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0  │          1.32 / 4.53 ±6.33 / 17.19 ms │          1.31 / 4.51 ±6.30 / 17.12 ms │     no change │
│ QQuery 1  │        14.23 / 14.53 ±0.20 / 14.86 ms │        14.35 / 14.64 ±0.17 / 14.86 ms │     no change │
│ QQuery 2  │        44.31 / 44.58 ±0.28 / 45.02 ms │        44.40 / 44.75 ±0.31 / 45.19 ms │     no change │
│ QQuery 3  │        43.28 / 44.24 ±0.70 / 45.35 ms │        44.70 / 45.73 ±0.88 / 47.18 ms │     no change │
│ QQuery 4  │     286.08 / 290.96 ±3.34 / 294.94 ms │     290.52 / 300.63 ±6.22 / 307.69 ms │     no change │
│ QQuery 5  │     343.30 / 360.49 ±9.19 / 368.08 ms │     344.37 / 347.19 ±2.44 / 350.28 ms │     no change │
│ QQuery 6  │           5.46 / 5.92 ±0.40 / 6.44 ms │           5.40 / 5.97 ±0.29 / 6.20 ms │     no change │
│ QQuery 7  │        17.17 / 19.36 ±3.27 / 25.76 ms │        16.87 / 18.58 ±2.12 / 22.74 ms │     no change │
│ QQuery 8  │     432.14 / 441.42 ±9.03 / 452.58 ms │     433.86 / 443.34 ±8.18 / 453.05 ms │     no change │
│ QQuery 9  │     665.10 / 676.08 ±9.11 / 689.32 ms │     624.49 / 635.81 ±6.97 / 645.27 ms │ +1.06x faster │
│ QQuery 10 │        92.37 / 94.29 ±1.59 / 96.89 ms │        90.27 / 93.43 ±2.54 / 97.79 ms │     no change │
│ QQuery 11 │     104.22 / 105.64 ±1.45 / 107.54 ms │     103.32 / 105.88 ±1.55 / 107.78 ms │     no change │
│ QQuery 12 │     344.34 / 349.25 ±3.25 / 353.07 ms │     345.08 / 347.85 ±1.60 / 349.45 ms │     no change │
│ QQuery 13 │     463.79 / 472.59 ±7.95 / 485.80 ms │     457.64 / 463.97 ±6.15 / 472.80 ms │     no change │
│ QQuery 14 │     350.37 / 356.22 ±3.77 / 360.54 ms │     346.54 / 352.03 ±6.32 / 364.15 ms │     no change │
│ QQuery 15 │    360.40 / 374.90 ±17.68 / 406.65 ms │    375.51 / 394.04 ±32.95 / 459.82 ms │  1.05x slower │
│ QQuery 16 │    714.01 / 738.95 ±23.24 / 774.61 ms │    728.40 / 746.45 ±14.43 / 765.84 ms │     no change │
│ QQuery 17 │    714.60 / 731.23 ±12.85 / 746.56 ms │     715.64 / 721.12 ±5.66 / 731.77 ms │     no change │
│ QQuery 18 │ 1430.78 / 1488.33 ±40.84 / 1548.80 ms │ 1379.93 / 1479.71 ±51.92 / 1528.42 ms │     no change │
│ QQuery 19 │        35.90 / 37.02 ±1.18 / 39.14 ms │        35.40 / 37.33 ±1.81 / 40.76 ms │     no change │
│ QQuery 20 │    713.45 / 735.48 ±24.51 / 771.36 ms │    712.34 / 727.29 ±14.90 / 754.80 ms │     no change │
│ QQuery 21 │     754.02 / 765.34 ±6.85 / 774.44 ms │     761.37 / 764.62 ±2.67 / 768.81 ms │     no change │
│ QQuery 22 │  1123.65 / 1128.39 ±4.69 / 1137.31 ms │  1126.97 / 1131.73 ±7.10 / 1145.76 ms │     no change │
│ QQuery 23 │ 3041.09 / 3062.25 ±18.65 / 3096.08 ms │  3033.97 / 3043.12 ±7.01 / 3055.29 ms │     no change │
│ QQuery 24 │     101.54 / 103.59 ±1.75 / 106.55 ms │      98.71 / 100.39 ±1.13 / 101.92 ms │     no change │
│ QQuery 25 │     142.10 / 142.85 ±0.49 / 143.58 ms │     136.56 / 138.17 ±0.90 / 139.25 ms │     no change │
│ QQuery 26 │     100.19 / 102.93 ±2.31 / 107.10 ms │      98.00 / 100.86 ±2.33 / 103.12 ms │     no change │
│ QQuery 27 │     849.12 / 854.43 ±7.74 / 869.79 ms │     846.66 / 853.51 ±4.77 / 857.99 ms │     no change │
│ QQuery 28 │ 7705.51 / 7745.32 ±22.00 / 7770.71 ms │ 7697.89 / 7744.14 ±31.93 / 7780.46 ms │     no change │
│ QQuery 29 │        50.77 / 55.69 ±5.09 / 65.45 ms │        50.30 / 53.99 ±4.24 / 61.53 ms │     no change │
│ QQuery 30 │     363.99 / 370.45 ±4.29 / 377.11 ms │     356.81 / 365.83 ±6.34 / 375.05 ms │     no change │
│ QQuery 31 │    362.12 / 377.82 ±11.94 / 394.11 ms │     376.70 / 380.15 ±3.81 / 386.17 ms │     no change │
│ QQuery 32 │ 1200.38 / 1267.05 ±55.53 / 1326.36 ms │ 1265.70 / 1294.94 ±27.34 / 1344.67 ms │     no change │
│ QQuery 33 │ 1460.50 / 1499.33 ±45.94 / 1580.55 ms │ 1470.47 / 1563.53 ±46.86 / 1592.95 ms │     no change │
│ QQuery 34 │  1431.98 / 1445.24 ±8.97 / 1459.07 ms │  1442.78 / 1454.41 ±8.09 / 1463.45 ms │     no change │
│ QQuery 35 │     382.15 / 386.54 ±3.26 / 390.79 ms │     379.35 / 385.51 ±7.65 / 397.78 ms │     no change │
│ QQuery 36 │     120.63 / 123.11 ±2.38 / 127.06 ms │     112.43 / 120.57 ±5.93 / 129.56 ms │     no change │
│ QQuery 37 │        48.56 / 49.41 ±0.56 / 50.23 ms │        48.03 / 50.15 ±1.57 / 52.85 ms │     no change │
│ QQuery 38 │        76.82 / 77.82 ±1.61 / 81.02 ms │        73.92 / 76.14 ±1.74 / 78.88 ms │     no change │
│ QQuery 39 │     220.70 / 223.98 ±1.85 / 226.23 ms │     204.83 / 218.14 ±7.68 / 228.76 ms │     no change │
│ QQuery 40 │        20.76 / 23.38 ±1.78 / 25.13 ms │        23.89 / 25.58 ±1.20 / 27.12 ms │  1.09x slower │
│ QQuery 41 │        20.53 / 22.07 ±1.92 / 25.72 ms │        19.67 / 20.48 ±0.58 / 21.38 ms │ +1.08x faster │
│ QQuery 42 │        19.62 / 19.98 ±0.25 / 20.37 ms │        18.69 / 20.48 ±1.77 / 23.84 ms │     no change │
└───────────┴───────────────────────────────────────┴───────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                         ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                         │ 27232.98ms │
│ Total Time (neilc_scalar-subquery-expr)   │ 27236.70ms │
│ Average Time (HEAD)                       │   633.33ms │
│ Average Time (neilc_scalar-subquery-expr) │   633.41ms │
│ Queries Faster                            │          2 │
│ Queries Slower                            │          2 │
│ Queries with No Change                    │         39 │
│ Queries with Failure                      │          0 │
└───────────────────────────────────────────┴────────────┘

Resource Usage

clickbench_partitioned — base (merge-base)

Metric Value
Wall time 137.3s
Peak memory 43.4 GiB
Avg memory 30.6 GiB
CPU user 1280.1s
CPU sys 100.5s
Disk read 0 B
Disk write 3.7 GiB

clickbench_partitioned — branch

Metric Value
Wall time 137.3s
Peak memory 42.2 GiB
Avg memory 30.8 GiB
CPU user 1274.8s
CPU sys 105.0s
Disk read 0 B
Disk write 756.0 KiB

File an issue against this benchmark runner

@Dandandan
Copy link
Copy Markdown
Contributor

I am a bit suprised that it is even needed to do some speculative/overlapping to get performance parity. AFAIK NestedLoopJoinExec doesn't do this (it will execute build side first then probe).

In a ideal scenario, we don't do that, as it will might increase the memory usage substantially by executing the pipelines concurrently and make it less cache/NUMA friendly, (we could instead improve parallelism at each individual pipeline e.g. using morsel approach).

@Dandandan
Copy link
Copy Markdown
Contributor

The most promising approach would perhaps be to check out the single join (it is also relatively easy to implement) and see what it does when switching to that type - I am wondering if it is able to do more optimization like pushing down the scalar value as a dynamic filter in certain cases (which I imagine would be pretty effective).

Also, thinking more about it, in some cases a join should be more effecient in execution as well as it allows doing avoiding materialization of the output if the scalar filter is in the join (I wonder if that's what were seeing). In the scalar subquery case it will materialize the join first and then apply the filter using FilterExec?

09)----------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal]
10)------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
11)--------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
12)----------------------FilterExec: substr(c_phone@1, 1, 2) IN (SET) ([13, 31, 23, 29, 30, 18, 17]) AND CAST(c_acctbal@2 AS Decimal128(19, 6)) > scalar_subquery(<pending>)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it push down for parquet ?

Copy link
Copy Markdown
Contributor Author

@neilconway neilconway Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I think we should be able to enable this but it isn't trivial, so I think better to defer it to another PR; I filed #21324 for this.


impl Eq for ScalarSubqueryExpr {}

impl PhysicalExpr for ScalarSubqueryExpr {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this implement placement (as literal) so it benefits from pushdown?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it would be possible to evaluate the subquery as eagerly as possible so it can be used in pruning already based on file/row group statistics.
ATM it seems it has already opened parquet file / processed some before filter can be evaluated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we could evaluate scalar subqueries first, then inline the result into the main query plan and use that to aggressively pushdown filters and prune partitions. That isn't the approach that this PR takes; I looked briefly at doing that but it seemed hard to arrange to be able to do query evaluation as part of planning the main query. It also seemed a bit odd to me: if a query contains many scalar subqueries that are expensive to evaluate, "planning" could potentially take a very long time.

Curious what you think. @Dandandan

@neilconway
Copy link
Copy Markdown
Contributor Author

neilconway commented Apr 1, 2026

I am a bit suprised that it is even needed to do some speculative/overlapping to get performance parity. AFAIK NestedLoopJoinExec doesn't do this (it will execute build side first then probe).

This was surprising to me as well, but on thinking about it further, it kinda makes sense. CrossJoinExec::execute() does

          let stream = self.right.execute(partition, Arc::clone(&context))?;

          // ...

          let left_fut = self.left_fut.try_once(|| {
              let left_stream = self.left.execute(0, context)?;

              Ok(load_left_input(
                  left_stream,
                  join_metrics.clone(),
                  reservation,
              ))
          })?;

          Ok(Box::pin(CrossJoinStream {...}))

i.e., we basically start up both inputs and allow them to do some work if they want to (e.g., RepartitionExec::execute(), CoalescePartitionsExec::execute(), SortPreservingMergeExec::execute() all kickoff some background work on the first execute() call).

Whereas in the current ScalarSubqueryExec implementation, we don't do anything with the main plan until the evaluation of all subqueries has completed, which means we do lose some opportunity to overlap work that NLJ is able to take advantage of.

@neilconway
Copy link
Copy Markdown
Contributor Author

neilconway commented Apr 1, 2026

BTW, recording mostly for posterity -- I notice that TPC-DS query 6 is ~7x faster with the ScalarSubqueryExec approach. Here are the plans on main:

=== Logical plan ===
Limit: skip=0, fetch=100
  Sort: cnt ASC NULLS LAST, state ASC NULLS LAST
    Projection: a.ca_state AS state, count(Int64(1)) AS count(*) AS cnt
      Filter: count(Int64(1)) >= Int64(10)
        Aggregate: groupBy=[[a.ca_state]], aggr=[[count(Int64(1))]]
          Filter: a.ca_address_sk = c.c_current_addr_sk AND c.c_customer_sk = s.ss_customer_sk AND s.ss_sold_date_sk = d.d_date_sk AND s.ss_item_sk = i.i_item_sk AND d.d_month_seq = (<subquery>) AND i.i_current_price > Float64(1.2) * (<subquery>)
            Subquery:
              Distinct:
                Projection: date_dim.d_month_seq
                  Filter: date_dim.d_year = Int64(2000) AND date_dim.d_moy = Int64(2)
                    TableScan: date_dim
            Subquery:
              Projection: avg(j.i_current_price)
                Aggregate: groupBy=[[]], aggr=[[avg(j.i_current_price)]]
                  Filter: j.i_category = outer_ref(i.i_category)
                    SubqueryAlias: j
                      TableScan: item
            Cross Join:
              Cross Join:
                Cross Join:
                  Cross Join:
                    SubqueryAlias: a
                      TableScan: customer_address
                    SubqueryAlias: c
                      TableScan: customer
                  SubqueryAlias: s
                    TableScan: store_sales
                SubqueryAlias: d
                  TableScan: date_dim
              SubqueryAlias: i
                TableScan: item

=== Optimized logical plan ===
Sort: cnt ASC NULLS LAST, state ASC NULLS LAST, fetch=100
  Projection: a.ca_state AS state, count(Int64(1)) AS count(*) AS cnt
    Filter: count(Int64(1)) >= Int64(10)
      Aggregate: groupBy=[[a.ca_state]], aggr=[[count(Int64(1))]]
        Projection: a.ca_state
          Filter: CAST(i.i_current_price AS Decimal128(30, 15)) > CAST(Float64(1.2) * __scalar_sq_2.avg(j.i_current_price) AS Decimal128(30, 15))
            Projection: a.ca_state, i.i_current_price, __scalar_sq_2.avg(j.i_current_price)
              Left Join: i.i_category = __scalar_sq_2.i_category
                Projection: a.ca_state, i.i_current_price, i.i_category
                  Inner Join: d.d_month_seq = __scalar_sq_1.d_month_seq
                    Projection: a.ca_state, d.d_month_seq, i.i_current_price, i.i_category
                      Inner Join: s.ss_item_sk = i.i_item_sk
                        Projection: a.ca_state, s.ss_item_sk, d.d_month_seq
                          Inner Join: s.ss_sold_date_sk = CAST(d.d_date_sk AS Float64)
                            Projection: a.ca_state, s.ss_sold_date_sk, s.ss_item_sk
                              Inner Join: CAST(c.c_customer_sk AS Float64) = s.ss_customer_sk
                                Projection: a.ca_state, c.c_customer_sk
                                  Inner Join: a.ca_address_sk = c.c_current_addr_sk
                                    SubqueryAlias: a
                                      TableScan: customer_address projection=[ca_address_sk, ca_state]
                                    SubqueryAlias: c
                                      TableScan: customer projection=[c_customer_sk, c_current_addr_sk]
                                SubqueryAlias: s
                                  TableScan: store_sales projection=[ss_sold_date_sk, ss_item_sk, ss_customer_sk]
                            SubqueryAlias: d
                              TableScan: date_dim projection=[d_date_sk, d_month_seq]
                        SubqueryAlias: i
                          TableScan: item projection=[i_item_sk, i_current_price, i_category]
                    SubqueryAlias: __scalar_sq_1
                      Aggregate: groupBy=[[date_dim.d_month_seq]], aggr=[[]]
                        Projection: date_dim.d_month_seq
                          Filter: date_dim.d_year = Int32(2000) AND date_dim.d_moy = Int32(2)
                            TableScan: date_dim projection=[d_month_seq, d_year, d_moy], partial_filters=[Boolean(true), date_dim.d_year = Int32(2000), date_dim.d_moy = Int32(2)]
                SubqueryAlias: __scalar_sq_2
                  Projection: CAST(avg(j.i_current_price) AS Float64), j.i_category
                    Aggregate: groupBy=[[j.i_category]], aggr=[[avg(j.i_current_price)]]
                      SubqueryAlias: j
                        TableScan: item projection=[i_current_price, i_category]

Here are the plans on the subquery-expr branch:

=== Logical plan ===
Limit: skip=0, fetch=100
  Sort: cnt ASC NULLS LAST, state ASC NULLS LAST
    Projection: a.ca_state AS state, count(Int64(1)) AS count(*) AS cnt
      Filter: count(Int64(1)) >= Int64(10)
        Aggregate: groupBy=[[a.ca_state]], aggr=[[count(Int64(1))]]
          Filter: a.ca_address_sk = c.c_current_addr_sk AND c.c_customer_sk = s.ss_customer_sk AND s.ss_sold_date_sk = d.d_date_sk AND s.ss_item_sk = i.i_item_sk AND d.d_month_seq = (<subquery>) AND i.i_current_price > Float64(1.2) * (<subquery>)
            Subquery:
              Distinct:
                Projection: date_dim.d_month_seq
                  Filter: date_dim.d_year = Int64(2000) AND date_dim.d_moy = Int64(2)
                    TableScan: date_dim
            Subquery:
              Projection: avg(j.i_current_price)
                Aggregate: groupBy=[[]], aggr=[[avg(j.i_current_price)]]
                  Filter: j.i_category = outer_ref(i.i_category)
                    SubqueryAlias: j
                      TableScan: item
            Cross Join:
              Cross Join:
                Cross Join:
                  Cross Join:
                    SubqueryAlias: a
                      TableScan: customer_address
                    SubqueryAlias: c
                      TableScan: customer
                  SubqueryAlias: s
                    TableScan: store_sales
                SubqueryAlias: d
                  TableScan: date_dim
              SubqueryAlias: i
                TableScan: item

=== Optimized logical plan ===
Sort: cnt ASC NULLS LAST, state ASC NULLS LAST, fetch=100
  Projection: a.ca_state AS state, count(Int64(1)) AS count(*) AS cnt
    Filter: count(Int64(1)) >= Int64(10)
      Aggregate: groupBy=[[a.ca_state]], aggr=[[count(Int64(1))]]
        Projection: a.ca_state
          Filter: CAST(i.i_current_price AS Decimal128(30, 15)) > CAST(Float64(1.2) * __scalar_sq_1.avg(j.i_current_price) AS Decimal128(30, 15))
            Projection: a.ca_state, i.i_current_price, __scalar_sq_1.avg(j.i_current_price)
              Left Join: i.i_category = __scalar_sq_1.i_category
                Projection: a.ca_state, i.i_current_price, i.i_category
                  Inner Join: s.ss_item_sk = i.i_item_sk
                    Projection: a.ca_state, s.ss_item_sk
                      Inner Join: s.ss_sold_date_sk = CAST(d.d_date_sk AS Float64)
                        Projection: a.ca_state, s.ss_sold_date_sk, s.ss_item_sk
                          Inner Join: CAST(c.c_customer_sk AS Float64) = s.ss_customer_sk
                            Projection: a.ca_state, c.c_customer_sk
                              Inner Join: a.ca_address_sk = c.c_current_addr_sk
                                SubqueryAlias: a
                                  TableScan: customer_address projection=[ca_address_sk, ca_state]
                                SubqueryAlias: c
                                  TableScan: customer projection=[c_customer_sk, c_current_addr_sk]
                            SubqueryAlias: s
                              TableScan: store_sales projection=[ss_sold_date_sk, ss_item_sk, ss_customer_sk]
                        SubqueryAlias: d
                          Projection: date_dim.d_date_sk
                            Filter: date_dim.d_month_seq = (<subquery>)
                              Subquery:
                                Aggregate: groupBy=[[date_dim.d_month_seq]], aggr=[[]]
                                  Projection: date_dim.d_month_seq
                                    Filter: date_dim.d_year = Int32(2000) AND date_dim.d_moy = Int32(2)
                                      TableScan: date_dim projection=[d_month_seq, d_year, d_moy], partial_filters=[date_dim.d_year = Int32(2000), date_dim.d_moy = Int32(2)]
                              TableScan: date_dim projection=[d_date_sk, d_month_seq], partial_filters=[date_dim.d_month_seq = (<subquery>)]
                                Subquery:
                                  Aggregate: groupBy=[[date_dim.d_month_seq]], aggr=[[]]
                                    Projection: date_dim.d_month_seq
                                      Filter: date_dim.d_year = Int32(2000) AND date_dim.d_moy = Int32(2)
                                        TableScan: date_dim projection=[d_month_seq, d_year, d_moy], partial_filters=[date_dim.d_year = Int32(2000), date_dim.d_moy = Int32(2)]
                    SubqueryAlias: i
                      TableScan: item projection=[i_item_sk, i_current_price, i_category]
                SubqueryAlias: __scalar_sq_1
                  Projection: CAST(avg(j.i_current_price) AS Float64), j.i_category
                    Aggregate: groupBy=[[j.i_category]], aggr=[[avg(j.i_current_price)]]
                      SubqueryAlias: j
                        TableScan: item projection=[i_current_price, i_category]

It looks like the difference is that we're able to push the subquery predicate on d_month_seq down and apply it directly to the scan on date_dim, which we aren't able to do when we evaluate the subquery as a cross-join. Would be nice to keep this optimization whatever approach we end up adopting for subquery evaluation -- without thinking about it too hard, I think we should be able to do the same optimization for single-join predicates as well.

@Dandandan
Copy link
Copy Markdown
Contributor

Dandandan commented Apr 1, 2026

          let stream = self.right.execute(partition, Arc::clone(&context))?;

          // ...

          let left_fut = self.left_fut.try_once(|| {
              let left_stream = self.left.execute(0, context)?;

              Ok(load_left_input(
                  left_stream,
                  join_metrics.clone(),
                  reservation,
              ))
          })?;

          Ok(Box::pin(CrossJoinStream {...}))

In principle self.right.execute only builds the stream - it shouldn't do any "actual" work, only the setup - I would be surprised if this was giving much of a difference.

The `CrossJoinStream will drive the execution of the stream (and only can do so when the left side is loaded.

@neilconway
Copy link
Copy Markdown
Contributor Author

In principle self.right.execute only builds the stream - it shouldn't do any "actual" work, only the setup

Is that true in practice? e.g.,

  • CoalescePartitionsExec::execute() -> builder.run_input() -> self.inner.spawn(...)
  • SortPreservingMergeExec::execute() -> spawn_buffered()

Both plan nodes are used by TPC-DS Q24, as one example of a place where we saw a slowdown w/o adding additional overlapping in ScalarSubqueryExec.

@Dandandan
Copy link
Copy Markdown
Contributor

In principle self.right.execute only builds the stream - it shouldn't do any "actual" work, only the setup

Is that true in practice? e.g.,

  • CoalescePartitionsExec::execute() -> builder.run_input() -> self.inner.spawn(...)
  • SortPreservingMergeExec::execute() -> spawn_buffered()

Both plan nodes are used by TPC-DS Q24, as one example of a place where we saw a slowdown w/o adding additional overlapping in ScalarSubqueryExec.

You are probably right in those cases it is doing potentially many things in parallel.

But I think this is not what we want ideally - we want to run few independent pipelines as possible, and get (data) parallelism from the individual pipelines rather than executing all at the same time.

@neilconway
Copy link
Copy Markdown
Contributor Author

But I think this is not what we want ideally - we want to run few independent pipelines as possible, and get (data) parallelism from the individual pipelines rather than executing all at the same time.

I don't disagree 😊 But for the purposes of this PR, we will regress performance on some benchmark queries if we don't do some additional work to get the same degree of overlapping that the cross-join path gets today. Is that something we're okay with?

I don't think the additional complexity to overlap subquery evaluation with main query evaluation is too bad (via WaitForSubqueryExec), but if we're going to land morsel-driven parallelism soon-ish (🎉🎉🎉), maybe that will solve this problem in a cleaner / more general way and we can keep the subquery eval stuff simpler. Let me know what you think @Dandandan

@Dandandan
Copy link
Copy Markdown
Contributor

Dandandan commented Apr 2, 2026

I don't disagree 😊 But for the purposes of this PR, we will regress performance on some benchmark queries if we don't do some additional work to get the same degree of overlapping that the cross-join path gets today. Is that something we're okay with?

Yeah I think that's okay, as far as we don't regress on memory usage too much I think we should be ok!

@alamb we should consider (reducing) parallelism/implicit buffering from CoalescePartitionsExec / SortPreservingMergeExec once we land morsel-driven scanning.

@Dandandan
Copy link
Copy Markdown
Contributor

Dandandan commented Apr 2, 2026

I am evaluating the impact of lazy CoalescePartitionsExec / SortPreservingMergeExec in:

#21326
#21328

@Dandandan
Copy link
Copy Markdown
Contributor

Seems that mainly CoalescePartitions is helpful for TPC-DS SF=1 (and slightly for TPCH).
#21326 (comment)

That benchmark has very limited parallelism (because of single-rowgroup tables).

SortPreservingMergeExec doesn't seem to do much so far as I can see (which I think makes sense as it will be mostly used in the root.

@neilconway
Copy link
Copy Markdown
Contributor Author

@Dandandan Based on discussion, I won't plan to implement the work to overlap main query and subquery evaluation for this PR. What do you think makes sense as a next step?

We could wait to merge this PR until the morsel-driven parallelism work lands (so we can check that morsel-driven parallelism effectively recovers the parallelism that we'll lose from the simple approach in this PR), or land them separately and just make sure we verify that overall performance hasn't regressed before we ship 54. wdyt?

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 3, 2026

Any chance we can break this PR into smaller pieces (e.g. move benchmarks, for example) to make it easier to review?

@neilconway
Copy link
Copy Markdown
Contributor Author

Any chance we can break this PR into smaller pieces (e.g. move benchmarks, for example) to make it easier to review?

Hmmm, that might be a bit tricky. The benchmarks are pretty trivial and could easily be omitted. Here's how Claude summarizes the PR:

  1. ScalarSubqueryExec execution plan — A new physical plan node that wraps a main input plan and a set of subquery plans.
  2. ScalarSubqueryExpr physical expression — A PhysicalExpr that reads a scalar value from a shared OnceLock-based results container, populated by ScalarSubqueryExec.
  3. ExecutionProps as the bridge — Carries subquery_indexes (mapping logical Subquery → result slot) and subquery_results (the shared OnceLock container) so that create_physical_expr can convert Expr::ScalarSubquery into ScalarSubqueryExpr.
  4. Physical planner integration — create_initial_plan collects uncorrelated scalar subqueries at each plan level, plans them, allocates a shared results container, and wraps the main plan in ScalarSubqueryExec.
  5. scalar_subquery_to_join scoped to correlated subqueries only — Uncorrelated scalar subqueries are no longer rewritten to joins by the optimizer; they flow through to the physical planner instead.
  6. Protobuf serialization — Round-trip serde support for the new plan nodes and expressions.
  7. Tree traversal helpers — LogicalPlan::map_uncorrelated_subqueries and Expr::contains_scalar_subquery.
  8. Benchmarks (trivial, would be fine to omit)
  9. Tests and updates to expected query plans

If it is helpful, I could prepare two PRs that have a split like:

  1. ScalarSubqueryExec, ScalarSubqueryExpr, ExecutionProps change, protobuf serialization, benchmarks (or omit them)
  2. Planner and optimizer changes, tree traversal helpers, test updates

If you think that would be easier to review, lmk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate development-process Related to development process of DataFusion logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve performance of array_has Implement physical execution of uncorrelated scalar subqueries

6 participants