diff --git a/Cargo.lock b/Cargo.lock index d4d5236b130..4bd788683ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10517,7 +10517,6 @@ dependencies = [ name = "vortex-fastlanes" version = "0.1.0" dependencies = [ - "arrayref", "codspeed-divan-compat", "fastlanes", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 90436b4f1a9..fc302ad1b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,6 @@ anyhow = "1.0.97" arbitrary = "1.3.2" arc-swap = "1.8" arcref = "0.2.0" -arrayref = "0.3.7" arrow-arith = "58" arrow-array = "58" arrow-buffer = "58" diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index ac30b2032b4..a14e19389bc 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -17,7 +17,6 @@ version = { workspace = true } workspace = true [dependencies] -arrayref = { workspace = true } fastlanes = { workspace = true } itertools = { workspace = true } lending-iterator = { workspace = true } diff --git a/encodings/fastlanes/src/rle/array/rle_compress.rs b/encodings/fastlanes/src/rle/array/rle_compress.rs index b4ec00f6b7d..36ec3a22a03 100644 --- a/encodings/fastlanes/src/rle/array/rle_compress.rs +++ b/encodings/fastlanes/src/rle/array/rle_compress.rs @@ -1,7 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use arrayref::array_mut_ref; +use std::mem; + use fastlanes::RLE as FastLanesRLE; use vortex_array::IntoArray; use vortex_array::ToCanonical; @@ -51,38 +52,40 @@ where let mut values_idx_offsets = BufferMut::::with_capacity(len.div_ceil(FL_CHUNK_SIZE)); let values_uninit = values_buf.spare_capacity_mut(); - let indices_uninit = indices_buf.spare_capacity_mut(); + // We don't care about the trailing chunk that exists due to overallocation by the underlying allocator. + let (indices_uninit, _) = indices_buf + .spare_capacity_mut() + .as_chunks_mut::(); let mut value_count_acc = 0; // Chunk value count prefix sum. let (chunks, remainder) = values.as_chunks::(); - let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| { - // SAFETY: NativeValue is repr(transparent) - let input: &[NativeValue; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) }; - - // SAFETY: `MaybeUninit>` and `NativeValue` have the same layout. - let rle_vals: &mut [NativeValue] = - unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) }; + let mut process_chunk = + |input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [mem::MaybeUninit; FL_CHUNK_SIZE]| { + // SAFETY: NativeValue is repr(transparent) + let input: &[NativeValue; FL_CHUNK_SIZE] = unsafe { mem::transmute(input) }; + let rle_idxs: &mut [u16; FL_CHUNK_SIZE] = unsafe { mem::transmute(rle_idxs) }; - // SAFETY: `MaybeUninit` and `u16` have the same layout. - let rle_idxs: &mut [u16] = - unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) }; + // SAFETY: `MaybeUninit>` and `NativeValue` have the same layout. + let rle_vals: &mut [NativeValue] = + unsafe { mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) }; - // Capture chunk start indices. This is necessary as indices - // returned from `T::encode` are relative to the chunk. - values_idx_offsets.push(value_count_acc as u64); + // Capture chunk start indices. This is necessary as indices + // returned from `T::encode` are relative to the chunk. + values_idx_offsets.push(value_count_acc as u64); - let value_count = NativeValue::::encode( - input, - array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE], - array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE], - ); + let value_count = NativeValue::::encode( + input, + unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) }, + rle_idxs, + ); - value_count_acc += value_count; - }; + value_count_acc += value_count; + }; - for (chunk_idx, chunk_slice) in chunks.iter().enumerate() { - process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice); + for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) { + // SAFETY: `MaybeUninit` and `u16` have the same layout. + process_chunk(chunk_slice, rle_idxs); } if !remainder.is_empty() { @@ -90,7 +93,10 @@ where // accounting for an additional value change. let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE]; padded_chunk[..remainder.len()].copy_from_slice(remainder); - process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk); + // There might be more entries in indices_uninit than necessary if the allocator gave us extra memory. + // Remainder has to go to the last chunk after full chunks have been processed. + let last_idx_chunk = &mut indices_uninit[chunks.len()]; + process_chunk(&padded_chunk, last_idx_chunk); } unsafe { @@ -143,11 +149,14 @@ mod tests { use rstest::rstest; use vortex_array::IntoArray; use vortex_array::ToCanonical; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::MaskedArray; + use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::dtype::half::f16; use vortex_buffer::Buffer; use vortex_buffer::buffer; - use vortex_error::VortexExpect; + use vortex_error::VortexResult; use super::*; use crate::rle::array::RLEArrayExt; @@ -271,6 +280,186 @@ mod tests { assert_arrays_eq!(decoded, expected); } + /// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity). + /// + /// Simulates a compressor that represents indices as a masked constant. + /// Valid when every chunk has at least two RLE dictionary entries (the + /// fill-forward default at index 0 and the actual value at index 1), which + /// holds whenever the first position of each chunk is null. + fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult { + let indices_prim = rle.indices().to_primitive(); + let masked_indices = MaskedArray::try_new( + ConstantArray::new(1u16, indices_prim.len()).into_array(), + indices_prim.validity()?, + )? + .into_array(); + RLE::try_new( + rle.values().clone(), + masked_indices, + rle.values_idx_offsets().clone(), + rle.offset(), + rle.len(), + ) + } + + #[test] + fn test_encode_all_null_chunk() -> VortexResult<()> { + let values: Vec> = vec![None; FL_CHUNK_SIZE]; + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> { + // First chunk is entirely null, second chunk has a value preceded by nulls. + let mut values: Vec> = vec![None; 2 * FL_CHUNK_SIZE]; + values[FL_CHUNK_SIZE + 100] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_one_value_near_end() -> VortexResult<()> { + // Single distinct value near the end of the chunk. + let mut values: Vec> = vec![None; FL_CHUNK_SIZE]; + values[1000] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> { + // 1085 elements (2 chunks: 1024 + 61 padded to 1024). + // Chunk 0 has -1i16 at scattered positions (273..=366), rest null. + // Chunk 1 (the remainder) is entirely null. + const NEG1_POSITIONS: &[usize] = &[ + 273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296, + 298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335, + 336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358, + 359, 362, 363, 364, 366, + ]; + let mut values: Vec> = vec![None; 1085]; + for &pos in NEG1_POSITIONS { + values[pos] = Some(-1); + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + /// Replaces indices at invalid (null) positions with random garbage values. + /// + /// This simulates a compressor that doesn't preserve index values at null + /// positions, which can happen when indices are further compressed and the + /// compressor clobbers invalid entries with arbitrary data. + fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult { + let indices_prim = rle.indices().to_primitive(); + let mut indices_data: Vec = indices_prim.as_slice::().to_vec(); + + // Use a simple deterministic "random" sequence. + let mut rng_state: u32 = 0xDEAD_BEEF; + let validity = indices_prim.validity()?; + for (i, idx) in indices_data.iter_mut().enumerate() { + if !validity.is_valid(i).unwrap_or(true) { + // xorshift32 + rng_state ^= rng_state << 13; + rng_state ^= rng_state >> 17; + rng_state ^= rng_state << 5; + *idx = rng_state as u16; + } + } + + let clobbered_indices = + PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()?).into_array(); + + RLE::try_new( + rle.values().clone(), + clobbered_indices, + rle.values_idx_offsets().clone(), + rle.offset(), + rle.len(), + ) + } + + #[test] + fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> { + let values: Vec> = vec![None; FL_CHUNK_SIZE]; + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_sparse_values() -> VortexResult<()> { + let mut values: Vec> = vec![None; FL_CHUNK_SIZE]; + values[0] = Some(10); + values[500] = Some(20); + values[1000] = Some(30); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> { + // Two chunks: first has scattered values, second is all null. + let mut values: Vec> = vec![None; 2 * FL_CHUNK_SIZE]; + values[0] = Some(10); + values[500] = Some(20); + values[FL_CHUNK_SIZE + 100] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> { + // 1085 elements: chunk 0 has values at scattered positions, chunk 1 is + // a partial (61 elements padded to 1024) that is entirely null. + let mut values: Vec> = vec![None; 1085]; + for i in (100..200).step_by(7) { + values[i] = Some(i as u32); + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> { + // Most positions are valid, only a few are null with garbage indices. + let mut values: Vec> = + (0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect(); + // Sprinkle in some nulls. + for i in (0..FL_CHUNK_SIZE).step_by(37) { + values[i] = None; + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + // Regression test: RLE compression properly supports decoding pos/neg zeros // See #[rstest] diff --git a/encodings/fastlanes/src/rle/array/rle_decompress.rs b/encodings/fastlanes/src/rle/array/rle_decompress.rs index 356dcb06dbe..d9c31b5de60 100644 --- a/encodings/fastlanes/src/rle/array/rle_decompress.rs +++ b/encodings/fastlanes/src/rle/array/rle_decompress.rs @@ -1,10 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use arrayref::array_mut_ref; -use arrayref::array_ref; use fastlanes::RLE; use num_traits::AsPrimitive; +use num_traits::NumCast; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::PrimitiveArray; @@ -13,6 +12,7 @@ use vortex_array::match_each_native_ptype; use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::validity::Validity; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; @@ -55,15 +55,16 @@ where let values = values.as_slice::(); let indices = array.indices().clone().execute::(ctx)?; - let indices = indices.as_slice::(); assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE)); + let has_invalid = !indices.all_valid()?; + let (indices_sl, _) = indices.as_slice::().as_chunks::(); let chunk_start_idx = array.offset() / FL_CHUNK_SIZE; let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE); let num_chunks = chunk_end_idx - chunk_start_idx; let mut buffer = BufferMut::::with_capacity(num_chunks * FL_CHUNK_SIZE); - let buffer_uninit = buffer.spare_capacity_mut(); + let (out_buf, _) = buffer.spare_capacity_mut().as_chunks_mut::(); let values_idx_offsets = array .values_idx_offsets() @@ -71,26 +72,46 @@ where .execute::(ctx)?; let values_idx_offsets = values_idx_offsets.as_slice::(); - for chunk_idx in 0..num_chunks { + for (chunk_idx, (chunk_indices, chunk_out)) in + indices_sl.iter().zip(out_buf.iter_mut()).enumerate() + { // Offsets in `values_idx_offsets` are absolute and need to be shifted - // by the offset of the first chunk, respective the current slice, in - // order to make them relative. + // by the offset of the first chunk, respective of the current slice, + // to make them relative. let value_idx_offset = (values_idx_offsets[chunk_idx].as_() - values_idx_offsets[0].as_()) as usize; - let chunk_values = &values[value_idx_offset..]; - let chunk_indices = &indices[chunk_idx * FL_CHUNK_SIZE..]; - - // SAFETY: `MaybeUninit` and `T` have the same layout. - let buffer_values: &mut [V] = unsafe { - std::mem::transmute(&mut buffer_uninit[chunk_idx * FL_CHUNK_SIZE..][..FL_CHUNK_SIZE]) + let next_value_idx_offset = if chunk_idx + 1 < num_chunks { + (values_idx_offsets[chunk_idx + 1].as_() - values_idx_offsets[0].as_()) as usize + } else { + values.len() }; + let num_chunk_values = u16::try_from(next_value_idx_offset - value_idx_offset) + .vortex_expect("There can be at most 1024 values in RLE chunk"); - V::decode( - chunk_values, - array_ref![chunk_indices, 0, FL_CHUNK_SIZE], - array_mut_ref![buffer_values, 0, FL_CHUNK_SIZE], - ); + // SAFETY: `MaybeUninit` and `T` have the same layout. + let buffer_values: &mut [V; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(chunk_out) }; + let chunk_values = &values[value_idx_offset..]; + if num_chunk_values == 1 { + // Single-value chunk: fill directly to avoid out-of-bounds index + // access. The indices may contain values other than 0 when they + // have been further compressed (e.g., as a masked constant). + buffer_values.fill(chunk_values[0]); + } else if has_invalid { + // When the indices array has invalid (null) positions, those + // positions may contain arbitrary garbage values after further + // compression. Clamp all indices into [0, num_chunk_values) to + // prevent out-of-bounds access in the fastlanes decoder. + let mut sanitized: [u16; FL_CHUNK_SIZE] = [0; FL_CHUNK_SIZE]; + for (idx_out, idx) in sanitized.iter_mut().zip(chunk_indices) { + let idx: u16 = + NumCast::from(*idx).vortex_expect("RLE indices are always less than u16"); + *idx_out = idx.min(num_chunk_values - 1); + } + V::decode(chunk_values, &sanitized, buffer_values); + } else { + V::decode(chunk_values, chunk_indices, buffer_values); + } } unsafe {