-
Notifications
You must be signed in to change notification settings - Fork 135
Use Scan API #6391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Use Scan API #6391
Changes from all commits
Commits
Show all changes
88 commits
Select commit
Hold shift + click to select a range
470fbe9
Scan API
gatesn f965615
Scan API
gatesn ae39e1a
Scan API
gatesn f41ccff
Scan API
gatesn 23510e4
Scan API
gatesn e925792
Scan API
gatesn f8a7543
Scan API
gatesn d05997f
Scan API
gatesn 5f6123a
Scan API
gatesn c5e60a4
Scan API
gatesn 2d6c816
DataFusion streaming
gatesn 5a7e047
DataFusion streaming
gatesn 07cb0d3
DataFusion streaming
gatesn b19d03b
DataFusion streaming
gatesn 899675e
Merge branch 'develop' into ngates/scan-api
gatesn 811acd5
DataFusion streaming
gatesn bbbaa62
DataFusion streaming
gatesn 717ebab
DataFusion streaming
gatesn 0a9b542
DataFusion streaming
gatesn 25cac35
DataFusion streaming
gatesn 367baaa
DataFusion streaming
gatesn 3b7d0ae
DataFusion streaming
gatesn aa46b99
Merge branch 'develop' into ngates/scan-api
gatesn ac06b32
DataFusion streaming
gatesn 270b242
DataFusion streaming
gatesn 6aa5a0b
DataFusion streaming
gatesn 7aaf8b6
DataFusion streaming
gatesn 669cedb
Scan API
gatesn 392f263
Scan API
gatesn a45bf71
Scan API
gatesn 4aff18a
Scan API
gatesn fa7a431
Scan API
gatesn fe6fbec
Scan API
gatesn ccc13c3
Scan API
gatesn 07562d6
Scan API
gatesn 7737131
Scan API
gatesn b6e1142
Scan API
gatesn 35eeec7
Configure DuckDB threads
gatesn 1da2a28
Configure DuckDB threads
gatesn c05641b
Configure DuckDB threads
gatesn 5d59027
Configure DuckDB threads
gatesn d2aa4fb
Configure DuckDB threads
gatesn e1330dd
Configure DuckDB threads
gatesn d396f02
Configure DuckDB threads
gatesn 9e54416
Configure DuckDB threads
gatesn 8b2ca17
Configure DuckDB threads
gatesn 68ec238
Configure DuckDB threads
gatesn d7ea77b
Configure DuckDB threads
gatesn 6f030fe
Configure DuckDB threads
gatesn dfb275c
Configure DuckDB threads
gatesn f87b42a
Configure DuckDB threads
gatesn 34cff37
merge
gatesn 978ec48
merge
gatesn f28bd1a
merge
gatesn 9f671fe
merge
gatesn 5db5de4
merge
gatesn 92b5910
merge
gatesn ac962ac
merge
gatesn a8384a5
merge
gatesn 61d4b6a
merge
gatesn 1f849ce
merge
gatesn 9c82df8
merge
gatesn fa37371
merge
gatesn e658d6c
merge
gatesn 826216d
Merge branch 'develop' into ngates/scan-api
gatesn 2acb89f
lifetimes
gatesn 847075f
lifetimes
gatesn 0e639e2
lifetimes
gatesn d24bd64
lifetimes
gatesn 04da834
lifetimes
gatesn d856d3d
lifetimes
gatesn 00ba860
lifetimes
gatesn e9dd877
Disable Scan API from benchmarks prior to merge
gatesn e3dcd2b
Merge branch 'develop' into ngates/scan-api
gatesn 04f88c0
Address comments
gatesn 6e7390b
Address comments
gatesn 12834a8
Merge branch 'develop' into ngates/scan-api
gatesn 42b9a80
Address comments
gatesn 4597c58
merge
gatesn 5900381
merge
gatesn eab7dbc
merge
gatesn eb9cd44
merge
gatesn aa8dc05
merge
gatesn 60a4770
Support including all trait default functions
gatesn d27386e
Support including all trait default functions
gatesn c779e8b
Support including all trait default functions
gatesn e8a4cc7
Support including all trait default functions
gatesn 5bd3214
Support including all trait default functions
gatesn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,13 +26,15 @@ use datafusion_physical_plan::collect; | |
| use futures::StreamExt; | ||
| use parking_lot::Mutex; | ||
| use tokio::fs::File; | ||
| use vortex::scan::api::DataSourceRef; | ||
| use vortex_bench::Benchmark; | ||
| use vortex_bench::BenchmarkArg; | ||
| use vortex_bench::CompactionStrategy; | ||
| use vortex_bench::Engine; | ||
| use vortex_bench::Format; | ||
| use vortex_bench::Opt; | ||
| use vortex_bench::Opts; | ||
| use vortex_bench::SESSION; | ||
| use vortex_bench::conversions::convert_parquet_directory_to_vortex; | ||
| use vortex_bench::create_benchmark; | ||
| use vortex_bench::create_output_writer; | ||
|
|
@@ -220,13 +222,20 @@ async fn main() -> anyhow::Result<()> { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn use_scan_api() -> bool { | ||
| std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") | ||
| } | ||
|
|
||
| async fn register_benchmark_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
| benchmark: &B, | ||
| format: Format, | ||
| ) -> anyhow::Result<()> { | ||
| match format { | ||
| Format::Arrow => register_arrow_tables(session, benchmark).await, | ||
| _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { | ||
| register_v2_tables(session, benchmark, format).await | ||
| } | ||
| _ => { | ||
| let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; | ||
| let file_format = format_to_df_format(format); | ||
|
|
@@ -265,6 +274,54 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>( | |
| } | ||
| } | ||
|
|
||
| /// Register tables using the V2 `VortexTable` + `MultiFileDataSource` path. | ||
| async fn register_v2_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
| benchmark: &B, | ||
| format: Format, | ||
| ) -> anyhow::Result<()> { | ||
| use vortex::file::multi::MultiFileDataSource; | ||
| use vortex::io::object_store::ObjectStoreFileSystem; | ||
| use vortex::io::session::RuntimeSessionExt; | ||
| use vortex::scan::api::DataSource as _; | ||
| use vortex_datafusion::v2::VortexTable; | ||
|
Comment on lines
+283
to
+287
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to top? also - lets log here so its visible which API is used |
||
|
|
||
| let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; | ||
|
|
||
| for table in benchmark.table_specs().iter() { | ||
| let pattern = benchmark.pattern(table.name, format); | ||
| let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern.clone())?; | ||
| let store = session | ||
| .state() | ||
| .runtime_env() | ||
| .object_store(table_url.object_store())?; | ||
|
|
||
| let fs: vortex::io::filesystem::FileSystemRef = | ||
| Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle())); | ||
| let base_prefix = benchmark_base.path().trim_start_matches('/').to_string(); | ||
| let fs = fs.with_prefix(base_prefix); | ||
|
|
||
| let glob_pattern = match &pattern { | ||
| Some(p) => p.as_str().to_string(), | ||
| None => format!("*.{}", format.ext()), | ||
| }; | ||
|
|
||
| let multi_ds = MultiFileDataSource::new(SESSION.clone()) | ||
| .with_filesystem(fs) | ||
| .with_glob(glob_pattern) | ||
| .build() | ||
| .await?; | ||
|
|
||
| let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?); | ||
| let data_source: DataSourceRef = Arc::new(multi_ds); | ||
|
|
||
| let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema)); | ||
| session.register_table(table.name, table_provider)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Load Arrow IPC files into in-memory DataFusion tables. | ||
| async fn register_arrow_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.