diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 7be829d80bd..f900022e2f6 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -48,6 +48,10 @@ jobs: name: Compression build_args: "--features lance" formats: "parquet,lance,vortex" + - id: vector-search-bench + name: Vector Similarity Search + build_args: "" + formats: "handrolled,vortex-uncompressed,vortex-default,vortex-turboquant" steps: - uses: runs-on/action@v2 if: github.repository == 'vortex-data/vortex' diff --git a/Cargo.lock b/Cargo.lock index 5ce1bc853cc..aa4519e13ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10052,6 +10052,26 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vector-search-bench" +version = "0.1.0" +dependencies = [ + "anyhow", + "arrow-array 58.0.0", + "arrow-buffer 58.0.0", + "arrow-schema 58.0.0", + "clap", + "indicatif", + "parquet 58.0.0", + "tempfile", + "tokio", + "tracing", + "vortex", + "vortex-bench", + "vortex-btrblocks", + "vortex-tensor", +] + [[package]] name = "version_check" version = "0.9.5" @@ -10241,6 +10261,7 @@ dependencies = [ "url", "uuid", "vortex", + "vortex-tensor", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9853cf94ed9..0c904489f58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ members = [ "benchmarks/datafusion-bench", "benchmarks/duckdb-bench", "benchmarks/random-access-bench", + "benchmarks/vector-search-bench", ] exclude = ["java/testfiles", "wasm-test"] resolver = "2" diff --git a/benchmarks/vector-search-bench/Cargo.toml b/benchmarks/vector-search-bench/Cargo.toml new file mode 100644 index 00000000000..bbc620572b5 --- /dev/null +++ b/benchmarks/vector-search-bench/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "vector-search-bench" +description = "Vector similarity search benchmarks for Vortex on public embedding datasets" +authors.workspace = true +categories.workspace = true +edition.workspace = true +homepage.workspace = true +include.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +publish = false + +[dependencies] +anyhow = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } +clap = { workspace = true, features = ["derive"] } +indicatif = { workspace = true } +parquet = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +vortex = { workspace = true } +vortex-bench = { workspace = true } +vortex-btrblocks = { workspace = true } +vortex-tensor = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } + +[lints] +workspace = true diff --git a/benchmarks/vector-search-bench/README.md b/benchmarks/vector-search-bench/README.md new file mode 100644 index 00000000000..50af601ec2a --- /dev/null +++ b/benchmarks/vector-search-bench/README.md @@ -0,0 +1,131 @@ +# vector-search-bench + +Brute-force cosine-similarity benchmark for Vortex on public VectorDBBench +embedding corpora. + +## What it measures + +For each `(dataset, format)` pair, the benchmark records: + +1. **`nbytes`** — in-memory footprint of the variant's array tree, in bytes. + Reporting the in-memory `.nbytes()` instead of an on-disk file size is + deliberate: the Vortex default write path runs BtrBlocks on every tree + regardless of whether it's already compressed, so "on-disk size" would + collapse `vortex-uncompressed` and `vortex-default` to the same bytes + even though their in-memory trees are different. The `nbytes()` + number is consistent with what the *compute* measurements actually + operate on. + - The `handrolled` baseline reports the canonical parquet file size + on disk — that's the only encoded representation it has. +2. **Compress time** — wall time to build the variant tree from the + materialized uncompressed source. ~0 for `vortex-uncompressed` (identity), + meaningful for the two compressed variants. +3. **Decompress time** — wall time to execute the variant tree all the way + back into a canonical `FixedSizeListArray` with a materialized f32 + element buffer. For `vortex-uncompressed` this is a no-op; for + `vortex-default` it includes ALP-RD bit-unpacking; for + `vortex-turboquant` it includes the inverse SORF rotation and + dictionary lookup. +4. **Cosine-similarity time** — `CosineSimilarity(data, const_query)` + executed to a materialized f32 array. +5. **Cosine-filter time** — `Binary(Gt, [CosineSimilarity, threshold])` + executed to a `BoolArray`. +6. **Recall@10** (TurboQuant only) — the fraction of the exact top-10 + nearest neighbours that TurboQuant recovers, using the uncompressed + Vortex scan as local ground truth. + +Before any timing starts, the benchmark runs a **correctness verification +pass**: cosine scores for a single query are computed against every +variant and compared to the uncompressed baseline. Lossless variants must +match within `1e-4` max-abs-diff; TurboQuant must stay within `0.2`. A +mismatch bails the run — you cannot publish throughput numbers for a +variant that returns wrong answers. + +## Formats + +- `handrolled` — Hand-rolled Rust scalar cosine loop over a flat + `Vec` that was decoded from the canonical parquet file via + `parquet-rs` / `arrow-rs`. The **decompress** phase does the parquet + read, downcasts to `Float32Array`, and memcpies into a plain `Vec`. + The **compute** phase is a plain scalar loop over `&[f32]` — no Arrow + compute kernels, no scalar-function dispatch, no SIMD annotations. + + This is a **compute-cost floor**, not a realistic parquet-on-DBMS + baseline. It answers the question "what's the minimum cost you could + get away with if you wrote a vector-search scan by hand with no query + engine?" Real parquet users would pay substantially more (DuckDB + `list_cosine_similarity`, DataFusion with a vector UDF, etc.) — + adding those as additional baselines is a natural v2 direction. +- `vortex-uncompressed` — Raw `Vector` extension array, no + encoding-level compression applied. +- `vortex-default` — `BtrBlocksCompressor::default()` applied to the FSL + storage child. On float vectors this typically finds ~15% lossless + savings via ALP-RD (mantissa/exponent split + bitpacking). +- `vortex-turboquant` — The full + `L2Denorm(SorfTransform(FSL(Dict(codes, centroids))), norms)` pipeline. + Lossy; recall@10 is reported alongside throughput. At the default 8-bit + config this typically gives ~3× storage reduction at >90% top-10 + recall. + +## Datasets + +The smallest built-in dataset is **Cohere-100K** (`cohere-small`): 100K +rows × 768 dims, cosine metric, ~150 MB zstd-parquet. It's the smallest +VectorDBBench-supplied corpus that still exercises every encoding path. +Larger variants (`cohere-medium`, `openai-small`, `openai-medium`, +`bioasq-medium`, `glove-medium`) are wired up for local / on-demand +experiments; see `vortex-bench/src/vector_dataset.rs` for the full list. + +The upstream URL for Cohere-100K is +`https://assets.zilliz.com/benchmark/cohere_small_100k/train.parquet`. +The public Zilliz bucket is anonymous-readable so the code can hit it +directly. + +## Running locally + +```bash +cargo run -p vector-search-bench --release -- \ + --datasets cohere-small \ + --formats handrolled,vortex-uncompressed,vortex-default,vortex-turboquant \ + --iterations 5 \ + -d table +``` + +The first run downloads the parquet file into +`vortex-bench/data/cohere-small/cohere-small.parquet` and caches it +idempotently for subsequent runs. + +### Running without network access + +The `gen_synthetic_dataset` helper writes a VectorDBBench-shape parquet +file (`id: int64` + `emb: list`, zstd-compressed) at any path. +Use it to populate the dataset cache so the benchmark's idempotent +download step skips the HTTP fetch: + +```bash +cargo run -p vector-search-bench --release --bin gen_synthetic_dataset -- \ + --num-rows 5000 \ + --dim 768 \ + --out vortex-bench/data/cohere-small/cohere-small.parquet +``` + +(Cargo's default bin name is the filename minus extension, so underscores, +not hyphens.) + +## CI note: dataset mirror + +CI runs after every develop-branch merge. Hitting `assets.zilliz.com` +from every merge would create recurring egress traffic on a third-party +bucket — the same courtesy reason `RPlace` / `AirQuality` are excluded +from CI in `compress-bench`. + +Before enabling the `vector-search-bench` entry in `.github/workflows/bench.yml` +on a fork, either: + +1. **Mirror the file into an internal bucket** and swap the URL in + `vortex-bench/src/vector_dataset.rs::VectorDataset::parquet_url`, or +2. **Accept the upstream egress cost** and leave the URL as-is. + +The mirror step is a one-off `aws s3 cp` and is documented here rather +than automated in the build because the destination bucket is +organization-specific. diff --git a/benchmarks/vector-search-bench/src/bin/gen_synthetic_dataset.rs b/benchmarks/vector-search-bench/src/bin/gen_synthetic_dataset.rs new file mode 100644 index 00000000000..d8a26578bc9 --- /dev/null +++ b/benchmarks/vector-search-bench/src/bin/gen_synthetic_dataset.rs @@ -0,0 +1,156 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Write a synthetic parquet file matching the VectorDBBench `emb: list` + `id: int64` +//! schema. Useful for local dev runs of `vector-search-bench` without needing network +//! access to `assets.zilliz.com`, and for sandbox / CI environments that block outbound +//! HTTPS. +//! +//! The generated file is deterministic for a given `(num_rows, dim, seed)` triple, so +//! downstream benchmark output is reproducible across runs on the same machine. Exact +//! bit-for-bit equality across different machines is not guaranteed because the PRNG +//! mixes in `f32::sin()`, whose last few ULPs are libm/CPU-dependent. +//! +//! Example: +//! +//! ```bash +//! cargo run -p vector-search-bench --bin gen_synthetic_dataset --release -- \ +//! --num-rows 5000 \ +//! --dim 768 \ +//! --out vortex-bench/data/cohere-small/cohere-small.parquet +//! ``` +//! +//! After running this, `vector-search-bench --datasets cohere-small` will find the +//! cached parquet file and skip the HTTP download via `idempotent_async`. (Cargo's +//! default bin name is the filename minus extension — underscores, not hyphens.) + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Result; +use arrow_array::Int64Array; +use arrow_array::ListArray; +use arrow_array::RecordBatch; +use arrow_array::builder::Float32Builder; +use arrow_array::builder::Int32BufferBuilder; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use clap::Parser; +use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; + +#[derive(Parser, Debug)] +#[command( + version, + about = "Generate a synthetic VectorDBBench-style parquet file" +)] +struct Args { + /// Number of rows to generate. + #[arg(long, default_value_t = 5000)] + num_rows: usize, + + /// Vector dimensionality. Must be ≥ 128 to exercise TurboQuant. + #[arg(long, default_value_t = 768)] + dim: u32, + + /// Deterministic PRNG seed — changing this changes the generated vectors. + #[arg(long, default_value_t = 0xC0FFEE)] + seed: u64, + + /// Output parquet file path. Parent directory is created if missing. + #[arg(long)] + out: PathBuf, +} + +fn main() -> Result<()> { + let args = Args::parse(); + + if let Some(parent) = args.out.parent() { + std::fs::create_dir_all(parent)?; + } + + // Build an Arrow `ListArray` so the schema matches VectorDBBench's `emb: + // list` (note: NOT fixed_size_list — parquet has no FSL logical type so + // arrow-rs writes lists). Every list has exactly `dim` elements. + let dim_usize = args.dim as usize; + let total_elements = args.num_rows * dim_usize; + + let mut float_values = Float32Builder::with_capacity(total_elements); + let mut offsets = Int32BufferBuilder::new(args.num_rows + 1); + offsets.append(0i32); + + // Generate per-element values as (random noise in `[-0.5, 0.5)`) + (position-based + // sinusoid of amplitude 0.25). The xorshift gives the random component; the sine + // mixes the row and column index so that vectors at different positions are distinct + // even at low bit widths after quantization. + // + // The sinusoid frequency constants below are deliberately small and coprime so that + // (row, col) → sine values don't repeat across the 100K-row × 1536-col domain any + // faster than ~100K rows. They don't have any particular mathematical meaning — they + // just need to be "slow enough to avoid short-period aliasing, fast enough that + // different rows look different". + const POS_FREQ_ROW: f32 = 0.00013; + const POS_FREQ_COL: f32 = 0.00007; + const POS_AMPLITUDE: f32 = 0.25; + const RAND_SCALE: f32 = 1.0 / 32768.0; + + let mut state = args.seed.wrapping_add(1); + for row in 0..args.num_rows { + for i in 0..dim_usize { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + let rand_component = (state & 0xFFFF) as f32 * RAND_SCALE - 0.5; + let pos_component = + ((row as f32 * POS_FREQ_ROW) + (i as f32 * POS_FREQ_COL)).sin() * POS_AMPLITUDE; + float_values.append_value(rand_component + pos_component); + } + let written = i32::try_from((row + 1) * dim_usize) + .context("offset overflows i32 — reduce num_rows or dim")?; + offsets.append(written); + } + + let values_array = float_values.finish(); + let offsets_buffer = offsets.finish(); + + let field = Arc::new(Field::new("item", DataType::Float32, false)); + let list_dtype = DataType::List(Arc::clone(&field)); + let list_array = ListArray::try_new( + Arc::clone(&field), + arrow_buffer::OffsetBuffer::new(offsets_buffer.into()), + Arc::new(values_array), + None, + )?; + + let ids: Int64Array = (0..args.num_rows as i64).collect(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("emb", list_dtype, false), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(ids), Arc::new(list_array)], + )?; + + let writer_props = WriterProperties::builder() + .set_compression(Compression::ZSTD(Default::default())) + .build(); + let file = File::create(&args.out)?; + let mut writer = ArrowWriter::try_new(file, schema, Some(writer_props))?; + writer.write(&batch)?; + writer.close()?; + + println!( + "wrote {} rows × {} dims to {}", + args.num_rows, + args.dim, + args.out.display() + ); + Ok(()) +} diff --git a/benchmarks/vector-search-bench/src/handrolled_baseline.rs b/benchmarks/vector-search-bench/src/handrolled_baseline.rs new file mode 100644 index 00000000000..f62c5198fb6 --- /dev/null +++ b/benchmarks/vector-search-bench/src/handrolled_baseline.rs @@ -0,0 +1,401 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Hand-rolled Rust cosine similarity baseline. +//! +//! This module provides the *compute-cost floor* the other Vortex variants are measured +//! against. It is **not** a realistic "parquet in a DBMS" baseline — it's the minimum +//! amount of work a Rust programmer could get away with if they wrote a vector-search +//! scan by hand with no query engine, no scalar-function dispatch, and no Arrow compute +//! kernels. +//! +//! Two distinct phases run per iteration, and the benchmark times them separately so the +//! dashboard can separate storage-read cost from compute cost: +//! +//! 1. **Decompress** ([`read_parquet_embedding_column`]) — reads the canonical parquet +//! file via `parquet-rs`, downcasts the `emb` column to an Arrow `Float32Array`, and +//! copies every value into a flat `Vec`. This phase is the only place Arrow is +//! actually used — only for the decode. The `memcpy` at the end is incidental: we +//! could operate directly on `Float32Array::values()` with identical performance, +//! but taking ownership of a `Vec` frees the Arrow `RecordBatch` lifetimes. +//! 2. **Compute** ([`cosine_loop`] and [`filter_loop`]) — runs a plain scalar Rust loop +//! over `&[f32]`. Arrow is no longer involved. There's no SIMD, no unrolling +//! annotations, no dispatch overhead, no output-array allocation beyond a single +//! `Vec`. This is deliberately "the fastest you could possibly make it go +//! without writing SIMD intrinsics". +//! +//! Calling this "the parquet baseline" would be misleading, because: +//! +//! - The compute layer has nothing to do with parquet — parquet is only the input +//! encoding, not the execution substrate. +//! - Real parquet-on-DBMS engines (DuckDB's `list_cosine_similarity`, DataFusion with a +//! vector UDF, etc.) would pay substantial dispatch / planner / row-iterator cost +//! that this loop skips entirely. +//! +//! Think of it as: "If you didn't have Vortex and didn't feel like reaching for a query +//! engine, what's the minimum scan cost you could get away with on this data?" That's +//! the question this module answers, and it's intentionally a lower bound rather than a +//! fair DBMS comparison. Future work could add DuckDB / DataFusion baselines alongside +//! this one for the DBMS-level comparison. + +use std::fs::File; +use std::path::Path; +use std::time::Duration; +use std::time::Instant; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use arrow_array::Array; +use arrow_array::FixedSizeListArray; +use arrow_array::Float32Array; +use arrow_array::ListArray; +use arrow_array::RecordBatch; +use arrow_array::cast::AsArray; +use arrow_schema::DataType; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + +use crate::VariantTimings; + +/// Read the entire `emb` column of a parquet file into a single flat `Vec`, along +/// with the dimension and row count. This is the *decompress* phase of the hand-rolled +/// baseline — it's the only place Arrow is actually used. `parquet-rs` does the file +/// decode, we downcast to `Float32Array`, and then memcpy into a plain `Vec` so +/// the compute loop can operate over a raw slice without holding any Arrow +/// `RecordBatch` references. +/// +/// Kept under its `parquet` name because this function *actually reads parquet*; only +/// the compute-side wrappers take the `handrolled` label. +pub fn read_parquet_embedding_column(parquet_path: &Path) -> Result { + let file = File::open(parquet_path) + .with_context(|| format!("open parquet file {}", parquet_path.display()))?; + let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; + + // Locate the `emb` column and sanity-check its type. + let (_, emb_field) = builder + .schema() + .column_with_name("emb") + .context("parquet schema missing `emb` column")?; + + // VectorDBBench parquet files use `list`; some others use `fixed_size_list`. + // Both need to be supported — the canonical parquet emit from arrow-rs is `list` + // since parquet has no fixed-size-list logical type. + let element_dtype = match emb_field.data_type() { + DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) => { + field.data_type().clone() + } + other => bail!("emb column must be a list of float, got {other:?}"), + }; + if !matches!(element_dtype, DataType::Float32) { + bail!( + "emb column element type must be Float32, got {:?}", + element_dtype + ); + } + + let reader = builder.build()?; + let batches: Vec = reader.collect::, _>>()?; + + let mut data = Vec::::new(); + let mut num_rows = 0usize; + let mut inferred_dim: Option = None; + + for batch in batches.iter() { + let column = batch + .column_by_name("emb") + .context("emb column missing from record batch")?; + append_batch(column, &mut data, &mut inferred_dim, &mut num_rows)?; + } + + let dim = inferred_dim.context("parquet file has zero rows — cannot infer dimension")?; + Ok(HandrolledBaselineData { + elements: data, + dim, + num_rows, + }) +} + +fn append_batch( + column: &dyn Array, + data: &mut Vec, + inferred_dim: &mut Option, + num_rows: &mut usize, +) -> Result<()> { + if let Some(fsl) = column.as_any().downcast_ref::() { + let dim = fsl.value_length() as usize; + maybe_set_dim(inferred_dim, dim)?; + let values = fsl + .values() + .as_any() + .downcast_ref::() + .context("FSL emb column must have Float32 values")?; + data.extend_from_slice(values.values()); + *num_rows += fsl.len(); + return Ok(()); + } + + if let Some(list) = column.as_any().downcast_ref::() { + let values: &Float32Array = list + .values() + .as_primitive_opt::() + .context("List emb column must have Float32 values")?; + let offsets = list.value_offsets(); + for i in 0..list.len() { + let start = offsets[i] as usize; + let end = offsets[i + 1] as usize; + let row_len = end - start; + maybe_set_dim(inferred_dim, row_len)?; + data.extend_from_slice(&values.values()[start..end]); + *num_rows += 1; + } + return Ok(()); + } + + bail!( + "emb column has unsupported arrow type {:?}", + column.data_type() + ); +} + +fn maybe_set_dim(inferred_dim: &mut Option, new_dim: usize) -> Result<()> { + match inferred_dim { + Some(d) if *d == new_dim => Ok(()), + Some(d) => bail!("inconsistent emb dimensions: saw {d} then {new_dim}"), + None if new_dim == 0 => bail!("emb row has zero elements"), + None => { + *inferred_dim = Some(new_dim); + Ok(()) + } + } +} + +/// The flattened representation of an embedding column, suitable for a hand-rolled +/// distance loop. Intentionally decoupled from any format — the compute side doesn't +/// care how the data got into this `Vec`. +/// +/// The benchmark's "size" measurement for the handrolled baseline comes from +/// [`crate::PreparedDataset::parquet_bytes`] (which is populated once in +/// [`crate::prepare_dataset`]), not from this struct. We deliberately don't carry +/// the file size in here — doing so would duplicate state between two places that +/// can go out of sync. +pub struct HandrolledBaselineData { + /// All rows concatenated: `elements.len() == num_rows * dim`. + pub elements: Vec, + /// Vector dimensionality. + pub dim: usize, + /// Number of rows. + pub num_rows: usize, +} + +/// Result of running the hand-rolled baseline timing loop. +/// +/// Carries both the best-of-N timing numbers **and** the cosine scores from the final +/// iteration. The scores are exposed so the caller can feed them into +/// [`crate::verify::verify_and_report_scores`] for the correctness check without +/// re-reading the parquet file. Because `cosine_loop` is deterministic, the scores +/// from any iteration equal the scores from every other iteration; using the last +/// one is simply the most convenient snapshot. +pub struct HandrolledBaselineResult { + /// Best-of-N wall times for decompress / cosine / filter. + pub timings: VariantTimings, + /// Cosine-similarity scores from the final iteration. Length equals the dataset + /// row count. + pub last_scores: Vec, +} + +/// Run the decompress / cosine / filter microbenchmarks for the hand-rolled baseline +/// and return the best-of-N wall times along with the last iteration's cosine scores. +/// +/// The decompress phase re-reads the parquet file from disk on each iteration (matches +/// how the Vortex variants re-execute their tree from scratch each iteration), and the +/// compute phase runs [`cosine_loop`] and [`filter_loop`] over the flat `Vec` the +/// decompress phase produced. Returning the last iteration's scores lets the caller +/// perform correctness verification against the Vortex baseline without a redundant +/// parquet read. +/// +/// # Panics +/// +/// Panics if `iterations == 0`. The benchmark CLI defaults to 5 and the lowest +/// meaningful value is 1 (single-shot best-of-1). +pub fn run_handrolled_baseline_timings( + parquet_path: &Path, + query: &[f32], + threshold: f32, + iterations: usize, +) -> Result { + assert!( + iterations > 0, + "run_handrolled_baseline_timings requires iterations >= 1" + ); + + let mut decompress = Duration::MAX; + let mut cosine = Duration::MAX; + let mut filter = Duration::MAX; + let mut last_scores: Vec = Vec::new(); + + for _ in 0..iterations { + let start = Instant::now(); + let data = read_parquet_embedding_column(parquet_path)?; + decompress = decompress.min(start.elapsed()); + + let start = Instant::now(); + let scores = cosine_loop(&data.elements, data.num_rows, data.dim, query); + cosine = cosine.min(start.elapsed()); + debug_assert_eq!(scores.len(), data.num_rows); + + let start = Instant::now(); + let matches = filter_loop(&scores, threshold); + filter = filter.min(start.elapsed()); + debug_assert_eq!(matches.len(), data.num_rows); + + last_scores = scores; + } + + Ok(HandrolledBaselineResult { + timings: VariantTimings { + decompress, + cosine, + filter, + }, + last_scores, + }) +} + +/// Compute cosine similarity for every row against `query`. The query is assumed to match +/// the database vectors' dimension. Returns one f32 score per row; scores for zero-norm +/// rows or a zero-norm query are 0.0 by convention. +pub fn cosine_loop(elements: &[f32], num_rows: usize, dim: usize, query: &[f32]) -> Vec { + assert_eq!(query.len(), dim); + assert_eq!(elements.len(), num_rows * dim); + + let query_norm = query.iter().map(|&q| q * q).sum::().sqrt(); + let mut out = Vec::with_capacity(num_rows); + if query_norm == 0.0 { + out.resize(num_rows, 0.0); + return out; + } + + for row in 0..num_rows { + let base = row * dim; + let slice = &elements[base..base + dim]; + let mut dot = 0.0f32; + let mut sq = 0.0f32; + for i in 0..dim { + dot += slice[i] * query[i]; + sq += slice[i] * slice[i]; + } + let norm = sq.sqrt(); + if norm == 0.0 { + out.push(0.0); + } else { + out.push(dot / (norm * query_norm)); + } + } + out +} + +/// Build the `cosine > threshold` boolean mask — **strict greater-than**, matching the +/// Vortex-side path which uses `Operator::Gt` in +/// [`vortex_tensor::vector_search::build_similarity_search_tree`]. Keep these two in +/// sync: if one changes the comparison semantics, the correctness-verification pass will +/// start reporting a mismatch for the lossless variants. +pub fn filter_loop(scores: &[f32], threshold: f32) -> Vec { + scores.iter().map(|&s| s > threshold).collect() +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::sync::Arc; + + use arrow_array::RecordBatch; + use arrow_array::builder::FixedSizeListBuilder; + use arrow_array::builder::Float32Builder; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema; + use parquet::arrow::ArrowWriter; + use tempfile::NamedTempFile; + + use super::*; + + /// Build a minimal parquet file with an `emb: FixedSizeList` column and + /// verify the baseline pipeline produces the expected scores. + fn write_tiny_fsl_parquet(dim: i32, rows: &[&[f32]]) -> Result { + let schema = Arc::new(Schema::new(vec![Field::new( + "emb", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), dim), + false, + )])); + + let file = NamedTempFile::new()?; + let mut writer = + ArrowWriter::try_new(File::create(file.path())?, Arc::clone(&schema), None)?; + + let dim_usize = usize::try_from(dim).unwrap(); + let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), dim); + for row in rows { + assert_eq!(row.len(), dim_usize); + for &v in row.iter() { + builder.values().append_value(v); + } + builder.append(true); + } + let array = builder.finish(); + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)])?; + writer.write(&batch)?; + writer.close()?; + Ok(file) + } + + #[test] + fn handrolled_baseline_reads_fsl_column() { + let file = + write_tiny_fsl_parquet(3, &[&[1.0, 0.0, 0.0], &[0.0, 1.0, 0.0], &[1.0, 0.0, 0.0]]) + .unwrap(); + + let data = read_parquet_embedding_column(file.path()).unwrap(); + assert_eq!(data.dim, 3); + assert_eq!(data.num_rows, 3); + assert_eq!(data.elements.len(), 9); + + let query = [1.0f32, 0.0, 0.0]; + let scores = cosine_loop(&data.elements, data.num_rows, data.dim, &query); + assert_eq!(scores, vec![1.0, 0.0, 1.0]); + + let mask = filter_loop(&scores, 0.5); + assert_eq!(mask, vec![true, false, true]); + } + + #[test] + fn run_handrolled_baseline_timings_returns_last_iteration_scores() { + // Verifies the new `last_scores` contract: the timing loop returns the + // cosine scores from the final iteration, and those scores match what we'd + // get from a one-shot `cosine_loop` on the same data. Callers of + // `run_handrolled_baseline_timings` rely on this for verification (so they + // don't need a second parquet read to compute ground-truth scores). + let file = + write_tiny_fsl_parquet(3, &[&[1.0, 0.0, 0.0], &[0.0, 1.0, 0.0], &[1.0, 0.0, 0.0]]) + .unwrap(); + let query = [1.0f32, 0.0, 0.0]; + + let result = run_handrolled_baseline_timings(file.path(), &query, 0.5, 3).unwrap(); + + // Deterministic expected scores: rows 0 and 2 match the query exactly, + // row 1 is orthogonal. + assert_eq!(result.last_scores, vec![1.0, 0.0, 1.0]); + assert!(result.timings.decompress > Duration::ZERO); + assert!(result.timings.cosine > Duration::ZERO); + assert!(result.timings.filter > Duration::ZERO); + } + + #[test] + #[should_panic(expected = "iterations >= 1")] + fn run_handrolled_baseline_timings_panics_on_zero_iterations() { + let file = + write_tiny_fsl_parquet(3, &[&[1.0, 0.0, 0.0], &[0.0, 1.0, 0.0], &[1.0, 0.0, 0.0]]) + .unwrap(); + let query = [1.0f32, 0.0, 0.0]; + let _result = run_handrolled_baseline_timings(file.path(), &query, 0.5, 0); + } +} diff --git a/benchmarks/vector-search-bench/src/lib.rs b/benchmarks/vector-search-bench/src/lib.rs new file mode 100644 index 00000000000..0a70fdfed1e --- /dev/null +++ b/benchmarks/vector-search-bench/src/lib.rs @@ -0,0 +1,651 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Vector similarity-search benchmark core. +//! +//! For each `(dataset, variant)` pair we report: +//! +//! - **In-memory size** — `ArrayRef::nbytes()` of the prepared variant tree. This is the +//! memory footprint you'd pay to keep that encoding resident. +//! - **Compress time** — the wall time to build the variant tree from the materialized +//! uncompressed source (0 for the uncompressed variant itself, the BtrBlocks pass for +//! `vortex-default`, the full L2Denorm+SORF+quantize pipeline for `vortex-turboquant`). +//! - **Decompress time** — the wall time to execute the variant tree back into a +//! canonical `FixedSizeListArray` (≈0 for the already-canonical uncompressed variant, +//! meaningful for the compressed variants). +//! - **Cosine time** — executing `CosineSimilarity(data, const_query)` to a materialized +//! f32 primitive array. +//! - **Filter time** — executing `Binary(Gt, [cosine, threshold])` to a `BoolArray`. +//! - **Recall@10** (for the lossy TurboQuant variant only) against exact top-10 from the +//! uncompressed variant. +//! +//! Before any timing begins, the benchmark also runs a **correctness verification** pass +//! via [`verify`]: for every variant it computes cosine scores for a single query and +//! compares them to the ground-truth scores from the uncompressed variant. Lossless +//! variants must match within [`verify::LOSSLESS_TOLERANCE`]; lossy variants must match +//! within [`verify::LOSSY_TOLERANCE`]. A correctness failure bails the run. +//! +//! Measurements are emitted via the existing `vortex_bench::measurements` types so +//! results flow through the standard `gh-json` pipeline and show up on the CI dashboard +//! alongside compress-bench / random-access-bench. + +use std::time::Duration; +use std::time::Instant; + +pub mod handrolled_baseline; +pub mod recall; +pub mod verify; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use clap::ValueEnum; +use vortex::array::ArrayRef; +use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::BoolArray; +use vortex::array::arrays::Chunked; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::Extension; +use vortex::array::arrays::ExtensionArray; +use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::Struct; +use vortex::array::arrays::chunked::ChunkedArrayExt; +use vortex::array::arrays::extension::ExtensionArrayExt; +use vortex::array::arrays::fixed_size_list::FixedSizeListArrayExt; +use vortex::array::arrays::struct_::StructArrayExt as _; +use vortex::dtype::DType; +use vortex::dtype::PType; +use vortex::error::vortex_panic; +use vortex_bench::Format; +use vortex_bench::SESSION; +use vortex_bench::conversions::list_to_vector_ext; +use vortex_bench::conversions::parquet_to_vortex_chunks; +use vortex_bench::datasets::Dataset; +use vortex_bench::vector_dataset::VectorDataset; +use vortex_btrblocks::BtrBlocksCompressor; +use vortex_tensor::scalar_fns::cosine_similarity::CosineSimilarity; +use vortex_tensor::vector_search::build_constant_query_vector; +use vortex_tensor::vector_search::build_similarity_search_tree; +use vortex_tensor::vector_search::compress_turboquant; + +/// The threshold used when wrapping the similarity expression in a +/// `Binary(Gt, [cosine, threshold])` filter. Set to a value high enough that random pairs +/// from a ~1.0-norm distribution reject but self-query pairs match. +pub const DEFAULT_THRESHOLD: f32 = 0.8; + +/// Row index used to pick a query vector from the dataset. Using a fixed row keeps queries +/// reproducible across runs and guarantees at least one match (since `cosine(x, x) == 1.0`). +pub const DEFAULT_QUERY_ROW: usize = 0; + +/// A single data-preparation strategy that the benchmark exercises. +/// +/// Each variant corresponds to one column on the "format" axis in downstream dashboards. The +/// `Format` mapping is what gets serialized into the `target.format` field of gh-json +/// output. +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum Variant { + /// Raw `Vector` with no encoding-level compression applied. + #[clap(name = "vortex-uncompressed")] + VortexUncompressed, + /// `BtrBlocksCompressor::default()` walks into the `Vector` extension and recursively + /// compresses the FSL storage child. This is the "generic lossless" Vortex story for + /// float vectors. + #[clap(name = "vortex-default")] + VortexDefault, + /// The full TurboQuant pipeline: `L2Denorm(SorfTransform(FSL(Dict)))`. Lossy; dramatic + /// size win; requires reporting recall alongside throughput for the comparison to be + /// honest. See [`vortex_tensor::vector_search::compress_turboquant`]. + #[clap(name = "vortex-turboquant")] + VortexTurboQuant, +} + +impl Variant { + /// The Format enum value this variant reports itself as in emitted measurements. + /// Uncompressed and BtrBlocks-default both surface as [`Format::OnDiskVortex`]; the + /// TurboQuant variant gets its own [`Format::VortexTurboQuant`] so dashboards can + /// distinguish them. + pub fn as_format(&self) -> Format { + match self { + Variant::VortexUncompressed => Format::OnDiskVortex, + Variant::VortexDefault => Format::OnDiskVortex, + Variant::VortexTurboQuant => Format::VortexTurboQuant, + } + } + + /// A stable, kebab-cased label used in metric names so dashboards can split apart + /// variants that map to the same Format. + pub fn label(&self) -> &'static str { + match self { + Variant::VortexUncompressed => "vortex-uncompressed", + Variant::VortexDefault => "vortex-default", + Variant::VortexTurboQuant => "vortex-turboquant", + } + } +} + +/// The ingested form of a dataset, ready to be fed to [`prepare_variant`] and the +/// timing/verification pipeline. +pub struct PreparedDataset { + /// Name used in metric strings — usually the dataset's `Dataset::name()`. + pub name: String, + /// Uncompressed `Vector` array (canonical form). Doubles as the + /// ground-truth basis for the correctness-verification pass and for TurboQuant's + /// Recall@K quality measurement. + pub uncompressed: ArrayRef, + /// The query vector to use (a single row pulled from the dataset). + pub query: Vec, + /// Parquet file size on disk in bytes — produced by the dataset download step + /// and reused as the "handrolled size" measurement in main.rs. + pub parquet_bytes: u64, +} + +impl PreparedDataset { + /// Dimension of the underlying vector column. + /// + /// # Panics + /// + /// Panics if `self.uncompressed` is not an `Extension>` — + /// which should be impossible because [`prepare_dataset`] is the only constructor + /// and it guarantees this shape. + pub fn dim(&self) -> u32 { + let fsl_dtype = match self.uncompressed.dtype() { + DType::Extension(ext) => ext.storage_dtype(), + other => vortex_panic!("expected Extension, got {other}"), + }; + match fsl_dtype { + DType::FixedSizeList(_, dim, _) => *dim, + other => vortex_panic!("expected FixedSizeList storage, got {other}"), + } + } + + /// Number of rows in the uncompressed dataset. + pub fn num_rows(&self) -> usize { + self.uncompressed.len() + } +} + +/// Prepare a dataset by downloading its parquet file, converting the `emb` column to a +/// `Vector` extension array, and extracting a single-row query vector. +pub async fn prepare_dataset(dataset: &VectorDataset) -> Result { + let parquet_path = dataset + .to_parquet_path() + .await + .context("download vector dataset parquet")?; + let parquet_bytes = std::fs::metadata(&parquet_path) + .with_context(|| format!("stat parquet file {:?}", parquet_path))? + .len(); + + tracing::info!( + "ingesting {} parquet from {:?} ({} bytes)", + dataset.name(), + parquet_path, + parquet_bytes + ); + + let chunked = parquet_to_vortex_chunks(parquet_path).await?; + + let struct_array = chunked.into_array(); + let emb_column = extract_emb_column(&struct_array)?; + let wrapped = list_to_vector_ext(emb_column)?; + + // `list_to_vector_ext` may return a chunked `Extension` when the source was + // a `ChunkedArray` of list columns (the usual shape after `parquet_to_vortex_chunks`). + // Materialize it into a single non-chunked `ExtensionArray` so downstream code can + // treat it uniformly. + let mut ctx = SESSION.create_execution_ctx(); + let uncompressed = wrapped.execute::(&mut ctx)?.into_array(); + + let query = extract_query_row(&uncompressed, DEFAULT_QUERY_ROW)?; + + Ok(PreparedDataset { + name: dataset.name().to_string(), + uncompressed, + query, + parquet_bytes, + }) +} + +/// Project the `emb` column out of a chunked struct array. This rebuilds a chunked list +/// array with just that one column. +fn extract_emb_column(struct_array: &ArrayRef) -> Result { + if let Some(chunked) = struct_array.as_opt::() { + let mut emb_chunks: Vec = Vec::with_capacity(chunked.nchunks()); + for chunk in chunked.iter_chunks() { + emb_chunks.push(extract_emb_column(chunk)?); + } + if emb_chunks.is_empty() { + bail!("dataset has no chunks"); + } + return Ok(ChunkedArray::from_iter(emb_chunks).into_array()); + } + + let Some(struct_view) = struct_array.as_opt::() else { + bail!( + "expected dataset chunks to be Struct arrays, got {}", + struct_array.dtype() + ); + }; + + let field = struct_view + .unmasked_field_by_name("emb") + .context("dataset parquet must have an `emb` column")?; + Ok(field.clone()) +} + +/// Pull a single row out of a `Vector` extension array as a plain `Vec`. +/// +/// Only `f32`-typed `Vector` arrays are supported today — the benchmark deliberately +/// restricts itself to `f32` vectors, so we assert the element type rather than +/// quietly returning a mis-cast slice. +pub(crate) fn extract_query_row(vector_ext: &ArrayRef, row: usize) -> Result> { + if row >= vector_ext.len() { + bail!( + "query row {row} out of bounds for dataset of length {}", + vector_ext.len() + ); + } + + let ext_view = vector_ext + .as_opt::() + .context("prepared dataset must be a Vector extension array")?; + + let mut ctx = SESSION.create_execution_ctx(); + + // Execute storage array to its canonical FSL form. + let fsl: FixedSizeListArray = ext_view.storage_array().clone().execute(&mut ctx)?; + + let dim_usize = match fsl.dtype() { + DType::FixedSizeList(_, d, _) => *d as usize, + other => bail!("storage dtype must be FixedSizeList, got {other}"), + }; + + let elements: PrimitiveArray = fsl.elements().clone().execute(&mut ctx)?; + if elements.ptype() != PType::F32 { + bail!( + "extract_query_row currently only supports f32 Vector columns, got {:?}", + elements.ptype() + ); + } + let slice = elements.as_slice::(); + let start = row * dim_usize; + Ok(slice[start..start + dim_usize].to_vec()) +} + +/// A prepared variant: the in-memory array tree plus the metadata we want to report +/// alongside it (size and construction cost). +#[derive(Debug, Clone)] +pub struct PreparedVariant { + /// The variant's in-memory array tree. For the uncompressed variant this is the same + /// canonical `Extension` pulled out of `prepare_dataset`; for the others it's + /// the output of the respective compression pipeline. + pub array: ArrayRef, + /// Summed byte footprint of the variant tree — `ArrayRef::nbytes()`. This is the + /// in-memory cost of keeping the variant resident, not a disk size. + pub nbytes: u64, + /// Wall time spent constructing the variant tree from the already-materialized + /// uncompressed source. 0 for [`Variant::VortexUncompressed`]; meaningful for the + /// compressed variants. + pub compress_duration: Duration, +} + +/// Apply a `Variant`'s preparation strategy to the materialized uncompressed source and +/// return the resulting tree together with its reported in-memory size and construction +/// time. Uses the global [`vortex_bench::SESSION`] for any execution-context work; the +/// benchmark has no reason to support multiple concurrent sessions. +/// +/// **Why nbytes instead of on-disk size?** The Vortex file writer applies BtrBlocks +/// compression as part of its default write strategy regardless of the in-memory tree +/// shape, so serializing an "uncompressed" tree and measuring the resulting `.vortex` +/// file produces the same bytes as serializing a `BtrBlocksCompressor::default()`- +/// compressed tree — the disk-size comparison collapses two conceptually different +/// things into one number. Reporting `nbytes()` of the in-memory tree keeps the size +/// measurement consistent with what the *compute* measurements operate on. +pub fn prepare_variant(prepared: &PreparedDataset, variant: Variant) -> Result { + match variant { + Variant::VortexUncompressed => { + // Identity: the uncompressed Extension is already materialized. Still + // record a dummy Instant so the timing point has a well-defined value even + // if it's effectively zero. + let start = Instant::now(); + let array = prepared.uncompressed.clone(); + let compress_duration = start.elapsed(); + let nbytes = array.nbytes(); + Ok(PreparedVariant { + array, + nbytes, + compress_duration, + }) + } + Variant::VortexDefault => { + let start = Instant::now(); + let array = BtrBlocksCompressor::default().compress(&prepared.uncompressed)?; + let compress_duration = start.elapsed(); + let nbytes = array.nbytes(); + Ok(PreparedVariant { + array, + nbytes, + compress_duration, + }) + } + Variant::VortexTurboQuant => { + let mut ctx = SESSION.create_execution_ctx(); + let start = Instant::now(); + let array = compress_turboquant(prepared.uncompressed.clone(), &mut ctx)?; + let compress_duration = start.elapsed(); + let nbytes = array.nbytes(); + Ok(PreparedVariant { + array, + nbytes, + compress_duration, + }) + } + } +} + +/// Run the decompress / cosine / filter microbenchmarks against a prepared variant +/// array and return the best-of-`iterations` wall times for each measurement. +/// +/// The three stages are **interleaved** inside a single outer loop rather than run +/// as three separate back-to-back loops. Interleaving keeps each stage's cache / +/// branch-predictor / allocator state symmetric across iterations — a pathology of +/// the back-to-back shape is that iteration `N+1` of the cosine stage runs on +/// warmed caches left behind by iteration `N` of the cosine stage, while iteration +/// `N+1` of the filter stage runs on caches left behind by the *cosine* stage. The +/// interleaved form makes each stage see roughly the same cache state every +/// iteration. +/// +/// Each stage still gets a fresh `ExecutionCtx` (from the global +/// [`vortex_bench::SESSION`]), so no cached scalar-fn state leaks between stages +/// within a single iteration. +pub fn run_timings( + variant_array: &ArrayRef, + query: &[f32], + iterations: usize, +) -> Result { + let mut decompress = Duration::MAX; + let mut cosine = Duration::MAX; + let mut filter = Duration::MAX; + + for _ in 0..iterations { + { + let mut ctx = SESSION.create_execution_ctx(); + let start = Instant::now(); + let decoded: FixedSizeListArray = decompress_full_scan(variant_array, &mut ctx)?; + decompress = decompress.min(start.elapsed()); + drop(decoded); + } + { + let mut ctx = SESSION.create_execution_ctx(); + let start = Instant::now(); + let scores: PrimitiveArray = execute_cosine(variant_array, query, &mut ctx)?; + cosine = cosine.min(start.elapsed()); + drop(scores); + } + { + let mut ctx = SESSION.create_execution_ctx(); + let start = Instant::now(); + let matches: BoolArray = + execute_filter(variant_array, query, DEFAULT_THRESHOLD, &mut ctx)?; + filter = filter.min(start.elapsed()); + drop(matches); + } + } + + Ok(VariantTimings { + decompress, + cosine, + filter, + }) +} + +/// Timing summary for one `(dataset, variant)` pair. +#[derive(Debug, Clone, Copy)] +pub struct VariantTimings { + /// Wall time to execute the variant's array tree back into a canonical + /// `FixedSizeListArray`. ~0 for [`Variant::VortexUncompressed`] (the tree is already + /// canonical), meaningful for the two compressed variants. + pub decompress: Duration, + /// Wall time for the cosine_similarity scalar-function execution over the whole + /// column (materialized into an `f32` [`PrimitiveArray`]). + pub cosine: Duration, + /// Wall time for the full `Binary(Gt, [cosine, threshold])` expression executed + /// into a [`BoolArray`]. + pub filter: Duration, +} + +/// Fully materialize the input column so the measurement captures *all* decompression +/// work — the extension shell, the FSL storage, **and the inner f32 element buffer**. +/// +/// Forcing the element buffer to materialize as a canonical `PrimitiveArray` is +/// what distinguishes this from a no-op cache hit. Executing the `ExtensionArray` or +/// `FixedSizeListArray` alone only unwraps the container shells — if the FSL's +/// `elements` child is (e.g.) an `alprd` tree, the bit-unpacking is lazy and only +/// happens when something reads the values. The `execute::` call below +/// forces that read. +/// +/// For the Vortex-uncompressed variant this is cheap (bitwise copy / no-op). For +/// BtrBlocks-default it includes the ALP-RD decoding pass. For TurboQuant it includes +/// running the inverse SORF rotation + dictionary lookup through the scalar-fn +/// pipeline. +pub fn decompress_full_scan( + array: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> Result { + let ext: ExtensionArray = array.clone().execute(ctx)?; + let fsl: FixedSizeListArray = ext.storage_array().clone().execute(ctx)?; + // Force the element buffer all the way down to a canonical PrimitiveArray so the + // timing captures any lazy decode work (ALP-RD bit unpacking, dict lookups, SORF + // inverse rotation, etc.). + let elements: PrimitiveArray = fsl.elements().clone().execute(ctx)?; + drop(elements); + Ok(fsl) +} + +/// Execute `CosineSimilarity(data, broadcast(query))` to a materialized `f32` +/// [`PrimitiveArray`]. Shared between the timing loop and the correctness-verification +/// path so both exercise the exact same expression tree. +/// +/// # Errors +/// +/// Returns an error if `data` is not a [`vortex_tensor::vector::Vector`] extension array, +/// if `query`'s length doesn't match the database vector dimension, or if the execution +/// context rejects the expression. +pub fn execute_cosine( + data: &ArrayRef, + query: &[f32], + ctx: &mut ExecutionCtx, +) -> Result { + let num_rows = data.len(); + let query_vec = build_constant_query_vector(query, num_rows)?; + let cosine = CosineSimilarity::try_new_array(data.clone(), query_vec, num_rows)?.into_array(); + Ok(cosine.execute(ctx)?) +} + +fn execute_filter( + data: &ArrayRef, + query: &[f32], + threshold: f32, + ctx: &mut ExecutionCtx, +) -> Result { + let tree = build_similarity_search_tree(data.clone(), query, threshold)?; + Ok(tree.execute(ctx)?) +} + +/// Test-only helpers shared between the unit tests in this crate's submodules. +#[cfg(test)] +pub(crate) mod test_utils { + use vortex::array::ArrayRef; + use vortex::array::IntoArray; + use vortex::array::arrays::ExtensionArray; + use vortex::array::arrays::FixedSizeListArray; + use vortex::array::arrays::PrimitiveArray; + use vortex::array::extension::EmptyMetadata; + use vortex::array::validity::Validity; + use vortex::buffer::BufferMut; + use vortex::dtype::extension::ExtDType; + use vortex_tensor::vector::Vector; + + /// Build a deterministic `Vector` extension array of `num_rows` rows for + /// tests. The PRNG is a trivial xorshift keyed by `seed`; we don't care about the + /// distribution beyond "not all zeros". + pub fn synthetic_vector(dim: u32, num_rows: usize, seed: u64) -> ArrayRef { + let mut buf = BufferMut::::with_capacity(num_rows * dim as usize); + let mut state = seed; + for _ in 0..(num_rows * dim as usize) { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + let v = ((state & 0xFFFF) as f32 / 32768.0) - 1.0; + buf.push(v); + } + let elements = PrimitiveArray::new::(buf.freeze(), Validity::NonNullable).into_array(); + let fsl = + FixedSizeListArray::try_new(elements, dim, Validity::NonNullable, num_rows).unwrap(); + let ext_dtype = ExtDType::::try_new(EmptyMetadata, fsl.dtype().clone()) + .unwrap() + .erased(); + ExtensionArray::new(ext_dtype, fsl.into_array()).into_array() + } +} + +#[cfg(test)] +mod tests { + use super::test_utils::synthetic_vector; + use super::*; + + /// Build a test `PreparedDataset` from synthetic data, pulling the query from + /// row 0 via the shared `extract_query_row` helper so all tests exercise the + /// ptype-assertion path the benchmark hot path uses. + fn test_prepared(dim: u32, num_rows: usize, seed: u64) -> PreparedDataset { + let uncompressed = synthetic_vector(dim, num_rows, seed); + let query = extract_query_row(&uncompressed, 0).unwrap(); + PreparedDataset { + name: "synthetic".to_string(), + uncompressed, + query, + parquet_bytes: 0, + } + } + + #[test] + fn extract_query_row_returns_the_right_slice() { + let dim = 8u32; + let num_rows = 4usize; + let prepared = test_prepared(dim, num_rows, 0xDEADBEEF); + + // Row 0 extraction was already used to populate `prepared.query`; check it + // agrees with a second extraction for row 0, and that row 3 (last) is + // different (as it should be for distinct synthetic vectors). + let row0 = extract_query_row(&prepared.uncompressed, 0).unwrap(); + let row3 = extract_query_row(&prepared.uncompressed, 3).unwrap(); + assert_eq!(row0, prepared.query); + assert_eq!(row0.len(), dim as usize); + assert_eq!(row3.len(), dim as usize); + assert_ne!(row0, row3, "different rows must differ for this seed"); + } + + #[test] + fn extract_query_row_rejects_out_of_bounds_row() { + let dim = 8u32; + let num_rows = 4usize; + let prepared = test_prepared(dim, num_rows, 0xC0FFEE); + + let err = extract_query_row(&prepared.uncompressed, 4) + .unwrap_err() + .to_string(); + assert!( + err.contains("query row 4 out of bounds"), + "unexpected error: {err}" + ); + } + + #[test] + fn prepare_variant_produces_non_empty_array_for_all_variants() { + let dim = 128u32; + let num_rows = 64usize; + let prepared = test_prepared(dim, num_rows, 0xC0FFEE); + + for variant in [ + Variant::VortexUncompressed, + Variant::VortexDefault, + Variant::VortexTurboQuant, + ] { + let prep = prepare_variant(&prepared, variant).unwrap(); + assert_eq!( + prep.array.len(), + num_rows, + "variant {variant:?} changed row count" + ); + assert!(prep.nbytes > 0, "variant {variant:?} reported zero size"); + + let timings = run_timings(&prep.array, &prepared.query, 2).unwrap(); + // TurboQuant + default must do real work; uncompressed's decompress is a + // no-op and can plausibly time as zero. + assert!(timings.cosine > Duration::ZERO); + assert!(timings.filter > Duration::ZERO); + } + } + + /// The **uncompressed** variant's decompress pass must be a no-op (the tree is + /// already canonical), while TurboQuant must do real work. This is a regression + /// guard for a future change accidentally making the uncompressed variant take the + /// slow path. + #[test] + fn uncompressed_decompress_is_fast() { + let dim = 128u32; + let num_rows = 256usize; + let prepared = test_prepared(dim, num_rows, 0xDEADBEEF); + + let uncompressed_prep = prepare_variant(&prepared, Variant::VortexUncompressed).unwrap(); + let turboquant_prep = prepare_variant(&prepared, Variant::VortexTurboQuant).unwrap(); + + let unc_timings = run_timings(&uncompressed_prep.array, &prepared.query, 3).unwrap(); + let tq_timings = run_timings(&turboquant_prep.array, &prepared.query, 3).unwrap(); + + // The uncompressed decompress should be at least an order of magnitude faster + // than TurboQuant's (usually many orders of magnitude). 5x is a loose lower + // bound that won't flake on a noisy CI runner. + assert!( + tq_timings.decompress > unc_timings.decompress * 5, + "expected TurboQuant decompress ({:?}) to be >5x uncompressed ({:?})", + tq_timings.decompress, + unc_timings.decompress + ); + } + + /// Diagnostic: print the in-memory tree shape for each variant so we can see + /// exactly what BtrBlocks and TurboQuant do to the FSL storage. + /// + /// Run with: + /// ```bash + /// cargo test -p vector-search-bench --release -- \ + /// --ignored --nocapture print_variant_trees + /// ``` + #[test] + #[ignore] + #[expect(clippy::use_debug, reason = "human-readable diagnostic output")] + fn print_variant_trees() { + let dim = 768u32; + let num_rows = 500usize; + let prepared = test_prepared(dim, num_rows, 0xC0FFEE); + + for variant in [ + Variant::VortexUncompressed, + Variant::VortexDefault, + Variant::VortexTurboQuant, + ] { + let prep = prepare_variant(&prepared, variant).unwrap(); + println!("=== {variant:?} ==="); + println!(" len : {}", prep.array.len()); + println!(" nbytes : {}", prep.nbytes); + println!(" compress_duration: {:?}", prep.compress_duration); + println!( + " encoding tree : {}", + prep.array.display_tree_encodings_only() + ); + } + } +} diff --git a/benchmarks/vector-search-bench/src/main.rs b/benchmarks/vector-search-bench/src/main.rs new file mode 100644 index 00000000000..5e78563ef22 --- /dev/null +++ b/benchmarks/vector-search-bench/src/main.rs @@ -0,0 +1,415 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! `vector-search-bench` — brute-force cosine-similarity benchmark over public VectorDBBench +//! embedding corpora. +//! +//! Usage: +//! +//! ```bash +//! cargo run -p vector-search-bench --release -- \ +//! --datasets cohere-small \ +//! --formats handrolled,vortex-uncompressed,vortex-default,vortex-turboquant \ +//! --iterations 5 \ +//! -d table +//! ``` +//! +//! The `handrolled` variant is a hand-rolled Rust scalar cosine loop over a flat +//! `Vec` decoded from the dataset's canonical parquet file; it is a compute-cost +//! floor, not a realistic parquet-on-DBMS baseline. See +//! [`handrolled_baseline`](vector_search_bench::handrolled_baseline) for details. + +use std::borrow::Cow; +use std::io::Write; +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use indicatif::ProgressBar; +use vector_search_bench::DEFAULT_THRESHOLD; +use vector_search_bench::Variant; +use vector_search_bench::handrolled_baseline::run_handrolled_baseline_timings; +use vector_search_bench::prepare_dataset; +use vector_search_bench::prepare_variant; +use vector_search_bench::recall::DEFAULT_TOP_K; +use vector_search_bench::recall::measure_recall_at_k; +use vector_search_bench::run_timings; +use vector_search_bench::verify::VerificationKind; +use vector_search_bench::verify::compute_cosine_scores; +use vector_search_bench::verify::verify_and_report_scores; +use vector_search_bench::verify::verify_variant; +use vortex_bench::Format; +use vortex_bench::create_output_writer; +use vortex_bench::datasets::Dataset; +use vortex_bench::display::DisplayFormat; +use vortex_bench::display::print_measurements_json; +use vortex_bench::measurements::CompressionTimingMeasurement; +use vortex_bench::measurements::CustomUnitMeasurement; +use vortex_bench::setup_logging_and_tracing; +use vortex_bench::vector_dataset::VectorDataset; + +const BENCHMARK_ID: &str = "vector-search"; + +/// Command-line arguments for `vector-search-bench`. +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Number of timed iterations per measurement. The reported time is the minimum across + /// iterations (matches compress-bench convention). + #[arg(short, long, default_value_t = 5)] + iterations: usize, + + /// Subset of datasets to run. Defaults to Cohere-small. + #[arg(long, value_delimiter = ',', value_enum, default_values_t = vec![SelectableDataset::CohereSmall])] + datasets: Vec, + + /// Which benchmark variants to run, using kebab-cased labels. The `--formats` name is + /// used (instead of `--variants`) so this benchmark matches the CI invocation + /// convention shared across random-access-bench / compress-bench. Accepted values: + /// `handrolled`, `vortex-uncompressed`, `vortex-default`, `vortex-turboquant`. + /// Defaults to running all four. + #[arg(long, value_delimiter = ',', value_enum, default_values_t = vec![SelectableFormat::Handrolled, SelectableFormat::VortexUncompressed, SelectableFormat::VortexDefault, SelectableFormat::VortexTurboQuant])] + formats: Vec, + + /// Number of query rows sampled when computing Recall@K for TurboQuant. 0 disables + /// the quality measurement entirely (useful for smoke tests). + #[arg(long, default_value_t = 100)] + recall_queries: usize, + + /// K in Recall@K. Defaults to 10, matching VectorDBBench conventions. + #[arg(long, default_value_t = DEFAULT_TOP_K)] + recall_k: usize, + + /// Output display format (`table` for humans, `gh-json` for CI ingestion). + #[arg(short, long, default_value_t, value_enum)] + display_format: DisplayFormat, + + /// If set, write output to this file instead of stdout. + #[arg(short, long)] + output_path: Option, + + /// Verbose logging. + #[arg(short, long)] + verbose: bool, + + /// Enable perfetto tracing output. + #[arg(long)] + tracing: bool, +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +enum SelectableDataset { + #[clap(name = "cohere-small")] + CohereSmall, + #[clap(name = "cohere-medium")] + CohereMedium, + #[clap(name = "openai-small")] + OpenAiSmall, + #[clap(name = "openai-medium")] + OpenAiMedium, + #[clap(name = "bioasq-medium")] + BioasqMedium, + #[clap(name = "glove-medium")] + GloveMedium, +} + +impl SelectableDataset { + fn into_dataset(self) -> VectorDataset { + match self { + SelectableDataset::CohereSmall => VectorDataset::CohereSmall, + SelectableDataset::CohereMedium => VectorDataset::CohereMedium, + SelectableDataset::OpenAiSmall => VectorDataset::OpenAiSmall, + SelectableDataset::OpenAiMedium => VectorDataset::OpenAiMedium, + SelectableDataset::BioasqMedium => VectorDataset::BioasqMedium, + SelectableDataset::GloveMedium => VectorDataset::GloveMedium, + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +enum SelectableFormat { + /// Hand-rolled Rust scalar cosine loop over a flat `Vec` decoded from the + /// canonical parquet file via `parquet-rs` / `arrow-rs`. Compute-cost floor — + /// not a realistic parquet-on-DBMS baseline. See + /// [`vector_search_bench::handrolled_baseline`]. + #[clap(name = "handrolled")] + Handrolled, + /// Raw `Vector` with no encoding compression. + #[clap(name = "vortex-uncompressed")] + VortexUncompressed, + /// BtrBlocks default-compression applied to the FSL storage child. + #[clap(name = "vortex-default")] + VortexDefault, + /// Full TurboQuant pipeline (lossy). + #[clap(name = "vortex-turboquant")] + VortexTurboQuant, +} + +impl SelectableFormat { + fn into_variant(self) -> Option { + match self { + SelectableFormat::Handrolled => None, + SelectableFormat::VortexUncompressed => Some(Variant::VortexUncompressed), + SelectableFormat::VortexDefault => Some(Variant::VortexDefault), + SelectableFormat::VortexTurboQuant => Some(Variant::VortexTurboQuant), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + setup_logging_and_tracing(args.verbose, args.tracing)?; + + let datasets: Vec = args + .datasets + .iter() + .copied() + .map(SelectableDataset::into_dataset) + .collect(); + + let run_handrolled_baseline = args.formats.contains(&SelectableFormat::Handrolled); + let variants: Vec = args + .formats + .iter() + .filter_map(|f| f.into_variant()) + .collect(); + + // One progress unit per inner-loop body: each Vortex variant plus the handrolled + // path (when it's enabled) gets exactly one `progress.inc(1)` below. Keep this + // count in sync with the number of `progress.inc` sites. + let total_work = datasets.len() * (variants.len() + usize::from(run_handrolled_baseline)); + let progress = ProgressBar::new(total_work as u64); + + let mut timings: Vec = Vec::new(); + let mut sizes: Vec = Vec::new(); + let mut recalls: Vec = Vec::new(); + let mut verification: Vec = Vec::new(); + + for dataset in &datasets { + let prepared = prepare_dataset(dataset).await?; + tracing::info!( + "prepared {}: dim={}, num_rows={}", + prepared.name, + prepared.dim(), + prepared.num_rows() + ); + + // Ground-truth cosine scores for the verification query — the scores produced by + // the uncompressed Vortex scan. Every other variant (including the hand-rolled + // baseline) will be compared against this. + let baseline_scores = compute_cosine_scores(&prepared.uncompressed, &prepared.query) + .context("compute ground-truth cosine scores for verification")?; + tracing::info!( + "computed {} ground-truth cosine scores for {}", + baseline_scores.len(), + prepared.name + ); + + // Hand-rolled baseline. Emitted as a separate pseudo-variant with label + // `handrolled` so it shows up in dashboards next to the Vortex variants. This + // is a hand-rolled Rust scalar cosine loop over a flat `Vec` decoded from + // parquet via `parquet-rs`; it must match the Vortex cosine scores within the + // lossless tolerance (f32 ULPs) because it's computing the same math on the + // same underlying f32 values. + // + // `target.format` stays `Format::Parquet` because the *storage* side is still + // parquet on disk — only the *compute* is hand-rolled. The metric `name` field + // carries the `handrolled` label so human readers can tell the compute apart + // from, say, a DuckDB `list_cosine_similarity` baseline on the same parquet. + // + // Timing runs first and returns the cosine scores from its final iteration; + // verification then reuses those scores rather than re-reading the parquet + // file. `cosine_loop` is deterministic, so the last-iteration scores equal + // what a separate pre-timing verification pass would produce — we just save + // one parquet read per dataset. If the scores drift from the Vortex baseline, + // `verify_and_report_scores` bails here (after the timing already ran, which + // is acceptable because the handrolled loop is cheap and we'd rather run it + // twice than skip correctness). + if run_handrolled_baseline { + let parquet_path = dataset.to_parquet_path().await?; + let label = "handrolled"; + let bench_name = format!("{label}/{}", prepared.name); + + let baseline_result = run_handrolled_baseline_timings( + &parquet_path, + &prepared.query, + DEFAULT_THRESHOLD, + args.iterations, + )?; + + let handrolled_report = verify_and_report_scores( + &bench_name, + &baseline_result.last_scores, + &baseline_scores, + VerificationKind::Lossless, + )?; + tracing::info!( + "{} verification (Lossless): max_abs_diff={:.2e}, mean_abs_diff={:.2e}", + bench_name, + handrolled_report.max_abs_diff, + handrolled_report.mean_abs_diff, + ); + verification.push(CustomUnitMeasurement { + name: format!("correctness-max-diff/{bench_name}"), + format: Format::Parquet, + unit: Cow::from("abs-diff"), + value: handrolled_report.max_abs_diff, + }); + + sizes.push(CustomUnitMeasurement { + name: format!("{label} size/{}", prepared.name), + format: Format::Parquet, + unit: Cow::from("bytes"), + value: prepared.parquet_bytes as f64, + }); + timings.push(CompressionTimingMeasurement { + name: format!("decompress time/{bench_name}"), + format: Format::Parquet, + time: baseline_result.timings.decompress, + }); + timings.push(CompressionTimingMeasurement { + name: format!("cosine-similarity time/{bench_name}"), + format: Format::Parquet, + time: baseline_result.timings.cosine, + }); + timings.push(CompressionTimingMeasurement { + name: format!("cosine-filter time/{bench_name}"), + format: Format::Parquet, + time: baseline_result.timings.filter, + }); + + progress.inc(1); + } + + for &variant in &variants { + let prep = prepare_variant(&prepared, variant)?; + + let variant_label = variant.label(); + let bench_name = format!("{variant_label}/{}", prepared.name); + + // Correctness verification BEFORE timing. Lossless variants must match + // the uncompressed baseline within f32 noise; TurboQuant must stay within + // its lossy tolerance. A failure bails the whole run — you cannot publish + // throughput numbers for an encoding that returns wrong answers. + let kind = if variant == Variant::VortexTurboQuant { + VerificationKind::Lossy + } else { + VerificationKind::Lossless + }; + let report = verify_variant( + &bench_name, + &prep.array, + &prepared.query, + &baseline_scores, + kind, + )?; + tracing::info!( + "{} verification ({:?}): max_abs_diff={:.2e}, mean_abs_diff={:.2e}", + bench_name, + kind, + report.max_abs_diff, + report.mean_abs_diff, + ); + verification.push(CustomUnitMeasurement { + name: format!("correctness-max-diff/{bench_name}"), + format: variant.as_format(), + unit: Cow::from("abs-diff"), + value: report.max_abs_diff, + }); + + // In-memory nbytes — the honest size of the variant tree we're executing. + sizes.push(CustomUnitMeasurement { + name: format!("{variant_label} nbytes/{}", prepared.name), + format: variant.as_format(), + unit: Cow::from("bytes"), + value: prep.nbytes as f64, + }); + + // Compress time — the wall time it takes to build the variant tree from + // the materialized uncompressed source. For the uncompressed variant + // itself this is ~0 (identity), so we still emit it as a measurement for + // dashboard consistency. + timings.push(CompressionTimingMeasurement { + name: format!("compress time/{bench_name}"), + format: variant.as_format(), + time: prep.compress_duration, + }); + + let variant_timings = run_timings(&prep.array, &prepared.query, args.iterations)?; + + timings.push(CompressionTimingMeasurement { + name: format!("decompress time/{bench_name}"), + format: variant.as_format(), + time: variant_timings.decompress, + }); + timings.push(CompressionTimingMeasurement { + name: format!("cosine-similarity time/{bench_name}"), + format: variant.as_format(), + time: variant_timings.cosine, + }); + timings.push(CompressionTimingMeasurement { + name: format!("cosine-filter time/{bench_name}"), + format: variant.as_format(), + time: variant_timings.filter, + }); + + // Recall@K quality measurement for lossy variants only. The lossless + // variants are trivially 1.0 by construction (since they agree with the + // uncompressed baseline within 1e-4) so we skip them to keep noise down. + if args.recall_queries > 0 && variant == Variant::VortexTurboQuant { + let recall = measure_recall_at_k( + &prepared.uncompressed, + &prep.array, + args.recall_queries, + args.recall_k, + )?; + tracing::info!("Recall@{} for {}: {:.4}", args.recall_k, bench_name, recall); + recalls.push(CustomUnitMeasurement { + name: format!("recall@{}/{bench_name}", args.recall_k), + format: variant.as_format(), + unit: Cow::from("recall"), + value: recall, + }); + } + + progress.inc(1); + } + } + progress.finish(); + + let mut writer = create_output_writer(&args.display_format, args.output_path, BENCHMARK_ID)?; + match args.display_format { + DisplayFormat::Table => { + // Our variants span multiple `Format` values *and* multiple labels that share a + // single `Format`, so the existing `render_table` helper (which groups by + // `Target`) would collapse them. Emit one line per measurement instead; this is + // only used for developer inspection — CI consumes `gh-json` via the arm below. + for timing in &timings { + writeln!(writer, "{} {} ns", timing.name, timing.time.as_nanos())?; + } + for size in &sizes { + writeln!(writer, "{} {} {}", size.name, size.value, size.unit)?; + } + for recall in &recalls { + writeln!( + writer, + "{} {:.4} {}", + recall.name, recall.value, recall.unit + )?; + } + for check in &verification { + writeln!(writer, "{} {:.6e} {}", check.name, check.value, check.unit)?; + } + } + DisplayFormat::GhJson => { + print_measurements_json(&mut writer, timings)?; + print_measurements_json(&mut writer, sizes)?; + print_measurements_json(&mut writer, recalls)?; + print_measurements_json(&mut writer, verification)?; + } + } + + Ok(()) +} diff --git a/benchmarks/vector-search-bench/src/recall.rs b/benchmarks/vector-search-bench/src/recall.rs new file mode 100644 index 00000000000..fb1dff4dc8c --- /dev/null +++ b/benchmarks/vector-search-bench/src/recall.rs @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Recall@K quality measurement for lossy vector-search variants. +//! +//! This module computes the fraction of the true top-K nearest neighbours that a +//! lossy encoding (today just TurboQuant) recovers, using the uncompressed Vortex +//! scan as the local ground truth. Recall is averaged over a small number of sampled +//! query rows. +//! +//! This is explicitly a *relative* recall — we compare TurboQuant-retrieved neighbours +//! against the neighbours that the *same* cosine-similarity expression finds in the +//! uncompressed scan, not against VectorDBBench's shipped `neighbors.parquet`. Comparing +//! against external ground truth would require an index (which Vortex doesn't have) and +//! is structurally out of scope for a file-format benchmark. + +use anyhow::Result; +use vortex::array::ArrayRef; +use vortex::utils::aliases::hash_set::HashSet; + +use crate::extract_query_row; +use crate::verify::compute_cosine_scores; + +/// Size of the neighbour set we compare. 10 is the standard VectorDBBench default. +pub const DEFAULT_TOP_K: usize = 10; + +/// Compute recall@K for the lossy `compressed` variant against the `uncompressed` +/// ground-truth variant, averaged over `num_queries` sampled query rows. Uses the +/// global [`vortex_bench::SESSION`] for all executions. +/// +/// Query selection is deterministic: rows are picked uniformly across the dataset at +/// `step = uncompressed.len() / num_queries` intervals. This keeps the result stable +/// across runs and avoids needing to thread a PRNG seed into the benchmark CLI. +pub fn measure_recall_at_k( + uncompressed: &ArrayRef, + compressed: &ArrayRef, + num_queries: usize, + top_k: usize, +) -> Result { + assert!( + num_queries > 0, + "measure_recall_at_k requires num_queries > 0" + ); + assert!(top_k > 0, "measure_recall_at_k requires top_k > 0"); + let num_rows = uncompressed.len(); + assert_eq!( + compressed.len(), + num_rows, + "uncompressed and compressed arrays must have the same row count" + ); + assert!(num_rows >= top_k, "dataset must have at least top_k rows"); + + let step = (num_rows / num_queries).max(1); + + let mut total_hits: usize = 0; + let mut total_checked: usize = 0; + + for q in 0..num_queries { + let row = (q * step).min(num_rows - 1); + let query = extract_query_row(uncompressed, row)?; + + let gt_scores = compute_cosine_scores(uncompressed, &query)?; + let truth = top_k_indices(>_scores, top_k); + + let lossy_scores = compute_cosine_scores(compressed, &query)?; + let lossy = top_k_indices(&lossy_scores, top_k); + + let truth_set: HashSet = truth.iter().copied().collect(); + total_hits += lossy.iter().filter(|idx| truth_set.contains(*idx)).count(); + total_checked += top_k; + } + + Ok(total_hits as f64 / total_checked as f64) +} + +/// Return the indices of the top-K highest scores, stable-sorted descending. +/// +/// Uses `f32::total_cmp` for a NaN-safe total order — `partial_cmp` would panic on +/// NaN, and `partial_cmp(...).unwrap_or(Ordering::Equal)` would put NaNs at +/// arbitrary positions. `total_cmp` gives NaNs a well-defined (but meaningless) sort +/// slot, which lets the function be robust against accidental NaN inputs without +/// silently hiding them. +fn top_k_indices(scores: &[f32], top_k: usize) -> Vec { + let mut idx: Vec = (0..scores.len()).collect(); + idx.sort_by(|&a, &b| scores[b].total_cmp(&scores[a])); + idx.truncate(top_k); + idx +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Variant; + use crate::extract_query_row; + use crate::prepare_variant; + use crate::test_utils::synthetic_vector; + + #[test] + fn top_k_indices_handles_nan_without_panicking() { + // `partial_cmp` panics on NaN (well, returns None, which was silently swallowed + // before). `total_cmp` gives NaN a well-defined slot, so the sort doesn't + // panic and doesn't produce arbitrary orderings for non-NaN elements. + let scores = [0.9f32, f32::NAN, 0.7, 0.5, f32::NAN]; + let top = top_k_indices(&scores, 3); + assert_eq!(top.len(), 3); + // The finite values 0.9, 0.7, 0.5 should still rank in the right order + // relative to each other — NaNs sort somewhere, but the finite ordering is + // preserved because `total_cmp` is a total order. + let finite_positions: Vec = top + .iter() + .copied() + .filter(|&i| !scores[i].is_nan()) + .collect(); + assert!( + finite_positions + .windows(2) + .all(|w| scores[w[0]] >= scores[w[1]]), + "finite scores should still be in descending order" + ); + } + + #[test] + fn uncompressed_has_perfect_self_recall() { + let dim = 128u32; + let num_rows = 64usize; + let uncompressed = synthetic_vector(dim, num_rows, 0xC0FFEE); + + let recall = measure_recall_at_k(&uncompressed, &uncompressed, 4, 10).unwrap(); + assert!( + (recall - 1.0).abs() < 1e-9, + "self-recall must be 1.0, got {recall}" + ); + } + + #[test] + fn turboquant_recall_is_reasonable_for_synthetic_data() { + let dim = 128u32; + let num_rows = 64usize; + let uncompressed = synthetic_vector(dim, num_rows, 0xC0FFEE); + + // `measure_recall_at_k` doesn't need the PreparedDataset's `query` field — + // it derives queries internally via `extract_query_row` on `uncompressed`. + // Construct just enough of a `PreparedDataset` to pass to `prepare_variant`. + let prepared = crate::PreparedDataset { + name: "synthetic".to_string(), + uncompressed: uncompressed.clone(), + query: extract_query_row(&uncompressed, 0).unwrap(), + parquet_bytes: 0, + }; + + let tq_prep = prepare_variant(&prepared, Variant::VortexTurboQuant).unwrap(); + + // With only 64 random rows, recall@10 won't be 1.0 but it should be well + // above chance (10/64 ≈ 0.156). The test asserts a loose lower bound to catch + // total regressions without being flaky on distribution noise. + let recall = measure_recall_at_k(&uncompressed, &tq_prep.array, 4, 10).unwrap(); + assert!( + recall >= 0.3, + "TurboQuant recall@10 on 64×128 synthetic data should be ≥0.3, got {recall}", + ); + } +} diff --git a/benchmarks/vector-search-bench/src/verify.rs b/benchmarks/vector-search-bench/src/verify.rs new file mode 100644 index 00000000000..da2a73f1ea2 --- /dev/null +++ b/benchmarks/vector-search-bench/src/verify.rs @@ -0,0 +1,402 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Correctness verification for vector-search variants. +//! +//! Before the timing loop runs, we compute cosine-similarity scores for a single query +//! row against the uncompressed baseline and against each prepared variant, then compare +//! the two score vectors element-by-element. This catches two distinct classes of bug: +//! +//! - A **lossless variant** that disagrees with the uncompressed scan (bug in the +//! compression pipeline, or in how we're routing through the scalar-fn dispatch, or in +//! the variant-specific decompress path). +//! - A **lossy variant** (TurboQuant) that drifts further from ground truth than we +//! expect from the bit-width and SORF rotation settings (regression in the encoder). +//! +//! The same `execute_cosine` function the timing loop uses is also what verification +//! uses, so the correctness check is validating the *exact* expression tree we're about +//! to benchmark. Lossless variants must match within [`LOSSLESS_TOLERANCE`]; lossy +//! variants must match within [`LOSSY_TOLERANCE`]. A hard-stop `Err` return on any +//! mismatch keeps the benchmark honest — you cannot publish throughput numbers for a +//! variant that's returning garbage. + +use anyhow::Result; +use anyhow::bail; +use vortex::array::ArrayRef; +use vortex::array::VortexSessionExecute; +use vortex::dtype::PType; +use vortex_bench::SESSION; + +use crate::execute_cosine; + +/// Maximum acceptable absolute difference in cosine scores for a *lossless* variant +/// (uncompressed, BtrBlocks-default). `cosine_similarity` traverses the FSL storage and +/// reduces with f32 accumulators, so a pure algebraic change of encoding can shift a +/// score by a few ULPs of f32 precision. `1e-4` is well above that noise floor while +/// still catching real regressions. +pub const LOSSLESS_TOLERANCE: f32 = 1e-4; + +/// Maximum acceptable absolute difference in cosine scores for the *lossy* TurboQuant +/// variant. At the default 8-bit configuration the reconstructed dot product typically +/// drifts by well under 0.05 for unit-normalized vectors. `0.2` is a loose upper bound +/// that catches regressions without flaking on distribution-specific noise. +pub const LOSSY_TOLERANCE: f32 = 0.2; + +/// How lossy a variant is allowed to be when its scores are compared to the +/// uncompressed baseline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VerificationKind { + /// Lossless variants must match within [`LOSSLESS_TOLERANCE`]. + Lossless, + /// Lossy variants must match within [`LOSSY_TOLERANCE`]. + Lossy, +} + +/// Per-variant correctness report. Captured for both pass and fail outcomes so the +/// caller can emit the numbers as dashboard measurements regardless. +#[derive(Debug, Clone, Copy)] +pub struct VerificationReport { + /// Number of rows compared (== dataset row count). + pub num_scores: usize, + /// Mean absolute difference between baseline and variant cosine scores. + pub mean_abs_diff: f64, + /// Max absolute difference between baseline and variant cosine scores. + pub max_abs_diff: f64, + /// Which tolerance band applied. + pub kind: VerificationKind, + /// Whether the variant's max-abs-diff stayed within its tolerance. + pub passed: bool, +} + +impl VerificationReport { + /// The tolerance that was applied to produce [`Self::passed`]. + pub fn tolerance(&self) -> f32 { + match self.kind { + VerificationKind::Lossless => LOSSLESS_TOLERANCE, + VerificationKind::Lossy => LOSSY_TOLERANCE, + } + } +} + +/// Compute cosine-similarity scores for a single query row on `data` and return them +/// as a plain `Vec`. This is a convenience wrapper around +/// [`crate::execute_cosine`] that pulls the f32 slice out of the resulting +/// `PrimitiveArray`. Uses the global [`vortex_bench::SESSION`]. +/// +/// # Errors +/// +/// Returns an error if [`execute_cosine`] fails (bad input shape or dispatch error), +/// or if the cosine expression produces a non-`f32` primitive array. The latter can't +/// happen today because the benchmark only wires `f32` `Vector` columns, but the +/// explicit ptype check keeps the function sound if the scalar-fn output type ever +/// widens (e.g. to `f64`) without the caller noticing. +pub fn compute_cosine_scores(data: &ArrayRef, query: &[f32]) -> Result> { + let mut ctx = SESSION.create_execution_ctx(); + let scores = execute_cosine(data, query, &mut ctx)?; + if scores.ptype() != PType::F32 { + bail!( + "compute_cosine_scores: cosine output must be f32, got {:?}", + scores.ptype() + ); + } + Ok(scores.as_slice::().to_vec()) +} + +/// Compare two equal-length score vectors and return their mean absolute difference +/// and max absolute difference, without evaluating a pass/fail threshold. +pub fn compare_scores(baseline: &[f32], other: &[f32]) -> (f64, f64) { + assert_eq!( + baseline.len(), + other.len(), + "compare_scores: length mismatch baseline={} other={}", + baseline.len(), + other.len(), + ); + + if baseline.is_empty() { + return (0.0, 0.0); + } + + let mut sum = 0.0f64; + let mut max: f64 = 0.0; + for (&b, &o) in baseline.iter().zip(other.iter()) { + // Treat (+0, -0) pairs as equal and propagate NaN as inf so it always fails + // the tolerance check below. + let diff = if b.is_nan() || o.is_nan() { + f64::INFINITY + } else { + (f64::from(b) - f64::from(o)).abs() + }; + sum += diff; + if diff > max { + max = diff; + } + } + (sum / baseline.len() as f64, max) +} + +/// Verify one variant's scores against a baseline and produce a full +/// [`VerificationReport`]. Whether `passed` is true depends on `kind`'s tolerance. +pub fn verify_scores( + baseline: &[f32], + variant_scores: &[f32], + kind: VerificationKind, +) -> VerificationReport { + let (mean_abs_diff, max_abs_diff) = compare_scores(baseline, variant_scores); + let tolerance = match kind { + VerificationKind::Lossless => f64::from(LOSSLESS_TOLERANCE), + VerificationKind::Lossy => f64::from(LOSSY_TOLERANCE), + }; + let passed = max_abs_diff <= tolerance; + VerificationReport { + num_scores: baseline.len(), + mean_abs_diff, + max_abs_diff, + kind, + passed, + } +} + +/// Verify pre-computed scores against a baseline and enforce the tolerance band. +/// +/// Takes already-materialized `variant_scores` (as a `&[f32]`) rather than an +/// `ArrayRef`, so both the Vortex-variant path (which computes scores via +/// [`execute_cosine`](crate::execute_cosine)) and the hand-rolled baseline path (which +/// runs a plain Rust loop over a flat `Vec`) share the same error-handling, +/// logging, and hard-fail logic without duplicating it in `main.rs`. +/// +/// Lossless mismatches bail the run with an error; lossy mismatches log a warning +/// but let the run continue so the recall measurement is still reported. +pub fn verify_and_report_scores( + variant_name: &str, + variant_scores: &[f32], + baseline_scores: &[f32], + kind: VerificationKind, +) -> Result { + let report = verify_scores(baseline_scores, variant_scores, kind); + + if !report.passed { + let message = format!( + "{variant_name} correctness check failed: max_abs_diff={:.6}, \ + mean_abs_diff={:.6}, tolerance={:.6} ({:?})", + report.max_abs_diff, + report.mean_abs_diff, + report.tolerance(), + report.kind, + ); + match kind { + VerificationKind::Lossless => bail!("{message}"), + VerificationKind::Lossy => { + tracing::warn!("{message}"); + } + } + } + + Ok(report) +} + +/// End-to-end variant verification: executes cosine on `variant_array` against the +/// same query used for the baseline and returns a [`VerificationReport`]. Returns +/// `Err` if `kind` is [`VerificationKind::Lossless`] and the scores disagree beyond +/// [`LOSSLESS_TOLERANCE`] — that indicates a real correctness bug, not a quality +/// tradeoff. Uses the global [`vortex_bench::SESSION`]. +pub fn verify_variant( + variant_name: &str, + variant_array: &ArrayRef, + query: &[f32], + baseline_scores: &[f32], + kind: VerificationKind, +) -> Result { + let scores = compute_cosine_scores(variant_array, query)?; + verify_and_report_scores(variant_name, &scores, baseline_scores, kind) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Variant; + use crate::extract_query_row; + use crate::prepare_variant; + use crate::test_utils::synthetic_vector; + + /// Build a `PreparedDataset` whose `query` is row 0 of the dataset. Using + /// `extract_query_row` here (rather than a test-local f32 extraction helper) also + /// keeps the test surface covered by the same ptype-assertion path the benchmark + /// hot path uses. + fn make_prepared(dim: u32, num_rows: usize, seed: u64) -> crate::PreparedDataset { + let uncompressed = synthetic_vector(dim, num_rows, seed); + let query = extract_query_row(&uncompressed, 0).unwrap(); + assert_eq!(query.len(), dim as usize); + crate::PreparedDataset { + name: "synthetic".to_string(), + uncompressed, + query, + parquet_bytes: 0, + } + } + + #[test] + fn compare_scores_handles_empty() { + let (mean, max) = compare_scores(&[], &[]); + assert_eq!(mean, 0.0); + assert_eq!(max, 0.0); + } + + #[test] + fn compare_scores_computes_mae_and_max() { + let base = [0.0f32, 1.0, 2.0, 3.0]; + let other = [0.0f32, 1.0, 2.5, 3.0]; + let (mean, max) = compare_scores(&base, &other); + assert!((max - 0.5).abs() < 1e-9); + assert!((mean - 0.125).abs() < 1e-9); + } + + #[test] + fn verify_scores_passes_for_identical_inputs() { + let base = [0.5f32; 10]; + let report = verify_scores(&base, &base, VerificationKind::Lossless); + assert!(report.passed); + assert_eq!(report.max_abs_diff, 0.0); + assert_eq!(report.mean_abs_diff, 0.0); + assert_eq!(report.num_scores, 10); + } + + #[test] + fn verify_scores_fails_for_lossless_beyond_tolerance() { + let base = [0.5f32; 10]; + let mut other = [0.5f32; 10]; + other[3] = 0.50001; // diff ≈ 1e-5, comfortably below the 1e-4 lossless bound + let report_ok = verify_scores(&base, &other, VerificationKind::Lossless); + assert!( + report_ok.passed, + "1e-5 drift should pass, got max={:.2e}", + report_ok.max_abs_diff + ); + + other[3] = 0.51; // diff of 0.01, well above 1e-4 + let report_bad = verify_scores(&base, &other, VerificationKind::Lossless); + assert!( + !report_bad.passed, + "1e-2 drift should fail, got max={:.2e}", + report_bad.max_abs_diff + ); + } + + #[test] + fn verify_scores_lossy_tolerates_small_drift() { + let base = [0.9f32; 10]; + let mut other = [0.9f32; 10]; + other[0] = 1.0; // diff of 0.1 + let report = verify_scores(&base, &other, VerificationKind::Lossy); + assert!( + report.passed, + "0.1 drift should pass lossy tolerance, got max={}", + report.max_abs_diff + ); + } + + #[test] + fn verify_scores_fails_on_nan() { + let base = [0.5f32, 0.5]; + let other = [0.5f32, f32::NAN]; + let report = verify_scores(&base, &other, VerificationKind::Lossless); + assert!(!report.passed); + assert!(report.max_abs_diff.is_infinite()); + } + + #[test] + fn verify_scores_fails_on_nan_in_baseline() { + // Symmetric case: NaN on the baseline side should also fail, not just variant. + let base = [0.5f32, f32::NAN]; + let other = [0.5f32, 0.5]; + let report = verify_scores(&base, &other, VerificationKind::Lossless); + assert!(!report.passed); + assert!(report.max_abs_diff.is_infinite()); + } + + #[test] + fn verify_and_report_scores_is_ok_for_identical_inputs() { + let base = [0.5f32; 10]; + let report = + verify_and_report_scores("self", &base, &base, VerificationKind::Lossless).unwrap(); + assert!(report.passed); + assert_eq!(report.max_abs_diff, 0.0); + } + + #[test] + fn verify_and_report_scores_bails_for_lossless_mismatch() { + let base = [0.5f32; 10]; + let mut other = [0.5f32; 10]; + other[3] = 0.6; + let err = + verify_and_report_scores("broken-variant", &other, &base, VerificationKind::Lossless) + .unwrap_err() + .to_string(); + assert!( + err.contains("broken-variant correctness check failed"), + "unexpected error: {err}" + ); + } + + #[test] + fn verify_and_report_scores_warns_for_lossy_mismatch_without_bailing() { + // A lossy variant outside its tolerance should NOT bail — it logs a warning + // and returns the failing report so the caller can still emit the + // measurement and show recall alongside it. + let base = [0.9f32; 10]; + let mut other = [0.9f32; 10]; + other[0] = 1.5; // diff of 0.6, above the 0.2 lossy tolerance + let report = + verify_and_report_scores("too-lossy-variant", &other, &base, VerificationKind::Lossy) + .expect("lossy failures should not bail"); + assert!(!report.passed); + assert!(report.max_abs_diff > f64::from(LOSSY_TOLERANCE)); + } + + #[test] + fn vortex_default_matches_uncompressed_end_to_end() { + let dim = 128u32; + let num_rows = 64usize; + let prepared = make_prepared(dim, num_rows, 0xC0FFEE); + + let baseline_scores = + compute_cosine_scores(&prepared.uncompressed, &prepared.query).unwrap(); + + let default_prep = prepare_variant(&prepared, Variant::VortexDefault).unwrap(); + let report = verify_variant( + "vortex-default", + &default_prep.array, + &prepared.query, + &baseline_scores, + VerificationKind::Lossless, + ) + .expect("vortex-default must be lossless against the uncompressed baseline"); + assert!(report.passed); + } + + #[test] + fn vortex_turboquant_stays_within_lossy_tolerance() { + let dim = 128u32; + let num_rows = 64usize; + let prepared = make_prepared(dim, num_rows, 0xDEADBEEF); + + let baseline_scores = + compute_cosine_scores(&prepared.uncompressed, &prepared.query).unwrap(); + + let tq_prep = prepare_variant(&prepared, Variant::VortexTurboQuant).unwrap(); + let report = verify_variant( + "vortex-turboquant", + &tq_prep.array, + &prepared.query, + &baseline_scores, + VerificationKind::Lossy, + ) + .expect("TurboQuant verification should not error"); + assert!( + report.passed, + "TurboQuant drift {:.4} exceeds lossy tolerance {:.4}", + report.max_abs_diff, + report.tolerance() + ); + } +} diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 62d302f12e1..1859a64a16e 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -63,3 +63,4 @@ vortex = { workspace = true, features = [ "tokio", "zstd", ] } +vortex-tensor = { workspace = true } diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 3f21ab30ba0..9811abc903d 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -21,17 +21,30 @@ use vortex::VortexSessionDefault; use vortex::array::ArrayRef; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; +use vortex::array::arrays::Chunked; use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::ExtensionArray; +use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::List; +use vortex::array::arrays::ListView; +use vortex::array::arrays::chunked::ChunkedArrayExt; +use vortex::array::arrays::list::ListArrayExt; +use vortex::array::arrays::listview::recursive_list_from_list_view; use vortex::array::arrow::FromArrowArray; use vortex::array::builders::builder_with_capacity; +use vortex::array::extension::EmptyMetadata; use vortex::array::stream::ArrayStreamAdapter; use vortex::array::stream::ArrayStreamExt; +use vortex::array::validity::Validity; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; +use vortex::dtype::extension::ExtDType; use vortex::error::VortexResult; +use vortex::error::vortex_bail; use vortex::error::vortex_err; use vortex::file::WriteOptionsSessionExt; use vortex::session::VortexSession; +use vortex_tensor::vector::Vector; use crate::CompactionStrategy; use crate::Format; @@ -222,3 +235,347 @@ pub async fn write_parquet_as_vortex( }) .await } + +/// Rewrap a list-of-float column as a [`vortex_tensor::vector::Vector`] extension array. +/// +/// Parquet has no fixed-size list logical type, so an embedding column ingested via +/// [`parquet_to_vortex_chunks`] arrives as `List` (or `List` / `List`) even +/// when every row has the same length. This helper validates that every list in `input` +/// has the same length `D` and reconstructs the column as +/// `Extension(FixedSizeList)` — the shape expected by the vector-search +/// scalar functions in `vortex-tensor`. +/// +/// The input may be either a single [`List`] array or a [`Chunked`] array of lists (the +/// common case after `parquet_to_vortex_chunks`). Chunked inputs are converted chunk-by-chunk +/// and reassembled as a [`ChunkedArray`] of `Extension`. +/// +/// # Errors +/// +/// Returns an error if: +/// - `input` is not a `List` or `Chunked` array. +/// - The element type is not a non-nullable float primitive (`f16`, `f32`, or `f64`). +/// - Any row has a different length than the first row. +/// - The list validity is nullable (vector elements cannot be null at the row level). +/// - The input has zero rows (the dimension cannot be inferred from empty input). +pub fn list_to_vector_ext(input: ArrayRef) -> VortexResult { + if let Some(chunked) = input.as_opt::() { + let converted: Vec = chunked + .iter_chunks() + .map(|chunk| list_to_vector_ext(chunk.clone())) + .collect::>()?; + if converted.is_empty() { + vortex_bail!("list_to_vector_ext: chunked input has no chunks"); + } + return Ok(ChunkedArray::from_iter(converted).into_array()); + } + + // `parquet_to_vortex_chunks` produces `ListView` arrays for list columns by default; + // materialize them into a flat `List` representation before we validate offsets. + if input.as_opt::().is_some() { + let flat = recursive_list_from_list_view(input)?; + return list_to_vector_ext(flat); + } + + let Some(list) = input.as_opt::() else { + vortex_bail!( + "list_to_vector_ext: expected a List array, got dtype {}", + input.dtype() + ); + }; + + if !matches!( + list.list_validity(), + Validity::NonNullable | Validity::AllValid + ) { + vortex_bail!( + "list_to_vector_ext: list rows must be non-nullable for Vector extension wrapping" + ); + } + + let element_dtype = list.element_dtype().clone(); + let DType::Primitive(ptype, elem_nullability) = &element_dtype else { + vortex_bail!( + "list_to_vector_ext: element dtype must be a primitive float, got {}", + element_dtype + ); + }; + if !ptype.is_float() { + vortex_bail!( + "list_to_vector_ext: element type must be float (f16/f32/f64), got {}", + ptype + ); + } + if elem_nullability.is_nullable() { + vortex_bail!( + "list_to_vector_ext: element type must be non-nullable, got nullable {}", + ptype + ); + } + + let num_rows = input.len(); + if num_rows == 0 { + vortex_bail!("list_to_vector_ext: cannot infer vector dimension from empty input"); + } + + // Walk the offsets array once, reusing the previous iteration's `end` as the + // next iteration's `start`. Each `offset_at` call goes through + // `ListArrayExt::offset_at`, which has a fast path when the offsets child is a + // `Primitive` array (direct slice index). That's the common case after + // `parquet_to_vortex_chunks`, so for a 100K-row column we do ~100K primitive + // slice indexes rather than 200K. The loop body is O(1) either way. + let mut prev_end = list.offset_at(0)?; + let first_end = list.offset_at(1)?; + let dim = first_end.checked_sub(prev_end).ok_or_else(|| { + vortex_err!("list_to_vector_ext: offsets are not monotonically increasing") + })?; + if dim == 0 { + vortex_bail!("list_to_vector_ext: first row has zero elements"); + } + prev_end = first_end; + + for i in 1..num_rows { + let end = list.offset_at(i + 1)?; + let row_len = end.checked_sub(prev_end).ok_or_else(|| { + vortex_err!("list_to_vector_ext: offsets are not monotonically increasing") + })?; + if row_len != dim { + vortex_bail!( + "list_to_vector_ext: row {} has length {} but expected {}", + i, + row_len, + dim + ); + } + prev_end = end; + } + + let elements = list.sliced_elements()?; + let expected_elements = num_rows + .checked_mul(dim) + .ok_or_else(|| vortex_err!("list_to_vector_ext: num_rows * dim overflows usize"))?; + if elements.len() != expected_elements { + vortex_bail!( + "list_to_vector_ext: elements buffer has length {} but expected {}", + elements.len(), + expected_elements + ); + } + + let dim_u32 = u32::try_from(dim) + .map_err(|_| vortex_err!("list_to_vector_ext: dimension {dim} does not fit in u32"))?; + + let fsl = FixedSizeListArray::try_new(elements, dim_u32, Validity::NonNullable, num_rows)?; + let ext_dtype = ExtDType::::try_new(EmptyMetadata, fsl.dtype().clone())?.erased(); + Ok(ExtensionArray::new(ext_dtype, fsl.into_array()).into_array()) +} + +#[cfg(test)] +mod tests { + use vortex::array::IntoArray; + use vortex::array::arrays::Extension; + use vortex::array::arrays::List; + use vortex::array::arrays::ListViewArray; + use vortex::array::arrays::PrimitiveArray; + use vortex::array::arrays::extension::ExtensionArrayExt; + use vortex::array::validity::Validity; + use vortex::buffer::BufferMut; + use vortex::dtype::DType; + + use super::list_to_vector_ext; + + fn list_f32(rows: &[&[f32]]) -> vortex::array::ArrayRef { + let mut elements = BufferMut::::with_capacity(rows.iter().map(|r| r.len()).sum()); + let mut offsets = BufferMut::::with_capacity(rows.len() + 1); + offsets.push(0); + for row in rows { + for &v in row.iter() { + elements.push(v); + } + offsets.push(i32::try_from(elements.len()).unwrap()); + } + + let elements_array = + PrimitiveArray::new::(elements.freeze(), Validity::NonNullable).into_array(); + let offsets_array = + PrimitiveArray::new::(offsets.freeze(), Validity::NonNullable).into_array(); + vortex::array::Array::::new(elements_array, offsets_array, Validity::NonNullable) + .into_array() + } + + #[test] + fn uniform_list_becomes_vector_extension() { + let list = list_f32(&[&[1.0, 2.0, 3.0], &[4.0, 5.0, 6.0], &[7.0, 8.0, 9.0]]); + let wrapped = list_to_vector_ext(list).unwrap(); + assert_eq!(wrapped.len(), 3); + let ext = wrapped.as_opt::().expect("returns Extension"); + assert!(matches!( + ext.storage_array().dtype(), + DType::FixedSizeList(_, 3, _) + )); + } + + #[test] + fn mismatched_row_length_is_rejected() { + let list = list_f32(&[&[1.0, 2.0, 3.0], &[4.0, 5.0]]); + let err = list_to_vector_ext(list).unwrap_err().to_string(); + assert!( + err.contains("row 1 has length 2 but expected 3"), + "unexpected error: {err}", + ); + } + + #[test] + fn non_list_input_is_rejected() { + let primitive = PrimitiveArray::new::( + BufferMut::::from_iter([1.0f32, 2.0, 3.0]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let err = list_to_vector_ext(primitive).unwrap_err().to_string(); + assert!( + err.contains("expected a List array"), + "unexpected error: {err}" + ); + } + + #[test] + fn empty_input_is_rejected() { + let list = list_f32(&[]); + let err = list_to_vector_ext(list).unwrap_err().to_string(); + assert!( + err.contains("cannot infer vector dimension from empty input"), + "unexpected error: {err}", + ); + } + + /// Build a `ListView` whose every row is a length-`dim` slice of the flattened + /// `values` buffer. This shape matches what `parquet_to_vortex_chunks` produces for + /// embedding columns after arrow-rs' canonicalization, and exercises the + /// `list_to_vector_ext` fast-path that collapses `ListView` → `List` before + /// validating offsets. + fn list_view_f32(dim: usize, rows: &[&[f32]]) -> vortex::array::ArrayRef { + let mut values = BufferMut::::with_capacity(rows.len() * dim); + for row in rows { + assert_eq!(row.len(), dim); + for &v in row.iter() { + values.push(v); + } + } + let elements = + PrimitiveArray::new::(values.freeze(), Validity::NonNullable).into_array(); + + let dim_i32 = i32::try_from(dim).unwrap(); + let num_rows = rows.len(); + + let mut offsets_buf = BufferMut::::with_capacity(num_rows); + for i in 0..num_rows { + offsets_buf.push(i32::try_from(i).unwrap() * dim_i32); + } + let offsets = + PrimitiveArray::new::(offsets_buf.freeze(), Validity::NonNullable).into_array(); + + let mut sizes_buf = BufferMut::::with_capacity(num_rows); + for _ in 0..num_rows { + sizes_buf.push(dim_i32); + } + let sizes = + PrimitiveArray::new::(sizes_buf.freeze(), Validity::NonNullable).into_array(); + + ListViewArray::try_new(elements, offsets, sizes, Validity::NonNullable) + .unwrap() + .into_array() + } + + #[test] + fn list_view_input_is_rewrapped_as_vector_extension() { + // Simulates the post-parquet-ingest shape: the `emb` column arrives as a + // ListView, not a List. `list_to_vector_ext` must materialize it via + // `recursive_list_from_list_view` and then validate offsets on the flattened + // `List` form. + let list_view = list_view_f32(3, &[&[1.0, 2.0, 3.0], &[4.0, 5.0, 6.0]]); + let wrapped = list_to_vector_ext(list_view).unwrap(); + assert_eq!(wrapped.len(), 2); + let ext = wrapped.as_opt::().expect("returns Extension"); + assert!(matches!( + ext.storage_array().dtype(), + DType::FixedSizeList(_, 3, _) + )); + } + + #[test] + fn all_invalid_list_validity_is_rejected() { + // A list with `Validity::AllInvalid` means every row is null. The Vector + // extension type requires non-nullable elements at the FSL level, so we + // must reject this input rather than silently dropping the validity mask. + let elements = PrimitiveArray::new::( + BufferMut::::from_iter([1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let offsets = PrimitiveArray::new::( + BufferMut::::from_iter([0i32, 3, 6]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let list = + vortex::array::Array::::new(elements, offsets, Validity::AllInvalid).into_array(); + + let err = list_to_vector_ext(list).unwrap_err().to_string(); + assert!( + err.contains("list rows must be non-nullable"), + "unexpected error: {err}" + ); + } + + #[test] + fn non_float_element_type_is_rejected() { + // Build a List. + let elements = PrimitiveArray::new::( + BufferMut::::from_iter([1i32, 2, 3, 4]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let offsets = PrimitiveArray::new::( + BufferMut::::from_iter([0i32, 2, 4]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let list = vortex::array::Array::::new(elements, offsets, Validity::NonNullable) + .into_array(); + + let err = list_to_vector_ext(list).unwrap_err().to_string(); + assert!( + err.contains("element type must be float"), + "unexpected error: {err}", + ); + } + + #[test] + fn nullable_element_dtype_is_rejected() { + // Build a `List` — a list whose elements have nullable dtype (even + // if every value happens to be present). The `Vector` extension type at the + // FSL level requires non-nullable elements, so this must be rejected. + // + // Passing `Validity::AllValid` to `PrimitiveArray::new` sets the ptype's + // nullability to `Nullable`, which is what triggers the rejection path even + // though every value is technically valid. + let elements = PrimitiveArray::new::( + BufferMut::::from_iter([1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).freeze(), + Validity::AllValid, + ) + .into_array(); + let offsets = PrimitiveArray::new::( + BufferMut::::from_iter([0i32, 3, 6]).freeze(), + Validity::NonNullable, + ) + .into_array(); + let list = vortex::array::Array::::new(elements, offsets, Validity::NonNullable) + .into_array(); + + let err = list_to_vector_ext(list).unwrap_err().to_string(); + assert!( + err.contains("element type must be non-nullable"), + "unexpected error: {err}" + ); + } +} diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index af0d3fdef30..e2829246872 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -53,6 +53,7 @@ pub mod statpopgen; pub mod tpcds; pub mod tpch; pub mod utils; +pub mod vector_dataset; pub use benchmark::Benchmark; pub use benchmark::TableSpec; @@ -138,6 +139,13 @@ pub enum Format { #[clap(name = "vortex-compact")] #[serde(rename = "vortex-compact")] VortexCompact, + /// Vortex file with the TurboQuant lossy vector-quantization encoding applied to + /// [`vortex_tensor::vector::Vector`] columns. Used by the vector-search benchmark to + /// distinguish TurboQuant-encoded results from the generic BtrBlocks-compressed + /// [`Format::OnDiskVortex`] layout in downstream dashboards. + #[clap(name = "vortex-turboquant")] + #[serde(rename = "vortex-turboquant")] + VortexTurboQuant, #[clap(name = "duckdb")] #[serde(rename = "duckdb")] OnDiskDuckDB, @@ -177,6 +185,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex-file-compressed", Format::VortexCompact => "vortex-compact", + Format::VortexTurboQuant => "vortex-turboquant", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -189,6 +198,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex", Format::VortexCompact => "vortex", + Format::VortexTurboQuant => "vortex", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } diff --git a/vortex-bench/src/measurements.rs b/vortex-bench/src/measurements.rs index f49349cd95e..8629d38d598 100644 --- a/vortex-bench/src/measurements.rs +++ b/vortex-bench/src/measurements.rs @@ -348,10 +348,13 @@ impl ToJson for CompressionTimingMeasurement { fn to_json(&self) -> serde_json::Value { let (name, engine) = match self.format { Format::OnDiskVortex => (self.name.to_string(), Engine::Vortex), + Format::VortexTurboQuant => { + (format!("vortex-turboquant {}", self.name), Engine::Vortex) + } Format::Parquet => (format!("parquet_rs-zstd {}", self.name), Engine::Arrow), Format::Lance => (format!("lance {}", self.name), Engine::Arrow), _ => vortex_panic!( - "CompressionTimingMeasurement only supports vortex, lance, and parquet formats" + "CompressionTimingMeasurement only supports vortex, vortex-turboquant, lance, and parquet formats" ), }; @@ -392,7 +395,9 @@ pub struct CustomUnitMeasurement { impl ToJson for CustomUnitMeasurement { fn to_json(&self) -> serde_json::Value { let engine = match self.format { - Format::OnDiskVortex | Format::VortexCompact => Engine::Vortex, + Format::OnDiskVortex | Format::VortexCompact | Format::VortexTurboQuant => { + Engine::Vortex + } Format::Parquet => Engine::Arrow, Format::Lance => Engine::Arrow, _ => Engine::Vortex, // Default to Vortex for other formats. diff --git a/vortex-bench/src/vector_dataset.rs b/vortex-bench/src/vector_dataset.rs new file mode 100644 index 00000000000..e6b049fe614 --- /dev/null +++ b/vortex-bench/src/vector_dataset.rs @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Datasets used by the vector-search benchmark. +//! +//! These are a subset of the public VectorDBBench +//! () datasets — MIT-licensed canonical +//! embedding corpora published by Zilliz under +//! `https://assets.zilliz.com/benchmark//`. Each dataset is distributed as one or more +//! parquet files with a `emb: list` column (the raw embedding vectors) and an +//! `id: int64` column. +//! +//! The URL constants below point at the upstream Zilliz bucket. For CI runs we recommend +//! mirroring these files into an internal bucket first to avoid repeated egress charges on +//! a third-party bucket — mirror setup is a one-off manual operation and documented in the +//! vector-search-bench crate README. + +use std::path::PathBuf; + +use anyhow::Result; +use anyhow::bail; +use async_trait::async_trait; +use vortex::array::ArrayRef; + +use crate::IdempotentPath; +use crate::datasets::Dataset; +use crate::datasets::data_downloads::download_data; + +/// A public embedding-vector dataset used by the vector-search benchmark. +/// +/// Each variant is one of the canonical VectorDBBench corpora, distributed as parquet under +/// the Zilliz public benchmark bucket. The smaller `*Small` sizes are appropriate for CI +/// runs; the larger sizes are intended for local / on-demand experiments. Only +/// cosine-metric datasets are wired today — SIFT / GIST / LAION (L2) will follow when an +/// L2-distance scalar function lands in `vortex-tensor`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VectorDataset { + /// Cohere wiki-22-12, 100K rows × 768 dims, cosine metric. ~307 MB raw / ~150 MB + /// zstd-parquet — the default CI-friendly size. + CohereSmall, + /// Cohere wiki-22-12, 1M rows × 768 dims, cosine metric. ~3 GB raw / ~1.5 GB + /// zstd-parquet. Local-only by default. + CohereMedium, + /// OpenAI embeddings on C4, 50K rows × 1536 dims, cosine metric. ~307 MB raw — + /// the smallest OpenAI variant and comparable in size to Cohere-small, but with + /// double the dimensionality. + OpenAiSmall, + /// OpenAI embeddings on C4, 500K rows × 1536 dims, cosine metric. ~3 GB raw. + /// Local-only by default. + OpenAiMedium, + /// Bioasq biomedical embeddings, 1M rows × 1024 dims, cosine metric. ~4 GB raw. + /// Local-only by default. + BioasqMedium, + /// Glove word embeddings, 1M rows × 200 dims, cosine metric. ~800 MB raw. + GloveMedium, +} + +/// All built-in [`VectorDataset`] variants in a fixed order. Convenient for iterating or +/// for listing choices in CLI help. +pub const ALL_VECTOR_DATASETS: &[VectorDataset] = &[ + VectorDataset::CohereSmall, + VectorDataset::CohereMedium, + VectorDataset::OpenAiSmall, + VectorDataset::OpenAiMedium, + VectorDataset::BioasqMedium, + VectorDataset::GloveMedium, +]; + +impl VectorDataset { + /// The upstream URL for this dataset's canonical train-split parquet file. + /// + /// **CI note**: point at an internal mirror before enabling this benchmark in CI — + /// see `benchmarks/vector-search-bench/README.md` for the procedure. + pub fn parquet_url(&self) -> &'static str { + match self { + VectorDataset::CohereSmall => { + "https://assets.zilliz.com/benchmark/cohere_small_100k/train.parquet" + } + VectorDataset::CohereMedium => { + "https://assets.zilliz.com/benchmark/cohere_medium_1m/train.parquet" + } + VectorDataset::OpenAiSmall => { + "https://assets.zilliz.com/benchmark/openai_small_50k/train.parquet" + } + VectorDataset::OpenAiMedium => { + "https://assets.zilliz.com/benchmark/openai_medium_500k/train.parquet" + } + VectorDataset::BioasqMedium => { + "https://assets.zilliz.com/benchmark/bioasq_medium_1m/train.parquet" + } + VectorDataset::GloveMedium => { + "https://assets.zilliz.com/benchmark/glove_medium_1m/train.parquet" + } + } + } + + /// Fixed vector dimensionality for this dataset. + pub fn dim(&self) -> u32 { + match self { + VectorDataset::CohereSmall | VectorDataset::CohereMedium => 768, + VectorDataset::OpenAiSmall | VectorDataset::OpenAiMedium => 1536, + VectorDataset::BioasqMedium => 1024, + VectorDataset::GloveMedium => 200, + } + } + + /// Expected number of rows in the train split. + pub fn num_rows(&self) -> usize { + match self { + VectorDataset::CohereSmall => 100_000, + VectorDataset::CohereMedium => 1_000_000, + VectorDataset::OpenAiSmall => 50_000, + VectorDataset::OpenAiMedium => 500_000, + VectorDataset::BioasqMedium => 1_000_000, + VectorDataset::GloveMedium => 1_000_000, + } + } + + /// The distance metric the upstream dataset was curated for. v1 only wires cosine, so + /// every built-in dataset returns [`VectorMetric::Cosine`]. The enum variant exists so + /// that L2 / inner-product datasets can be added later without a breaking change. + pub fn metric(&self) -> VectorMetric { + match self { + VectorDataset::CohereSmall + | VectorDataset::CohereMedium + | VectorDataset::OpenAiSmall + | VectorDataset::OpenAiMedium + | VectorDataset::BioasqMedium + | VectorDataset::GloveMedium => VectorMetric::Cosine, + } + } +} + +/// Distance metric a dataset was curated for. The vector-search benchmark only wires cosine +/// today, but having this explicit makes it obvious when a future dataset should be paired +/// with L2 or inner-product instead. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VectorMetric { + /// Cosine similarity: `dot(a, b) / (||a|| * ||b||)`. + Cosine, + /// Squared L2 distance: `sum((a - b)^2)`. + L2, + /// Inner product: `dot(a, b)`. + InnerProduct, +} + +#[async_trait] +impl Dataset for VectorDataset { + fn name(&self) -> &str { + match self { + VectorDataset::CohereSmall => "cohere-small", + VectorDataset::CohereMedium => "cohere-medium", + VectorDataset::OpenAiSmall => "openai-small", + VectorDataset::OpenAiMedium => "openai-medium", + VectorDataset::BioasqMedium => "bioasq-medium", + VectorDataset::GloveMedium => "glove-medium", + } + } + + async fn to_parquet_path(&self) -> Result { + let dir = format!("{}/", self.name()).to_data_path(); + let parquet = dir.join(format!("{}.parquet", self.name())); + download_data(parquet.clone(), self.parquet_url()).await?; + Ok(parquet) + } + + /// **Not supported.** `VectorDataset` can't return a straight Vortex array via + /// [`Dataset::to_vortex_array`] because: + /// + /// - The struct-shaped array the other datasets return would arrive as + /// `{ id: int64, emb: list }` — with `emb` as a *list*, not the + /// `Extension(FixedSizeList<...>)` shape the vector-search benchmark + /// actually operates on. + /// - The benchmark therefore bypasses this method entirely: it calls + /// [`Dataset::to_parquet_path`] and then runs + /// [`crate::conversions::parquet_to_vortex_chunks`] + + /// [`crate::conversions::list_to_vector_ext`] itself, which produces the + /// correct `Extension` shape. + /// + /// Returning the raw struct here would be a trap for future callers who expect + /// the same semantic shape the benchmark measures. Bailing explicitly makes the + /// contract unambiguous. + async fn to_vortex_array(&self) -> Result { + bail!( + "VectorDataset::to_vortex_array is not supported; use `to_parquet_path` + \ + `parquet_to_vortex_chunks` + `list_to_vector_ext` to build the \ + Extension shape the benchmark needs" + ); + } +} + +#[cfg(test)] +mod tests { + use vortex::utils::aliases::hash_set::HashSet; + + use super::ALL_VECTOR_DATASETS; + use super::VectorDataset; + use super::VectorMetric; + use crate::datasets::Dataset; + + #[test] + fn cohere_small_metadata() { + let ds = VectorDataset::CohereSmall; + assert_eq!(ds.name(), "cohere-small"); + assert_eq!(ds.dim(), 768); + assert_eq!(ds.num_rows(), 100_000); + assert_eq!(ds.metric(), VectorMetric::Cosine); + assert!(ds.parquet_url().ends_with("/train.parquet")); + assert!(ds.parquet_url().contains("cohere_small_100k")); + } + + #[test] + fn all_datasets_have_consistent_metadata() { + // Every built-in dataset must have a unique kebab-cased name, point at a + // `train.parquet` file under `assets.zilliz.com/benchmark/`, declare a + // dimension ≥ `MIN_DIMENSION` for TurboQuant, a non-zero row count, and + // (for v1) cosine metric. + let mut seen_names: HashSet = HashSet::default(); + for &ds in ALL_VECTOR_DATASETS { + let name = ds.name(); + assert!( + seen_names.insert(name.to_string()), + "duplicate dataset name {name}", + ); + assert!( + ds.dim() >= 128, + "{name} dim {} below TurboQuant minimum", + ds.dim() + ); + assert!(ds.num_rows() > 0, "{name} has zero rows"); + assert_eq!( + ds.metric(), + VectorMetric::Cosine, + "{name} must be cosine for v1" + ); + let url = ds.parquet_url(); + assert!(url.starts_with("https://assets.zilliz.com/benchmark/")); + assert!(url.ends_with("/train.parquet")); + } + } +} diff --git a/vortex-tensor/benches/similarity_search_common/mod.rs b/vortex-tensor/benches/similarity_search_common/mod.rs index c22cb5a9f08..b95867b26b7 100644 --- a/vortex-tensor/benches/similarity_search_common/mod.rs +++ b/vortex-tensor/benches/similarity_search_common/mod.rs @@ -30,22 +30,14 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::arrays::ConstantArray; use vortex_array::arrays::Extension; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::FixedSizeListArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt; -use vortex_array::arrays::scalar_fn::ScalarFnArrayExt; -use vortex_array::builtins::ArrayBuiltins; -use vortex_array::dtype::DType; -use vortex_array::dtype::Nullability; -use vortex_array::dtype::PType; use vortex_array::dtype::extension::ExtDType; use vortex_array::extension::EmptyMetadata; -use vortex_array::scalar::Scalar; -use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_btrblocks::BtrBlocksCompressor; @@ -54,12 +46,9 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; use vortex_session::VortexSession; -use vortex_tensor::encodings::turboquant::TurboQuantConfig; -use vortex_tensor::encodings::turboquant::turboquant_encode_unchecked; -use vortex_tensor::scalar_fns::cosine_similarity::CosineSimilarity; -use vortex_tensor::scalar_fns::l2_denorm::L2Denorm; -use vortex_tensor::scalar_fns::l2_denorm::normalize_as_l2_denorm; use vortex_tensor::vector::Vector; +pub use vortex_tensor::vector_search::build_similarity_search_tree; +pub use vortex_tensor::vector_search::compress_turboquant; /// A shared [`VortexSession`] pre-loaded with the builtin [`ArraySession`] so both bench and /// example can create execution contexts cheaply. @@ -146,67 +135,20 @@ pub fn extract_row_as_query(vectors: &ArrayRef, row: usize, dim: u32) -> Vec` extension array whose storage is a [`ConstantArray`] broadcasting a -/// single query vector across `num_rows` rows. This is how we hand a single query vector to -/// `CosineSimilarity` on the `rhs` side -- `ScalarFnArray` requires both children to have the -/// same length, so we broadcast the query instead of hand-rolling a 1-row input. -fn build_constant_query_vector(query: &[f32], num_rows: usize) -> VortexResult { - let element_dtype = DType::Primitive(PType::F32, Nullability::NonNullable); - - let children: Vec = query - .iter() - .map(|&v| Scalar::primitive(v, Nullability::NonNullable)) - .collect(); - let storage_scalar = Scalar::fixed_size_list(element_dtype, children, Nullability::NonNullable); - - let storage = ConstantArray::new(storage_scalar, num_rows).into_array(); - - let ext_dtype = ExtDType::::try_new(EmptyMetadata, storage.dtype().clone())?.erased(); - Ok(ExtensionArray::new(ext_dtype, storage).into_array()) -} - /// Compresses a raw `Vector` array with the default BtrBlocks pipeline. /// /// [`BtrBlocksCompressor`] walks into the extension array and recursively compresses the /// underlying FSL storage child. TurboQuant is *not* exercised by this path -- it is not /// registered in the default scheme set -- so this measures "generic" lossless compression /// applied to float vectors. +/// +/// Stays in this bench-only module because `BtrBlocksCompressor` is a dev-dependency of +/// `vortex-tensor`, so promoting it to the public `vector_search` module would drag the +/// `vortex-btrblocks` dep into `vortex-tensor`'s main dependency list. pub fn compress_default(data: ArrayRef) -> VortexResult { BtrBlocksCompressor::default().compress(&data) } -/// Compresses a raw `Vector` array with the TurboQuant pipeline by hand, producing the -/// same tree shape that -/// [`vortex_tensor::encodings::turboquant::TurboQuantScheme`] would: -/// -/// ```text -/// L2Denorm(SorfTransform(FSL(Dict(codes, centroids))), norms) -/// ``` -/// -/// Calling the encode helpers directly (instead of going through -/// `BtrBlocksCompressorBuilder::with_turboquant()`) lets this example avoid depending on the -/// `unstable_encodings` feature flag. -/// -/// See `vortex-tensor/src/encodings/turboquant/tests/mod.rs::normalize_and_encode` for the same -/// canonical recipe. -pub fn compress_turboquant(data: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - let l2_denorm = normalize_as_l2_denorm(data, ctx)?; - let normalized = l2_denorm.child_at(0).clone(); - let norms = l2_denorm.child_at(1).clone(); - let num_rows = l2_denorm.len(); - - let normalized_ext = normalized - .as_opt::() - .vortex_expect("normalized child should be an Extension array"); - - let config = TurboQuantConfig::default(); - // SAFETY: `normalize_as_l2_denorm` guarantees every row is unit-norm (or zero), which is the - // invariant `turboquant_encode_unchecked` expects. - let tq = unsafe { turboquant_encode_unchecked(normalized_ext, &config, ctx) }?; - - Ok(unsafe { L2Denorm::new_array_unchecked(tq, norms, num_rows) }?.into_array()) -} - /// Dispatch helper that builds the data array for the requested [`Variant`], starting from a /// single random-vector generation. Always returns an `ArrayRef` whose logical dtype is /// `Vector`. @@ -224,33 +166,3 @@ pub fn build_variant( Variant::TurboQuant => compress_turboquant(raw, ctx), } } - -/// Build the lazy similarity-search array tree for a prepared data array and a single query -/// vector. The returned tree is a boolean array of length `data.len()` where position `i` is -/// `true` iff `cosine_similarity(data[i], query) > threshold`. -/// -/// The tree shape is: -/// -/// ```text -/// Binary(Gt, [ -/// CosineSimilarity([data, ConstantArray(query_vec, n)]), -/// ConstantArray(threshold, n), -/// ]) -/// ``` -/// -/// This function does no execution; it is safe to call inside a benchmark setup closure. -pub fn build_similarity_search_tree( - data: ArrayRef, - query: &[f32], - threshold: f32, -) -> VortexResult { - let num_rows = data.len(); - let query_vec = build_constant_query_vector(query, num_rows)?; - - let cosine = CosineSimilarity::try_new_array(data, query_vec, num_rows)?.into_array(); - - let threshold_scalar = Scalar::primitive(threshold, Nullability::NonNullable); - let threshold_array = ConstantArray::new(threshold_scalar, num_rows).into_array(); - - cosine.binary(threshold_array, Operator::Gt) -} diff --git a/vortex-tensor/public-api.lock b/vortex-tensor/public-api.lock index bec8df1cb29..90aea23a194 100644 --- a/vortex-tensor/public-api.lock +++ b/vortex-tensor/public-api.lock @@ -550,4 +550,12 @@ impl core::marker::Copy for vortex_tensor::vector::VectorMatcherMetadata impl core::marker::StructuralPartialEq for vortex_tensor::vector::VectorMatcherMetadata +pub mod vortex_tensor::vector_search + +pub fn vortex_tensor::vector_search::build_constant_query_vector(query: &[f32], num_rows: usize) -> vortex_error::VortexResult + +pub fn vortex_tensor::vector_search::build_similarity_search_tree(data: vortex_array::array::erased::ArrayRef, query: &[f32], threshold: f32) -> vortex_error::VortexResult + +pub fn vortex_tensor::vector_search::compress_turboquant(data: vortex_array::array::erased::ArrayRef, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + pub fn vortex_tensor::initialize(session: &vortex_session::VortexSession) diff --git a/vortex-tensor/src/lib.rs b/vortex-tensor/src/lib.rs index 3d3563aa8e4..b3cf6c21695 100644 --- a/vortex-tensor/src/lib.rs +++ b/vortex-tensor/src/lib.rs @@ -25,6 +25,8 @@ pub mod vector; pub mod encodings; +pub mod vector_search; + mod utils; /// Initialize the Vortex tensor library with a Vortex session. diff --git a/vortex-tensor/src/vector_search.rs b/vortex-tensor/src/vector_search.rs new file mode 100644 index 00000000000..6934fa52a71 --- /dev/null +++ b/vortex-tensor/src/vector_search.rs @@ -0,0 +1,298 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Reusable helpers for building brute-force vector similarity search expressions over +//! [`Vector`] extension arrays. +//! +//! This module exposes three small building blocks that together make it straightforward to +//! stand up a cosine-similarity-plus-threshold scan on top of a prepared data array: +//! +//! - [`compress_turboquant`] applies the canonical TurboQuant encoding pipeline +//! (`L2Denorm(SorfTransform(FSL(Dict(codes, centroids))), norms)`) to a raw +//! `Vector` array without requiring the caller to plumb the +//! `unstable_encodings` feature flag on the `vortex` facade. +//! - [`build_constant_query_vector`] wraps a single query vector into a +//! [`Vector`] extension array whose storage is a [`ConstantArray`] broadcast +//! across `num_rows` rows. This is the shape expected by +//! [`CosineSimilarity::try_new_array`] for the RHS of a database-vs-query scan. +//! - [`build_similarity_search_tree`] wires everything together into a lazy +//! `Binary(Gt, [CosineSimilarity(data, query), threshold])` expression. +//! +//! Executing the tree from [`build_similarity_search_tree`] into a +//! [`BoolArray`](vortex_array::arrays::BoolArray) yields one boolean per row indicating whether +//! that row's cosine similarity to the query exceeds `threshold`. +//! +//! # Example +//! +//! ```ignore +//! use vortex_array::{ArrayRef, VortexSessionExecute}; +//! use vortex_array::arrays::BoolArray; +//! use vortex_session::VortexSession; +//! use vortex_tensor::vector_search::{build_similarity_search_tree, compress_turboquant}; +//! +//! fn run(session: &VortexSession, data: ArrayRef, query: &[f32]) -> anyhow::Result<()> { +//! let mut ctx = session.create_execution_ctx(); +//! let data = compress_turboquant(data, &mut ctx)?; +//! let tree = build_similarity_search_tree(data, query, 0.8)?; +//! let _matches: BoolArray = tree.execute(&mut ctx)?; +//! Ok(()) +//! } +//! ``` +//! +//! [`Vector`]: crate::vector::Vector +//! [`CosineSimilarity::try_new_array`]: crate::scalar_fns::cosine_similarity::CosineSimilarity::try_new_array + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::Extension; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::scalar_fn::ScalarFnArrayExt; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::extension::EmptyMetadata; +use vortex_array::scalar::Scalar; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +use crate::encodings::turboquant::TurboQuantConfig; +use crate::encodings::turboquant::turboquant_encode_unchecked; +use crate::scalar_fns::cosine_similarity::CosineSimilarity; +use crate::scalar_fns::l2_denorm::L2Denorm; +use crate::scalar_fns::l2_denorm::normalize_as_l2_denorm; +use crate::vector::Vector; + +/// Apply the canonical TurboQuant encoding pipeline to a `Vector` array. +/// +/// The returned array has the shape +/// `L2Denorm(SorfTransform(FSL(Dict(codes, centroids))), norms)` — exactly what +/// [`vortex_tensor::encodings::turboquant::TurboQuantScheme`] produces when invoked through +/// `BtrBlocksCompressorBuilder::with_turboquant()`, but without requiring callers to enable +/// the `unstable_encodings` feature on the `vortex` facade. +/// +/// The input `data` must be a [`Vector`] extension array whose element type is `f32` and whose +/// dimensionality is at least +/// [`turboquant::MIN_DIMENSION`](crate::encodings::turboquant::MIN_DIMENSION). The TurboQuant +/// configuration used is [`TurboQuantConfig::default()`] (8-bit codes, 3 SORF rounds, seed 42). +/// +/// # Errors +/// +/// Returns an error if `data` is not a [`Vector`] extension array, if normalization fails, or +/// if the underlying TurboQuant encoder rejects the input shape. +pub fn compress_turboquant(data: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let l2_denorm = normalize_as_l2_denorm(data, ctx)?; + let normalized = l2_denorm.child_at(0).clone(); + let norms = l2_denorm.child_at(1).clone(); + let num_rows = l2_denorm.len(); + + let Some(normalized_ext) = normalized.as_opt::() else { + vortex_bail!("normalize_as_l2_denorm must produce an Extension array child"); + }; + + let config = TurboQuantConfig::default(); + // SAFETY: `normalize_as_l2_denorm` guarantees every row is unit-norm (or zero), which is + // the invariant `turboquant_encode_unchecked` expects. + let tq = unsafe { turboquant_encode_unchecked(normalized_ext, &config, ctx) }?; + + Ok(unsafe { L2Denorm::new_array_unchecked(tq, norms, num_rows) }?.into_array()) +} + +/// Build a `Vector` extension array whose storage is a [`ConstantArray`] broadcasting +/// a single query vector across `num_rows` rows. +/// +/// This is the shape expected for the RHS of a database-vs-query +/// [`CosineSimilarity`](crate::scalar_fns::cosine_similarity::CosineSimilarity) scan: the +/// `ScalarFnArray` contract requires both children to have the same length, so rather than +/// hand-rolling a 1-row input we broadcast the query across the whole database. +/// +/// # Errors +/// +/// Returns an error if the [`Vector`] extension dtype rejects the constructed storage dtype. +pub fn build_constant_query_vector(query: &[f32], num_rows: usize) -> VortexResult { + let element_dtype = DType::Primitive(PType::F32, Nullability::NonNullable); + + let children: Vec = query + .iter() + .map(|&v| Scalar::primitive(v, Nullability::NonNullable)) + .collect(); + let storage_scalar = Scalar::fixed_size_list(element_dtype, children, Nullability::NonNullable); + + let storage = ConstantArray::new(storage_scalar, num_rows).into_array(); + + let ext_dtype = ExtDType::::try_new(EmptyMetadata, storage.dtype().clone())?.erased(); + Ok(ExtensionArray::new(ext_dtype, storage).into_array()) +} + +/// Build the lazy similarity-search expression tree for a prepared database array and a +/// single query vector. +/// +/// The returned array is a lazy boolean expression of length `data.len()` whose position `i` +/// is `true` iff `cosine_similarity(data[i], query) > threshold`. Executing it into a +/// [`BoolArray`](vortex_array::arrays::BoolArray) runs the full scan. +/// +/// The tree shape is: +/// +/// ```text +/// Binary(Gt, [ +/// CosineSimilarity([data, ConstantArray(query_vec, n)]), +/// ConstantArray(threshold, n), +/// ]) +/// ``` +/// +/// This function performs no execution; it is safe to call inside a benchmark setup closure. +/// +/// # Errors +/// +/// Returns an error if `query` has a length incompatible with `data`'s vector dimension, or +/// if any of the intermediate array constructors fails. +pub fn build_similarity_search_tree( + data: ArrayRef, + query: &[f32], + threshold: f32, +) -> VortexResult { + let num_rows = data.len(); + let query_vec = build_constant_query_vector(query, num_rows)?; + + let cosine = CosineSimilarity::try_new_array(data, query_vec, num_rows)?.into_array(); + + let threshold_scalar = Scalar::primitive(threshold, Nullability::NonNullable); + let threshold_array = ConstantArray::new(threshold_scalar, num_rows).into_array(); + + cosine.binary(threshold_array, Operator::Gt) +} + +#[cfg(test)] +mod tests { + use vortex_array::ArrayRef; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::Extension; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::FixedSizeListArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::bool::BoolArrayExt; + use vortex_array::dtype::extension::ExtDType; + use vortex_array::extension::EmptyMetadata; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::BufferMut; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use super::build_constant_query_vector; + use super::build_similarity_search_tree; + use super::compress_turboquant; + use crate::vector::Vector; + + /// Build a `Vector` extension array from a flat f32 slice. Each contiguous + /// group of `DIM` values becomes one row. + fn vector_array(dim: u32, values: &[f32]) -> VortexResult { + let dim_usize = dim as usize; + assert_eq!(values.len() % dim_usize, 0); + let num_rows = values.len() / dim_usize; + + let mut buf = BufferMut::::with_capacity(values.len()); + for &v in values { + buf.push(v); + } + let elements = PrimitiveArray::new::(buf.freeze(), Validity::NonNullable); + let fsl = FixedSizeListArray::try_new( + elements.into_array(), + dim, + Validity::NonNullable, + num_rows, + )?; + + let ext_dtype = ExtDType::::try_new(EmptyMetadata, fsl.dtype().clone())?.erased(); + Ok(ExtensionArray::new(ext_dtype, fsl.into_array()).into_array()) + } + + fn test_session() -> VortexSession { + VortexSession::empty().with::() + } + + #[test] + fn constant_query_vector_has_vector_extension_dtype() -> VortexResult<()> { + let query = vec![1.0f32, 0.0, 0.0, 0.0]; + let rhs = build_constant_query_vector(&query, 5)?; + + assert_eq!(rhs.len(), 5); + assert!(rhs.as_opt::().is_some()); + Ok(()) + } + + #[test] + fn similarity_search_tree_executes_to_bool_array() -> VortexResult<()> { + // 4 rows of 3-dim vectors; the first and last match the query [1, 0, 0]. + let data = vector_array( + 3, + &[ + 1.0, 0.0, 0.0, // + 0.0, 1.0, 0.0, // + 0.0, 0.0, 1.0, // + 1.0, 0.0, 0.0, // + ], + )?; + let query = [1.0f32, 0.0, 0.0]; + + let tree = build_similarity_search_tree(data, &query, 0.5)?; + let mut ctx = test_session().create_execution_ctx(); + let result: BoolArray = tree.execute(&mut ctx)?; + + let bits = result.to_bit_buffer(); + assert_eq!(bits.len(), 4); + assert!(bits.value(0)); + assert!(!bits.value(1)); + assert!(!bits.value(2)); + assert!(bits.value(3)); + Ok(()) + } + + #[test] + fn turboquant_roundtrip_preserves_ranking() -> VortexResult<()> { + // Build 6 rows of 128-dim vectors where row 0 is highly correlated with the query. + // TurboQuant should preserve the "row 0 is the best match" ordering. + const DIM: u32 = 128; + const NUM_ROWS: usize = 6; + + let mut values = Vec::::with_capacity(NUM_ROWS * DIM as usize); + let query: Vec = (0..DIM as usize) + .map(|i| ((i as f32) * 0.017).sin()) + .collect(); + + // Row 0: identical to query (cosine=1.0) + values.extend_from_slice(&query); + // Row 1: query + noise + for (i, q) in query.iter().enumerate() { + values.push(q + 0.05 * ((i as f32) * 0.03).cos()); + } + // Rows 2..6: unrelated patterns + for row in 2..NUM_ROWS { + for i in 0..DIM as usize { + values.push(((row as f32 * 1.3 + i as f32) * 0.07).sin()); + } + } + + let data = vector_array(DIM, &values)?; + let mut ctx = test_session().create_execution_ctx(); + let compressed = compress_turboquant(data, &mut ctx)?; + assert_eq!(compressed.len(), NUM_ROWS); + + // Build a tree with a low threshold so row 0 (cosine=1.0 exact) matches. + let tree = build_similarity_search_tree(compressed, &query, 0.95)?; + let result: BoolArray = tree.execute(&mut ctx)?; + let bits = result.to_bit_buffer(); + assert_eq!(bits.len(), NUM_ROWS); + assert!( + bits.value(0), + "row 0 (identical to query) must match at threshold 0.95 even after TurboQuant" + ); + Ok(()) + } +}