diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index 465a0ee492d..f38bfca2d5f 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -18,6 +18,7 @@ use vortex_bench::datasets::nested_structs; use vortex_bench::datasets::taxi_data; use vortex_bench::idempotent_async; use vortex_bench::random_access::RandomAccessor; +use vortex_bench::random_access::RandomAccessorRet; use vortex_bench::random_access::data_path; /// Convert a parquet file to lance format. @@ -112,8 +113,9 @@ impl RandomAccessor for LanceRandomAccessor { &self.name } - async fn take(&self, indices: &[u64]) -> anyhow::Result { - let result = self.dataset.take(indices, self.projection.clone()).await?; - Ok(result.num_rows()) + async fn take(&self, indices: &[u64]) -> anyhow::Result { + Ok(RandomAccessorRet::RecordBatch( + self.dataset.take(indices, self.projection.clone()).await?, + )) } } diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 38fd2ee0548..fe4e545feff 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -247,7 +247,7 @@ async fn benchmark_random_access( loop { let start = Instant::now(); - let _row_count = accessor.take(indices).await?; + let _arr = accessor.take(indices).await?; runs.push(start.elapsed()); if overall_start.elapsed() >= time_limit { diff --git a/vortex-bench/src/random_access/mod.rs b/vortex-bench/src/random_access/mod.rs index 5421ebb01b5..3b9f95f5eaf 100644 --- a/vortex-bench/src/random_access/mod.rs +++ b/vortex-bench/src/random_access/mod.rs @@ -4,7 +4,9 @@ use std::path::PathBuf; use anyhow::Result; +use arrow_array::RecordBatch; use async_trait::async_trait; +use vortex::array::ArrayRef; use crate::Format; @@ -41,6 +43,11 @@ pub trait BenchDataset: Send + Sync { async fn path(&self, format: Format) -> Result; } +pub enum RandomAccessorRet { + RecordBatch(RecordBatch), + ArrayRef(ArrayRef), +} + /// Trait for format-specific random access (take) operations. /// /// Implementations handle reading specific rows by index from a data source. @@ -53,6 +60,6 @@ pub trait RandomAccessor: Send + Sync { /// The format this accessor handles. fn format(&self) -> Format; - /// Take rows at the given indices, returning the number of rows read. - async fn take(&self, indices: &[u64]) -> Result; + /// Take rows at the given indices, returning the handle. + async fn take(&self, indices: &[u64]) -> Result; } diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 13358fb9531..147a8cf347e 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -30,6 +30,7 @@ use vortex::utils::aliases::hash_map::HashMap; use crate::Format; use crate::SESSION; use crate::random_access::RandomAccessor; +use crate::random_access::RandomAccessorRet; /// Random accessor for Vortex format files. /// @@ -66,7 +67,7 @@ impl RandomAccessor for VortexRandomAccessor { &self.name } - async fn take(&self, indices: &[u64]) -> anyhow::Result { + async fn take(&self, indices: &[u64]) -> anyhow::Result { let indices_buf: Buffer = Buffer::from(indices.to_vec()); let array = self .file @@ -79,7 +80,7 @@ impl RandomAccessor for VortexRandomAccessor { // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. let mut ctx = SESSION.create_execution_ctx(); let canonical = array.execute::(&mut ctx)?.into_array(); - Ok(canonical.len()) + Ok(RandomAccessorRet::ArrayRef(canonical)) } } @@ -137,7 +138,7 @@ impl RandomAccessor for ParquetRandomAccessor { &self.name } - async fn take(&self, indices: &[u64]) -> anyhow::Result { + async fn take(&self, indices: &[u64]) -> anyhow::Result { // Map indices to row groups. let mut row_groups = HashMap::new(); for &idx in indices { @@ -181,6 +182,6 @@ impl RandomAccessor for ParquetRandomAccessor { .await; let result = concat_batches(&schema, &batches)?; - Ok(result.num_rows()) + Ok(RandomAccessorRet::RecordBatch(result)) } }