diff --git a/Cargo.lock b/Cargo.lock index 045c72176fd..64afffd997d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9195,6 +9195,7 @@ dependencies = [ "static_assertions", "tabled", "termtree", + "test-with", "tracing", "uuid", "vortex-array", diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 666a23c02c4..76e119ae366 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -95,6 +95,7 @@ rand_distr = { workspace = true } rstest = { workspace = true } serde_json = { workspace = true } serde_test = { workspace = true } +test-with = { workspace = true } vortex-array = { path = ".", features = ["_test-harness", "table-display"] } [[bench]] diff --git a/vortex-array/src/arrays/varbinview/build_views.rs b/vortex-array/src/arrays/varbinview/build_views.rs index 87f5c84706b..a13cd62bf78 100644 --- a/vortex-array/src/arrays/varbinview/build_views.rs +++ b/vortex-array/src/arrays/varbinview/build_views.rs @@ -33,6 +33,44 @@ pub fn build_views>( ) -> (Vec, Buffer) { let mut views = BufferMut::::with_capacity(lens.len()); + // Common case: the whole decoded heap fits within a single buffer, so no rollover can occur + // (`bytes.len()` is the total decoded size and therefore an upper bound on every offset). This + // lets the hot loop drop the per-element rollover branch and construct reference views inline, + // avoiding the out-of-line `BinaryView::make_view` call for the common long-string case. + if bytes.len() <= max_buffer_len { + let data = bytes.as_slice(); + let mut offset = 0usize; + // Write directly into the reserved spare capacity rather than `push_unchecked`. The latter + // advances the backing buffer's length on every call, which the optimizer cannot prove is + // loop-invariant, so it reloads and rewrites the output cursor through the stack each + // iteration. Writing into the spare slice keeps the cursor in a register and the length is + // set once after the loop. + let spare = views.spare_capacity_mut(); + for (slot, &len) in spare.iter_mut().zip(lens) { + let len = len.as_(); + let value = &data[offset..offset + len]; + let view = if len > BinaryView::MAX_INLINED_SIZE { + let mut prefix = [0u8; 4]; + prefix.copy_from_slice(&value[..4]); + BinaryView::new_ref(len.as_(), prefix, start_buf_index, offset.as_()) + } else { + BinaryView::make_view(value, start_buf_index, offset.as_()) + }; + slot.write(view); + offset += len; + } + // SAFETY: the loop initialized exactly `lens.len()` contiguous views (`spare` has at least + // `lens.len()` slots, and `zip` stops at the shorter operand). + unsafe { views.set_len(lens.len()) }; + + let buffers = if bytes.is_empty() { + Vec::new() + } else { + vec![bytes.freeze()] + }; + return (buffers, views.freeze()); + } + let mut buffers = Vec::new(); let mut buf_index = start_buf_index; @@ -66,12 +104,275 @@ pub fn build_views>( #[cfg(test)] mod tests { + use rstest::rstest; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use crate::arrays::varbinview::BinaryView; + use crate::arrays::varbinview::build_views::MAX_BUFFER_LEN; use crate::arrays::varbinview::build_views::build_views; + /// Concatenate `values` into a single byte heap and return it alongside the per-element lengths, + /// matching the `(bytes, lens)` inputs that `build_views` consumes. + fn flatten(values: &[&[u8]]) -> (ByteBufferMut, Vec) { + let mut bytes = ByteBufferMut::empty(); + let mut lens = Vec::with_capacity(values.len()); + for v in values { + bytes.extend_from_slice(v); + lens.push(u32::try_from(v.len()).unwrap()); + } + (bytes, lens) + } + + /// Reconstruct the logical value behind each view by dereferencing it through the output + /// buffers. The first buffer corresponds to `start_buf_index`, so buffer indices are rebased by + /// that amount. This is the core correctness invariant: regardless of which code path built the + /// views, every view must point back at its original bytes. + fn reconstruct( + buffers: &[ByteBuffer], + views: &[BinaryView], + start_buf_index: u32, + ) -> Vec> { + views + .iter() + .map(|view| { + if view.is_inlined() { + view.as_inlined().value().to_vec() + } else { + let r = view.as_view(); + let buf = &buffers[(r.buffer_index - start_buf_index) as usize]; + buf[r.as_range()].to_vec() + } + }) + .collect() + } + + /// The single-buffer fast path (`bytes.len() <= max_buffer_len`) must reproduce every input + /// value exactly, emit a single output buffer holding the untouched heap, and reference only + /// `start_buf_index`. We cover a spread of value sets that mix inlined (<= 12 bytes) and + /// reference (> 12 bytes) lengths, including the 12/13 byte inline boundary, empty values, and a + /// fully-inlined set. + #[rstest] + #[case::mixed(&[b"a".as_slice(), b"this is a long reference value", b"short", b"another long value here!!"])] + #[case::inline_boundary(&[&[b'x'; 12] as &[u8], &[b'y'; 13], &[b'z'; 12], &[b'w'; 13]])] + #[case::all_inlined(&[b"".as_slice(), b"a", b"bb", b"ccc", b"dddddddddddd"])] + #[case::all_reference(&[&[b'a'; 100] as &[u8], &[b'b'; 50], &[b'c'; 4096]])] + #[case::empty_values_interleaved(&[b"".as_slice(), b"a long value that is referenced", b"", b"", b"trailing long reference value"])] + #[case::single_long(&[&[7u8; 1 << 16] as &[u8]])] + fn fast_path_roundtrip(#[case] values: &[&[u8]]) { + let (bytes, lens) = flatten(values); + let total = bytes.len(); + let start_buf_index = 3; + + // `max_buffer_len` strictly greater than the heap forces the single-buffer fast path. + let (buffers, views) = build_views(start_buf_index, total + 1, bytes, &lens); + + assert_eq!(views.len(), values.len()); + if total == 0 { + assert!(buffers.is_empty(), "empty heap must not allocate a buffer"); + } else { + assert_eq!(buffers.len(), 1, "whole heap must stay in one buffer"); + // The fast path freezes the input heap unchanged. + let concatenated: Vec = values.concat(); + assert_eq!(buffers[0].as_slice(), concatenated.as_slice()); + } + for view in views.iter() { + if !view.is_inlined() { + assert_eq!(view.as_view().buffer_index, start_buf_index); + } + } + + let expected: Vec> = values.iter().map(|v| v.to_vec()).collect(); + assert_eq!(reconstruct(&buffers, &views, start_buf_index), expected); + } + + /// Offsets and sizes are written into the `u32` `Ref` fields via `as_` truncation, so we must + /// confirm they stay correct once the running offset grows well past the 16-bit range (i.e. is + /// not narrowed to a smaller width). A ~9 MiB heap pushes offsets above 2^23 while remaining far + /// below `MAX_BUFFER_LEN`; each value encodes its index in its first bytes so a misplaced offset + /// would reconstruct the wrong bytes. + #[test] + fn fast_path_large_offsets() { + const N: usize = 9000; + const LEN: usize = 1000; + // The final offset is (N - 1) * LEN, which must exceed 2^23 to be a meaningful check. + const _: () = assert!((N - 1) * LEN > (1 << 23)); + + let values: Vec> = (0..N) + .map(|i| { + let mut v = vec![0u8; LEN]; + v[..4].copy_from_slice(&u32::try_from(i).unwrap().to_le_bytes()); + v + }) + .collect(); + let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect(); + + let (bytes, lens) = flatten(&refs); + let total = bytes.len(); + + let (buffers, views) = build_views(0, total + 1, bytes, &lens); + + assert_eq!(buffers.len(), 1); + // The recorded offset must equal the cumulative byte position, exactly, for every view. + for (i, view) in views.iter().enumerate() { + let r = view.as_view(); + assert_eq!(r.offset as usize, i * LEN, "wrong offset for view {i}"); + assert_eq!(r.size as usize, LEN); + } + assert_eq!(reconstruct(&buffers, &views, 0), values); + } + + /// The fast path is taken when `bytes.len() <= max_buffer_len`, so equality at the boundary must + /// still produce a single buffer (not roll over to the slow path). + #[test] + fn fast_path_taken_at_exact_boundary() { + let (bytes, lens) = + flatten(&[b"this value is definitely long", b"and so is this one here"]); + let total = bytes.len(); + + let (buffers, views) = build_views(0, total, bytes, &lens); + + assert_eq!( + buffers.len(), + 1, + "len == max_buffer_len must stay on fast path" + ); + assert_eq!(views.len(), 2); + } + + /// For the same logical data, the fast path (single buffer) and the slow rollover path must + /// reconstruct identical values. Driving the slow path with a small `max_buffer_len` forces + /// buffer splitting while leaving the recovered values unchanged. + #[test] + fn fast_and_slow_paths_agree() { + let values: &[&[u8]] = &[ + b"first long reference value", + b"tiny", + b"second long reference value!!", + b"third looooong reference value", + ]; + let expected: Vec> = values.iter().map(|v| v.to_vec()).collect(); + + let (fast_bytes, lens) = flatten(values); + let total = fast_bytes.len(); + let (fast_buffers, fast_views) = build_views(0, total + 1, fast_bytes, &lens); + assert_eq!(fast_buffers.len(), 1); + assert_eq!(reconstruct(&fast_buffers, &fast_views, 0), expected); + + // Force the rollover path: a small cap (>= the longest value) that the total heap exceeds. + let longest = values.iter().map(|v| v.len()).max().unwrap(); + let (slow_bytes, _) = flatten(values); + let (slow_buffers, slow_views) = build_views(0, longest, slow_bytes, &lens); + assert!( + slow_buffers.len() > 1, + "small cap should split into many buffers" + ); + assert_eq!(reconstruct(&slow_buffers, &slow_views, 0), expected); + + // Same logical contents regardless of how the heap was partitioned. + assert_eq!( + reconstruct(&fast_buffers, &fast_views, 0), + reconstruct(&slow_buffers, &slow_views, 0) + ); + } + + /// Empty input must yield no buffers and no views, exercising the `bytes.is_empty()` branch. + #[test] + fn fast_path_empty_input() { + let lens: Vec = Vec::new(); + let (buffers, views) = build_views(0, 1024, ByteBufferMut::empty(), &lens); + assert!(buffers.is_empty()); + assert!(views.is_empty()); + } + + /// The fast path must produce views byte-identical to the value-inspecting `make_view`, which is + /// what the slow path uses. This pins the inline/reference decision and field layout. + #[test] + fn fast_path_matches_make_view() { + let values: &[&[u8]] = &[b"inline", b"this is a long reference value", b""]; + let (bytes, lens) = flatten(values); + let total = bytes.len(); + let (_buffers, views) = build_views(0, total + 1, bytes, &lens); + + let expected = [ + BinaryView::make_view(b"inline", 0, 0), + BinaryView::make_view(b"this is a long reference value", 0, 6), + BinaryView::make_view(b"", 0, 36), + ]; + assert_eq!(views.as_slice(), &expected); + } + + // TODO(someone): ideally CI would run this in release mode as well, since debug builds make the + // ~2.25 GiB allocation and fill loop substantially slower. + /// Slow regression for the single-buffer fast-path guard. The fast path is only valid when the + /// whole heap fits in one buffer (`bytes.len() <= max_buffer_len`); once the heap exceeds + /// [`MAX_BUFFER_LEN`] (`i32::MAX`, ~2.0 GiB) `build_views` must roll the heap into multiple + /// buffers, resetting the per-buffer offset, so no view references an offset past the + /// `i32`-bounded buffer limit. + /// + /// We build a heap just past `i32::MAX` and assert it rolls over into more than one buffer, that + /// no buffer exceeds `MAX_BUFFER_LEN`, and that values straddling the rollover boundary (where + /// the second buffer's offsets restart from zero) reconstruct exactly. If the guard regressed and + /// the fast path swallowed the whole heap, it would emit a single >2 GiB buffer with offsets past + /// `i32::MAX`, which the buffer-count and buffer-size assertions catch. + /// + /// Allocates ~2.25 GiB, so it is gated to CI and skipped when `VORTEX_SKIP_SLOW_TESTS` is set: + /// + /// ```text + /// CI=1 cargo test --release -p vortex-array build_views_offsets_overflow + /// ``` + /// + /// [`MAX_BUFFER_LEN`]: super::MAX_BUFFER_LEN + #[test_with::env(CI)] + #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)] + fn build_views_offsets_overflow_i32() { + const STRING_LEN: usize = 64 * 1024; + // Comfortably past MAX_BUFFER_LEN (`i32::MAX` ~= 2.0 GiB) so the heap must roll over. + const TOTAL_BYTES: usize = (1usize << 31) + (256 << 20); // ~2.25 GiB + const N: usize = TOTAL_BYTES / STRING_LEN; + + // Each value's first 8 bytes encode its row index, so a misrouted offset is detectable. + let nth_string = |i: usize| { + let mut s = vec![b'x'; STRING_LEN]; + s[..8].copy_from_slice(&(i as u64).to_le_bytes()); + s + }; + + let mut bytes = ByteBufferMut::with_capacity(N * STRING_LEN); + let mut value = vec![b'x'; STRING_LEN]; + for i in 0..N { + value[..8].copy_from_slice(&(i as u64).to_le_bytes()); + bytes.extend_from_slice(&value); + } + + let lens = vec![u32::try_from(STRING_LEN).unwrap(); N]; + let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, &lens); + + assert_eq!(views.len(), N); + assert!( + buffers.len() >= 2, + "heap exceeding MAX_BUFFER_LEN must roll over into multiple buffers, got {}", + buffers.len() + ); + for (i, b) in buffers.iter().enumerate() { + assert!( + b.len() <= MAX_BUFFER_LEN, + "buffer {i} of {} bytes exceeds MAX_BUFFER_LEN", + b.len() + ); + } + + // The boundary row is the first whose offset would cross MAX_BUFFER_LEN on the fast path. + let boundary = MAX_BUFFER_LEN / STRING_LEN; + for i in [0, boundary - 1, boundary, boundary + 1, N / 2, N - 1] { + let view = &views[i]; + let r = view.as_view(); + let got = &buffers[r.buffer_index as usize][r.as_range()]; + assert_eq!(got, nth_string(i).as_slice(), "value mismatch at row {i}"); + assert_eq!(r.size as usize, STRING_LEN); + } + } + #[test] fn test_to_canonical_large() { // We are testing generating views for raw data that should look like diff --git a/vortex-array/src/arrays/varbinview/view.rs b/vortex-array/src/arrays/varbinview/view.rs index 38cbd2798c6..dd411f07cc4 100644 --- a/vortex-array/src/arrays/varbinview/view.rs +++ b/vortex-array/src/arrays/varbinview/view.rs @@ -170,6 +170,27 @@ impl BinaryView { Self { le_bytes: [0; 16] } } + /// Create a reference view directly from its components, without inspecting the value. + /// + /// `size` must be greater than [`MAX_INLINED_SIZE`], and `prefix` must hold the first four + /// bytes of the value. This is the fast path for bulk view construction where the caller has + /// already established that the value is too long to inline; it assembles the 16-byte view as a + /// single `u128` so the compiler can emit one wide store per view. + /// + /// [`MAX_INLINED_SIZE`]: Self::MAX_INLINED_SIZE + #[inline] + pub fn new_ref(size: u32, prefix: [u8; 4], buffer_index: u32, offset: u32) -> Self { + debug_assert!(size as usize > Self::MAX_INLINED_SIZE); + // Matches the little-endian field order of `Ref` (size, prefix, buffer_index, offset), + // consistent with `le_bytes` and the `From`/`as_u128` representation. + Self::from( + u128::from(size) + | (u128::from(u32::from_le_bytes(prefix)) << 32) + | (u128::from(buffer_index) << 64) + | (u128::from(offset) << 96), + ) + } + /// Create a new inlined binary view /// /// # Panics @@ -278,3 +299,31 @@ impl fmt::Debug for BinaryView { s.finish() } } + +#[cfg(test)] +#[expect( + clippy::cast_possible_truncation, + reason = "test values are bounded well within the target types" +)] +mod tests { + use super::*; + + #[test] + fn new_ref_matches_make_view() { + // `new_ref` assembles the reference view as a `u128`; it must be byte-identical to the + // value-inspecting `make_view` for any value longer than the inline limit. + for &len in &[13usize, 20, 255, 4096] { + let value: Vec = (0..len).map(|i| (i % 251) as u8).collect(); + let prefix = [value[0], value[1], value[2], value[3]]; + let made = BinaryView::make_view(&value, 7, 42); + let built = BinaryView::new_ref(len as u32, prefix, 7, 42); + assert_eq!(made.as_u128(), built.as_u128(), "mismatch at len {len}"); + assert!(!built.is_inlined()); + let r = built.as_view(); + assert_eq!(r.size, len as u32); + assert_eq!(r.prefix, prefix); + assert_eq!(r.buffer_index, 7); + assert_eq!(r.offset, 42); + } + } +}