From 4b067610eebab1d5b0ca77587fc951f704e54b6a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 13 Feb 2026 20:19:59 -0500 Subject: [PATCH] Configure DuckDB threads Signed-off-by: Nicholas Gates --- benchmarks/duckdb-bench/src/lib.rs | 14 ++---- vortex-duckdb/src/lib.rs | 73 ------------------------------ vortex-duckdb/src/scan.rs | 31 ++----------- 3 files changed, 9 insertions(+), 109 deletions(-) diff --git a/benchmarks/duckdb-bench/src/lib.rs b/benchmarks/duckdb-bench/src/lib.rs index ec719b432cc..f6da732aef4 100644 --- a/benchmarks/duckdb-bench/src/lib.rs +++ b/benchmarks/duckdb-bench/src/lib.rs @@ -17,7 +17,6 @@ use vortex_bench::generate_duckdb_registration_sql; use vortex_duckdb::duckdb::Config; use vortex_duckdb::duckdb::Connection; use vortex_duckdb::duckdb::Database; -use vortex_duckdb::register_extension_options; /// DuckDB context for benchmarks. pub struct DuckClient { @@ -67,10 +66,12 @@ impl DuckClient { path: Option, threads: Option, ) -> Result<(Database, Connection)> { - let config = Config::new().vortex_expect("failed to create duckdb config"); + let mut config = Config::new().vortex_expect("failed to create duckdb config"); - // Register Vortex extension options before creating connection - register_extension_options(&config); + // Set DuckDB thread count if specified + if let Some(thread_count) = threads { + config.set("threads", &format!("{}", thread_count))?; + } let db = match path { Some(path) => Database::open_with_config(path, config), @@ -91,11 +92,6 @@ impl DuckClient { // parquet_metadata_cache" when running DuckDB in debug mode. connection.query("SET parquet_metadata_cache = true")?; - // Set vortex_max_threads if specified - if let Some(thread_count) = threads { - connection.query(&format!("SET vortex_max_threads = {}", thread_count))?; - } - Ok((db, connection)) } diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 9f85fae772c..c5ccf235903 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -16,7 +16,6 @@ use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; use crate::copy::VortexCopyFunction; -use crate::duckdb::Config; pub use crate::duckdb::Connection; pub use crate::duckdb::Database; pub use crate::duckdb::LogicalType; @@ -44,36 +43,6 @@ static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRunt static SESSION: LazyLock = LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); -/// Register Vortex extension configuration options with DuckDB. -/// This must be called before `register_table_functions` to take effect. -pub fn register_extension_options(config: &Config) { - let logical_type = LogicalType::uint64(); - - let default_threads = std::thread::available_parallelism() - .map(|n| n.get() as u64) - .unwrap_or(1); - let default_value = Value::from(default_threads); - - // Register the vortex_max_threads extension option - // SAFETY: We're passing valid pointers for database, logical_type, and default_value - // The C++ code will copy the LogicalType and Value, so we can safely drop them after this call - let result = unsafe { - cpp::duckdb_vx_add_extension_option( - config.as_ptr(), - c"vortex_max_threads".as_ptr(), - c"Maximum number of threads for Vortex table scans".as_ptr(), - logical_type.as_ptr(), - default_value.as_ptr(), - ) - }; - - assert_eq!( - result, - cpp::duckdb_state::DuckDBSuccess, - "Failed to register vortex_max_threads extension option" - ); -} - /// Initialize the Vortex extension by registering the extension functions. /// Note: This also registers extension options. If you want to register options /// separately (e.g., before creating connections), call `register_extension_options` first. @@ -124,45 +93,3 @@ pub extern "C" fn vortex_extension_version_rust() -> *const c_char { } .as_ptr() } - -#[cfg(test)] -mod tests { - use std::ffi::CString; - - use super::*; - use crate::duckdb::Config; - use crate::duckdb::Database; - - #[test] - fn test_vortex_max_threads_option_registration() { - let config = Config::new().expect("Failed to create config"); - register_extension_options(&config); - let db = Database::open_in_memory_with_config(config).expect("Failed to open database"); - - let conn = db.connect().expect("Failed to connect"); - - let _result1 = conn - .query("SET vortex_max_threads = 4") - .expect("Failed to set vortex_max_threads - option may not be registered"); - - let max_threads_cstr = CString::new("vortex_max_threads").unwrap(); - let ctx = conn.client_context().vortex_expect("ctx exists"); - assert_eq!( - ctx.try_get_current_setting(&max_threads_cstr) - .unwrap() - .to_string(), - "4" - ); - - let _result2 = conn - .query("SET vortex_max_threads = 8") - .expect("Failed to set vortex_max_threads to 8"); - - assert_eq!( - ctx.try_get_current_setting(&max_threads_cstr) - .unwrap() - .to_string(), - "8" - ); - } -} diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index d0b14a5063e..6108fd5fcf2 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::cmp::max; -use std::ffi::CString; use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; @@ -62,7 +61,6 @@ use crate::duckdb::BindResult; use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; -use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; @@ -79,7 +77,6 @@ pub struct VortexBindData { file_urls: Vec, column_names: Vec, column_types: Vec, - max_threads: u64, } impl Clone for VortexBindData { @@ -92,7 +89,6 @@ impl Clone for VortexBindData { file_urls: self.file_urls.clone(), column_names: self.column_names.clone(), column_types: self.column_types.clone(), - max_threads: self.max_threads, } } } @@ -260,24 +256,6 @@ impl TableFunction for VortexTableFunction { .get_parameter(0) .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; - // Read the vortex_max_threads setting from DuckDB configuration - let max_threads_cstr = CString::new("vortex_max_threads") - .map_err(|e| vortex_err!("Invalid setting name: {}", e))?; - let max_threads = ctx - .try_get_current_setting(&max_threads_cstr) - .and_then(|v| match v.as_ref().extract() { - ExtractedValue::UBigInt(val) => usize::try_from(val).ok(), - ExtractedValue::BigInt(val) if val > 0 => usize::try_from(val as u64).ok(), - _ => None, - }) - .unwrap_or_else(|| { - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - }); - - tracing::trace!("running scan with max_threads {max_threads}"); - let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( file_glob_string.as_ref().as_string(), )))?; @@ -309,7 +287,6 @@ impl TableFunction for VortexTableFunction { filter_exprs: vec![], column_names, column_types, - max_threads: max_threads as u64, }) } @@ -389,13 +366,13 @@ impl TableFunction for VortexTableFunction { .map_or_else(|| "true".to_string(), |f| f.to_string()) ); - // Use the max_threads from bind_data (read from vortex_max_threads setting) - #[expect(clippy::cast_possible_truncation, reason = "max_threads fits in usize")] - let num_workers = bind_data.max_threads as usize; - let client_context = init_input.client_context()?; let object_cache = client_context.object_cache(); + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + let handle = RUNTIME.handle(); let first_file = bind_data.first_file.clone(); let scan_streams = stream::iter(bind_data.file_urls.clone())