From 5e8fb2d9899e308619fa451f5ba2fb8fbc2ea9ab Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 15 Apr 2026 14:46:41 -0400 Subject: [PATCH] add vector-search-bench crate Signed-off-by: Connor Tsui --- Cargo.lock | 24 ++ Cargo.toml | 1 + benchmarks/vector-search-bench/Cargo.toml | 40 +++ .../vector-search-bench/src/compression.rs | 103 ++++++++ .../vector-search-bench/src/expression.rs | 97 ++++++++ benchmarks/vector-search-bench/src/ingest.rs | 218 ++++++++++++++++ benchmarks/vector-search-bench/src/lib.rs | 25 ++ benchmarks/vector-search-bench/src/prepare.rs | 233 ++++++++++++++++++ 8 files changed, 741 insertions(+) create mode 100644 benchmarks/vector-search-bench/Cargo.toml create mode 100644 benchmarks/vector-search-bench/src/compression.rs create mode 100644 benchmarks/vector-search-bench/src/expression.rs create mode 100644 benchmarks/vector-search-bench/src/ingest.rs create mode 100644 benchmarks/vector-search-bench/src/lib.rs create mode 100644 benchmarks/vector-search-bench/src/prepare.rs diff --git a/Cargo.lock b/Cargo.lock index b746598d3b1..deacfaf7977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10042,6 +10042,30 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vector-search-bench" +version = "0.1.0" +dependencies = [ + "anyhow", + "arrow-array 58.0.0", + "arrow-buffer 58.0.0", + "arrow-schema 58.0.0", + "clap", + "futures", + "indicatif", + "parquet 58.0.0", + "rand 0.10.1", + "serde", + "tabled", + "tempfile", + "tokio", + "tracing", + "vortex", + "vortex-bench", + "vortex-btrblocks", + "vortex-tensor", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index a6429bf04e6..1ed524d1c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ members = [ "benchmarks/datafusion-bench", "benchmarks/duckdb-bench", "benchmarks/random-access-bench", + "benchmarks/vector-search-bench", ] exclude = ["java/testfiles", "wasm-test"] resolver = "2" diff --git a/benchmarks/vector-search-bench/Cargo.toml b/benchmarks/vector-search-bench/Cargo.toml new file mode 100644 index 00000000000..126b62d5e2d --- /dev/null +++ b/benchmarks/vector-search-bench/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "vector-search-bench" +description = "Vector similarity search benchmarks for Vortex on public embedding datasets" +authors.workspace = true +categories.workspace = true +edition.workspace = true +homepage.workspace = true +include.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +publish = false + +[dependencies] +anyhow = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } +clap = { workspace = true, features = ["derive"] } +futures = { workspace = true } +indicatif = { workspace = true } +parquet = { workspace = true, features = ["async"] } +rand = { workspace = true } +serde = { workspace = true, features = ["derive"] } +tabled = { workspace = true, features = ["std"] } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +vortex = { workspace = true, features = ["files", "tokio", "unstable_encodings"] } +vortex-bench = { workspace = true, features = ["unstable_encodings"] } +vortex-btrblocks = { workspace = true, features = ["unstable_encodings"] } +vortex-tensor = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } + +[lints] +workspace = true diff --git a/benchmarks/vector-search-bench/src/compression.rs b/benchmarks/vector-search-bench/src/compression.rs new file mode 100644 index 00000000000..6e2733cc1cd --- /dev/null +++ b/benchmarks/vector-search-bench/src/compression.rs @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Vector compression flavors exercised by the benchmark. +//! +//! Each [`VectorFlavor`] variant maps to a [`vortex::file::WriteStrategyBuilder`] configuration +//! applied to the same input data. +//! +//! The benchmark writes one `.vortex` file per flavor per data file, then scans them all with the +//! same query so the comparison is apples-to-apples with the Parquet files. +//! +//! Note that the handrolled `&[f32]` parquet baseline is **not** a flavor here. + +use clap::ValueEnum; +use vortex::array::ArrayId; +use vortex::array::scalar_fn::ScalarFnVTable; +use vortex::file::ALLOWED_ENCODINGS; +use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; +use vortex::file::WriteStrategyBuilder; +use vortex::session::VortexSession; +use vortex::utils::aliases::hash_set::HashSet; +use vortex_bench::Format; +use vortex_btrblocks::BtrBlocksCompressorBuilder; +use vortex_tensor::scalar_fns::l2_denorm::L2Denorm; +use vortex_tensor::scalar_fns::sorf_transform::SorfTransform; + +/// Every [`VectorFlavor`] variant in CLI-help order. +pub const ALL_VECTOR_FLAVORS: &[VectorFlavor] = + &[VectorFlavor::Uncompressed, VectorFlavor::TurboQuant]; + +/// One write-side compression configuration we measure. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ValueEnum)] +pub enum VectorFlavor { + /// `BtrBlocksCompressorBuilder::empty()` + #[clap(name = "vortex-uncompressed")] + Uncompressed, + /// `BtrBlocksCompressorBuilder::default().with_turboquant()`. + #[clap(name = "vortex-turboquant")] + TurboQuant, + // TODO(connor): We will want to add `Default` here which is just the default compressor. +} + +impl VectorFlavor { + /// Stable kebab-cased label used in CLI args and metric names. + pub fn label(&self) -> &'static str { + match self { + VectorFlavor::Uncompressed => "vortex-uncompressed", + VectorFlavor::TurboQuant => "vortex-turboquant", + } + } + + /// The `target.format` value emitted on measurements for this flavor. Both flavors produce + /// `.vortex` files, so the compression label carries the flavor split. + pub fn as_format(&self) -> Format { + match self { + VectorFlavor::Uncompressed => Format::OnDiskVortex, + VectorFlavor::TurboQuant => Format::OnDiskVortex, + } + } + + /// Subdirectory name under the per-dataset cache root used to store this flavor's `.vortex` + /// files. + pub fn dir_name(&self) -> &'static str { + match self { + VectorFlavor::Uncompressed => "vortex-uncompressed", + VectorFlavor::TurboQuant => "vortex-turboquant", + } + } + + /// Build the [`vortex::file::WriteStrategyBuilder`]-backed write options for this flavor. + /// + /// TurboQuant produces `L2Denorm(SorfTransform(...))` which the default file + /// `ALLOWED_ENCODINGS` set rejects on normalization — we extend the allow-list with the two + /// scalar-fn array IDs the scheme actually emits. + pub fn create_write_options(&self, session: &VortexSession) -> VortexWriteOptions { + let strategy = match self { + VectorFlavor::Uncompressed => { + let compressor = BtrBlocksCompressorBuilder::empty().build(); + + WriteStrategyBuilder::default() + .with_compressor(compressor) + .build() + } + VectorFlavor::TurboQuant => { + let compressor = BtrBlocksCompressorBuilder::default() + .with_turboquant() + .build(); + + let mut allowed: HashSet = ALLOWED_ENCODINGS.clone(); + allowed.insert(L2Denorm.id()); + allowed.insert(SorfTransform.id()); + + WriteStrategyBuilder::default() + .with_compressor(compressor) + .with_allow_encodings(allowed) + .build() + } + }; + + session.write_options().with_strategy(strategy) + } +} diff --git a/benchmarks/vector-search-bench/src/expression.rs b/benchmarks/vector-search-bench/src/expression.rs new file mode 100644 index 00000000000..b1a5b1ef1c3 --- /dev/null +++ b/benchmarks/vector-search-bench/src/expression.rs @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Cosine-similarity filter [`Expression`]s used by the file-scan path. +//! +//! We can easily build a cosine similarity filter by hand: +//! +//! ```text +//! gt( +//! cosine_similarity(col("emb"), lit(query_scalar)), +//! lit(threshold), +//! ) +//! ``` +//! +//! The query is wrapped as `Scalar::extension::(Scalar::fixed_size_list(F32, ...))` so +//! [`CosineSimilarity`] can treat it as a single-row `Vector` value during evaluation. +//! +//! At scan time the literal expands into a `ConstantArray` whose row count matches the chunk batch +//! size. + +use anyhow::Result; +use vortex::array::expr::Expression; +use vortex::array::expr::col; +use vortex::array::expr::gt; +use vortex::array::expr::lit; +use vortex::array::extension::EmptyMetadata; +use vortex::array::scalar::Scalar; +use vortex::array::scalar_fn::EmptyOptions; +use vortex::array::scalar_fn::ScalarFnVTableExt; +use vortex::dtype::DType; +use vortex::dtype::Nullability; +use vortex::dtype::PType; +use vortex_tensor::scalar_fns::cosine_similarity::CosineSimilarity; +use vortex_tensor::vector::Vector; + +/// Build the filter `cosine_similarity(emb, query) > threshold`. +pub fn similarity_filter(query: &[f32], threshold: f32) -> Result { + // Empty queries short-circuit to a literal `false`, so scans return no rows instead of trying + // to evaluate cosine similarity on a zero-dimensional vector. + if query.is_empty() { + return Ok(lit(false)); + } + + let query_lit = lit(query_scalar(query)?); + let cosine = CosineSimilarity.new_expr(EmptyOptions, [col("emb"), query_lit]); + Ok(gt(cosine, lit(threshold))) +} + +/// Wrap a query vector as `Scalar::extension::(Scalar::fixed_size_list(F32, ...))`. +pub fn query_scalar(query: &[f32]) -> Result { + let children: Vec = query + .iter() + .map(|&v| Scalar::primitive(v, Nullability::NonNullable)) + .collect(); + + let element_dtype = DType::Primitive(PType::F32, Nullability::NonNullable); + let fsl = Scalar::fixed_size_list(element_dtype, children, Nullability::NonNullable); + + Ok(Scalar::extension::(EmptyMetadata, fsl)) +} + +/// Project just the `emb` column. Used by the throughput-only scan path. +pub fn emb_projection() -> Expression { + col("emb") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn query_scalar_accepts_empty_query() { + let scalar = query_scalar(&[]).unwrap(); + match scalar.dtype() { + DType::Extension(_) => {} + other => panic!("expected Extension, got {other}"), + } + } + + #[test] + fn query_scalar_builds_extension_dtype() { + let scalar = query_scalar(&[1.0, 0.0, 0.0]).unwrap(); + match scalar.dtype() { + DType::Extension(_) => {} + other => panic!("expected Extension, got {other}"), + } + } + + #[test] + fn similarity_filter_uses_gt_operator() { + let expr = similarity_filter(&[1.0, 0.0, 0.0], 0.5).unwrap(); + // Quick sanity check: the printed form contains the operator and the threshold so + // future refactors that change the structure get caught here. + let printed = format!("{expr:?}"); + assert!(printed.contains("Gt") || printed.contains(">"), "{printed}"); + } +} diff --git a/benchmarks/vector-search-bench/src/ingest.rs b/benchmarks/vector-search-bench/src/ingest.rs new file mode 100644 index 00000000000..48904530170 --- /dev/null +++ b/benchmarks/vector-search-bench/src/ingest.rs @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Per-chunk ingest transform. +//! +//! Bridges the parquet record-batch stream and the Vortex file writer: +//! +//! 1. Project the `emb` column out of each struct chunk. +//! 2. Rewrap the `emb` column as `Extension>` via +//! [`vortex_bench::vector_dataset::list_to_vector_ext`]. +//! 3. Cast the FSL element buffer from `f64` -> `f32` if the source is `f64`. After this point all +//! downstream code (compression, scan, recall) is f32-only. +//! 4. Optionally project the `scalar_labels` column through unchanged so future filtered-search +//! benchmarks have it without re-ingest. +//! 5. Repackage as `Struct { id: i64, emb: Vector, scalar_labels: ??? }`. + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use anyhow::ensure; +use vortex::array::ArrayRef; +use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; +use vortex::array::arrays::ExtensionArray; +use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::Struct; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::extension::ExtensionArrayExt; +use vortex::array::arrays::fixed_size_list::FixedSizeListArrayExt; +use vortex::array::arrays::struct_::StructArrayExt; +use vortex::array::extension::EmptyMetadata; +use vortex::array::validity::Validity; +use vortex::buffer::Buffer; +use vortex::dtype::DType; +use vortex::dtype::PType; +use vortex::dtype::extension::ExtDType; +use vortex_bench::vector_dataset::list_to_vector_ext; +use vortex_tensor::vector::AnyVector; +use vortex_tensor::vector::Vector; + +/// Configuration passed alongside each chunk so the transform can stay stateless. +#[derive(Debug, Clone, Copy)] +pub struct ChunkTransform { + /// Source element ptype as declared by the dataset catalog. Used purely to decide whether the + /// f64 -> f32 cast is needed. + pub src_ptype: PType, + // /// Whether to project the `scalar_labels` column through the output struct. + // pub include_scalar_labels: bool, +} + +impl ChunkTransform { + /// Apply the transform to a single struct chunk and return the rebuilt chunk. + /// + /// `chunk` must be a non-chunked `Struct { id: i64, emb: List }`, where all of the list + /// elements are + /// + /// The returned array is always a `Struct { id: i64, emb: Vector }`. + pub fn apply(&self, chunk: ArrayRef, ctx: &mut ExecutionCtx) -> Result { + let struct_view = chunk.as_opt::().with_context(|| { + format!("ingest: expected struct chunk, got dtype {}", chunk.dtype()) + })?; + + let id = struct_view + .unmasked_field_by_name("id") + .context("ingest: chunk missing `id` column")? + .clone(); + let emb = struct_view + .unmasked_field_by_name("emb") + .context("ingest: chunk missing `emb` column")? + .clone(); + + let emb_ext: ExtensionArray = list_to_vector_ext(emb)?.execute(ctx)?; + + let f32_vector_array = if self.src_ptype == PType::F64 { + convert_f64_to_f32_vectors(&emb_ext, ctx)? + } else { + emb_ext.into_array() + }; + + let fields = [("id", id), ("emb", f32_vector_array)]; + Ok(StructArray::from_fields(&fields)?.into_array()) + } +} + +/// Convert a `Vector` extension array down to `Vector`. +/// +/// This conversion is lossy, but we are generally ok with this because most vector search +/// operations do not demand a high amount of precision. +fn convert_f64_to_f32_vectors(ext: &ExtensionArray, ctx: &mut ExecutionCtx) -> Result { + ensure!(ext.ext_dtype().is::()); + + let fsl: FixedSizeListArray = ext.storage_array().clone().execute(ctx)?; + let validity = fsl.validity()?; + let elements: PrimitiveArray = fsl.elements().clone().execute(ctx)?; + ensure!(elements.ptype() == PType::F64); + + let dim = match fsl.dtype() { + DType::FixedSizeList(_, dim, _) => *dim, + other => bail!("cast_vector_ext_to_f32: expected FSL dtype, got {other}"), + }; + + let f64_slice = elements.as_slice::(); + + #[expect( + clippy::cast_possible_truncation, + reason = "this is intentionally lossy" + )] + let f32_buf: Buffer = f64_slice + .iter() + .copied() + .map(|double| double as f32) + .collect(); + + let f32_elements = PrimitiveArray::new::(f32_buf, Validity::NonNullable).into_array(); + let new_fsl = FixedSizeListArray::try_new(f32_elements, dim, validity, fsl.len())?; + let ext_dtype = ExtDType::::try_new(EmptyMetadata, new_fsl.dtype().clone())?.erased(); + + Ok(ExtensionArray::new(ext_dtype, new_fsl.into_array()).into_array()) +} + +#[cfg(test)] +mod tests { + use vortex::VortexSessionDefault; + use vortex::array::VortexSessionExecute; + use vortex::array::arrays::List; + use vortex::buffer::BufferMut; + use vortex::dtype::Nullability; + use vortex::session::VortexSession; + + use super::*; + + fn list_chunk_f64(rows: &[&[f64]]) -> ArrayRef { + let mut elements = BufferMut::::with_capacity(rows.iter().map(|r| r.len()).sum()); + let mut offsets = BufferMut::::with_capacity(rows.len() + 1); + offsets.push(0); + for row in rows { + for &v in row.iter() { + elements.push(v); + } + offsets.push(i32::try_from(elements.len()).unwrap()); + } + let elements_array = + PrimitiveArray::new::(elements.freeze(), Validity::NonNullable).into_array(); + let offsets_array = + PrimitiveArray::new::(offsets.freeze(), Validity::NonNullable).into_array(); + vortex::array::Array::::new(elements_array, offsets_array, Validity::NonNullable) + .into_array() + } + + fn id_array(ids: &[i64]) -> ArrayRef { + PrimitiveArray::new::( + BufferMut::from_iter(ids.iter().copied()).freeze(), + Validity::NonNullable, + ) + .into_array() + } + + #[test] + fn f64_chunk_is_cast_to_f32() -> Result<()> { + let session = VortexSession::default(); + let mut ctx = session.create_execution_ctx(); + + let emb = list_chunk_f64(&[&[1.0, 2.0, 3.0], &[4.0, 5.0, 6.0]]); + let chunk = + StructArray::from_fields(&[("id", id_array(&[0, 1])), ("emb", emb)])?.into_array(); + let transform = ChunkTransform { + src_ptype: PType::F64, + }; + let out = transform.apply(chunk, &mut ctx)?; + let out_struct = out.as_opt::().expect("returns Struct"); + let out_emb = out_struct.unmasked_field_by_name("emb").unwrap().clone(); + let DType::Extension(ext) = out_emb.dtype() else { + panic!("expected extension dtype, got {}", out_emb.dtype()); + }; + match ext.storage_dtype() { + DType::FixedSizeList(elem, 3, Nullability::NonNullable) => { + assert_eq!( + **elem, + DType::Primitive(PType::F32, Nullability::NonNullable) + ); + } + other => panic!("unexpected storage dtype {other}"), + } + Ok(()) + } + + #[test] + fn f32_chunk_passes_through() -> Result<()> { + let session = VortexSession::default(); + let mut ctx = session.create_execution_ctx(); + + let mut elements = BufferMut::::with_capacity(6); + for v in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] { + elements.push(v); + } + let mut offsets = BufferMut::::with_capacity(3); + offsets.push(0); + offsets.push(3); + offsets.push(6); + let emb = vortex::array::Array::::new( + PrimitiveArray::new::(elements.freeze(), Validity::NonNullable).into_array(), + PrimitiveArray::new::(offsets.freeze(), Validity::NonNullable).into_array(), + Validity::NonNullable, + ) + .into_array(); + let chunk = + StructArray::from_fields(&[("id", id_array(&[0, 1])), ("emb", emb)])?.into_array(); + + let transform = ChunkTransform { + src_ptype: PType::F32, + }; + let out = transform.apply(chunk, &mut ctx)?; + let out_struct = out.as_opt::().expect("returns Struct"); + assert_eq!(out_struct.len(), 2); + Ok(()) + } +} diff --git a/benchmarks/vector-search-bench/src/lib.rs b/benchmarks/vector-search-bench/src/lib.rs new file mode 100644 index 00000000000..ea41e773f47 --- /dev/null +++ b/benchmarks/vector-search-bench/src/lib.rs @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! `vector-search-bench` vector similarity-search benchmark over several datasets. + +pub mod compression; +pub mod expression; +pub mod ingest; +pub mod prepare; + +use std::sync::LazyLock; + +use vortex::VortexSessionDefault; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; + +pub static SESSION: LazyLock = LazyLock::new(|| { + // SAFETY: called from inside the LazyLock initializer, before any other access to + // `SESSION`. The first thread to dereference SESSION runs this once. + unsafe { std::env::set_var(vortex_tensor::SCALAR_FN_ARRAY_TENSOR_PLUGIN_ENV, "1") }; + + let session = VortexSession::default().with_tokio(); + vortex_tensor::initialize(&session); + session +}); diff --git a/benchmarks/vector-search-bench/src/prepare.rs b/benchmarks/vector-search-bench/src/prepare.rs new file mode 100644 index 00000000000..3674b353eac --- /dev/null +++ b/benchmarks/vector-search-bench/src/prepare.rs @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Per-flavor on-disk ingest. +//! +//! For each `(dataset, layout, flavor)` triple, [`prepare_flavor`] streams every parquet shard +//! through the [`crate::ingest::ChunkTransform`] and writes one `.vortex` file per shard. The +//! pipeline is idempotent (existing `.vortex` files are skipped) and reports end-to-end wall-clock +//! time, summed input parquet bytes, and total output bytes. + +use std::path::Path; +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; +use futures::StreamExt; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tracing::info; +use tracing::warn; +use vortex::array::VortexSessionExecute; +use vortex::array::stream::ArrayStreamAdapter; +use vortex::array::stream::ArrayStreamExt; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex_bench::conversions::parquet_to_vortex_stream; +use vortex_bench::data_dir; +use vortex_bench::utils::file::idempotent_async; +use vortex_bench::vector_dataset::DatasetPaths; +use vortex_bench::vector_dataset::TrainLayout; +use vortex_bench::vector_dataset::VectorDataset; + +use crate::SESSION; +use crate::compression::VectorFlavor; +use crate::ingest::ChunkTransform; + +/// The paths of the vortex files that result from preparing one `(dataset, layout, flavor)` triple. +#[derive(Debug, Clone)] +pub struct CompressedVortexDataSet { + pub dataset: VectorDataset, + pub layout: TrainLayout, + pub flavor: VectorFlavor, + pub vortex_files: Vec, +} + +/// Drive [`prepare_flavor`] across a list of flavors, returning a [`CompressedVortexDataSet`] per flavor +/// in input order. +pub async fn prepare_all( + dataset: VectorDataset, + layout: TrainLayout, + paths_for_dataset: &DatasetPaths, + flavors: &[VectorFlavor], +) -> Result> { + let mut results = Vec::with_capacity(flavors.len()); + + for &flavor in flavors { + let r = prepare_flavor(dataset, layout, paths_for_dataset, flavor).await?; + results.push(r); + } + + Ok(results) +} + +// TODO(connor): This should probably download things in parallel? +/// Prepare one flavor of one dataset by writing one `.vortex` file per train shard. +/// +/// This function is sequential (for now). +pub async fn prepare_flavor( + dataset: VectorDataset, + layout: TrainLayout, + paths_for_dataset: &DatasetPaths, + flavor: VectorFlavor, +) -> Result { + let transform = ChunkTransform { + src_ptype: dataset.element_ptype(), + }; + + let mut vortex_files = Vec::with_capacity(paths_for_dataset.train_files.len()); + + for parquet_path in &paths_for_dataset.train_files { + let parquet_path = parquet_path.clone(); + let vortex_path = parquet_to_vortex_path(&parquet_path, dataset, layout, flavor)?; + + let already_cached = vortex_path.exists(); + if already_cached { + warn!( + "skipping cached vortex shard {} ({} flavor)", + vortex_path.display(), + flavor.label() + ); + } else { + info!( + "ingesting {} -> {} ({} flavor)", + parquet_path.display(), + vortex_path.display(), + flavor.label(), + ); + } + + let written_path = idempotent_async(vortex_path.as_path(), |tmp| async move { + write_shard_streaming(&parquet_path, &tmp, flavor, transform).await + }) + .await?; + + vortex_files.push(written_path); + } + + Ok(CompressedVortexDataSet { + dataset, + layout, + flavor, + vortex_files, + }) +} + +/// Stream one parquet shard through the chunk transform into a Vortex file. +/// +/// The output dtype is derived once from the first transformed chunk so the [`ArrayStreamAdapter`] +/// can declare it ahead of time. +async fn write_shard_streaming( + parquet_path: &Path, + vortex_path: &Path, + flavor: VectorFlavor, + transform: ChunkTransform, +) -> Result<()> { + let file = File::open(parquet_path).await?; + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + let mut array_stream = parquet_to_vortex_stream(builder.build()?); + + let mut ctx = SESSION.create_execution_ctx(); + + // We need to get the first chunk so that we know what the dtype of the file is. + let first = match array_stream.next().await { + Some(chunk) => transform_chunk(transform, chunk, &mut ctx, parquet_path, 1)?, + None => { + return Err(vortex_err!( + "ingest: parquet shard {} produced no chunks", + parquet_path.display(), + ) + .into()); + } + }; + let dtype = first.dtype().clone(); + let shard_path = parquet_path.to_path_buf(); + + let transformed = + futures::stream::iter(std::iter::once(Ok(first))).chain(array_stream.enumerate().map( + move |(chunk_offset, chunk_or_err)| { + let mut local_ctx = SESSION.create_execution_ctx(); + transform_chunk( + transform, + chunk_or_err, + &mut local_ctx, + &shard_path, + chunk_offset + 2, + ) + }, + )); + + let stream = ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, transformed)); + + let mut output = tokio::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(vortex_path) + .await?; + + flavor + .create_write_options(&SESSION) + .write(&mut output, stream) + .await?; + output.flush().await?; + + Ok(()) +} + +fn transform_chunk( + transform: ChunkTransform, + chunk_or_err: VortexResult, + ctx: &mut vortex::array::ExecutionCtx, + parquet_path: &Path, + chunk_idx: usize, +) -> VortexResult { + let chunk = chunk_or_err.map_err(|err| { + vortex_err!( + "ingest: failed to read chunk {} from {}: {err:#}", + chunk_idx, + parquet_path.display(), + ) + })?; + + transform.apply(chunk, ctx).map_err(|err| { + vortex_err!( + "ingest: failed to transform chunk {} from {}: {err:#}", + chunk_idx, + parquet_path.display(), + ) + }) +} + +/// Translate a parquet shard path to its `.vortex` companion under the flavor directory. +/// +/// Just swaps the file extension and rebases the file name into the per-[`VectorFlavor`] flavor +/// directory. The shard stem is preserved so a directory listing pairs `00-of-10.parquet` with +/// `00-of-10.vortex`. +pub fn parquet_to_vortex_path( + parquet: &Path, + dataset: VectorDataset, + layout: TrainLayout, + flavor: VectorFlavor, +) -> Result { + let stem = parquet + .file_stem() + .with_context(|| format!("parquet path {} has no file stem", parquet.display()))? + .to_owned(); + + // TODO(connor): Is there a better way to do this? + let mut name = stem; + name.push(".vortex"); + + Ok(flavor_dir(dataset, layout, flavor).join(name)) +} + +/// `vortex-bench/data/vector-search////`. +fn flavor_dir(ds: VectorDataset, layout: TrainLayout, flavor: VectorFlavor) -> PathBuf { + data_dir() + .join("vector-search") + .join(ds.name()) + .join(layout.label()) + .join(flavor.dir_name()) +}