Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4a3e553
Initial implementation
neilconway Feb 20, 2026
6b4f5c0
cargo fmt
neilconway Mar 29, 2026
1412ab1
Properly wait for subquery exec to complete before exec'ing main input
neilconway Mar 29, 2026
cedfa5c
Better fix for async exec issue
neilconway Mar 29, 2026
d80569f
Fix doc lint error
neilconway Mar 29, 2026
9f606fb
Implement logical plan serialization/deserialization for subqueries
neilconway Mar 30, 2026
b07491b
cargo fmt
neilconway Mar 30, 2026
27a1ac2
Refactor logical plan deserialization
neilconway Mar 30, 2026
bce0a6d
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 30, 2026
7071001
Increase large files size check
neilconway Mar 30, 2026
b9bce91
fix clippy
neilconway Mar 30, 2026
7c965aa
Update expected TPC-H plans
neilconway Mar 30, 2026
09f167a
Implement statistics
neilconway Mar 30, 2026
54a9f79
Tweak comments
neilconway Mar 30, 2026
b979e3d
Merge branch 'main' into neilc/scalar-subquery-expr
neilconway Mar 30, 2026
2c256e7
Ensure projection pushdown works inside uncorrelated subqueries
neilconway Mar 30, 2026
99d9bcf
Update expected plans
neilconway Mar 30, 2026
9a11d62
Fix overlooked cases for projection pushdown
neilconway Mar 31, 2026
9b217ca
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 31, 2026
5aef67e
Fix line numbers in expected EXPLAIN
neilconway Mar 31, 2026
3d0b99f
Evaluate subqueries in parallel
neilconway Mar 31, 2026
f99ded5
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
b02abf8
Don't try to use subquery filters for partition pruning
neilconway Apr 2, 2026
3971312
Raise an error if duplicate subquery eval is detected
neilconway Apr 2, 2026
64e9f34
cargo fmt
neilconway Apr 2, 2026
26d8acb
Update expected plan
neilconway Apr 2, 2026
d2af491
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
f9c9d5d
Remove unnecessary IN/EXISTS serialization code
neilconway Apr 3, 2026
92e6054
Code cleanup
neilconway Apr 3, 2026
6857966
Code cleanup
neilconway Apr 3, 2026
6a4f524
Code cleanup and refactoring
neilconway Apr 3, 2026
7adb788
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 3, 2026
670139c
Updates for plan API changes
neilconway Apr 3, 2026
1239e3a
Fix doc build
neilconway Apr 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/large_files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
fetch-depth: 0
- name: Check size of new Git objects
env:
# 1 MB ought to be enough for anybody.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to up the limit? this repo gets checked out a lot

What is so large that required increasing to 2MB?

# 2 MB ought to be enough for anybody.
# TODO in case we may want to consciously commit a bigger file to the repo without using Git LFS we may disable the check e.g. with a label
MAX_FILE_SIZE_BYTES: 1048576
MAX_FILE_SIZE_BYTES: 2097152
shell: bash
run: |
if [ "${{ github.event_name }}" = "merge_group" ]; then
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,7 @@ required-features = ["parquet"]
[[bench]]
harness = false
name = "reset_plan_states"

[[bench]]
harness = false
name = "scalar_subquery_sql"
118 changes: 118 additions & 0 deletions datafusion/core/benches/scalar_subquery_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmarks for uncorrelated scalar subquery evaluation.
//!
//! Measures the overhead of subquery execution machinery by using simple
//! arithmetic and comparison operators that don't have specialized scalar
//! fast paths, keeping the comparison between the old (join-based) and new
//! (ScalarSubqueryExec-based) approaches apples-to-apples.

use arrow::array::Int64Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use std::hint::black_box;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.sql(sql)).unwrap();
black_box(rt.block_on(df.collect()).unwrap());
}

fn create_context(num_rows: usize) -> Result<SessionContext> {
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));

let batch_size = 4096;
let batches = (0..num_rows / batch_size)
.map(|i| {
let values: Vec<i64> =
((i * batch_size) as i64..((i + 1) * batch_size) as i64).collect();
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(values))])
.unwrap()
})
.collect::<Vec<_>>();

// Small lookup table for the subquery to read from.
let sq_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let sq_batch = RecordBatch::try_new(
sq_schema.clone(),
vec![Arc::new(Int64Array::from(vec![10, 20, 30]))],
)?;

let ctx = SessionContext::new();
ctx.register_table(
"main_t",
Arc::new(MemTable::try_new(schema, vec![batches])?),
)?;
ctx.register_table(
"lookup",
Arc::new(MemTable::try_new(sq_schema, vec![vec![sq_batch]])?),
)?;

Ok(ctx)
}

fn criterion_benchmark(c: &mut Criterion) {
let num_rows = 1_048_576; // 2^20
let rt = Runtime::new().unwrap();

// Scalar subquery in a filter (WHERE clause).
c.bench_function("scalar_subquery_filter", |b| {
let ctx = create_context(num_rows).unwrap();
b.iter(|| {
query(
&ctx,
&rt,
"SELECT x FROM main_t WHERE x > (SELECT max(v) FROM lookup)",
)
})
});

// Scalar subquery in a projection (SELECT expression).
c.bench_function("scalar_subquery_projection", |b| {
let ctx = create_context(num_rows).unwrap();
b.iter(|| {
query(
&ctx,
&rt,
"SELECT x + (SELECT max(v) FROM lookup) AS y FROM main_t",
)
})
});

// Two scalar subqueries in one query.
c.bench_function("scalar_subquery_two_subqueries", |b| {
let ctx = create_context(num_rows).unwrap();
b.iter(|| {
query(
&ctx,
&rt,
"SELECT x FROM main_t \
WHERE x > (SELECT min(v) FROM lookup) \
AND x < (SELECT max(v) FROM lookup) + 1000000",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading
Loading