diff --git a/Cargo.lock b/Cargo.lock index 045c72176fd..374306195f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -799,7 +799,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1317,7 +1317,7 @@ checksum = "af491d569909a7e4dee0ad7db7f5341fef5c614d5b8ec8cf765732aba3cff681" dependencies = [ "serde", "termcolor", - "unicode-width 0.1.14", + "unicode-width 0.2.2", ] [[package]] @@ -5723,6 +5723,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe21416a02c693fb9f980befcb230ecc70b0b3d1cc4abf88b9675c4c1457f0c" +[[package]] +name = "onpair" +version = "0.0.3" +source = "git+https://github.com/spiraldb/onpair.git?rev=53e8ca6081d377e9933d999cef286e26bf52e2c7#53e8ca6081d377e9933d999cef286e26bf52e2c7" +dependencies = [ + "hashbrown 0.16.1", + "rand 0.9.4", +] + [[package]] name = "oorandom" version = "11.1.5" @@ -6137,9 +6146,9 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "044b1fa4f259f4df9ad5078e587b208f5d288a25407575fcddb9face30c7c692" dependencies = [ - "rand 0.8.6", + "rand 0.9.4", "socket2", - "thiserror 1.0.69", + "thiserror 2.0.18", ] [[package]] @@ -6352,7 +6361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -6384,7 +6393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -9340,6 +9349,7 @@ dependencies = [ "vortex-fastlanes", "vortex-fsst", "vortex-mask", + "vortex-onpair", "vortex-pco", "vortex-runend", "vortex-sequence", @@ -9682,6 +9692,7 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-metrics", + "vortex-onpair", "vortex-pco", "vortex-runend", "vortex-scan", @@ -9889,6 +9900,23 @@ dependencies = [ "vortex-cuda-macros", ] +[[package]] +name = "vortex-onpair" +version = "0.1.0" +dependencies = [ + "codspeed-divan-compat", + "memchr", + "num-traits", + "onpair", + "prost 0.14.3", + "rstest", + "vortex-array", + "vortex-buffer", + "vortex-error", + "vortex-mask", + "vortex-session", +] + [[package]] name = "vortex-parquet-variant" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index af46038fd58..fe69b2370e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,8 @@ members = [ "encodings/zstd", "encodings/bytebool", "encodings/parquet-variant", + # Experimental encodings + "encodings/experimental/onpair", # Benchmarks "benchmarks/lance-bench", "benchmarks/compress-bench", @@ -188,6 +190,7 @@ num_enum = { version = "0.7.3", default-features = false } object_store = { version = "0.13.1", default-features = false } once_cell = "1.21" oneshot = { version = "0.2.0", features = ["async"] } +onpair = { version = "0.0.3", git = "https://github.com/spiraldb/onpair.git", rev = "53e8ca6081d377e9933d999cef286e26bf52e2c7" } opentelemetry = "0.32.0" opentelemetry-otlp = "0.32.0" opentelemetry_sdk = "0.32.0" @@ -288,6 +291,7 @@ vortex-ipc = { version = "0.1.0", path = "./vortex-ipc", default-features = fals vortex-layout = { version = "0.1.0", path = "./vortex-layout", default-features = false } vortex-mask = { version = "0.1.0", path = "./vortex-mask", default-features = false } vortex-metrics = { version = "0.1.0", path = "./vortex-metrics", default-features = false } +vortex-onpair = { version = "0.1.0", path = "./encodings/experimental/onpair", default-features = false } vortex-parquet-variant = { version = "0.1.0", path = "./encodings/parquet-variant" } vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = false } vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false } diff --git a/encodings/experimental/onpair/Cargo.toml b/encodings/experimental/onpair/Cargo.toml new file mode 100644 index 00000000000..258568cc75a --- /dev/null +++ b/encodings/experimental/onpair/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "vortex-onpair" +authors = { workspace = true } +categories = { workspace = true } +description = "Vortex OnPair string array encoding (dict-12, pushdown predicates)" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = "README.md" +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +memchr = { workspace = true } +num-traits = { workspace = true } +onpair = { workspace = true } +prost = { workspace = true } +vortex-array = { workspace = true } +vortex-buffer = { workspace = true } +vortex-error = { workspace = true } +vortex-mask = { workspace = true } +vortex-session = { workspace = true } + +[features] +_test-harness = ["vortex-array/_test-harness"] + +[dev-dependencies] +divan = { workspace = true } +rstest = { workspace = true } +vortex-array = { workspace = true, features = ["_test-harness"] } + +[[bench]] +name = "decode" +harness = false diff --git a/encodings/experimental/onpair/README.md b/encodings/experimental/onpair/README.md new file mode 100644 index 00000000000..9628c006201 --- /dev/null +++ b/encodings/experimental/onpair/README.md @@ -0,0 +1,39 @@ +# Vortex OnPair + +A Vortex Encoding for Binary and Utf8 data that uses the +[OnPair][onpair] short-string compression algorithm. OnPair is a +dictionary-based encoder with fast per-row random access. + +The trainer / encoder lives in the standalone [`onpair`][onpair-crate] +crate; this crate wraps the resulting column as a Vortex array with +cascading-compressor support on every integer child. + +## Compute + +Like the FSST encoding, this crate provides `cast` and `filter` +pushdown. Other operators fall back to ordinary decompression. + +## Default Configuration + +The default training preset is **dict-12**: 12 bits per token, +dictionary capped at 4 096 entries. Token codes are stored as a +`PrimitiveArray`; downstream `FastLanes::BitPacking` losslessly +narrows the child to exactly `bits`-bit codes on disk. + +## Layout + +- Buffer 0 — `dict_bytes`: dictionary blob built by the OnPair trainer, + padded with `MAX_TOKEN_SIZE` trailing zero bytes so the over-copy + decoder can read 16 bytes past the last token. +- Slot 0 — `dict_offsets`: `PrimitiveArray`, len `dict_size + 1`. +- Slot 1 — `codes`: `PrimitiveArray`, length `total_tokens`. +- Slot 2 — `codes_offsets`: `PrimitiveArray`, length `num_rows + 1`. +- Slot 3 — `uncompressed_lengths`: integer `PrimitiveArray`, length + `num_rows`. +- Slot 4 — optional validity child. + +All four integer slot children flow through the standard cascading +compressor pipeline (FoR / BitPacking / RunEnd / etc.). + +[onpair]: https://arxiv.org/abs/2508.02280 +[onpair-crate]: https://github.com/spiraldb/onpair diff --git a/encodings/experimental/onpair/benches/decode.rs b/encodings/experimental/onpair/benches/decode.rs new file mode 100644 index 00000000000..2f3fce9db23 --- /dev/null +++ b/encodings/experimental/onpair/benches/decode.rs @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! Decode-path microbenchmarks for the OnPair Vortex array. +//! +//! * `decompress_into` — the upstream `onpair::decompress_into` decoder hot +//! loop, fed by pre-materialised [`DecodeInputs`]. Measures the inner loop +//! only (no child `execute`, no allocation). +//! * `canonicalize_to_varbinview` — the full Vortex +//! `OnPair → VarBinViewArray` path callers actually hit. Includes child +//! `execute`, the build_views step, allocation, etc. +//! +//! Each bench sweeps four corpus shapes against two row counts to surface +//! cache-pressure cliffs and per-row decode cost. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_lossless, + clippy::panic, + clippy::tests_outside_test_module, + clippy::redundant_clone, + clippy::missing_safety_doc, + clippy::unwrap_used, + clippy::expect_used +)] + +use std::mem::MaybeUninit; +use std::sync::LazyLock; + +use divan::Bencher; +use onpair::Parts; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::Nullability; +use vortex_array::session::ArraySession; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; +use vortex_mask::Mask; +use vortex_onpair::DEFAULT_DICT12_CONFIG; +use vortex_onpair::OnPair; +use vortex_onpair::OnPairArray; +use vortex_onpair::OnPairArraySlotsExt; + +/// Host-resident decode inputs, materialised once so the decode-loop benchmark +/// measures only `onpair::decompress_into` (not child `execute`/allocation). +struct DecodeInputs { + dict_bytes: ByteBuffer, + dict_offsets: Buffer, + codes: Buffer, + bits: u32, +} + +impl DecodeInputs { + fn as_parts(&self) -> Parts<'_> { + Parts { + dict_bytes: self.dict_bytes.as_slice(), + dict_offsets: self.dict_offsets.as_slice(), + bits: self.bits, + codes: self.codes.as_slice(), + } + } + + fn decompressed_len(&self) -> usize { + onpair::decompressed_len(self.as_parts()) + } + + fn decompress_into(&self, out: &mut [MaybeUninit]) -> usize { + onpair::decompress_into(self.as_parts(), out) + } +} +use vortex_onpair::onpair_compress; +use vortex_session::VortexSession; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +#[derive(Copy, Clone, Debug)] +enum Shape { + /// URL / HTTP-log shaped — high lexical overlap, ~35–45 bytes per row. + UrlLog, + /// Short uniform strings — 4–8 bytes per row, very low cardinality. + Short, + /// Long log-line shaped — ~120 bytes per row, more tokens per row. + Long, + /// High cardinality — every row unique. + HighCard, +} + +fn corpus(n: usize, shape: Shape) -> Vec { + let mut state = 0x9e37_79b9_7f4a_7c15_u64; + let mut next = || { + state = state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + state + }; + let mut out = Vec::with_capacity(n); + match shape { + Shape::UrlLog => { + let templates: &[&str] = &[ + "https://www.example.com/products/{id}", + "https://cdn.example.com/img/{id}.webp", + "https://api.example.com/v2/orders/{id}", + "https://www.example.com/users/{id}/profile", + "INFO request_id={id} status=200 method=GET", + "WARN request_id={id} status=429 method=POST", + "ERROR request_id={id} status=500 method=PUT", + ]; + for _ in 0..n { + let s = next(); + let pick = (s as usize) % templates.len(); + let id = s as u32; + out.push(templates[pick].replace("{id}", &format!("{id:08x}"))); + } + } + Shape::Short => { + let templates: &[&str] = &["alpha", "beta", "gamma", "delta", "eps", "zeta", "eta"]; + for _ in 0..n { + let s = next(); + out.push(templates[(s as usize) % templates.len()].to_string()); + } + } + Shape::Long => { + let templates: &[&str] = &[ + "2026-05-14T12:34:56.789012Z INFO request_id={id} method=GET path=/api/v1/users/{id}/profile status=200", + "2026-05-14T12:34:56.789012Z WARN request_id={id} method=POST path=/api/v1/users/{id}/sessions status=429", + "2026-05-14T12:34:56.789012Z ERROR request_id={id} method=PUT path=/api/v1/users/{id}/settings status=500", + ]; + for _ in 0..n { + let s = next(); + let pick = (s as usize) % templates.len(); + let id = s as u32; + out.push(templates[pick].replace("{id}", &format!("{id:08x}"))); + } + } + Shape::HighCard => { + for i in 0..n { + out.push(format!("row-{i:010x}-{rand:016x}", rand = next())); + } + } + } + out +} + +fn compress(n: usize, shape: Shape) -> OnPairArray { + let strings = corpus(n, shape); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG) + .unwrap_or_else(|e| panic!("onpair_compress failed: {e}")) +} + +/// Canonicalise a slot child to the decoder's native primitive width. +fn widen(arr: &ArrayRef, ctx: &mut ExecutionCtx) -> Buffer { + arr.cast(DType::Primitive(T::PTYPE, arr.dtype().nullability())) + .expect("cast") + .execute::(ctx) + .expect("execute") + .into_buffer::() +} + +fn materialise(arr: &OnPairArray) -> (DecodeInputs, usize) { + let mut ctx = SESSION.create_execution_ctx(); + let view = arr.as_view(); + let inputs = DecodeInputs { + dict_bytes: view.dict_bytes().clone(), + dict_offsets: widen::(view.dict_offsets(), &mut ctx), + codes: widen::(view.codes(), &mut ctx), + bits: view.bits(), + }; + let total = inputs.decompressed_len(); + (inputs, total) +} + +const CASES: &[(Shape, usize)] = &[ + (Shape::UrlLog, 100_000), + (Shape::UrlLog, 1_000_000), + (Shape::Short, 100_000), + (Shape::Long, 100_000), + (Shape::HighCard, 100_000), +]; + +/// Raw decode loop time, excluding child `execute` and the output allocation. +/// Hits `onpair::decompress_into` directly. +#[divan::bench(args = CASES)] +fn decompress_into_bench(bencher: Bencher, case: (Shape, usize)) { + let (shape, n) = case; + let arr = compress(n, shape); + let (inputs, total) = materialise(&arr); + bencher.bench_local(|| { + let mut out: Vec = Vec::with_capacity(total); + let written = inputs.decompress_into(out.spare_capacity_mut()); + unsafe { out.set_len(written) }; + divan::black_box(out); + }); +} + +/// Full Vortex canonicalisation, including `execute<>` on every child, +/// building the view buffer + `BinaryView` list, etc. +#[divan::bench(args = CASES)] +fn canonicalize_to_varbinview(bencher: Bencher, case: (Shape, usize)) { + let (shape, n) = case; + let arr = compress(n, shape); + bencher + .with_inputs(|| arr.clone().into_array()) + .bench_local_values(|arr| { + let mut ctx = SESSION.create_execution_ctx(); + divan::black_box( + arr.execute::(&mut ctx) + .unwrap_or_else(|e| panic!("canonicalize failed: {e}")), + ) + }); +} + +// ─── Compute kernels ───────────────────────────────────────────────────── + +const COMPUTE_CASES: &[(Shape, usize)] = &[(Shape::UrlLog, 100_000), (Shape::UrlLog, 1_000_000)]; + +/// Filter — share-dict path. Builds a 1-in-7 mask so we keep ~14 % of +/// rows; the cost is dominated by the `codes` segment copy + offsets. +#[divan::bench(args = COMPUTE_CASES)] +fn filter_share_dict(bencher: Bencher, case: (Shape, usize)) { + let (shape, n) = case; + let arr = compress(n, shape); + let mask = Mask::from_iter((0..n).map(|i| i % 7 == 0)); + bencher.bench_local(|| { + let mut ctx = SESSION.create_execution_ctx(); + let result = ::filter(arr.as_view(), &mask, &mut ctx) + .unwrap() + .unwrap(); + divan::black_box(result); + }); +} + +fn main() { + divan::main(); +} diff --git a/encodings/experimental/onpair/goldenfiles/onpair.metadata b/encodings/experimental/onpair/goldenfiles/onpair.metadata new file mode 100644 index 00000000000..e96baf1a0ab --- /dev/null +++ b/encodings/experimental/onpair/goldenfiles/onpair.metadata @@ -0,0 +1 @@ + € €è(08 \ No newline at end of file diff --git a/encodings/experimental/onpair/public-api.lock b/encodings/experimental/onpair/public-api.lock new file mode 100644 index 00000000000..1571f6b5d67 --- /dev/null +++ b/encodings/experimental/onpair/public-api.lock @@ -0,0 +1,235 @@ +pub mod vortex_onpair + +pub use vortex_onpair::Bits + +pub use vortex_onpair::Config + +pub use vortex_onpair::OnPairError + +pub use vortex_onpair::Threshold + +pub struct vortex_onpair::OnPair + +impl vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::try_new(dtype: vortex_array::dtype::DType, dict_bytes: vortex_array::buffer::BufferHandle, dict_offsets: vortex_array::array::erased::ArrayRef, codes: vortex_array::array::erased::ArrayRef, codes_offsets: vortex_array::array::erased::ArrayRef, uncompressed_lengths: vortex_array::array::erased::ArrayRef, validity: vortex_array::validity::Validity, bits: u32) -> vortex_error::VortexResult + +impl vortex_array::array::vtable::VTable for vortex_onpair::OnPair + +pub type vortex_onpair::OnPair::OperationsVTable = vortex_onpair::OnPair + +pub type vortex_onpair::OnPair::TypedArrayData = vortex_onpair::OnPairData + +pub type vortex_onpair::OnPair::ValidityVTable = vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::append_to_builder(array: vortex_array::array::view::ArrayView<'_, Self>, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + +pub fn vortex_onpair::OnPair::buffer(array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> vortex_array::buffer::BufferHandle + +pub fn vortex_onpair::OnPair::buffer_name(_array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> core::option::Option + +pub fn vortex_onpair::OnPair::deserialize(&self, dtype: &vortex_array::dtype::DType, len: usize, metadata: &[u8], buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren, _session: &vortex_session::VortexSession) -> vortex_error::VortexResult> + +pub fn vortex_onpair::OnPair::execute(array: vortex_array::array::typed::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_onpair::OnPair::execute_parent(array: vortex_array::array::view::ArrayView<'_, Self>, parent: &vortex_array::array::erased::ArrayRef, child_idx: usize, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_onpair::OnPair::id(&self) -> vortex_array::array::ArrayId + +pub fn vortex_onpair::OnPair::nbuffers(_array: vortex_array::array::view::ArrayView<'_, Self>) -> usize + +pub fn vortex_onpair::OnPair::reduce_parent(array: vortex_array::array::view::ArrayView<'_, Self>, parent: &vortex_array::array::erased::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + +pub fn vortex_onpair::OnPair::serialize(array: vortex_array::array::view::ArrayView<'_, Self>, _session: &vortex_session::VortexSession) -> vortex_error::VortexResult>> + +pub fn vortex_onpair::OnPair::slot_name(_array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> alloc::string::String + +pub fn vortex_onpair::OnPair::validate(&self, data: &Self::TypedArrayData, dtype: &vortex_array::dtype::DType, len: usize, slots: &[core::option::Option]) -> vortex_error::VortexResult<()> + +impl vortex_array::array::vtable::operations::OperationsVTable for vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::scalar_at(array: vortex_array::array::view::ArrayView<'_, vortex_onpair::OnPair>, index: usize, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +impl vortex_array::array::vtable::validity::ValidityVTable for vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::validity(array: vortex_array::array::view::ArrayView<'_, vortex_onpair::OnPair>) -> vortex_error::VortexResult + +impl vortex_array::arrays::filter::kernel::FilterKernel for vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::filter(array: vortex_array::array::view::ArrayView<'_, Self>, mask: &vortex_mask::Mask, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::arrays::slice::SliceReduce for vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::slice(array: vortex_array::array::view::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> + +impl vortex_array::scalar_fn::fns::cast::kernel::CastReduce for vortex_onpair::OnPair + +pub fn vortex_onpair::OnPair::cast(array: vortex_array::array::view::ArrayView<'_, Self>, dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult> + +pub struct vortex_onpair::OnPairData + +impl vortex_onpair::OnPairData + +pub fn vortex_onpair::OnPairData::bits(&self) -> u32 + +pub fn vortex_onpair::OnPairData::dict_bytes(&self) -> &vortex_buffer::ByteBuffer + +pub fn vortex_onpair::OnPairData::dict_bytes_handle(&self) -> &vortex_array::buffer::BufferHandle + +pub fn vortex_onpair::OnPairData::is_empty(&self) -> bool + +pub fn vortex_onpair::OnPairData::len(&self) -> usize + +pub fn vortex_onpair::OnPairData::new(dict_bytes: vortex_array::buffer::BufferHandle, bits: u32, len: usize) -> Self + +impl core::fmt::Debug for vortex_onpair::OnPairData + +pub fn vortex_onpair::OnPairData::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::fmt::Display for vortex_onpair::OnPairData + +pub fn vortex_onpair::OnPairData::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::hash::ArrayEq for vortex_onpair::OnPairData + +pub fn vortex_onpair::OnPairData::array_eq(&self, other: &Self, precision: vortex_array::hash::Precision) -> bool + +impl vortex_array::hash::ArrayHash for vortex_onpair::OnPairData + +pub fn vortex_onpair::OnPairData::array_hash(&self, state: &mut H, precision: vortex_array::hash::Precision) + +pub struct vortex_onpair::OnPairMetadata + +pub vortex_onpair::OnPairMetadata::bits: u32 + +pub vortex_onpair::OnPairMetadata::codes_offsets_ptype: i32 + +pub vortex_onpair::OnPairMetadata::codes_ptype: i32 + +pub vortex_onpair::OnPairMetadata::dict_offsets_ptype: i32 + +pub vortex_onpair::OnPairMetadata::dict_size: u32 + +pub vortex_onpair::OnPairMetadata::total_tokens: u64 + +pub vortex_onpair::OnPairMetadata::uncompressed_lengths_ptype: i32 + +impl vortex_onpair::OnPairMetadata + +pub fn vortex_onpair::OnPairMetadata::codes_offsets_ptype(&self) -> vortex_array::dtype::ptype::PType + +pub fn vortex_onpair::OnPairMetadata::codes_ptype(&self) -> vortex_array::dtype::ptype::PType + +pub fn vortex_onpair::OnPairMetadata::dict_offsets_ptype(&self) -> vortex_array::dtype::ptype::PType + +pub fn vortex_onpair::OnPairMetadata::set_codes_offsets_ptype(&mut self, value: vortex_array::dtype::ptype::PType) + +pub fn vortex_onpair::OnPairMetadata::set_codes_ptype(&mut self, value: vortex_array::dtype::ptype::PType) + +pub fn vortex_onpair::OnPairMetadata::set_dict_offsets_ptype(&mut self, value: vortex_array::dtype::ptype::PType) + +pub fn vortex_onpair::OnPairMetadata::set_uncompressed_lengths_ptype(&mut self, value: vortex_array::dtype::ptype::PType) + +pub fn vortex_onpair::OnPairMetadata::uncompressed_lengths_ptype(&self) -> vortex_array::dtype::ptype::PType + +impl vortex_onpair::OnPairMetadata + +pub fn vortex_onpair::OnPairMetadata::get_uncompressed_lengths_ptype(&self) -> vortex_error::VortexResult + +impl core::default::Default for vortex_onpair::OnPairMetadata + +pub fn vortex_onpair::OnPairMetadata::default() -> Self + +impl core::fmt::Debug for vortex_onpair::OnPairMetadata + +pub fn vortex_onpair::OnPairMetadata::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl prost::message::Message for vortex_onpair::OnPairMetadata + +pub fn vortex_onpair::OnPairMetadata::clear(&mut self) + +pub fn vortex_onpair::OnPairMetadata::encoded_len(&self) -> usize + +pub struct vortex_onpair::OnPairSlots + +pub vortex_onpair::OnPairSlots::codes: vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlots::codes_offsets: vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlots::dict_offsets: vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlots::uncompressed_lengths: vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlots::validity: core::option::Option + +impl vortex_onpair::OnPairSlots + +pub const vortex_onpair::OnPairSlots::CODES: usize + +pub const vortex_onpair::OnPairSlots::CODES_OFFSETS: usize + +pub const vortex_onpair::OnPairSlots::COUNT: usize + +pub const vortex_onpair::OnPairSlots::DICT_OFFSETS: usize + +pub const vortex_onpair::OnPairSlots::NAMES: [&'static str; 5] + +pub const vortex_onpair::OnPairSlots::UNCOMPRESSED_LENGTHS: usize + +pub const vortex_onpair::OnPairSlots::VALIDITY: usize + +pub fn vortex_onpair::OnPairSlots::from_slots(slots: vortex_array::array::ArraySlots) -> Self + +pub fn vortex_onpair::OnPairSlots::into_slots(self) -> vortex_array::array::ArraySlots + +pub struct vortex_onpair::OnPairSlotsView<'a> + +pub vortex_onpair::OnPairSlotsView::codes: &'a vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlotsView::codes_offsets: &'a vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlotsView::dict_offsets: &'a vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlotsView::uncompressed_lengths: &'a vortex_array::array::erased::ArrayRef + +pub vortex_onpair::OnPairSlotsView::validity: core::option::Option<&'a vortex_array::array::erased::ArrayRef> + +impl<'a> vortex_onpair::OnPairSlotsView<'a> + +pub fn vortex_onpair::OnPairSlotsView<'a>::from_slots(slots: &'a [core::option::Option]) -> Self + +pub fn vortex_onpair::OnPairSlotsView<'a>::to_owned(&self) -> vortex_onpair::OnPairSlots + +pub const vortex_onpair::DEFAULT_DICT12_CONFIG: onpair::config::Config + +pub trait vortex_onpair::OnPairArrayExt: vortex_onpair::OnPairArraySlotsExt + +pub fn vortex_onpair::OnPairArrayExt::array_validity(&self) -> vortex_array::validity::Validity + +impl vortex_onpair::OnPairArrayExt for T + +pub trait vortex_onpair::OnPairArraySlotsExt: vortex_array::array::typed::TypedArrayRef + +pub fn vortex_onpair::OnPairArraySlotsExt::codes(&self) -> &vortex_array::array::erased::ArrayRef + +pub fn vortex_onpair::OnPairArraySlotsExt::codes_offsets(&self) -> &vortex_array::array::erased::ArrayRef + +pub fn vortex_onpair::OnPairArraySlotsExt::dict_offsets(&self) -> &vortex_array::array::erased::ArrayRef + +pub fn vortex_onpair::OnPairArraySlotsExt::slots_view(&self) -> vortex_onpair::OnPairSlotsView<'_> + +pub fn vortex_onpair::OnPairArraySlotsExt::uncompressed_lengths(&self) -> &vortex_array::array::erased::ArrayRef + +pub fn vortex_onpair::OnPairArraySlotsExt::validity(&self) -> core::option::Option<&vortex_array::array::erased::ArrayRef> + +impl> vortex_onpair::OnPairArraySlotsExt for T + +pub fn vortex_onpair::onpair_compress>(array: A, len: usize, dtype: &vortex_array::dtype::DType, config: onpair::config::Config) -> vortex_error::VortexResult + +pub fn vortex_onpair::onpair_compress_array(array: &vortex_array::array::erased::ArrayRef, config: onpair::config::Config, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_onpair::onpair_compress_array_default(array: &vortex_array::array::erased::ArrayRef, config: onpair::config::Config) -> vortex_error::VortexResult + +pub fn vortex_onpair::onpair_compress_iter<'a, I>(iter: I, len: usize, dtype: vortex_array::dtype::DType, config: onpair::config::Config) -> vortex_error::VortexResult where I: core::iter::traits::iterator::Iterator> + +pub type vortex_onpair::OnPairArray = vortex_array::array::typed::Array diff --git a/encodings/experimental/onpair/src/array.rs b/encodings/experimental/onpair/src/array.rs new file mode 100644 index 00000000000..18fa5e942ce --- /dev/null +++ b/encodings/experimental/onpair/src/array.rs @@ -0,0 +1,539 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Debug; +use std::fmt::Display; +use std::fmt::Formatter; +use std::hash::Hasher; + +use prost::Message as _; +use vortex_array::Array; +use vortex_array::ArrayEq; +use vortex_array::ArrayHash; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; +use vortex_array::IntoArray; +use vortex_array::Precision; +use vortex_array::array_slots; +use vortex_array::buffer::BufferHandle; +use vortex_array::builders::ArrayBuilder; +use vortex_array::builders::VarBinViewBuilder; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::serde::ArrayChildren; +use vortex_array::validity::Validity; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; +use vortex_array::vtable::child_to_validity; +use vortex_array::vtable::validity_to_child; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::canonical::canonicalize_onpair; +use crate::canonical::onpair_decode_views; +use crate::kernel::PARENT_KERNELS; +use crate::rules::RULES; + +/// An [`OnPair`]-encoded Vortex array. +pub type OnPairArray = Array; + +/// Wire-format metadata persisted alongside the OnPair buffer + slot children. +/// +/// On disk the layout is FSST-shape: +/// +/// * Buffer 0 — `dict_bytes`: the dictionary blob built by the OnPair trainer, +/// padded with `onpair::MAX_TOKEN_SIZE` trailing zero +/// bytes so the over-copy decoder can read 16 bytes past the last token. +/// * Slots — see [`OnPairSlots`]. +/// +/// The four integer slot children flow through the standard `compress_child` +/// pipeline (see `vortex-btrblocks::schemes::string::OnPairScheme`), so any +/// encoding registered with the compressor can re-encode them — exactly the +/// same shape as FSST's `codes` `VarBinArray`. +#[derive(Clone, prost::Message)] +pub struct OnPairMetadata { + /// Width of the per-row primitive `uncompressed_lengths` child. + #[prost(enumeration = "PType", tag = "1")] + pub uncompressed_lengths_ptype: i32, + /// Bits-per-token the column was compressed with (9..=16). Every value + /// in the `codes` child only uses its low `bits` bits. + #[prost(uint32, tag = "2")] + pub bits: u32, + /// Number of dictionary tokens. `dict_offsets` has length `dict_size + 1`. + /// Bounded by `2^bits ≤ 2^16 = 65_536`, so `u32` is comfortably wide. + #[prost(uint32, tag = "3")] + pub dict_size: u32, + /// Total number of tokens across all rows. `codes` has this length; + /// `codes_offsets.last() == total_tokens`. + #[prost(uint64, tag = "4")] + pub total_tokens: u64, + /// PType of the `dict_offsets` slot child (defaults to U32, may be + /// narrowed to U16/U8 by the cascading compressor when values fit). + #[prost(enumeration = "PType", tag = "5")] + pub dict_offsets_ptype: i32, + /// PType of the `codes` slot child (typically U16, may be narrowed to U8 + /// when `bits <= 8`). + #[prost(enumeration = "PType", tag = "6")] + pub codes_ptype: i32, + /// PType of the `codes_offsets` slot child. + #[prost(enumeration = "PType", tag = "7")] + pub codes_offsets_ptype: i32, +} + +impl OnPairMetadata { + pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult { + PType::try_from(self.uncompressed_lengths_ptype) + .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype)) + } +} + +#[array_slots(OnPair)] +pub struct OnPairSlots { + /// `PrimitiveArray`, length `dict_size + 1`. Cascading compressor may + /// narrow the ptype to U16/U8. + pub dict_offsets: ArrayRef, + /// `PrimitiveArray`. Each value only uses its low `bits` bits; + /// downstream `FastLanes::BitPacking` losslessly shrinks the child to + /// exactly `bits`-bit codes on disk. + pub codes: ArrayRef, + /// `PrimitiveArray`, length `num_rows + 1`. FoR / RunEnd / etc. apply + /// naturally via the cascading compressor. + pub codes_offsets: ArrayRef, + /// Integer `PrimitiveArray`, length `num_rows`. Used to size the canonical + /// output buffer. + pub uncompressed_lengths: ArrayRef, + /// Optional validity child for the outer string column. + pub validity: Option, +} + +/// Inner data for an OnPair-encoded array. +/// +/// Holds only the dictionary blob (buffer 0). Every other piece — +/// `dict_offsets`, the per-token `codes`, the per-row `codes_offsets`, the +/// per-row `uncompressed_lengths`, and the optional validity child — is a +/// Vortex slot child so it can be re-encoded by the cascading compressor. +#[derive(Clone)] +pub struct OnPairData { + /// The dictionary blob (buffer 0). + /// + /// INVARIANT: this buffer must be over-padded past its logical end + /// (`dict_offsets.last()`) by the decoder's fixed token read width, + /// `onpair::MAX_TOKEN_SIZE`. The over-copy decoder reads + /// every dictionary entry with one fixed-width load and then advances the + /// cursor by the token's true length, so the load for the final, shortest + /// token over-reads past the logical end of the dictionary. This is the + /// same over-read the decoder accounts for on the final few codes; the + /// trailing padding absorbs it so that any entry can be read in bounds. + /// `onpair_compress` establishes this padding (see `parts_to_children`); + /// the over-copy decoder lives in the `onpair` crate. + dict_bytes: BufferHandle, + bits: u32, + len: usize, +} + +impl OnPairData { + pub fn new(dict_bytes: BufferHandle, bits: u32, len: usize) -> Self { + Self { + dict_bytes, + bits, + len, + } + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn bits(&self) -> u32 { + self.bits + } + + pub fn dict_bytes(&self) -> &ByteBuffer { + self.dict_bytes.as_host() + } + + pub fn dict_bytes_handle(&self) -> &BufferHandle { + &self.dict_bytes + } +} + +impl Display for OnPairData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "len: {}, bits: {}, dict_bytes_len: {}", + self.len, + self.bits, + self.dict_bytes.len() + ) + } +} + +impl Debug for OnPairData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OnPairData") + .field("len", &self.len) + .field("bits", &self.bits) + .field("dict_bytes_len", &self.dict_bytes.len()) + .finish() + } +} + +impl ArrayHash for OnPairData { + fn array_hash(&self, state: &mut H, precision: Precision) { + self.dict_bytes.as_host().array_hash(state, precision); + state.write_u32(self.bits); + } +} + +impl ArrayEq for OnPairData { + fn array_eq(&self, other: &Self, precision: Precision) -> bool { + self.bits == other.bits + && self + .dict_bytes + .as_host() + .array_eq(other.dict_bytes.as_host(), precision) + } +} + +/// Zero-sized VTable marker for the OnPair encoding. +#[derive(Clone, Debug)] +pub struct OnPair; + +impl OnPair { + /// Build an [`OnPairArray`] from already-materialised parts. + #[expect(clippy::too_many_arguments, reason = "every child is a real input")] + pub fn try_new( + dtype: DType, + dict_bytes: BufferHandle, + dict_offsets: ArrayRef, + codes: ArrayRef, + codes_offsets: ArrayRef, + uncompressed_lengths: ArrayRef, + validity: Validity, + bits: u32, + ) -> VortexResult { + validate_parts( + &dtype, + &dict_offsets, + &codes, + &codes_offsets, + &uncompressed_lengths, + bits, + )?; + let len = uncompressed_lengths.len(); + let data = OnPairData::new(dict_bytes, bits, len); + let slots = OnPairSlots { + dict_offsets, + codes, + codes_offsets, + uncompressed_lengths, + validity: validity_to_child(&validity, len), + } + .into_slots(); + Ok(unsafe { + Array::from_parts_unchecked(ArrayParts::new(OnPair, dtype, len, data).with_slots(slots)) + }) + } + + #[expect(clippy::too_many_arguments, reason = "every child is a real input")] + pub(crate) unsafe fn new_unchecked( + dtype: DType, + dict_bytes: BufferHandle, + dict_offsets: ArrayRef, + codes: ArrayRef, + codes_offsets: ArrayRef, + uncompressed_lengths: ArrayRef, + validity: Validity, + bits: u32, + ) -> OnPairArray { + let len = uncompressed_lengths.len(); + let data = OnPairData::new(dict_bytes, bits, len); + let slots = OnPairSlots { + dict_offsets, + codes, + codes_offsets, + uncompressed_lengths, + validity: validity_to_child(&validity, len), + } + .into_slots(); + unsafe { + Array::from_parts_unchecked(ArrayParts::new(OnPair, dtype, len, data).with_slots(slots)) + } + } +} + +fn validate_parts( + dtype: &DType, + dict_offsets: &ArrayRef, + codes: &ArrayRef, + codes_offsets: &ArrayRef, + uncompressed_lengths: &ArrayRef, + bits: u32, +) -> VortexResult<()> { + vortex_ensure!( + matches!(dtype, DType::Binary(_) | DType::Utf8(_)), + "OnPair arrays must be Binary or Utf8, found {dtype}" + ); + vortex_ensure!((9..=16).contains(&bits), "bits {bits} out of range [9, 16]"); + + if !dict_offsets.dtype().is_int() || dict_offsets.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "dict_offsets must be non-nullable integer"); + } + if !codes.dtype().is_int() || codes.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "codes must be non-nullable integer"); + } + if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "codes_offsets must be non-nullable integer"); + } + if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "uncompressed_lengths must be non-nullable integer"); + } + if codes_offsets.len() != uncompressed_lengths.len() + 1 { + vortex_bail!(InvalidArgument: + "codes_offsets.len ({}) != uncompressed_lengths.len + 1 ({})", + codes_offsets.len(), + uncompressed_lengths.len() + 1 + ); + } + Ok(()) +} + +impl VTable for OnPair { + type TypedArrayData = OnPairData; + type OperationsVTable = Self; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.onpair"); + *ID + } + + fn validate( + &self, + data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + let s = OnPairSlotsView::from_slots(slots); + validate_parts( + dtype, + s.dict_offsets, + s.codes, + s.codes_offsets, + s.uncompressed_lengths, + data.bits, + )?; + if s.uncompressed_lengths.len() != len { + vortex_bail!(InvalidArgument: "uncompressed_lengths must have same len as outer array"); + } + if data.len != len { + vortex_bail!(InvalidArgument: "OnPairData len {} != outer len {}", data.len, len); + } + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 1 + } + + fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + match idx { + 0 => array.dict_bytes_handle().clone(), + _ => vortex_panic!("OnPairArray buffer index {idx} out of bounds"), + } + } + + fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option { + match idx { + 0 => Some("dict_bytes".to_string()), + _ => vortex_panic!("OnPairArray buffer_name index {idx} out of bounds"), + } + } + + fn serialize( + array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + let dict_size = u32::try_from(array.dict_offsets().len().saturating_sub(1)) + .map_err(|_| vortex_err!("OnPair dict_size exceeds u32"))?; + let total_tokens = array.codes().len() as u64; + Ok(Some( + OnPairMetadata { + uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(), + bits: array.bits(), + dict_size, + total_tokens, + dict_offsets_ptype: array.dict_offsets().dtype().as_ptype().into(), + codes_ptype: array.codes().dtype().as_ptype().into(), + codes_offsets_ptype: array.codes_offsets().dtype().as_ptype().into(), + } + .encode_to_vec(), + )) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + if buffers.len() != 1 { + vortex_bail!(InvalidArgument: "Expected 1 buffer, got {}", buffers.len()); + } + let metadata = OnPairMetadata::decode(metadata)?; + let uncompressed_ptype = metadata.get_uncompressed_lengths_ptype()?; + + // Slot children. We pass `usize::MAX` for slots whose length we + // don't know up front (`dict_offsets` and `codes`). `codes_offsets` + // has known length `len + 1`. + let dict_offsets_len = metadata.dict_size as usize + 1; + let total_tokens = usize::try_from(metadata.total_tokens) + .map_err(|_| vortex_err!("total_tokens {} overflows usize", metadata.total_tokens))?; + // The cascading compressor may have narrowed any of these integer + // children to a tighter ptype; the recorded ptype tells the framework + // exactly which dtype to materialise as. + let dict_offsets_ptype = PType::try_from(metadata.dict_offsets_ptype).map_err(|_| { + vortex_err!("invalid dict_offsets_ptype {}", metadata.dict_offsets_ptype) + })?; + let codes_ptype = PType::try_from(metadata.codes_ptype) + .map_err(|_| vortex_err!("invalid codes_ptype {}", metadata.codes_ptype))?; + let codes_offsets_ptype = PType::try_from(metadata.codes_offsets_ptype).map_err(|_| { + vortex_err!( + "invalid codes_offsets_ptype {}", + metadata.codes_offsets_ptype + ) + })?; + let dict_offsets = children.get( + 0, + &DType::Primitive(dict_offsets_ptype, Nullability::NonNullable), + dict_offsets_len, + )?; + let codes = children.get( + 1, + &DType::Primitive(codes_ptype, Nullability::NonNullable), + total_tokens, + )?; + let codes_offsets = children.get( + 2, + &DType::Primitive(codes_offsets_ptype, Nullability::NonNullable), + len + 1, + )?; + let uncompressed_lengths = children.get( + 3, + &DType::Primitive(uncompressed_ptype, Nullability::NonNullable), + len, + )?; + let validity = match children.len() { + 4 => Validity::from(dtype.nullability()), + 5 => Validity::Array(children.get(4, &Validity::DTYPE, len)?), + other => vortex_bail!(InvalidArgument: "Expected 4 or 5 children, got {other}"), + }; + + let data = OnPairData::new(buffers[0].clone(), metadata.bits, len); + let slots = OnPairSlots { + dict_offsets, + codes, + codes_offsets, + uncompressed_lengths, + validity: validity_to_child(&validity, len), + } + .into_slots(); + Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots)) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + OnPairSlots::NAMES[idx].to_string() + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + canonicalize_onpair(array.as_view(), ctx).map(ExecutionResult::done) + } + + fn append_to_builder( + array: ArrayView<'_, Self>, + builder: &mut dyn ArrayBuilder, + ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + let Some(builder) = builder.as_any_mut().downcast_mut::() else { + builder.extend_from_array( + &array + .array() + .clone() + .execute::(ctx)? + .into_array(), + ); + return Ok(()); + }; + + let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress()); + let (buffers, views) = onpair_decode_views(array, next_buffer_index, ctx)?; + builder.push_buffer_and_adjusted_views( + &buffers, + &views, + array + .array() + .validity()? + .execute_mask(array.array().len(), ctx)?, + ); + Ok(()) + } + + fn execute_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + PARENT_KERNELS.execute(array, parent, child_idx, ctx) + } + + fn reduce_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ) -> VortexResult> { + RULES.evaluate(array, parent, child_idx) + } +} + +impl ValidityVTable for OnPair { + fn validity(array: ArrayView<'_, OnPair>) -> VortexResult { + Ok(child_to_validity( + array.slots()[OnPairSlots::VALIDITY].as_ref(), + array.dtype().nullability(), + )) + } +} + +/// Convenience methods on top of the macro-generated [`OnPairArraySlotsExt`]. +pub trait OnPairArrayExt: OnPairArraySlotsExt { + fn array_validity(&self) -> Validity { + child_to_validity( + self.as_ref().slots()[OnPairSlots::VALIDITY].as_ref(), + self.as_ref().dtype().nullability(), + ) + } +} + +impl OnPairArrayExt for T {} diff --git a/encodings/experimental/onpair/src/canonical.rs b/encodings/experimental/onpair/src/canonical.rs new file mode 100644 index 00000000000..c658b841155 --- /dev/null +++ b/encodings/experimental/onpair/src/canonical.rs @@ -0,0 +1,115 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! Convert an [`OnPairArray`] to its canonical `VarBinViewArray` by handing +//! the materialised parts to `onpair::decompress_into`. + +use std::sync::Arc; + +use num_traits::AsPrimitive; +use onpair::Parts; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::varbinview::build_views::BinaryView; +use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN; +use vortex_array::arrays::varbinview::build_views::build_views; +use vortex_array::match_each_integer_ptype; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; + +use crate::OnPair; +use crate::OnPairArraySlotsExt; +use crate::decode::code_boundary_at; +use crate::decode::collect_widened; + +pub(super) fn canonicalize_onpair( + array: ArrayView<'_, OnPair>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let (buffers, views) = onpair_decode_views(array, 0, ctx)?; + let validity = array.array().validity()?; + Ok(unsafe { + VarBinViewArray::new_unchecked(views, Arc::from(buffers), array.dtype().clone(), validity) + .into_array() + }) +} + +pub(crate) fn onpair_decode_views( + array: ArrayView<'_, OnPair>, + start_buf_index: u32, + ctx: &mut ExecutionCtx, +) -> VortexResult<(Vec, Buffer)> { + let lengths = array + .uncompressed_lengths() + .clone() + .execute::(ctx)?; + + let total_size: usize = match_each_integer_ptype!(lengths.ptype(), |P| { + lengths + .as_slice::

() + .iter() + .map(|&l| AsPrimitive::::as_(l)) + .sum() + }); + + // `codes_offsets` holds the per-row code boundaries and may itself be a + // sliced or filtered view of the original. Its first and last entries + // bound the contiguous run of `codes` belonging to the rows present in + // this array: `slice` keeps the full `codes` child and only narrows + // `codes_offsets` (so `code_start > 0` and/or `code_end < codes.len()`), + // while `filter` rebuilds both children so the window is the whole stream. + // OnPair has no `TakeExecute`, so a reordering take is served from the + // canonical `VarBinView` and never reaches this path. We only need those + // two boundaries, so point-look them up rather than decoding every offset. + let codes_offsets = array.codes_offsets(); + let code_start = code_boundary_at(codes_offsets, 0, ctx)?; + let code_end = code_boundary_at(codes_offsets, array.len(), ctx)?; + vortex_ensure!( + code_start <= code_end, + "OnPair codes_offsets must be nondecreasing" + ); + vortex_ensure!( + code_end <= array.codes().len(), + "OnPair codes_offsets end {} exceeds codes len {}", + code_end, + array.codes().len() + ); + + // Slice the `codes` child to that window *before* unpacking it, so a sliced + // array materialises only its own codes rather than the whole column's. The + // contiguous decoder walks `codes` in order and never reads the per-row + // boundaries, so an empty boundary slice is sound. + let codes = collect_widened::(&array.codes().slice(code_start..code_end)?, ctx)?; + let dict_offsets = collect_widened::(array.dict_offsets(), ctx)?; + + let mut out_bytes = ByteBufferMut::with_capacity(total_size); + let written = onpair::decompress_into( + Parts { + dict_bytes: array.dict_bytes().as_slice(), + dict_offsets: dict_offsets.as_slice(), + bits: array.bits(), + codes: codes.as_slice(), + }, + out_bytes.spare_capacity_mut(), + ); + debug_assert_eq!(written, total_size); + // SAFETY: `decompress_into` initialised exactly `written` bytes of the + // spare capacity reserved above. + unsafe { out_bytes.set_len(written) }; + + match_each_integer_ptype!(lengths.ptype(), |P| { + Ok(build_views( + start_buf_index, + MAX_BUFFER_LEN, + out_bytes, + lengths.as_slice::

(), + )) + }) +} diff --git a/encodings/experimental/onpair/src/compress.rs b/encodings/experimental/onpair/src/compress.rs new file mode 100644 index 00000000000..226922cf593 --- /dev/null +++ b/encodings/experimental/onpair/src/compress.rs @@ -0,0 +1,186 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Train + compress entry points for the OnPair encoding. + +use onpair::Config; +use onpair::Offset; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::OnPair; +use crate::OnPairArray; + +/// Default OnPair training configuration: 12-bit codes ("dict-12"). +pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG; + +/// Compress an iterable of optional byte strings via the OnPair encoder. +pub fn onpair_compress_iter<'a, I>( + iter: I, + len: usize, + dtype: DType, + config: Config, +) -> VortexResult +where + I: Iterator>, +{ + onpair_compress_iter_with_offsets::(iter, len, dtype, config) +} + +fn onpair_compress_iter_with_offsets<'a, O, I>( + iter: I, + len: usize, + dtype: DType, + config: Config, +) -> VortexResult +where + O: Offset, + I: Iterator>, +{ + let mut flat: Vec = Vec::with_capacity(len * 16); + let mut offsets: Vec = Vec::with_capacity(len + 1); + let mut uncompressed_lengths: BufferMut = BufferMut::with_capacity(len); + let mut validity_bits: Vec = Vec::with_capacity(len); + offsets.push(::from_usize(0)); + + for item in iter { + match item { + Some(bytes) => { + flat.extend_from_slice(bytes); + offsets.push(::from_usize(flat.len())); + uncompressed_lengths.push( + i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"), + ); + validity_bits.push(true); + } + None => { + offsets.push(::from_usize(flat.len())); + uncompressed_lengths.push(0); + validity_bits.push(false); + } + } + } + + let column = onpair::compress(&flat, &offsets, config) + .map_err(|e| vortex_err!("OnPair compress failed: {e}"))?; + let bits = column.bits; + let dict_bytes = dict_bytes_to_buffer(column.dict_bytes); + let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?; + let codes = Buffer::from(column.codes).into_array(); + let dict_offsets = Buffer::from(column.dict_offsets).into_array(); + let codes_offsets = Buffer::from(codes_offsets).into_array(); + + let uncompressed_lengths = uncompressed_lengths.into_array(); + let validity = match dtype.nullability() { + Nullability::NonNullable => Validity::NonNullable, + Nullability::Nullable => Validity::from_iter(validity_bits), + }; + + OnPair::try_new( + dtype, + dict_bytes, + dict_offsets, + codes, + codes_offsets, + uncompressed_lengths, + validity, + bits, + ) +} + +/// Lift compressed dictionary bytes into the Vortex buffer slot. +fn dict_bytes_to_buffer(dict_bytes: Vec) -> BufferHandle { + // Pad the dictionary blob with MAX_TOKEN_SIZE zero bytes so the + // over-copy decoder can issue a fixed 16-byte load for every token + // without risking an OOB read on the last entry. + let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE); + padded.extend_from_slice(&dict_bytes); + padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0); + // Align dict_bytes to 8 bytes so the segment that ultimately holds the + // OnPair tree starts at an 8-aligned in-memory address. Without this + // anchor, the per-buffer padding the serializer inserts is only + // *relative* to the segment start; if the segment lands at a u8-aligned + // heap address, downstream `PrimitiveArray::deserialize` panics + // with `Misaligned buffer cannot be used to build PrimitiveArray of u32`. + BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8))) +} + +/// Reconstruct the per-row `codes_offsets` from the flat `codes`, the +/// dictionary `dict_offsets` (token byte lengths) and the per-row decoded byte +/// boundaries. Returns `nrows + 1` cumulative code counts (`u32`). +// TODO(joe): can we compute this while compressing the array, yes but a worse API. +fn build_codes_offsets( + codes: &[u16], + dict_offsets: &[u32], + row_byte_offsets: &[O], +) -> VortexResult> { + let nrows = row_byte_offsets.len() - 1; + let mut codes_offsets = Vec::with_capacity(nrows + 1); + codes_offsets.push(0u32); + let mut decoded_bytes: u64 = 0; + let mut code_idx: usize = 0; + for r in 0..nrows { + let target = row_byte_offsets[r + 1] + .to_usize() + .ok_or_else(|| vortex_err!("OnPair row byte offset does not fit usize"))? + as u64; + while decoded_bytes < target { + let code = codes[code_idx] as usize; + decoded_bytes += u64::from(dict_offsets[code + 1] - dict_offsets[code]); + code_idx += 1; + } + codes_offsets.push( + u32::try_from(code_idx) + .map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?, + ); + } + Ok(codes_offsets) +} + +/// Compress a byte-string accessor (typically a `VarBinArray` or +/// `VarBinViewArray`). +pub fn onpair_compress>( + array: A, + len: usize, + dtype: &DType, + config: Config, +) -> VortexResult { + array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config)) +} + +/// Compress any [`ArrayRef`] whose canonical form is a string array, by first +/// canonicalising to `VarBinViewArray`. +pub fn onpair_compress_array( + array: &ArrayRef, + config: Config, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let view = array.clone().execute::(ctx)?; + let len = view.len(); + let dtype = view.dtype().clone(); + onpair_compress(&view, len, &dtype, config) +} + +/// Convenience: build a default `ExecutionCtx` from `LEGACY_SESSION`. +pub fn onpair_compress_array_default( + array: &ArrayRef, + config: Config, +) -> VortexResult { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + onpair_compress_array(array, config, &mut ctx) +} diff --git a/encodings/experimental/onpair/src/compute/cast.rs b/encodings/experimental/onpair/src/compute/cast.rs new file mode 100644 index 00000000000..93e1fdd8f8a --- /dev/null +++ b/encodings/experimental/onpair/src/compute/cast.rs @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::IntoArray; +use vortex_array::dtype::DType; +use vortex_array::scalar_fn::fns::cast::CastReduce; +use vortex_error::VortexResult; + +use crate::OnPair; +use crate::OnPairArraySlotsExt; + +/// Cast between `Utf8` and `Binary` (or adjust nullability) without touching +/// any of the encoded payload — we only rewrap into a new outer DType. +impl CastReduce for OnPair { + fn cast(array: ArrayView<'_, Self>, dtype: &DType) -> VortexResult> { + if !array.dtype().eq_ignore_nullability(dtype) { + return Ok(None); + } + let validity = array.array().validity()?; + let Some(new_validity) = + validity.trivially_cast_nullability(dtype.nullability(), array.array().len())? + else { + return Ok(None); + }; + Ok(Some( + unsafe { + OnPair::new_unchecked( + dtype.clone(), + array.dict_bytes_handle().clone(), + array.dict_offsets().clone(), + array.codes().clone(), + array.codes_offsets().clone(), + array.uncompressed_lengths().clone(), + new_validity, + array.bits(), + ) + } + .into_array(), + )) + } +} diff --git a/encodings/experimental/onpair/src/compute/filter.rs b/encodings/experimental/onpair/src/compute/filter.rs new file mode 100644 index 00000000000..c26f3eeeacd --- /dev/null +++ b/encodings/experimental/onpair/src/compute/filter.rs @@ -0,0 +1,78 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! Filter that **shares the dictionary**. The previous implementation +//! decoded the whole array, filtered the canonical bytes, and re-trained +//! a brand-new OnPair dictionary on the surviving rows — order-of- +//! magnitude regressions on TPC-H Q22 at SF=10 traced back to that cost +//! (the customer table's `c_phone` column gets two consecutive filters, +//! each of which was paying full `Column::compress` training overhead). +//! +//! FSST-shape filter: keep `dict_bytes` + `dict_offsets` **identical** +//! to the input; rebuild only `codes`, `codes_offsets`, +//! `uncompressed_lengths`, and validity. No decode, no retrain on the +//! read path. + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::List; +use vortex_array::arrays::ListArray; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::arrays::list::ListArrayExt; +use vortex_array::validity::Validity; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::OnPair; +use crate::OnPairArrayExt; +use crate::OnPairArraySlotsExt; + +impl FilterKernel for OnPair { + fn filter( + array: ArrayView<'_, Self>, + mask: &Mask, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // OnPair's `codes` + `codes_offsets` are a list of token runs, + // analogous to FSST's `codes` VarBin child. Delegate to the standard + // List filter so sparse masks can filter the encoded child directly. + let codes = unsafe { + ListArray::new_unchecked( + array.codes().clone(), + array.codes_offsets().clone(), + Validity::NonNullable, + ) + }; + let filtered_codes_ref = ::filter(codes.as_view(), mask, ctx)? + .vortex_expect("List filter kernel always returns Some"); + let filtered_codes = filtered_codes_ref + .try_downcast::() + .ok() + .vortex_expect("must be List"); + + // uncompressed_lengths + validity flow through the standard + // primitive filter — these are short integer arrays so the cost + // is negligible compared to the (avoided) recompress. + let uncompressed_lengths = array.uncompressed_lengths().clone().filter(mask.clone())?; + let validity = array.array_validity().filter(mask)?; + + Ok(Some( + unsafe { + OnPair::new_unchecked( + array.dtype().clone(), + array.dict_bytes_handle().clone(), + array.dict_offsets().clone(), + filtered_codes.elements().clone(), + filtered_codes.offsets().clone(), + uncompressed_lengths, + validity, + array.bits(), + ) + } + .into_array(), + )) + } +} diff --git a/encodings/experimental/onpair/src/compute/mod.rs b/encodings/experimental/onpair/src/compute/mod.rs new file mode 100644 index 00000000000..4cb15868625 --- /dev/null +++ b/encodings/experimental/onpair/src/compute/mod.rs @@ -0,0 +1,6 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod cast; +mod filter; +mod slice; diff --git a/encodings/experimental/onpair/src/compute/slice.rs b/encodings/experimental/onpair/src/compute/slice.rs new file mode 100644 index 00000000000..fcfebf413bf --- /dev/null +++ b/encodings/experimental/onpair/src/compute/slice.rs @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! Slicing an `OnPairArray` reuses the same dictionary blob, the full +//! `codes` child, and the full `dict_offsets` child. Only the +//! `codes_offsets` child (narrowed to `[start, end + 1)`), the +//! `uncompressed_lengths` child (narrowed to `[start, end)`) and the +//! optional validity child change. No decode, no re-training. + +use std::ops::Range; + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::IntoArray; +use vortex_array::arrays::slice::SliceReduce; +use vortex_error::VortexResult; + +use crate::OnPair; +use crate::OnPairArrayExt; +use crate::OnPairArraySlotsExt; + +impl SliceReduce for OnPair { + fn slice(array: ArrayView<'_, Self>, range: Range) -> VortexResult> { + let codes_offsets = array.codes_offsets().slice(range.start..range.end + 1)?; + let uncompressed_lengths = array.uncompressed_lengths().slice(range.clone())?; + let validity = array.array_validity().slice(range)?; + Ok(Some( + unsafe { + OnPair::new_unchecked( + array.dtype().clone(), + array.dict_bytes_handle().clone(), + array.dict_offsets().clone(), + array.codes().clone(), + codes_offsets, + uncompressed_lengths, + validity, + array.bits(), + ) + } + .into_array(), + )) + } +} diff --git a/encodings/experimental/onpair/src/decode.rs b/encodings/experimental/onpair/src/decode.rs new file mode 100644 index 00000000000..e3c7346f0d9 --- /dev/null +++ b/encodings/experimental/onpair/src/decode.rs @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! Helpers for turning [`OnPair`] slot children into the inputs the upstream +//! `onpair` decoder consumes. + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::NativePType; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +/// Canonicalise a slot child to the decoder's native primitive width. +pub(crate) fn collect_widened( + arr: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let dtype = DType::Primitive(T::PTYPE, arr.dtype().nullability()); + Ok(arr + .cast(dtype)? + .execute::(ctx)? + .into_buffer::()) +} + +/// Read one `codes_offsets` boundary by point lookup. This decodes at most a +/// single chunk of the child — never the whole per-row offsets array — so the +/// callers that only need a row window (`scalar_at`, the canonical decode's +/// start/end bounds) don't pay to materialise every boundary. +pub(crate) fn code_boundary_at( + codes_offsets: &ArrayRef, + index: usize, + ctx: &mut ExecutionCtx, +) -> VortexResult { + codes_offsets + .execute_scalar(index, ctx)? + .as_primitive() + .as_::() + .ok_or_else(|| vortex_err!("OnPair codes_offsets[{index}] is null")) +} diff --git a/encodings/experimental/onpair/src/kernel.rs b/encodings/experimental/onpair/src/kernel.rs new file mode 100644 index 00000000000..fdd521e887e --- /dev/null +++ b/encodings/experimental/onpair/src/kernel.rs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::arrays::filter::FilterExecuteAdaptor; +use vortex_array::kernel::ParentKernelSet; + +use crate::OnPair; + +// TODO: implement ListExecute & TakeExecute for OnPair +pub(super) const PARENT_KERNELS: ParentKernelSet = + ParentKernelSet::new(&[ParentKernelSet::lift(&FilterExecuteAdaptor(OnPair))]); diff --git a/encodings/experimental/onpair/src/lib.rs b/encodings/experimental/onpair/src/lib.rs new file mode 100644 index 00000000000..94c18b6dec8 --- /dev/null +++ b/encodings/experimental/onpair/src/lib.rs @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Vortex string array backed by the [OnPair][onpair] short-string +//! compression library, with `cast` and `filter` pushdown. +//! +//! The default training preset is `dict-12` (12 bits per token, dictionary +//! capped at 4 096 entries). See [`onpair_compress`] for the entry point and +//! [`OnPairArray`] for the resulting array type. +//! +//! [onpair]: https://arxiv.org/abs/2508.02280 + +mod array; +mod canonical; +mod compress; +mod compute; +mod decode; +mod kernel; +mod ops; +mod rules; +#[cfg(test)] +mod tests; + +pub use array::*; +pub use compress::*; +pub use onpair::Bits; +pub use onpair::Config; +pub use onpair::Error as OnPairError; +pub use onpair::Threshold; diff --git a/encodings/experimental/onpair/src/ops.rs b/encodings/experimental/onpair/src/ops.rs new file mode 100644 index 00000000000..a6e097bbfd4 --- /dev/null +++ b/encodings/experimental/onpair/src/ops.rs @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use onpair::Parts; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::varbin::varbin_scalar; +use vortex_array::scalar::Scalar; +use vortex_array::vtable::OperationsVTable; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::OnPair; +use crate::OnPairArraySlotsExt; +use crate::decode::code_boundary_at; +use crate::decode::collect_widened; + +impl OperationsVTable for OnPair { + fn scalar_at( + array: ArrayView<'_, OnPair>, + index: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + // A row owns a variable-length run of the flat `codes` stream; the + // per-row `codes_offsets` boundaries map the row index to that run. + // Read just this row's two boundaries (point lookups that decode at + // most one chunk of `codes_offsets`) and decode only that run — never + // the whole column. + let codes_offsets = array.codes_offsets(); + let row_start = code_boundary_at(codes_offsets, index, ctx)?; + let row_end = code_boundary_at(codes_offsets, index + 1, ctx)?; + + let codes = collect_widened::(&array.codes().slice(row_start..row_end)?, ctx)?; + let dict_offsets = collect_widened::(array.dict_offsets(), ctx)?; + let parts = Parts { + dict_bytes: array.dict_bytes().as_slice(), + dict_offsets: dict_offsets.as_slice(), + bits: array.bits(), + codes: codes.as_slice(), + }; + + // The per-row decoded length is recorded in the `uncompressed_lengths` + // child, so read it directly instead of asking the decoder to compute it. + let len = array + .uncompressed_lengths() + .execute_scalar(index, ctx)? + .as_primitive() + .as_::() + .ok_or_else(|| vortex_err!("OnPair uncompressed_lengths[{index}] is null"))?; + let mut buf: Vec = Vec::with_capacity(len); + let written = onpair::decompress_into(parts, buf.spare_capacity_mut()); + debug_assert_eq!(written, len); + // SAFETY: `decompress_into` initialised `written` bytes of the spare + // capacity reserved above. + unsafe { buf.set_len(written) }; + Ok(varbin_scalar(ByteBuffer::from(buf), array.dtype())) + } +} diff --git a/encodings/experimental/onpair/src/rules.rs b/encodings/experimental/onpair/src/rules.rs new file mode 100644 index 00000000000..279c160c1eb --- /dev/null +++ b/encodings/experimental/onpair/src/rules.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::arrays::slice::SliceReduceAdaptor; +use vortex_array::optimizer::rules::ParentRuleSet; +use vortex_array::scalar_fn::fns::cast::CastReduceAdaptor; + +use crate::OnPair; + +pub(crate) static RULES: ParentRuleSet = ParentRuleSet::new(&[ + ParentRuleSet::lift(&SliceReduceAdaptor(OnPair)), + ParentRuleSet::lift(&CastReduceAdaptor(OnPair)), +]); diff --git a/encodings/experimental/onpair/src/tests.rs b/encodings/experimental/onpair/src/tests.rs new file mode 100644 index 00000000000..dd6fe4b0116 --- /dev/null +++ b/encodings/experimental/onpair/src/tests.rs @@ -0,0 +1,485 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::LazyLock; + +use prost::Message; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::match_each_integer_ptype; +use vortex_array::session::ArraySession; +use vortex_array::test_harness::check_metadata; +use vortex_array::validity::Validity; +use vortex_buffer::BufferMut; +use vortex_session::VortexSession; + +use crate::OnPair; +use crate::OnPairArrayExt; +use crate::OnPairArraySlotsExt; +use crate::OnPairMetadata; +use crate::compress::DEFAULT_DICT12_CONFIG; +use crate::compress::onpair_compress; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +fn sample_input() -> VarBinArray { + VarBinArray::from_iter( + [ + Some("https://www.example.com/page"), + Some("https://www.example.com/data"), + Some("https://www.test.org/page"), + Some("ftp://files.example.com/x"), + Some("https://www.example.com/page"), + ], + DType::Utf8(Nullability::NonNullable), + ) +} + +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_metadata_golden() { + check_metadata( + "onpair.metadata", + &OnPairMetadata { + uncompressed_lengths_ptype: PType::I32 as i32, + bits: 12, + dict_size: 4096, + total_tokens: 128_000, + dict_offsets_ptype: PType::U32 as i32, + codes_ptype: PType::U16 as i32, + codes_offsets_ptype: PType::U32 as i32, + } + .encode_to_vec(), + ); +} + +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_roundtrip() { + let input = sample_input(); + let len = input.len(); + let dtype = input.dtype().clone(); + + let compressed = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).expect("compress"); + assert!(compressed.clone().into_array().is::()); + + let mut ctx = SESSION.create_execution_ctx(); + let decoded = compressed + .into_array() + .execute::(&mut ctx) + .expect("canonicalize"); + + decoded + .with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got.len(), 5); + assert_eq!( + got[0].as_deref(), + Some(b"https://www.example.com/page".as_ref()) + ); + assert_eq!( + got[3].as_deref(), + Some(b"ftp://files.example.com/x".as_ref()) + ); + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_nullable_canonicalize() { + let input = VarBinArray::from_iter( + [Some("a"), None, Some("bbb"), None, Some("ccccc")], + DType::Utf8(Nullability::Nullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let canonical = arr + .into_array() + .execute::(&mut ctx) + .unwrap(); + canonical + .with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got[1], None); + assert_eq!(got[3], None); + assert_eq!(got[4].as_deref(), Some(b"ccccc".as_ref())); + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_scalar_at() { + let input = sample_input(); + let len = input.len(); + let dtype = input.dtype().clone(); + let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let s = arr.into_array().execute_scalar(2, &mut ctx).unwrap(); + let v = s.as_utf8().value().unwrap(); + assert_eq!(v.as_bytes(), b"https://www.test.org/page"); +} + +/// `scalar_at` must decode only the requested row's code window — fetching +/// its two `codes_offsets` boundaries via point lookup, not by materialising +/// the whole `codes_offsets`/`codes` children. Verify correctness at several +/// indices (including the last row) on a full array, and on a *sliced* array +/// where `codes_offsets` is itself a narrowed view and the row index is +/// relative to the slice. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_scalar_at_window() -> vortex_error::VortexResult<()> { + let n = 2_000usize; + let strings: Vec = (0..n) + .map(|i| format!("https://www.example.com/items/{i:08}/page?q={i}")) + .collect(); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + let arr = + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG)?.into_array(); + + let mut ctx = SESSION.create_execution_ctx(); + for &i in &[0usize, 1, 999, 1000, n - 1] { + let got = arr.execute_scalar(i, &mut ctx)?; + assert_eq!( + got.as_utf8().value().unwrap().as_bytes(), + strings[i].as_bytes(), + "full array row {i}" + ); + } + + // Sliced array: `codes_offsets` is narrowed (first boundary > 0), so the + // point lookup must resolve indices relative to the slice. + let (start, end) = (700usize, 1300usize); + let sliced = arr.slice(start..end)?; + assert!(sliced.is::(), "slice dropped OnPair encoding"); + for &j in &[0usize, 1, 300, end - start - 1] { + let got = sliced.execute_scalar(j, &mut ctx)?; + assert_eq!( + got.as_utf8().value().unwrap().as_bytes(), + strings[start + j].as_bytes(), + "sliced row {j}" + ); + } + Ok(()) +} + +/// The hot decode loop is 4×-unrolled with a scalar tail. Anything that +/// lands in the tail (1-3 leftover tokens, or zero total tokens) must +/// produce the same bytes as the unrolled body. Hit every row-count +/// near the boundary. +#[cfg_attr(miri, ignore)] +#[rstest::rstest] +#[case::n_1(1)] +#[case::n_2(2)] +#[case::n_3(3)] +#[case::n_4(4)] +#[case::n_5(5)] +#[case::n_7(7)] +#[case::n_8(8)] +#[case::n_9(9)] +fn test_onpair_unroll_tail_boundaries(#[case] n: usize) { + let words: &[&str] = &["a", "bb", "ccc", "https://www.example.com/x"]; + let strings: Vec<&str> = (0..n).map(|i| words[i % words.len()]).collect(); + let input = VarBinArray::from_iter( + strings.iter().map(|s| Some(*s)), + DType::Utf8(Nullability::NonNullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let canonical = arr + .into_array() + .execute::(&mut ctx) + .unwrap(); + canonical + .with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got.len(), n); + for (i, expected) in strings.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(expected.as_bytes()), "n={n}, i={i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +/// Empty array — the unroll path must short-circuit cleanly. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_empty() { + let input = VarBinArray::from_iter( + std::iter::empty::>(), + DType::Utf8(Nullability::NonNullable), + ); + let len = input.len(); + let dtype = input.dtype().clone(); + let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); + assert_eq!(arr.len(), 0); + let mut ctx = SESSION.create_execution_ctx(); + let canonical = arr + .into_array() + .execute::(&mut ctx) + .unwrap(); + assert_eq!(canonical.len(), 0); +} + +/// Filter must share the dictionary — never recompress (this is the +/// regression cause on TPC-H Q22 SF=10). Exercise both selectivities +/// and check that the result is bit-exact and still an OnPairArray. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_filter_shares_dict() { + let n = 5_000usize; + let strings: Vec = (0..n) + .map(|i| format!("https://www.example.com/items/{i:08}")) + .collect(); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + let arr = + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + let dict_bytes_before = arr.dict_bytes().clone(); + let dict_offsets_len_before = arr.dict_offsets().len(); + + // Keep every 7th row. + let keep: Vec = (0..n).map(|i| i % 7 == 0).collect(); + let mask = vortex_mask::Mask::from_iter(keep.iter().copied()); + let expected: Vec<&str> = strings + .iter() + .enumerate() + .filter_map(|(i, s)| keep[i].then_some(s.as_str())) + .collect(); + + let mut filter_ctx = SESSION.create_execution_ctx(); + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) + .unwrap() + .expect("OnPair filter must return Some"); + assert!( + filtered.is::(), + "filter dropped OnPair encoding: got {}", + filtered.encoding_id() + ); + let typed = filtered.try_downcast::().expect("OnPair"); + // Dict must be byte-identical with the input — no retrain, no copy. + assert_eq!(typed.dict_bytes().as_slice(), dict_bytes_before.as_slice()); + assert_eq!(typed.dict_offsets().len(), dict_offsets_len_before); + assert_eq!(typed.len(), expected.len()); + + let mut ctx = SESSION.create_execution_ctx(); + let canonical = typed + .into_array() + .execute::(&mut ctx) + .unwrap(); + canonical + .with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got.len(), expected.len()); + for (i, want) in expected.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +/// Rebuild an OnPair array, swapping `codes_offsets` for a narrowed +/// (smaller-ptype) primitive copy. Used by the narrowed-child +/// regression tests below. +#[expect(clippy::cognitive_complexity)] +fn narrow_codes_offsets(arr: &crate::OnPairArray, target: PType) -> crate::OnPairArray { + let view = arr.as_view(); + let mut ctx = SESSION.create_execution_ctx(); + let original = view + .codes_offsets() + .clone() + .execute::(&mut ctx) + .unwrap(); + + let narrowed_array = match_each_integer_ptype!(original.ptype(), |SRC| { + let src = original.as_slice::(); + match_each_integer_ptype!(target, |DST| { + let mut buf = BufferMut::::with_capacity(src.len()); + for &v in src { + #[allow( + clippy::unnecessary_cast, + reason = "macro-generated SRC may already be u64" + )] + buf.push(DST::try_from(v as u64).expect("value must fit in target ptype")); + } + PrimitiveArray::new(buf.freeze(), Validity::NonNullable).into_array() + }) + }); + + unsafe { + OnPair::new_unchecked( + view.dtype().clone(), + view.dict_bytes_handle().clone(), + view.dict_offsets().clone(), + view.codes().clone(), + narrowed_array, + view.uncompressed_lengths().clone(), + view.array_validity(), + view.bits(), + ) + } +} + +/// Regression: the cascading compressor can narrow `codes_offsets` +/// from u32 → u16 when every row's token count is small. The previous +/// `filter` impl read the child as `as_slice::()` and panicked +/// with `Other error: Attempted to get slice of type u32 from array +/// of type u16`. The fix dispatches via `match_each_integer_ptype!`. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_filter_with_narrowed_codes_offsets_u16() { + let n = 200usize; + // Short rows so per-row token counts stay small and codes_offsets + // values fit in u16. (We narrow manually below regardless — this + // matches the shape the cascading compressor produces in the + // wild.) + let strings: Vec = (0..n).map(|i| format!("r{:03}", i)).collect(); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + let arr = + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + + // Force `codes_offsets` to u16 so the panicking pre-fix + // `as_slice::()` would fire. + let arr = narrow_codes_offsets(&arr, PType::U16); + assert_eq!( + arr.as_view().codes_offsets().dtype().as_ptype(), + PType::U16, + "codes_offsets must be u16 to exercise the regression path" + ); + + let keep: Vec = (0..n).map(|i| i % 3 == 0).collect(); + let mask = vortex_mask::Mask::from_iter(keep.iter().copied()); + let expected: Vec<&str> = strings + .iter() + .enumerate() + .filter_map(|(i, s)| keep[i].then_some(s.as_str())) + .collect(); + + let mut filter_ctx = SESSION.create_execution_ctx(); + // Pre-fix: this call panics with "Attempted to get slice of type + // u32 from array of type u16". Post-fix: succeeds. + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) + .unwrap() + .expect("OnPair filter must return Some"); + let typed = filtered.try_downcast::().expect("OnPair"); + assert_eq!(typed.len(), expected.len()); + + let mut ctx = SESSION.create_execution_ctx(); + let canonical = typed + .into_array() + .execute::(&mut ctx) + .unwrap(); + canonical + .with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got.len(), expected.len()); + for (i, want) in expected.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +/// Same regression, narrowed to u8 (smallest possible ptype) — extra +/// coverage that the macro dispatch handles every integer ptype the +/// cascading compressor might pick. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_filter_with_narrowed_codes_offsets_u8() { + let n = 100usize; + let strings: Vec = (0..n).map(|i| format!("{i}")).collect(); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + let arr = + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + let arr = narrow_codes_offsets(&arr, PType::U8); + assert_eq!(arr.as_view().codes_offsets().dtype().as_ptype(), PType::U8); + + let mask = vortex_mask::Mask::from_iter((0..n).map(|i| i % 2 == 0)); + + let mut filter_ctx = SESSION.create_execution_ctx(); + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) + .unwrap() + .expect("OnPair filter must return Some"); + assert_eq!(filtered.len(), n / 2); +} + +/// Regression: canonicalising a *sliced* OnPair array. `slice` keeps the full +/// `codes` child and only narrows `codes_offsets`, so a sliced array has a +/// non-contiguous code window (`code_start > 0` and/or `code_end < +/// codes.len()`). `onpair_decode_views` must decode exactly that window; +/// decoding the whole `codes` stream — as a boundary-agnostic whole-column +/// decoder would — yields the wrong bytes (and over-runs the output) for any +/// partial slice. `filter` never produces this shape (it rebuilds `codes` +/// contiguously), so the existing filter tests do not cover it. +#[cfg_attr(miri, ignore)] +#[test] +fn test_onpair_slice_canonicalize() -> vortex_error::VortexResult<()> { + let n = 5_000usize; + let strings: Vec = (0..n) + .map(|i| format!("https://www.example.com/items/{i:08}")) + .collect(); + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + let arr = + onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG)?.into_array(); + + // interior (start>0, end0, + // end=n), and a near-full window. + for (start, end) in [(1234usize, 1240usize), (0, 7), (4993, n), (1, n - 1)] { + let sliced = arr.clone().slice(start..end)?; + assert_eq!(sliced.len(), end - start); + assert!( + sliced.is::(), + "slice dropped OnPair encoding: got {}", + sliced.encoding_id() + ); + + let mut ctx = SESSION.create_execution_ctx(); + let canonical = sliced.execute::(&mut ctx)?; + canonical.with_iterator(|iter| { + let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); + assert_eq!(got.len(), end - start, "window {start}..{end} length"); + for (i, want) in strings[start..end].iter().enumerate() { + assert_eq!( + got[i].as_deref(), + Some(want.as_bytes()), + "window {start}..{end} row {i}" + ); + } + Ok::<_, vortex_error::VortexError>(()) + })?; + } + Ok(()) +} diff --git a/encodings/experimental/onpair/tests/big_data.rs b/encodings/experimental/onpair/tests/big_data.rs new file mode 100644 index 00000000000..81b1fa9e10b --- /dev/null +++ b/encodings/experimental/onpair/tests/big_data.rs @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! End-to-end smoke test on a realistically-sized input. Validates the +//! pure-Rust decode path and pushdown predicates end-to-end through the new +//! u16-codes layout. + +#![allow( + clippy::cast_possible_truncation, + clippy::redundant_clone, + clippy::tests_outside_test_module, + clippy::use_debug +)] + +use std::sync::LazyLock; +use std::time::Instant; + +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::session::ArraySession; +use vortex_onpair::DEFAULT_DICT12_CONFIG; +use vortex_onpair::onpair_compress; +use vortex_session::VortexSession; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +fn corpus(n: usize) -> Vec { + let templates: &[&str] = &[ + "GET /api/v1/users/{id}/profile HTTP/1.1", + "POST /api/v1/users/{id}/sessions HTTP/1.1", + "GET /static/js/app.{id}.js HTTP/1.1", + "GET /static/css/app.{id}.css HTTP/1.1", + "https://www.example.com/products/{id}", + "https://cdn.example.com/img/{id}.webp", + "https://api.example.com/v2/orders/{id}", + "ftp://files.example.com/dump/{id}.tar.gz", + "ssh://deploy@build-{id}.internal:22", + "redis://cache-{id}.svc.cluster.local:6379", + "INFO request_id={id} method=GET status=200", + "WARN request_id={id} method=POST status=429", + "ERROR request_id={id} method=PUT status=500", + ]; + let mut out = Vec::with_capacity(n); + let mut state = 0x9e37_79b9_7f4a_7c15_u64; + for _ in 0..n { + state = state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + let pick = (state as usize) % templates.len(); + let id = state as u32; + out.push(templates[pick].replace("{id}", &format!("{:08x}", id))); + } + out +} + +#[test] +#[cfg_attr(miri, ignore)] +fn smoke_100k_rows() { + let n = 100_000; + let strings = corpus(n); + let raw_bytes: usize = strings.iter().map(|s| s.len()).sum(); + + let varbin = VarBinArray::from_iter( + strings.iter().map(|s| Some(s.as_bytes())), + DType::Utf8(Nullability::NonNullable), + ); + + let t0 = Instant::now(); + let arr = onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG) + .expect("compress"); + let compress_elapsed = t0.elapsed(); + let bits = arr.bits(); + eprintln!( + "compressed {} rows ({} raw bytes) in {:?}, bits={}", + n, raw_bytes, compress_elapsed, bits + ); + + let arr_ref = arr.into_array(); + let mut ctx = SESSION.create_execution_ctx(); + + // Full canonical round-trip via the pure-Rust decoder. + let t0 = Instant::now(); + let decoded = arr_ref + .clone() + .execute::(&mut ctx) + .expect("canonicalize"); + eprintln!("canonicalized in {:?}", t0.elapsed()); + + assert_eq!(decoded.len(), n); + decoded + .with_iterator(|iter| { + for (i, got) in iter.enumerate() { + let want = strings[i].as_bytes(); + assert_eq!(got, Some(want), "row {} mismatch", i); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); + eprintln!("roundtrip OK on all {} rows", n); + + // Equality pushdown: pick a specific row's value and ensure the kernel + // finds all occurrences. + let needle_row = 42; + let needle = strings[needle_row].clone(); + let want_eq = strings.iter().filter(|s| **s == needle).count(); + let eq = arr_ref + .binary( + ConstantArray::new(needle.as_str(), n).into_array(), + Operator::Eq, + ) + .unwrap() + .execute::(&mut ctx) + .unwrap() + .into_array(); + assert_eq!(eq.as_bool_typed().true_count().unwrap(), want_eq); + eprintln!("eq pushdown matches reference count ({})", want_eq); +} diff --git a/encodings/fastlanes/src/delta/compute/cast.rs b/encodings/fastlanes/src/delta/compute/cast.rs index 43a247df9f0..99c92a06603 100644 --- a/encodings/fastlanes/src/delta/compute/cast.rs +++ b/encodings/fastlanes/src/delta/compute/cast.rs @@ -20,18 +20,31 @@ impl CastReduce for Delta { }; let source_ptype = array.dtype().as_ptype(); - // TODO(DK): narrows can be safe but we must decompress to compute the maximum value. - if target_ptype.is_signed_int() || source_ptype.bit_width() > target_ptype.bit_width() { + // Only a same-width cast (e.g. a nullability change) can be served by + // re-casting the stored components in place. Any width change must defer + // to the decompress-then-cast fallback (`Ok(None)`): + // + // * Widening cannot be done in place. `bases`/`deltas` are held in + // FastLanes transposed layout with `T::LANES` (= 1024 / bit_width) + // entries per chunk, and `T::LANES` changes with the target width. + // Re-widening the buffers element-wise preserves the *source* width's + // layout, but `delta_decompress` then reads them with the *target* + // width's lane count, decoding against a misaligned layout and + // producing wrong (and, for `onpair` dictionary offsets, non-monotonic) + // values for any array larger than a single near-empty chunk. + // * Narrowing is unsafe without first decompressing to check the max + // value fits. + if source_ptype.bit_width() != target_ptype.bit_width() { return Ok(None); } - // Signed sources need a different cast policy than the lossless widening cast - // used here. The delta bytes are stored as the result of `wrapping_sub`, so e.g. + // Signed sources need a different cast policy than the lossless cast used + // here. The delta bytes are stored as the result of `wrapping_sub`, so e.g. // a delta of -1i8 has the bit pattern 0xFF. Widening *as a value* (the cast op's // semantics) sign-extends that to 0xFFFFFFFF, which means `wrapping_add(base, delta)` // at the wider type produces a different result than at the source type — round-trip // breaks. Cross-signedness widening has the same hazard for the same reason. Fall // back to decompress-and-re-encode for both cases. - if source_ptype.is_signed_int() { + if target_ptype.is_signed_int() || source_ptype.is_signed_int() { return Ok(None); } @@ -88,6 +101,43 @@ mod tests { assert_arrays_eq!(casted, PrimitiveArray::from_iter([10u32, 20, 30, 40, 50])); } + /// Widening across more than one FastLanes chunk (len > 1024). The in-place + /// component cast is invalid here because `T::LANES` differs between source + /// and target widths, so this must fall back to decompress-then-cast. A + /// previous in-place widen produced non-monotonic values and corrupted + /// round-trips (the `onpair` dictionary-offsets panic). + #[rstest] + #[case::u8_to_u32(8)] + #[case::u16_to_u32(16)] + fn test_cast_delta_widen_multichunk(#[case] src_width: u32) { + let n = 4096usize; + let expected: Vec = (0..n as u32).map(|i| (i * 3) % 60_000).collect(); + let delta = match src_width { + 8 => Delta::try_from_primitive_array( + &PrimitiveArray::from_iter((0..n).map(|i| ((i * 3) % 250) as u8)), + &mut SESSION.create_execution_ctx(), + ), + _ => Delta::try_from_primitive_array( + &PrimitiveArray::from_iter(expected.iter().map(|&v| v as u16)), + &mut SESSION.create_execution_ctx(), + ), + } + .unwrap(); + let expected: Vec = if src_width == 8 { + (0..n).map(|i| ((i * 3) % 250) as u32).collect() + } else { + expected + }; + + let casted = delta + .into_array() + .cast(DType::Primitive(PType::U32, Nullability::NonNullable)) + .unwrap() + .execute::(&mut SESSION.create_execution_ctx()) + .unwrap(); + assert_eq!(casted.as_slice::(), expected.as_slice()); + } + #[test] fn test_cast_delta_nullable() { // DeltaArray doesn't support nullable arrays - the validity is handled at the DeltaArray level diff --git a/encodings/fastlanes/src/delta/vtable/rules.rs b/encodings/fastlanes/src/delta/vtable/rules.rs index d6892897ab5..adf0cd3cc6a 100644 --- a/encodings/fastlanes/src/delta/vtable/rules.rs +++ b/encodings/fastlanes/src/delta/vtable/rules.rs @@ -3,11 +3,15 @@ use vortex_array::arrays::slice::SliceReduceAdaptor; use vortex_array::optimizer::rules::ParentRuleSet; +// Kept (with the registration below) so the Delta cast rule can be re-enabled +// once the in-place widening is made correct; see the TODO below. +#[allow(unused_imports)] use vortex_array::scalar_fn::fns::cast::CastReduceAdaptor; use crate::delta::vtable::Delta; pub(crate) static RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&SliceReduceAdaptor(Delta)), - ParentRuleSet::lift(&CastReduceAdaptor(Delta)), + // TODO(joe): fixme, this is incorrect.. + // ParentRuleSet::lift(&CastReduceAdaptor(Delta)), ]); diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 40b0ae52aae..1adb6508828 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -30,6 +30,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } +vortex-onpair = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } @@ -48,7 +49,11 @@ vortex-session = { workspace = true } [features] # This feature enabled unstable encodings for which we don't guarantee stability. -unstable_encodings = ["dep:vortex-tensor", "vortex-zstd?/unstable_encodings"] +unstable_encodings = [ + "dep:vortex-tensor", + "dep:vortex-onpair", + "vortex-zstd?/unstable_encodings", +] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 43d0c56a80c..61c40341dbc 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -54,7 +54,11 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // String schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &string::StringDictScheme, + // Both string-fragmentation schemes are registered; the sample-based + // selector keeps whichever is smaller per column. &string::FSSTScheme, + #[cfg(feature = "unstable_encodings")] + &string::OnPairScheme, &string::StringConstantScheme, &string::NullDominatedSparseScheme, //////////////////////////////////////////////////////////////////////////////////////////////// @@ -174,7 +178,12 @@ impl BtrBlocksCompressorBuilder { /// preserves the array buffer layout for zero-conversion GPU decompression. Without it, /// interleaved Zstd compression is used. pub fn only_cuda_compatible(self) -> Self { - let builder = self.exclude_schemes([ + // String fragmentation schemes (OnPair, FSST) require host-side + // dictionary expansion at decode time, which is incompatible with + // pure-GPU decompression paths. Strip whichever string-fragment + // scheme is enabled by feature. + #[cfg_attr(not(feature = "unstable_encodings"), allow(unused_mut))] + let mut excluded: Vec = vec![ integer::SparseScheme.id(), integer::IntRLEScheme.id(), float::FloatRLEScheme.id(), @@ -182,7 +191,10 @@ impl BtrBlocksCompressorBuilder { string::StringDictScheme.id(), string::FSSTScheme.id(), binary::BinaryDictScheme.id(), - ]); + ]; + #[cfg(feature = "unstable_encodings")] + excluded.push(string::OnPairScheme.id()); + let builder = self.exclude_schemes(excluded); #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] let builder = builder.with_new_scheme(&string::ZstdBuffersScheme); diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index 55b8e5b53db..c38b6f412cc 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -919,7 +919,7 @@ pub(crate) fn rle_compress( } #[cfg(feature = "unstable_encodings")] -fn try_compress_delta( +pub(crate) fn try_compress_delta( compressor: &CascadingCompressor, child: &ArrayRef, parent_ctx: &CompressorContext, diff --git a/vortex-btrblocks/src/schemes/string.rs b/vortex-btrblocks/src/schemes/string.rs index db98559c3da..8d83aaadf1d 100644 --- a/vortex-btrblocks/src/schemes/string.rs +++ b/vortex-btrblocks/src/schemes/string.rs @@ -33,9 +33,17 @@ use crate::Scheme; use crate::SchemeExt; /// FSST (Fast Static Symbol Table) compression. +/// +/// One of the two string-fragmentation schemes in the default +/// [`crate::ALL_SCHEMES`] (alongside `OnPairScheme`); the sample-based selector +/// keeps whichever is smaller per column. FSST compresses faster, OnPair +/// usually wins on ratio. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FSSTScheme; +#[cfg(feature = "unstable_encodings")] +pub use onpair::OnPairScheme; + /// Sparse encoding for null-dominated arrays. /// /// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays. @@ -292,6 +300,191 @@ impl Scheme for ZstdBuffersScheme { } } +#[cfg(feature = "unstable_encodings")] +mod onpair { + use vortex_array::ArrayRef; + use vortex_array::Canonical; + use vortex_array::ExecutionCtx; + use vortex_array::IntoArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::primitive::PrimitiveArrayExt; + use vortex_compressor::estimate::CompressionEstimate; + use vortex_compressor::estimate::DeferredEstimate; + use vortex_compressor::scheme::SchemeId; + use vortex_error::VortexResult; + use vortex_onpair::DEFAULT_DICT12_CONFIG; + use vortex_onpair::OnPair; + use vortex_onpair::OnPairArrayExt; + use vortex_onpair::OnPairArraySlotsExt; + use vortex_onpair::onpair_compress; + + use crate::ArrayAndStats; + use crate::CascadingCompressor; + use crate::CompressorContext; + use crate::Scheme; + use crate::SchemeExt; + use crate::schemes::integer::try_compress_delta; + + /// OnPair short-string compression (dict-12). + /// + /// A default string-fragmentation scheme (alongside [`super::FSSTScheme`]) — + /// targets large columns of short-to-medium strings with high lexical + /// overlap, like URLs or log lines. Uses a learned dictionary of frequent + /// adjacent substrings (built by the OnPair trainer at compress time) and + /// 12-bit token codes stored as a u16 child, with offsets / + /// uncompressed-lengths flowing through the cascading compressor like any + /// other primitive children. + #[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub struct OnPairScheme; + + impl Scheme for OnPairScheme { + fn scheme_name(&self) -> &'static str { + "vortex.string.onpair" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_utf8() + } + + /// 4 primitive slot children flow through the cascading compressor: + /// `dict_offsets` (u32 → typically `FoR`/`BitPacked`), `codes` (u16 → + /// `FastLanes::BitPacked` to exactly `bits` = 12 by default), + /// `codes_offsets` (u32 → `FoR`), `uncompressed_lengths` (i32 → narrow + /// + `FoR`). Validity stays untouched. + fn num_children(&self) -> usize { + 4 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let utf8 = data.array_as_varbinview().into_owned(); + let onpair_array = + onpair_compress(&utf8, utf8.len(), utf8.dtype(), DEFAULT_DICT12_CONFIG)?; + + let dict_offsets = compress_offsets_child( + compressor, + onpair_array.dict_offsets(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + let codes = compress_primitive_child( + compressor, + onpair_array.codes(), + &compress_ctx, + self.id(), + 1, + exec_ctx, + )?; + let codes_offsets = compress_offsets_child( + compressor, + onpair_array.codes_offsets(), + &compress_ctx, + self.id(), + 2, + exec_ctx, + )?; + let uncompressed_lengths = compress_primitive_child( + compressor, + onpair_array.uncompressed_lengths(), + &compress_ctx, + self.id(), + 3, + exec_ctx, + )?; + + Ok(OnPair::try_new( + onpair_array.dtype().clone(), + onpair_array.dict_bytes_handle().clone(), + dict_offsets, + codes, + codes_offsets, + uncompressed_lengths, + onpair_array.array_validity(), + onpair_array.bits(), + )? + .into_array()) + } + } + + /// Narrow a primitive child to its tightest int type, then forward it to + /// the cascading compressor. + fn compress_primitive_child( + compressor: &CascadingCompressor, + child: &ArrayRef, + compress_ctx: &CompressorContext, + scheme_id: SchemeId, + child_idx: usize, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let narrowed = child + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)? + .into_array(); + compressor.compress_child(&narrowed, compress_ctx, scheme_id, child_idx, exec_ctx) + } + + /// Minimum child length before delta is even attempted. Delta carries fixed + /// overhead (a separate `bases` array plus FastLanes' 1024-element lane + /// packing), so on short children it can only lose. + const OFFSETS_DELTA_MIN_LEN: usize = 2048; + + /// Compress a monotonic offsets child. For children of at least + /// [`OFFSETS_DELTA_MIN_LEN`] it tries both the normal cascading path and a + /// delta path and keeps whichever produces fewer bytes; shorter children + /// skip delta entirely. `dict_offsets` and `codes_offsets` are cumulative + /// (monotonic), so delta (per-entry deltas) usually packs much tighter than + /// FoR+bitpacking over the full range. + fn compress_offsets_child( + compressor: &CascadingCompressor, + child: &ArrayRef, + compress_ctx: &CompressorContext, + scheme_id: SchemeId, + child_idx: usize, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let narrowed = child + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)? + .into_array(); + let plain = + compressor.compress_child(&narrowed, compress_ctx, scheme_id, child_idx, exec_ctx)?; + if narrowed.len() < OFFSETS_DELTA_MIN_LEN { + return Ok(plain); + } + let delta = try_compress_delta( + compressor, + &narrowed, + compress_ctx, + scheme_id, + child_idx, + exec_ctx, + )?; + if delta.nbytes() < plain.nbytes() { + Ok(delta) + } else { + Ok(plain) + } + } +} + #[cfg(test)] mod tests { use std::sync::LazyLock; @@ -410,8 +603,25 @@ mod scheme_selection_tests { Ok(()) } + #[cfg(feature = "unstable_encodings")] #[test] - fn test_fsst_compressed() -> VortexResult<()> { + fn test_onpair_in_default_scheme_list() { + use crate::SchemeExt; + use crate::schemes::string::OnPairScheme; + + let ids: Vec<_> = crate::ALL_SCHEMES.iter().map(|s| s.id()).collect(); + assert!( + ids.contains(&OnPairScheme.id()), + "OnPairScheme not registered in ALL_SCHEMES" + ); + } + + #[cfg(feature = "unstable_encodings")] + #[test] + fn test_onpair_compressed() -> VortexResult<()> { + // Dictionary-style string corpus: high lexical overlap, short rows. + // OnPair beats FSST on this corpus, so it wins the sample-based + // comparison even though both are registered by default. let mut strings = Vec::with_capacity(1000); for i in 0..1000 { strings.push(Some(format!( @@ -422,7 +632,48 @@ mod scheme_selection_tests { let array_ref = array.into_array(); let compressed = BtrBlocksCompressor::default() .compress(&array_ref, &mut SESSION.create_execution_ctx())?; - assert!(compressed.is::()); + assert!( + compressed.is::(), + "expected OnPair, got {}", + compressed.encoding_id() + ); + Ok(()) + } + + /// FSST is registered in the default scheme list (alongside OnPair), and an + /// FSST-only builder still produces an FSST array. + #[test] + fn test_fsst_in_default_scheme_list() -> VortexResult<()> { + use crate::BtrBlocksCompressorBuilder; + use crate::SchemeExt; + use crate::schemes::string::FSSTScheme; + + // FSST is registered by default. + assert!( + crate::ALL_SCHEMES.iter().any(|s| s.id() == FSSTScheme.id()), + "FSSTScheme should be in ALL_SCHEMES", + ); + + // An FSST-only builder still produces an FSST array for FSST-favourable + // input. + let mut strings = Vec::with_capacity(1000); + for i in 0..1000 { + strings.push(Some(format!( + "this_is_a_common_prefix_with_some_variation_{i}_and_a_common_suffix_pattern" + ))); + } + let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable)); + let array_ref = array.into_array(); + + let compressor = BtrBlocksCompressorBuilder::empty() + .with_new_scheme(&FSSTScheme) + .build(); + let compressed = compressor.compress(&array_ref, &mut SESSION.create_execution_ctx())?; + assert!( + compressed.is::(), + "expected FSST when only FSSTScheme is registered, got {}", + compressed.encoding_id() + ); Ok(()) } } diff --git a/vortex-btrblocks/tests/onpair_roundtrip.rs b/vortex-btrblocks/tests/onpair_roundtrip.rs new file mode 100644 index 00000000000..3843e02c319 --- /dev/null +++ b/vortex-btrblocks/tests/onpair_roundtrip.rs @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +// +//! End-to-end round-trip through the full Vortex compressor + decompressor +//! on string arrays. Lives in `vortex-btrblocks` (gated on `unstable_encodings`) +//! so it exercises the same code path the file writer uses, not just the +//! OnPair crate in isolation. + +#![cfg(feature = "unstable_encodings")] +#![allow( + clippy::cast_possible_truncation, + clippy::tests_outside_test_module, + clippy::use_debug +)] + +use std::sync::LazyLock; + +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::session::ArraySession; +use vortex_btrblocks::BtrBlocksCompressor; +use vortex_session::VortexSession; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +/// Helper: synthetic short-string corpus that the cascading compressor should +/// route through OnPair. +fn corpus(n: usize) -> Vec { + let templates: &[&str] = &[ + "https://www.example.com/products/{id}", + "https://cdn.example.com/img/{id}.webp", + "https://api.example.com/v2/orders/{id}", + "https://www.example.com/users/{id}/profile", + "INFO request_id={id} status=200 method=GET", + "WARN request_id={id} status=429 method=POST", + "ERROR request_id={id} status=500 method=PUT", + ]; + let mut out = Vec::with_capacity(n); + let mut state = 0x9e37_79b9_7f4a_7c15_u64; + for _ in 0..n { + state = state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + let pick = (state as usize) % templates.len(); + let id = state as u32; + out.push(templates[pick].replace("{id}", &format!("{:08x}", id))); + } + out +} + +#[test] +fn nonnullable_roundtrip_via_default_compressor() { + let n = 4096; + let strings = corpus(n); + let array = VarBinViewArray::from_iter( + strings.iter().map(|s| Some(s.as_str())), + DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + + let compressed = BtrBlocksCompressor::default() + .compress(&array, &mut SESSION.create_execution_ctx()) + .expect("compress"); + // Don't assert a specific scheme — both OnPair and FSST are registered and + // the sample-based selector keeps whichever is smaller. What matters is the + // round-trip. + + let decoded = compressed + .execute::(&mut SESSION.create_execution_ctx()) + .expect("decompress"); + assert_eq!(decoded.len(), n); + decoded + .with_iterator(|iter| { + for (i, got) in iter.enumerate() { + assert_eq!( + got, + Some(strings[i].as_bytes()), + "mismatch at row {i}: got {:?}", + got.map(|b| String::from_utf8_lossy(b).into_owned()), + ); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +#[test] +fn nullable_roundtrip_via_default_compressor() { + let n = 2048; + let strings: Vec> = corpus(n) + .into_iter() + .enumerate() + .map(|(i, s)| (i % 7 != 0).then_some(s)) + .collect(); + + let array = VarBinViewArray::from_iter( + strings.iter().map(|s| s.as_deref()), + DType::Utf8(Nullability::Nullable), + ) + .into_array(); + + let compressed = BtrBlocksCompressor::default() + .compress(&array, &mut SESSION.create_execution_ctx()) + .expect("compress"); + // Don't assert OnPair specifically here — the sample-based selector may + // pick a different scheme on tiny inputs. What matters is the round-trip. + + let decoded = compressed + .execute::(&mut SESSION.create_execution_ctx()) + .expect("decompress"); + assert_eq!(decoded.len(), n); + decoded + .with_iterator(|iter| { + for (i, got) in iter.enumerate() { + let want = strings[i].as_deref().map(str::as_bytes); + assert_eq!(got, want, "mismatch at row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +/// Larger corpus that exercises the offsets-narrowing / delta-encoding paths +/// the cascading compressor enables past 2048 entries. The decoder must +/// reconstruct absolute u32 offsets from whatever encoded shape the +/// compressor chose for each child. +#[test] +fn large_unique_short_strings_roundtrip() { + let n = 1 << 13; // 8192 rows, all unique, short. + let strings: Vec = (0..n).map(|i| format!("k{i:05x}")).collect(); + let array = VarBinViewArray::from_iter( + strings.iter().map(|s| Some(s.as_str())), + DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + + let compressed = BtrBlocksCompressor::default() + .compress(&array, &mut SESSION.create_execution_ctx()) + .expect("compress"); + + let decoded = compressed + .execute::(&mut SESSION.create_execution_ctx()) + .expect("decompress"); + assert_eq!(decoded.len(), n); + decoded + .with_iterator(|iter| { + for (i, got) in iter.enumerate() { + assert_eq!(got, Some(strings[i].as_bytes()), "row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +#[test] +fn empty_and_short_string_roundtrip() { + // Edge cases: empty strings interleaved with short ones. + let strings = vec!["", "a", "", "bb", "ccc", "", "dddd", "eeeee", ""]; + let array = VarBinViewArray::from_iter( + strings.iter().map(|s| Some(*s)), + DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + + let compressed = BtrBlocksCompressor::default() + .compress(&array, &mut SESSION.create_execution_ctx()) + .expect("compress"); + let decoded = compressed + .execute::(&mut SESSION.create_execution_ctx()) + .expect("decompress"); + decoded + .with_iterator(|iter| { + let got: Vec<_> = iter.collect(); + for (i, want) in strings.iter().enumerate() { + assert_eq!(got[i], Some(want.as_bytes()), "row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} + +/// Regression for the Euro2016 compress-bench panic +/// (`onpair::decompress`: "dictionary offsets must be nondecreasing"). +/// +/// A large, high-cardinality corpus fills the OnPair dictionary toward its +/// 4096-entry cap, so the cascading compressor narrows `dict_offsets` to `u16` +/// and Delta-encodes it across multiple FastLanes chunks (len > 1024). The old +/// decode path widened it via `arr.cast(u32).execute()`, but the `Delta` cast +/// kernel preserves the Delta wrapping and only widens the inner bases/deltas +/// in place — and the transposed bases layout is keyed on `T::LANES`, which +/// differs between `u16` and `u32`. Decoding the widened Delta against the +/// misaligned layout yields non-monotonic offsets and trips the upstream +/// assert. The fix canonicalises each child to a `PrimitiveArray` first, then +/// widens element-wise. +#[test] +fn delta_dict_offsets_roundtrip() { + let n = 1usize << 16; + // Hex-encoded index plus a hashed suffix: every row is unique with enough + // shared structure to route through OnPair while filling the dictionary. + let strings: Vec = (0..n) + .map(|i| format!("{i:016x}-{:08x}", i.wrapping_mul(2654435761))) + .collect(); + let array = VarBinViewArray::from_iter( + strings.iter().map(|s| Some(s.as_str())), + DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + let compressed = BtrBlocksCompressor::default() + .compress(&array, &mut SESSION.create_execution_ctx()) + .expect("compress"); + let decoded = compressed + .execute::(&mut SESSION.create_execution_ctx()) + .expect("decompress"); + assert_eq!(decoded.len(), n); + decoded + .with_iterator(|iter| { + for (i, got) in iter.enumerate() { + assert_eq!(got, Some(strings[i].as_bytes()), "row {i}"); + } + Ok::<_, vortex_error::VortexError>(()) + }) + .unwrap(); +} diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 77d664a12cb..c4bf980d683 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -46,6 +46,7 @@ vortex-io = { workspace = true } vortex-layout = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } +vortex-onpair = { workspace = true, optional = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } vortex-scan = { workspace = true } @@ -78,6 +79,7 @@ tokio = [ zstd = ["dep:vortex-zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"] # This feature enables unstable encodings for which we don't guarantee stability. unstable_encodings = [ + "dep:vortex-onpair", "dep:vortex-tensor", "vortex-zstd?/unstable_encodings", "vortex-btrblocks/unstable_encodings", diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index e69b5848de2..9b927abff54 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -115,6 +115,8 @@ use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; use vortex_bytebool::ByteBool; use vortex_fsst::FSST; +#[cfg(feature = "unstable_encodings")] +use vortex_onpair::OnPair; use vortex_pco::Pco; use vortex_session::VortexSession; use vortex_zigzag::ZigZag; @@ -162,6 +164,8 @@ pub fn register_default_encodings(session: &VortexSession) { arrays.register(ByteBool); arrays.register(Dict); arrays.register(FSST); + #[cfg(feature = "unstable_encodings")] + arrays.register(OnPair); arrays.register(Pco); arrays.register(ZigZag); #[cfg(feature = "zstd")] diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 661e72240d1..ed58f32e11d 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -53,6 +53,8 @@ use vortex_layout::layouts::repartition::RepartitionWriterOptions; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions; use vortex_layout::layouts::zoned::writer::ZonedStrategy; +#[cfg(feature = "unstable_encodings")] +use vortex_onpair::OnPair; use vortex_pco::Pco; use vortex_runend::RunEnd; use vortex_sequence::Sequence; @@ -102,6 +104,8 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { allowed.insert(Delta.id()); allowed.insert(FoR.id()); allowed.insert(FSST.id()); + #[cfg(feature = "unstable_encodings")] + allowed.insert(OnPair.id()); allowed.insert(Pco.id()); allowed.insert(RLE.id()); allowed.insert(RunEnd.id());