From 2e2fde740bdf6cafcab413a07874442052ea44b9 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 08:57:06 +0000 Subject: [PATCH 1/6] Store Patched array patches untransposed with chunk offsets Remove the eager GPU lane transpose from the Patched array. Patches are now kept in their natural sorted layout with one chunk offset per 1024-element chunk, mirroring the Patches helper, and execute/slice/take/compare/filter reuse the existing untransposed patch machinery. The transpose count (n_lanes) is retained as metadata so the data-parallel GPU transpose can be re-added in the future without a format change. Signed-off-by: Claude --- vortex-array/src/arrays/patched/array.rs | 335 +++++++----------- .../src/arrays/patched/compute/compare.rs | 118 +++--- .../src/arrays/patched/compute/filter.rs | 34 +- .../src/arrays/patched/compute/take.rs | 62 ++-- vortex-array/src/arrays/patched/mod.rs | 70 +--- vortex-array/src/arrays/patched/vtable/mod.rs | 198 +++++------ .../src/arrays/patched/vtable/operations.rs | 34 +- .../src/arrays/patched/vtable/slice.rs | 46 +-- 8 files changed, 359 insertions(+), 538 deletions(-) diff --git a/vortex-array/src/arrays/patched/array.rs b/vortex-array/src/arrays/patched/array.rs index b1e5367607b..6c2e3f49cd8 100644 --- a/vortex-array/src/arrays/patched/array.rs +++ b/vortex-array/src/arrays/patched/array.rs @@ -3,34 +3,25 @@ use std::fmt::Display; use std::fmt::Formatter; -use std::ops::Range; -use vortex_buffer::Buffer; +use num_traits::AsPrimitive; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_ensure; -use vortex_error::vortex_err; use crate::ArrayRef; use crate::ArraySlots; -use crate::Canonical; use crate::ExecutionCtx; use crate::IntoArray; -use crate::LEGACY_SESSION; -use crate::VortexSessionExecute; use crate::array::Array; use crate::array::ArrayParts; use crate::array::TypedArrayRef; use crate::array_slots; use crate::arrays::Patched; use crate::arrays::PrimitiveArray; -use crate::arrays::patched::TransposedPatches; use crate::arrays::patched::patch_lanes; -use crate::buffer::BufferHandle; use crate::dtype::DType; -use crate::dtype::IntegerPType; -use crate::dtype::NativePType; -use crate::dtype::PType; use crate::match_each_native_ptype; use crate::match_each_unsigned_integer_ptype; use crate::patches::Patches; @@ -38,34 +29,47 @@ use crate::validity::Validity; #[derive(Debug, Clone)] pub struct PatchedData { - /// Number of lanes the patch indices and values have been split into. Each of the `n_chunks` - /// of 1024 values is split into `n_lanes` lanes horizontally, each lane having 1024 / n_lanes - /// values that might be patched. + /// Number of lanes that *would* be used if these patches were transposed into the + /// data-parallel GPU layout. + /// + /// The patches are stored untransposed; this value is retained only as metadata so the + /// transpose configuration can be recovered later. + // Kept so the data-parallel GPU transpose can be re-added in the future without a format change. pub(super) n_lanes: usize, - /// The offset into that first chunk that is considered in bounds. + /// The absolute offset of the first in-view element, accounting for any slicing. /// - /// The patch indices of the first chunk less than `offset` should be skipped, and the offset - /// should be subtracted out of the remaining offsets to get their final position in the - /// executed array. + /// Patch indices are stored as global positions, so the final position of a patch within the + /// executed array is `index - offset`. pub(super) offset: usize, + + /// Number of patches sliced off the start of the first in-view chunk. + /// + /// `chunk_offsets` are sliced at chunk granularity while the patches themselves are sliced at + /// element granularity, so this records how many leading patches of the first chunk fall + /// outside the view. + pub(super) offset_within_chunk: usize, } #[array_slots(Patched)] pub struct PatchedSlots { /// The inner array containing the base unpatched values. pub inner: ArrayRef, - /// The lane offsets array for locating patches within lanes. - pub lane_offsets: ArrayRef, - /// The indices of patched (exception) values. + /// The sorted global indices of patched (exception) values. pub patch_indices: ArrayRef, /// The patched (exception) values at the corresponding indices. pub patch_values: ArrayRef, + /// One offset per 1024-element chunk into `patch_indices`/`patch_values`. + pub chunk_offsets: ArrayRef, } impl Display for PatchedData { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "n_lanes: {}, offset: {}", self.n_lanes, self.offset) + write!( + f, + "n_lanes: {}, offset: {}, offset_within_chunk: {}", + self.n_lanes, self.offset, self.offset_within_chunk + ) } } @@ -94,76 +98,48 @@ impl PatchedData { slots.patch_indices.len(), slots.patch_values.len() ); + vortex_ensure!( + slots.patch_indices.dtype().is_unsigned_int(), + "PatchedArray patch indices must be unsigned integers, got {}", + slots.patch_indices.dtype() + ); Ok(()) } } pub trait PatchedArrayExt: PatchedArraySlotsExt { + /// The transpose count retained as metadata. See `PatchedData::n_lanes`. #[inline] fn n_lanes(&self) -> usize { self.n_lanes } + /// The absolute offset of the first in-view element. #[inline] fn offset(&self) -> usize { self.offset } + /// Number of patches sliced off the start of the first in-view chunk. #[inline] - fn lane_range(&self, chunk: usize, lane: usize) -> VortexResult> { - assert!(chunk * 1024 <= self.as_ref().len() + self.offset()); - assert!(lane < self.n_lanes()); - - let start = self.lane_offsets().execute_scalar( - chunk * self.n_lanes() + lane, - &mut LEGACY_SESSION.create_execution_ctx(), - )?; - let stop = self.lane_offsets().execute_scalar( - chunk * self.n_lanes() + lane + 1, - &mut LEGACY_SESSION.create_execution_ctx(), - )?; - - let start = start - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("could not cast lane_offset to usize"))?; - - let stop = stop - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("could not cast lane_offset to usize"))?; - - Ok(start..stop) + fn offset_within_chunk(&self) -> usize { + self.offset_within_chunk } - fn slice_chunks(&self, chunks: Range) -> VortexResult> { - let lane_offsets_start = chunks.start * self.n_lanes(); - let lane_offsets_stop = chunks.end * self.n_lanes() + 1; - - let sliced_lane_offsets = self - .lane_offsets() - .slice(lane_offsets_start..lane_offsets_stop)?; - let indices = self.patch_indices().clone(); - let values = self.patch_values().clone(); - - let begin = (chunks.start * 1024).saturating_sub(self.offset()); - let end = (chunks.end * 1024) - .saturating_sub(self.offset()) - .min(self.as_ref().len()); - - let offset = if chunks.start == 0 { self.offset() } else { 0 }; - let inner = self.inner().slice(begin..end)?; - let len = inner.len(); - let dtype = self.as_ref().dtype().clone(); - let slots = PatchedSlots { - inner, - lane_offsets: sliced_lane_offsets, - patch_indices: indices, - patch_values: values, + /// Reconstruct the untransposed [`Patches`] backing this array. + fn patches(&self) -> Patches { + // SAFETY: a `Patched` array is only ever constructed from valid, sorted patches with + // matching index/value lengths and chunk offsets. + unsafe { + Patches::new_unchecked( + self.as_ref().len(), + self.offset(), + self.patch_indices().clone(), + self.patch_values().clone(), + Some(self.chunk_offsets().clone()), + Some(self.offset_within_chunk()), + ) } - .into_slots(); - - Ok(unsafe { Patched::new_unchecked(dtype, len, slots, self.n_lanes(), offset) }) } } @@ -196,43 +172,57 @@ impl Patched { ); let values_ptype = patches.dtype().as_ptype(); + let n_lanes = match_each_native_ptype!(values_ptype, |V| { patch_lanes::() }); + + // Ensure the patches carry a chunk offset for every 1024-element chunk, computing them + // when the source patches don't already provide them. + let patches = match patches.chunk_offsets() { + Some(_) => patches.clone(), + None => { + let chunk_offsets = compute_chunk_offsets(patches, ctx)?; + // SAFETY: we only attach freshly computed chunk offsets to existing valid patches. + unsafe { + Patches::new_unchecked( + patches.array_len(), + patches.offset(), + patches.indices().clone(), + patches.values().clone(), + Some(chunk_offsets), + Some(0), + ) + } + } + }; + + Ok(Self::wrap(inner, &patches, n_lanes)) + } - let TransposedPatches { - n_lanes, - lane_offsets, - indices, - values, - } = transpose_patches(patches, ctx)?; - - let lane_offsets = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(lane_offsets), - PType::U32, - Validity::NonNullable, - ) - .into_array(); - let indices = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(indices), - PType::U16, - Validity::NonNullable, - ) - .into_array(); - let values = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(values), - values_ptype, - Validity::NonNullable, - ) - .into_array(); - + /// Wrap an `inner` array and untransposed `patches` (which must carry chunk offsets) into a + /// [`Patched`] array. + pub(super) fn wrap(inner: ArrayRef, patches: &Patches, n_lanes: usize) -> Array { + let chunk_offsets = patches + .chunk_offsets() + .clone() + .vortex_expect("Patched requires chunk offsets"); let dtype = inner.dtype().clone(); let len = inner.len(); let slots = PatchedSlots { inner, - lane_offsets, - patch_indices: indices, - patch_values: values, + patch_indices: patches.indices().clone(), + patch_values: patches.values().clone(), + chunk_offsets, } .into_slots(); - Ok(unsafe { Self::new_unchecked(dtype, len, slots, n_lanes, 0) }) + unsafe { + Self::new_unchecked( + dtype, + len, + slots, + n_lanes, + patches.offset(), + patches.offset_within_chunk().unwrap_or(0), + ) + } } pub(crate) unsafe fn new_unchecked( @@ -241,123 +231,56 @@ impl Patched { slots: ArraySlots, n_lanes: usize, offset: usize, + offset_within_chunk: usize, ) -> Array { unsafe { Array::from_parts_unchecked( - ArrayParts::new(Patched, dtype, len, PatchedData { n_lanes, offset }) - .with_slots(slots), + ArrayParts::new( + Patched, + dtype, + len, + PatchedData { + n_lanes, + offset, + offset_within_chunk, + }, + ) + .with_slots(slots), ) } } } -/// Transpose a set of patches from the default sorted layout into the data parallel layout. -fn transpose_patches(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult { +/// Compute one `u64` chunk offset per 1024-element chunk for a set of sorted patches. +/// +/// `chunk_offsets[c]` is the position in the patch arrays at which the patches for chunk `c` +/// begin, i.e. the number of patches whose global index is less than `c * 1024`. +fn compute_chunk_offsets(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult { let array_len = patches.array_len(); let offset = patches.offset(); + let total_chunks = array_len.div_ceil(1024); - let indices = patches - .indices() - .clone() - .execute::(ctx)? - .into_primitive(); - - let values = patches - .values() - .clone() - .execute::(ctx)? - .into_primitive(); - + let indices = patches.indices().clone().execute::(ctx)?; let indices_ptype = indices.ptype(); - let values_ptype = values.ptype(); - - let indices = indices.buffer_handle().clone().unwrap_host(); - let values = values.buffer_handle().clone().unwrap_host(); - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - match_each_native_ptype!(values_ptype, |V| { - let indices: Buffer = Buffer::from_byte_buffer(indices); - let values: Buffer = Buffer::from_byte_buffer(values); - - Ok(transpose( - indices.as_slice(), - values.as_slice(), - offset, - array_len, - )) - }) - }) -} -#[expect(clippy::cast_possible_truncation)] -fn transpose( - indices_in: &[I], - values_in: &[V], - offset: usize, - array_len: usize, -) -> TransposedPatches { - // Total number of slots is number of chunks times number of lanes. - let n_chunks = array_len.div_ceil(1024); - assert!( - n_chunks <= u32::MAX as usize, - "Cannot transpose patches for array with >= 4 trillion elements" - ); - - let n_lanes = patch_lanes::(); - - // We know upfront how many indices and values we'll have. - let mut indices_buffer = BufferMut::with_capacity(indices_in.len()); - let mut values_buffer = BufferMut::with_capacity(values_in.len()); - - // Number of patches in each chunk/lane. - let mut lane_offsets: BufferMut = BufferMut::zeroed(n_chunks * n_lanes + 1); - - // Scan the index/value pairs once to get chunk/lane counts. - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - lane_offsets[chunk * n_lanes + lane + 1] += 1; - } - - for index in 1..lane_offsets.len() { - lane_offsets[index] += lane_offsets[index - 1]; - } - - // Loop over patches, writing them to final positions. - let indices_out = indices_buffer.spare_capacity_mut(); - let values_out = values_buffer.spare_capacity_mut(); - for (index, &value) in std::iter::zip(indices_in, values_in) { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - let position = &mut lane_offsets[chunk * n_lanes + lane]; - indices_out[*position as usize].write((index % 1024) as u16); - values_out[*position as usize].write(value); - *position += 1; - } - - unsafe { - indices_buffer.set_len(indices_in.len()); - values_buffer.set_len(values_in.len()); - } - - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - lane_offsets[chunk * n_lanes + lane] -= 1; - } + let chunk_offsets = match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let indices = indices.as_slice::(); + let mut offsets: BufferMut = BufferMut::with_capacity(total_chunks); + let mut pos = 0usize; + for chunk in 0..total_chunks { + let chunk_start = chunk * 1024; + while pos < indices.len() && { + let index: usize = indices[pos].as_(); + index - offset < chunk_start + } { + pos += 1; + } + offsets.push(pos as u64); + } + offsets.freeze() + }); - TransposedPatches { - n_lanes, - lane_offsets: lane_offsets.freeze().into_byte_buffer(), - indices: indices_buffer.freeze().into_byte_buffer(), - values: values_buffer.freeze().into_byte_buffer(), - } + Ok(PrimitiveArray::new(chunk_offsets, Validity::NonNullable).into_array()) } #[cfg(test)] diff --git a/vortex-array/src/arrays/patched/compute/compare.rs b/vortex-array/src/arrays/patched/compute/compare.rs index c7d879323cb..b0e9e9e9df8 100644 --- a/vortex-array/src/arrays/patched/compute/compare.rs +++ b/vortex-array/src/arrays/patched/compute/compare.rs @@ -20,12 +20,18 @@ use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; use crate::arrays::primitive::NativeValue; use crate::builtins::ArrayBuiltins; +use crate::dtype::IntegerPType; use crate::dtype::NativePType; use crate::match_each_native_ptype; +use crate::match_each_unsigned_integer_ptype; use crate::scalar_fn::fns::binary::CompareKernel; use crate::scalar_fn::fns::operators::CompareOperator; impl CompareKernel for Patched { + #[expect( + clippy::cognitive_complexity, + reason = "complexity is from nested match_each_* macros" + )] fn compare( lhs: ArrayView<'_, Self>, rhs: &ArrayRef, @@ -59,50 +65,48 @@ impl CompareKernel for Patched { let mut bits = BitBufferMut::from_buffer(bits.unwrap_host().into_mut(), offset, len); - let lane_offsets = lhs.lane_offsets().clone().execute::(ctx)?; let indices = lhs.patch_indices().clone().execute::(ctx)?; let values = lhs.patch_values().clone().execute::(ctx)?; - let n_lanes = lhs.n_lanes(); + let indices_ptype = indices.ptype(); match_each_native_ptype!(values.ptype(), |V| { let offset = lhs.offset(); - let indices = indices.as_slice::(); let values = values.as_slice::(); let constant = constant .as_primitive() .as_::() .vortex_expect("compare constant not null"); - let apply_patches = ApplyPatches { - bits: &mut bits, - offset, - n_lanes, - lane_offsets: lane_offsets.as_slice::(), - indices, - values, - constant, - }; - - match operator { - CompareOperator::Eq => { - apply_patches.apply(|l, r| NativeValue(l) == NativeValue(r))?; + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let apply_patches = ApplyPatches { + bits: &mut bits, + offset, + indices: indices.as_slice::(), + values, + constant, + }; + + match operator { + CompareOperator::Eq => { + apply_patches.apply(|l, r| NativeValue(l) == NativeValue(r)); + } + CompareOperator::NotEq => { + apply_patches.apply(|l, r| NativeValue(l) != NativeValue(r)); + } + CompareOperator::Gt => { + apply_patches.apply(|l, r| NativeValue(l) > NativeValue(r)); + } + CompareOperator::Gte => { + apply_patches.apply(|l, r| NativeValue(l) >= NativeValue(r)); + } + CompareOperator::Lt => { + apply_patches.apply(|l, r| NativeValue(l) < NativeValue(r)); + } + CompareOperator::Lte => { + apply_patches.apply(|l, r| NativeValue(l) <= NativeValue(r)); + } } - CompareOperator::NotEq => { - apply_patches.apply(|l, r| NativeValue(l) != NativeValue(r))?; - } - CompareOperator::Gt => { - apply_patches.apply(|l, r| NativeValue(l) > NativeValue(r))?; - } - CompareOperator::Gte => { - apply_patches.apply(|l, r| NativeValue(l) >= NativeValue(r))?; - } - CompareOperator::Lt => { - apply_patches.apply(|l, r| NativeValue(l) < NativeValue(r))?; - } - CompareOperator::Lte => { - apply_patches.apply(|l, r| NativeValue(l) <= NativeValue(r))?; - } - } + }); }); let result = BoolArray::new(bits.freeze(), validity); @@ -110,49 +114,35 @@ impl CompareKernel for Patched { } } -struct ApplyPatches<'a, V: NativePType> { +struct ApplyPatches<'a, I: IntegerPType, V: NativePType> { bits: &'a mut BitBufferMut, offset: usize, - n_lanes: usize, - lane_offsets: &'a [u32], - indices: &'a [u16], + indices: &'a [I], values: &'a [V], constant: V, } -impl ApplyPatches<'_, V> { - fn apply(self, cmp: F) -> VortexResult<()> +impl ApplyPatches<'_, I, V> { + fn apply(self, cmp: F) where F: Fn(V, V) -> bool, { - for index in 0..(self.lane_offsets.len() - 1) { - let chunk = index / self.n_lanes; - - let lane_start = self.lane_offsets[index] as usize; - let lane_end = self.lane_offsets[index + 1] as usize; - - for (&patch_index, &patch_value) in std::iter::zip( - &self.indices[lane_start..lane_end], - &self.values[lane_start..lane_end], - ) { - let bit_index = chunk * 1024 + patch_index as usize; - // Skip any indices < the offset. - if bit_index < self.offset { - continue; - } - let bit_index = bit_index - self.offset; - if bit_index >= self.bits.len() { - break; - } - if cmp(patch_value, self.constant) { - self.bits.set(bit_index) - } else { - self.bits.unset(bit_index) - } + for (&patch_index, &patch_value) in std::iter::zip(self.indices, self.values) { + let bit_index: usize = patch_index.as_(); + // Skip any indices < the offset. + if bit_index < self.offset { + continue; + } + let bit_index = bit_index - self.offset; + if bit_index >= self.bits.len() { + continue; + } + if cmp(patch_value, self.constant) { + self.bits.set(bit_index) + } else { + self.bits.unset(bit_index) } } - - Ok(()) } } diff --git a/vortex-array/src/arrays/patched/compute/filter.rs b/vortex-array/src/arrays/patched/compute/filter.rs index d2c60941059..9a76d801b36 100644 --- a/vortex-array/src/arrays/patched/compute/filter.rs +++ b/vortex-array/src/arrays/patched/compute/filter.rs @@ -12,6 +12,7 @@ use crate::arrays::FilterArray; use crate::arrays::Patched; use crate::arrays::filter::FilterReduce; use crate::arrays::patched::PatchedArrayExt; +use crate::arrays::patched::PatchedArraySlotsExt; impl FilterReduce for Patched { fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { @@ -20,6 +21,11 @@ impl FilterReduce for Patched { // // This is helpful when we have a very selective filter that is clustered to a small // range. + // + // Chunk math is done relative to the position within the first chunk: the array's offset + // can be an arbitrary absolute value (after slicing) but only its remainder mod 1024 + // shifts the chunk boundaries within this array's local coordinates. + let offset_within_chunk = array.offset() % 1024; let (chunk_start, chunk_stop) = match mask.slices() { AllOr::All | AllOr::None => { // This is handled as the precondition to this method, see the FilterReduce @@ -30,34 +36,36 @@ impl FilterReduce for Patched { let (first, _) = slices[0]; let (_, last) = slices[slices.len() - 1]; - // Convert mask indices to absolute positions by adding offset ( - (array.offset() + first) / 1024, - (array.offset() + last).div_ceil(1024), + (offset_within_chunk + first) / 1024, + (offset_within_chunk + last).div_ceil(1024), ) } }; - let n_chunks = (array.offset() + array.len()).div_ceil(1024); + let n_chunks = (offset_within_chunk + array.len()).div_ceil(1024); // If all chunks already covered, there is nothing to do. if chunk_start == 0 && chunk_stop == n_chunks { return Ok(None); } - let sliced = array.slice_chunks(chunk_start..chunk_stop)?; - - // Slice the mask according to if the chunk is sliced. - // Convert chunk bounds back to mask indices by subtracting offset. - let mask_start = (chunk_start * 1024).saturating_sub(array.offset()); + // Convert chunk bounds back to local element indices, then slice the patched array down to + // the covered range. + let mask_start = (chunk_start * 1024).saturating_sub(offset_within_chunk); let mask_end = (chunk_stop * 1024) - .saturating_sub(array.offset()) + .saturating_sub(offset_within_chunk) .min(array.len()); + + let inner = array.inner().slice(mask_start..mask_end)?; + let sliced = match array.patches().slice(mask_start..mask_end)? { + Some(patches) => Patched::wrap(inner, &patches, array.n_lanes()).into_array(), + None => inner, + }; + let remainder = mask.slice(mask_start..mask_end); - Ok(Some( - FilterArray::new(sliced.into_array(), remainder).into_array(), - )) + Ok(Some(FilterArray::new(sliced, remainder).into_array())) } } diff --git a/vortex-array/src/arrays/patched/compute/take.rs b/vortex-array/src/arrays/patched/compute/take.rs index 893d18db1ac..55b5c2ad8bd 100644 --- a/vortex-array/src/arrays/patched/compute/take.rs +++ b/vortex-array/src/arrays/patched/compute/take.rs @@ -21,6 +21,10 @@ use crate::match_each_native_ptype; use crate::match_each_unsigned_integer_ptype; impl TakeExecute for Patched { + #[expect( + clippy::cognitive_complexity, + reason = "complexity is from nested match_each_* macros" + )] fn take( array: ArrayView<'_, Self>, indices: &ArrayRef, @@ -44,33 +48,30 @@ impl TakeExecute for Patched { } = inner.into_data_parts(); let indices_ptype = indices.dtype().as_ptype(); + let patch_indices = array + .patch_indices() + .clone() + .execute::(ctx)?; + let patch_indices_ptype = patch_indices.ptype(); match_each_unsigned_integer_ptype!(indices_ptype, |I| { match_each_native_ptype!(ptype, |V| { let indices = indices.clone().execute::(ctx)?; - let lane_offsets = array - .lane_offsets() - .clone() - .execute::(ctx)?; - let patch_indices = array - .patch_indices() - .clone() - .execute::(ctx)?; let patch_values = array .patch_values() .clone() .execute::(ctx)?; let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); - take_map( - output.as_mut(), - indices.as_slice::(), - array.offset(), - array.len(), - array.n_lanes(), - lane_offsets.as_slice::(), - patch_indices.as_slice::(), - patch_values.as_slice::(), - ); + match_each_unsigned_integer_ptype!(patch_indices_ptype, |P| { + take_map( + output.as_mut(), + indices.as_slice::(), + array.offset(), + array.len(), + patch_indices.as_slice::

(), + patch_values.as_slice::(), + ); + }); // SAFETY: output and validity still have same length after take_map returns. unsafe { @@ -87,33 +88,20 @@ impl TakeExecute for Patched { /// /// First, builds a hashmap from index to patch value, then uses the hashmap in a loop to collect /// the values. -#[expect(clippy::too_many_arguments)] -fn take_map( +fn take_map( output: &mut [V], indices: &[I], offset: usize, len: usize, - n_lanes: usize, - lane_offsets: &[u32], - patch_index: &[u16], + patch_index: &[P], patch_value: &[V], ) { - let n_chunks = (offset + len).div_ceil(1024); // Build a hashmap of patch_index -> values. let mut index_map = FxHashMap::with_capacity_and_hasher(patch_index.len(), Default::default()); - for chunk in 0..n_chunks { - for lane in 0..n_lanes { - let lane_start = lane_offsets[chunk * n_lanes + lane]; - let lane_end = lane_offsets[chunk * n_lanes + lane + 1]; - for i in lane_start..lane_end { - let patch_idx = patch_index[i as usize]; - let patch_value = patch_value[i as usize]; - - let index = chunk * 1024 + patch_idx as usize; - if index >= offset && index < offset + len { - index_map.insert(index - offset, patch_value); - } - } + for (&patch_idx, &patch_value) in std::iter::zip(patch_index, patch_value) { + let index: usize = patch_idx.as_(); + if index >= offset && index < offset + len { + index_map.insert(index - offset, patch_value); } } diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs index 0ba25ab7929..4b205d8527c 100644 --- a/vortex-array/src/arrays/patched/mod.rs +++ b/vortex-array/src/arrays/patched/mod.rs @@ -22,50 +22,27 @@ //! //! # Details //! -//! To patch an array, we first divide it into a set of chunks of length 1024, and then within -//! each chunk, we assign each position to a lane. The number of lanes depends on the width of -//! the underlying type. -//! -//! Thus, rather than sorting patch indices and values by their global offset, they are sorted -//! primarily by their chunk, and then subsequently by their lanes. +//! Patch indices and values are kept in their natural sorted (untransposed) layout, exactly like +//! the [`Patches`](crate::patches::Patches) helper. To allow constant-time seeking to the patches +//! belonging to a given chunk, we additionally store a `chunk_offsets` array holding one offset +//! per 1024-element chunk. //! //! The Patched array layout has 4 children //! //! * `inner`: the inner array is the one containing encoded values, including the filler values //! that need to be patched over at execution time -//! * `lane_offsets`: this is an indexing buffer that allows you to see into ranges of the other -//! two children -//! * `indices`: An array of `u16` chunk indices, indicating where within the chunk should the value -//! be overwritten by the patch value -//! * `values`: The child array containing the patch values, which should be inserted over -//! the values of the `inner` at the locations provided by `indices` -//! -//! `indices` and `values` are aligned and accessed together. -//! -//! ```text +//! * `patch_indices`: a sorted array of unsigned global indices indicating which positions of +//! `inner` should be overwritten by the patch value +//! * `patch_values`: the child array containing the patch values, which should be inserted over +//! the values of the `inner` at the locations provided by `patch_indices` +//! * `chunk_offsets`: an indexing buffer with one entry per 1024-element chunk, so that the +//! patches for chunk `c` are `patch_indices[chunk_offsets[c]..chunk_offsets[c + 1]]` //! -//! chunk 0 chunk 0 chunk 0 chunk 0 chunk 0 chunk 0 -//! lane 0 lane 1 lane 2 lane 3 lane 4 lane 5 -//! ┌────────────┬────────────┬────────────┬────────────┬────────────┬────────────┐ -//! lane_offsets │ 0 │ 0 │ 2 │ 2 │ 3 │ 5 │ ... -//! └─────┬──────┴─────┬──────┴─────┬──────┴──────┬─────┴──────┬─────┴──────┬─────┘ -//! │ │ │ │ │ │ -//! │ │ │ │ │ │ -//! ┌─────┴────────────┘ └──────┬──────┘ ┌──────┘ └─────┐ -//! │ │ │ │ -//! │ │ │ │ -//! │ │ │ │ -//! ▼────────────┬────────────┬────────────▼────────────▼────────────┬────────────▼ -//! indices │ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ -//! ├────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤ -//! values │ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ -//! └────────────┴────────────┴────────────┴────────────┴────────────┴────────────┘ -//! ``` +//! `patch_indices` and `patch_values` are aligned and accessed together. //! -//! It turns out that this layout is optimal for executing patching on GPUs, because the -//! `lane_offsets` allows each thread in a warp to seek to its patches in constant time. +//! The number of lanes that *would* be used if these patches were transposed into the +//! data-parallel GPU layout is retained as `n_lanes` metadata, but no transpose is performed: the +//! patches are stored untransposed. mod array; mod compute; @@ -75,25 +52,16 @@ use std::env; use std::sync::LazyLock; pub use array::*; -use vortex_buffer::ByteBuffer; pub use vtable::*; -/// Patches that have been transposed into GPU format. -struct TransposedPatches { - n_lanes: usize, - lane_offsets: ByteBuffer, - indices: ByteBuffer, - values: ByteBuffer, -} - -/// Number of lanes used at patch time for a value of type `V`. +/// Number of lanes that would be used at patch time for a value of type `V` if the patches were +/// transposed into the data-parallel GPU layout. /// /// This is *NOT* equal to the number of FastLanes lanes for the type `V`, rather this is going to -/// correspond to how many "lanes" we will end up copying data on. +/// correspond to how many "lanes" we would end up copying data on. /// -/// When applied on the CPU, this configuration doesn't really matter. On the GPU, it is based -/// on the number of patches involved here. -const fn patch_lanes() -> usize { +/// The patches themselves are stored untransposed; this value is retained only as metadata. +pub(crate) const fn patch_lanes() -> usize { // For types 32-bits or smaller, we use a 32 lane configuration, and for 64-bit we use 16 lanes. // This matches up with the number of lanes we use to execute copying results from bit-unpacking // from shared to global memory. diff --git a/vortex-array/src/arrays/patched/vtable/mod.rs b/vortex-array/src/arrays/patched/vtable/mod.rs index f35a13600c5..99af2df6eea 100644 --- a/vortex-array/src/arrays/patched/vtable/mod.rs +++ b/vortex-array/src/arrays/patched/vtable/mod.rs @@ -12,9 +12,9 @@ mod slice; use std::hash::Hash; use std::hash::Hasher; -use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_session::VortexSession; use vortex_session::registry::CachedId; @@ -32,7 +32,6 @@ use crate::array::ArrayView; use crate::array::VTable; use crate::array::ValidityChild; use crate::array::ValidityVTableFromChild; -use crate::arrays::Primitive; use crate::arrays::PrimitiveArray; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; @@ -41,15 +40,16 @@ use crate::arrays::patched::PatchedSlots; use crate::arrays::patched::PatchedSlotsView; use crate::arrays::patched::compute::rules::PARENT_RULES; use crate::arrays::patched::vtable::kernels::PARENT_KERNELS; -use crate::arrays::primitive::PrimitiveDataParts; use crate::buffer::BufferHandle; use crate::builders::ArrayBuilder; use crate::builders::PrimitiveBuilder; use crate::dtype::DType; +use crate::dtype::IntegerPType; use crate::dtype::NativePType; use crate::dtype::PType; use crate::match_each_native_ptype; -use crate::require_child; +use crate::match_each_unsigned_integer_ptype; +use crate::patches::Patches; use crate::serde::ArrayChildren; /// A [`Patched`]-encoded Vortex array. @@ -70,27 +70,44 @@ pub struct PatchedMetadata { #[prost(uint32, tag = "1")] pub(crate) n_patches: u32, - /// The number of lanes used for patch indexing. Must be a power of two between 1 and 128. + /// The transpose count retained as metadata. See `PatchedData::n_lanes`. #[prost(uint32, tag = "2")] pub(crate) n_lanes: u32, - /// An offset into the first chunk's patches that should be considered in-view. - /// - /// Always between 0 and 1023. - #[prost(uint32, tag = "3")] - pub(crate) offset: u32, + /// The absolute offset of the first in-view element. + #[prost(uint64, tag = "3")] + pub(crate) offset: u64, + + /// Number of patches sliced off the start of the first in-view chunk. + #[prost(uint64, tag = "4")] + pub(crate) offset_within_chunk: u64, + + /// The number of chunk offsets, one per 1024-element chunk. + #[prost(uint64, tag = "5")] + pub(crate) chunk_offsets_len: u64, + + /// The primitive type of the patch indices child array. + #[prost(enumeration = "PType", tag = "6")] + pub(crate) indices_ptype: i32, + + /// The primitive type of the chunk offsets child array. + #[prost(enumeration = "PType", tag = "7")] + pub(crate) chunk_offsets_ptype: i32, } impl ArrayHash for PatchedData { fn array_hash(&self, state: &mut H, _precision: Precision) { self.offset.hash(state); + self.offset_within_chunk.hash(state); self.n_lanes.hash(state); } } impl ArrayEq for PatchedData { fn array_eq(&self, other: &Self, _precision: Precision) -> bool { - self.offset == other.offset && self.n_lanes == other.n_lanes + self.offset == other.offset + && self.offset_within_chunk == other.offset_within_chunk + && self.n_lanes == other.n_lanes } } @@ -129,9 +146,9 @@ impl VTable for Patched { fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef { match idx { PatchedSlots::INNER => array.inner().clone(), - PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(), PatchedSlots::PATCH_INDICES => array.patch_indices().clone(), PatchedSlots::PATCH_VALUES => array.patch_values().clone(), + PatchedSlots::CHUNK_OFFSETS => array.chunk_offsets().clone(), _ => vortex_panic!("invalid child index for PatchedArray: {idx}"), } } @@ -144,7 +161,11 @@ impl VTable for Patched { PatchedMetadata { n_patches: u32::try_from(array.patch_indices().len())?, n_lanes: u32::try_from(array.n_lanes())?, - offset: u32::try_from(array.offset())?, + offset: array.offset() as u64, + offset_within_chunk: array.offset_within_chunk() as u64, + chunk_offsets_len: array.chunk_offsets().len() as u64, + indices_ptype: array.patch_indices().dtype().as_ptype() as i32, + chunk_offsets_ptype: array.chunk_offsets().dtype().as_ptype() as i32, } .encode_to_vec(), )) @@ -162,23 +183,31 @@ impl VTable for Patched { let metadata = PatchedMetadata::decode(metadata)?; let n_patches = metadata.n_patches as usize; let n_lanes = metadata.n_lanes as usize; - let offset = metadata.offset as usize; + let offset = usize::try_from(metadata.offset) + .map_err(|_| vortex_err!("patched offset does not fit in usize"))?; + let offset_within_chunk = usize::try_from(metadata.offset_within_chunk) + .map_err(|_| vortex_err!("patched offset_within_chunk does not fit in usize"))?; + let chunk_offsets_len = usize::try_from(metadata.chunk_offsets_len) + .map_err(|_| vortex_err!("patched chunk_offsets_len does not fit in usize"))?; - // n_chunks should correspond to the chunk in the `inner`. - // After slicing when offset > 0, there may be additional chunks. - let n_chunks = (len + offset).div_ceil(1024); + let indices_dtype: DType = PType::try_from(metadata.indices_ptype)?.into(); + let chunk_offsets_dtype: DType = PType::try_from(metadata.chunk_offsets_ptype)?.into(); let inner = children.get(0, dtype, len)?; - let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?; - let indices = children.get(2, PType::U16.into(), n_patches)?; - let values = children.get(3, dtype, n_patches)?; - - let data = PatchedData { n_lanes, offset }; + let indices = children.get(1, &indices_dtype, n_patches)?; + let values = children.get(2, dtype, n_patches)?; + let chunk_offsets = children.get(3, &chunk_offsets_dtype, chunk_offsets_len)?; + + let data = PatchedData { + n_lanes, + offset, + offset_within_chunk, + }; let slots = PatchedSlots { inner, - lane_offsets, patch_indices: indices, patch_values: values, + chunk_offsets, } .into_slots(); Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots)) @@ -209,10 +238,6 @@ impl VTable for Patched { array.inner().append_to_builder(builder, ctx)?; let offset = array.offset(); - let lane_offsets = array - .lane_offsets() - .clone() - .execute::(ctx)?; let indices = array .patch_indices() .clone() @@ -221,6 +246,7 @@ impl VTable for Patched { .patch_values() .clone() .execute::(ctx)?; + let indices_ptype = indices.ptype(); match_each_native_ptype!(ptype, |V| { let typed_builder = builder @@ -232,16 +258,16 @@ impl VTable for Patched { // populated by the inner.append_to_builder() call above. let output = typed_builder.values_mut(); let trailer = output.len() - len; - - apply_patches_primitive::( - &mut output[trailer..], - offset, - len, - array.n_lanes(), - lane_offsets.as_slice::(), - indices.as_slice::(), - values.as_slice::(), - ); + let values = values.as_slice::(); + + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + apply_patches_primitive::( + &mut output[trailer..], + offset, + indices.as_slice::(), + values, + ); + }); }); Ok(()) @@ -251,54 +277,30 @@ impl VTable for Patched { PatchedSlots::NAMES[idx].to_string() } - fn execute(array: Array, _ctx: &mut ExecutionCtx) -> VortexResult { - let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive); - let array = - require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive); - let array = - require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive); - let array = - require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive); - + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { let len = array.len(); - - let n_lanes = array.n_lanes; let offset = array.offset; - let slots = match array.try_into_parts() { - Ok(parts) => PatchedSlots::from_slots(parts.slots), - Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(), - }; + let offset_within_chunk = array.offset_within_chunk; - // TODO(joe): use iterative execution - let PrimitiveDataParts { - buffer, - ptype, - validity, - } = slots.inner.downcast::().into_data_parts(); - - let values = slots.patch_values.downcast::(); - let lane_offsets = slots.lane_offsets.downcast::(); - let patch_indices = slots.patch_indices.downcast::(); - - let patched_values = match_each_native_ptype!(values.ptype(), |V| { - let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); - - apply_patches_primitive::( - &mut output, - offset, + let view = PatchedSlotsView::from_slots(array.slots()); + let inner = view.inner.clone(); + // SAFETY: a `Patched` array always holds valid, sorted patches with chunk offsets. + let patches = unsafe { + Patches::new_unchecked( len, - n_lanes, - lane_offsets.as_slice::(), - patch_indices.as_slice::(), - values.as_slice::(), - ); - - let output = output.freeze(); + offset, + view.patch_indices.clone(), + view.patch_values.clone(), + Some(view.chunk_offsets.clone()), + Some(offset_within_chunk), + ) + }; - PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity) - }); + // TODO(joe): use iterative execution + let inner = inner.execute::(ctx)?; + let patched = inner.patch(&patches, ctx)?; - Ok(ExecutionResult::done(patched_values.into_array())) + Ok(ExecutionResult::done(patched.into_array())) } fn execute_parent( @@ -319,31 +321,27 @@ impl VTable for Patched { } } -/// Apply patches on top of the existing value types. -fn apply_patches_primitive( +/// Apply untransposed patches on top of an output buffer. +/// +/// Patch indices are global; a patch lands at `index - offset` and any index outside +/// `[offset, offset + output.len())` is skipped. +fn apply_patches_primitive( output: &mut [V], offset: usize, - len: usize, - n_lanes: usize, - lane_offsets: &[u32], - indices: &[u16], + indices: &[I], values: &[V], ) { - let n_chunks = (offset + len).div_ceil(1024); - for chunk in 0..n_chunks { - let start = lane_offsets[chunk * n_lanes] as usize; - let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize; - - for idx in start..stop { - // the indices slice is measured as an offset into the 1024-value chunk. - let index = chunk * 1024 + indices[idx] as usize; - if index < offset || index >= offset + len { - continue; - } - - let value = values[idx]; - output[index - offset] = value; + let len = output.len(); + for (patch, &value) in std::iter::zip(indices, values) { + let index: usize = patch.as_(); + if index < offset { + continue; + } + let position = index - offset; + if position >= len { + continue; } + output[position] = value; } } @@ -663,9 +661,9 @@ mod tests { let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array(); let slots = PatchedSlots { inner: new_inner, - lane_offsets: array.lane_offsets().clone(), patch_indices: array.patch_indices().clone(), patch_values: array.patch_values().clone(), + chunk_offsets: array.chunk_offsets().clone(), }; let array_ref = array.into_array(); diff --git a/vortex-array/src/arrays/patched/vtable/operations.rs b/vortex-array/src/arrays/patched/vtable/operations.rs index f0491666e56..0a0f1ba2cd1 100644 --- a/vortex-array/src/arrays/patched/vtable/operations.rs +++ b/vortex-array/src/arrays/patched/vtable/operations.rs @@ -6,11 +6,9 @@ use vortex_error::VortexResult; use crate::ExecutionCtx; use crate::array::ArrayView; use crate::array::OperationsVTable; -use crate::arrays::PrimitiveArray; use crate::arrays::patched::Patched; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::optimizer::ArrayOptimizer; use crate::scalar::Scalar; impl OperationsVTable for Patched { @@ -19,35 +17,9 @@ impl OperationsVTable for Patched { index: usize, ctx: &mut ExecutionCtx, ) -> VortexResult { - let chunk = (index + array.offset()) / 1024; - - #[expect( - clippy::cast_possible_truncation, - reason = "N % 1024 always fits in u16" - )] - let chunk_index = ((index + array.offset()) % 1024) as u16; - - let lane = (index + array.offset()) % array.n_lanes(); - - let range = array.lane_range(chunk, lane)?; - - // Get the range of indices corresponding to the lane, potentially decoding them to avoid - // the overhead of repeated scalar_at calls. - let patch_indices = array - .patch_indices() - .slice(range.clone())? - .optimize()? - .execute::(ctx)?; - - // NOTE: we do linear scan as lane has <= 32 patches, binary search would likely - // be slower. - for (&patch_index, idx) in std::iter::zip(patch_indices.as_slice::(), range) { - if patch_index == chunk_index { - return array - .patch_values() - .execute_scalar(idx, ctx)? - .cast(array.dtype()); - } + // Constant-time chunked lookup via the untransposed patches' chunk offsets. + if let Some(patch) = array.patches().get_patched(index)? { + return patch.cast(array.dtype()); } // Otherwise, access the underlying value. diff --git a/vortex-array/src/arrays/patched/vtable/slice.rs b/vortex-array/src/arrays/patched/vtable/slice.rs index 8655e0c7d84..3567bb0e596 100644 --- a/vortex-array/src/arrays/patched/vtable/slice.rs +++ b/vortex-array/src/arrays/patched/vtable/slice.rs @@ -11,46 +11,20 @@ use crate::array::ArrayView; use crate::arrays::Patched; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::arrays::patched::PatchedSlots; use crate::arrays::slice::SliceReduce; impl SliceReduce for Patched { fn slice(array: ArrayView<'_, Self>, range: Range) -> VortexResult> { - // We **always** slice the patches at 1024-element chunk boundaries. We keep the offset + len - // around so that when we execute we know how much to chop off. - let new_offset = (range.start + array.offset()) % 1024; - let chunk_start = (range.start + array.offset()) / 1024; - let chunk_stop = (range.end + array.offset()).div_ceil(1024); - let sliced_lane_offsets = array - .lane_offsets() - .slice((chunk_start * array.n_lanes())..(chunk_stop * array.n_lanes()) + 1)?; - - // Unlike the patches, we slice the inner to the exact range. This is handled - // at execution time by making sure to skip patch indices that are < offset - // or >= len. + // Slice the inner to the exact range; the patches keep their own offset bookkeeping and + // are sliced at chunk granularity via `Patches::slice`. let inner = array.inner().slice(range.start..range.end)?; - let len = inner.len(); - let slots = PatchedSlots { - inner, - lane_offsets: sliced_lane_offsets, - patch_indices: array.patch_indices().clone(), - patch_values: array.patch_values().clone(), + match array.patches().slice(range)? { + // Patches remain in the sliced range: rewrap them around the sliced inner. + Some(patches) => Ok(Some(Patched::wrap(inner, &patches, array.n_lanes()).into_array())), + // No patches overlap the slice, so the inner array already holds the final values. + None => Ok(Some(inner)), } - .into_slots(); - - Ok(Some( - unsafe { - Patched::new_unchecked( - array.dtype().clone(), - len, - slots, - array.n_lanes(), - new_offset, - ) - } - .into_array(), - )) } } @@ -92,9 +66,9 @@ mod tests { @r#" root: vortex.patched(u16, len=9) inner: vortex.primitive(u16, len=9) - lane_offsets: vortex.primitive(u32, len=33) - patch_indices: vortex.primitive(u16, len=3) - patch_values: vortex.primitive(u16, len=3) + patch_indices: vortex.primitive(u32, len=2) + patch_values: vortex.primitive(u16, len=2) + chunk_offsets: vortex.primitive(u64, len=1) "#); let executed = sliced.execute::(&mut ctx)?.into_primitive(); From 9f74b886fb9c97242d9e6a3ccc603a68b33edac8 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 19:45:22 +0000 Subject: [PATCH 2/6] Decode primitive Sparse arrays as Patched via a plugin Add SparsePatchedPlugin which, when the experimental Patched encoding is enabled, deserializes a primitive Sparse array as a Patched array over a constant fill. A Sparse array is logically patches on top of a constant, so this reuses the same externalization shim already used for BitPacked and ALP. Non-primitive (bool/varbin/struct/fixed-size-list) and nullable-patch sparse arrays remain Sparse, since Patched only represents primitive inners with non-null patch values. Signed-off-by: Claude --- encodings/sparse/src/lib.rs | 2 + encodings/sparse/src/plugin.rs | 201 +++++++++++++++++++++++++++++++++ vortex-file/src/lib.rs | 8 ++ 3 files changed, 211 insertions(+) create mode 100644 encodings/sparse/src/plugin.rs diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index 42b5cd46724..fcd24d0e17f 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -65,9 +65,11 @@ mod canonical; mod compute; mod kernel; mod ops; +mod plugin; mod rules; mod slice; +pub use plugin::SparsePatchedPlugin; use vortex_array::aggregate_fn::AggregateFnVTable as _; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::min_max::MinMax; diff --git a/encodings/sparse/src/plugin.rs b/encodings/sparse/src/plugin.rs new file mode 100644 index 00000000000..24cf57d9877 --- /dev/null +++ b/encodings/sparse/src/plugin.rs @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A custom [`ArrayPlugin`] that lets you load in and deserialize a `Sparse` array as a +//! `PatchedArray` that wraps a constant fill array. +//! +//! A `Sparse` array is logically a set of patches applied on top of a constant fill value, which +//! is exactly what a `Patched` array over a [`ConstantArray`] represents. This plugin externalizes +//! that representation on deserialize when the array is primitive with non-null patches, which is +//! the subset that `Patched` can represent. All other sparse arrays are returned unchanged. + +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayPlugin; +use vortex_array::ArrayRef; +use vortex_array::ArrayVTable; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::Patched; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::serde::ArrayChildren; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use crate::Sparse; +use crate::SparseExt; + +/// Custom deserialization plugin that converts a primitive `Sparse` array into a `PatchedArray` +/// holding a [`ConstantArray`] fill. +#[derive(Debug, Clone)] +pub struct SparsePatchedPlugin; + +impl ArrayPlugin for SparsePatchedPlugin { + fn id(&self) -> ArrayId { + // We reuse the existing `Sparse` ID so that we can take over its deserialization pathway. + ArrayVTable::id(&Sparse) + } + + fn serialize( + &self, + array: &ArrayRef, + session: &VortexSession, + ) -> VortexResult>> { + // Delegate to the Sparse VTable for serialization. + Sparse.serialize(array, session) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + session: &VortexSession, + ) -> VortexResult { + let sparse = Array::::try_from_parts(ArrayVTable::deserialize( + &Sparse, dtype, len, metadata, buffers, children, session, + )?) + .map_err(|_| vortex_err!("Sparse plugin should only deserialize vortex.sparse"))?; + + // `Patched` can only represent primitive inners with non-null patch values, so anything + // else (bool, varbin, struct, fixed-size-list, nullable patches) stays a Sparse array. + if !dtype.is_primitive() { + return Ok(sparse.into_array()); + } + + let patches = sparse.patches(); + let mut ctx = session.create_execution_ctx(); + if !patches.values().all_valid(&mut ctx)? { + return Ok(sparse.into_array()); + } + + let fill = ConstantArray::new(sparse.fill_scalar().clone(), len).into_array(); + let patched = Patched::from_array_and_patches(fill, &patches, &mut ctx)?; + + Ok(patched.into_array()) + } + + fn is_supported_encoding(&self, id: &ArrayId) -> bool { + id == ArrayVTable::id(&Sparse) || id == ArrayVTable::id(&Patched) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::ArrayPlugin; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::PatchedArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::patched::PatchedArraySlotsExt; + use vortex_array::buffer::BufferHandle; + use vortex_array::patches::Patches; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + use vortex_session::VortexSession; + + use super::SparsePatchedPlugin; + use crate::Sparse; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(SparsePatchedPlugin); + session + }); + + fn primitive_sparse() -> VortexResult { + let patches = Patches::new( + 10, + 0, + PrimitiveArray::from_iter([1u32, 3, 7]).into_array(), + PrimitiveArray::from_iter([10u32, 30, 70]).into_array(), + None, + )?; + Sparse::try_new_from_patches( + patches, + Scalar::primitive(0u32, vortex_array::dtype::Nullability::NonNullable), + ) + } + + fn round_trip(array: &vortex_array::ArrayRef) -> VortexResult { + let metadata = SESSION.array_serialize(array)?.unwrap(); + let children = array.children(); + let buffers = array + .buffers() + .into_iter() + .map(BufferHandle::new_host) + .collect::>(); + + SparsePatchedPlugin.deserialize( + array.dtype(), + array.len(), + &metadata, + &buffers, + &children, + &SESSION, + ) + } + + #[test] + fn primitive_sparse_becomes_patched() -> VortexResult<()> { + let sparse = primitive_sparse()?.into_array(); + let deserialized = round_trip(&sparse)?; + + let patched: PatchedArray = deserialized + .try_downcast() + .map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?; + + // The inner is the constant fill. + assert!(patched.inner().as_constant().is_some()); + + // The decoded values must match the original sparse array. + let mut ctx = SESSION.create_execution_ctx(); + let expected = sparse + .execute::(&mut ctx)? + .into_buffer::(); + let actual = patched + .into_array() + .execute::(&mut ctx)? + .into_buffer::(); + assert_eq!(expected, actual); + + Ok(()) + } + + #[test] + fn non_primitive_sparse_stays_sparse() -> VortexResult<()> { + use vortex_array::arrays::BoolArray; + + let patches = Patches::new( + 5, + 0, + PrimitiveArray::from_iter([1u32, 3]).into_array(), + BoolArray::from_iter([true, false]).into_array(), + None, + )?; + let sparse = Sparse::try_new_from_patches( + patches, + Scalar::bool(false, vortex_array::dtype::Nullability::NonNullable), + )? + .into_array(); + + let deserialized = round_trip(&sparse)?; + assert!( + deserialized.is::(), + "non-primitive sparse should stay sparse, got {}", + deserialized.encoding_id() + ); + + Ok(()) + } +} diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index e69b5848de2..5d4c520cd98 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -117,6 +117,7 @@ use vortex_bytebool::ByteBool; use vortex_fsst::FSST; use vortex_pco::Pco; use vortex_session::VortexSession; +use vortex_sparse::SparsePatchedPlugin; use vortex_zigzag::ZigZag; pub use writer::*; @@ -183,6 +184,13 @@ pub fn register_default_encodings(session: &VortexSession) { vortex_sequence::initialize(session); vortex_sparse::initialize(session); + if use_experimental_patches() { + // Override Sparse registration with a plugin that decodes primitive Sparse arrays + // as a Patched array over a constant fill. Must run after `vortex_sparse::initialize` + // so it takes priority for the same encoding id. + session.arrays().register(SparsePatchedPlugin); + } + #[cfg(feature = "unstable_encodings")] vortex_tensor::initialize(session); } From 0a4cd32bd4fa6be1cb04a682ac01d884792a6ef1 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 20:45:19 +0000 Subject: [PATCH 3/6] Add benchmark comparing fused vs two-pass patch decode Microbenchmark isolating loop ordering for patched bitpacked decode: unpack every 1024-block then scatter all patches, versus unpacking one block and patching it while still hot in cache. Both do identical work, so any delta is cache locality. Fused wins 20-40% once the output exceeds L2, peaking around moderate patch density. Signed-off-by: Claude --- encodings/fastlanes/Cargo.toml | 4 + .../benches/patched_unpack_locality.rs | 189 ++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 encodings/fastlanes/benches/patched_unpack_locality.rs diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index 08c96c481d7..b418f9699a8 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -68,3 +68,7 @@ harness = false name = "cast_bitpacked" harness = false required-features = ["_test-harness"] + +[[bench]] +name = "patched_unpack_locality" +harness = false diff --git a/encodings/fastlanes/benches/patched_unpack_locality.rs b/encodings/fastlanes/benches/patched_unpack_locality.rs new file mode 100644 index 00000000000..9273210333e --- /dev/null +++ b/encodings/fastlanes/benches/patched_unpack_locality.rs @@ -0,0 +1,189 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Microbenchmark isolating the cache-locality question for patched bitpacked arrays: +//! is it faster to unpack every 1024-element block and *then* scatter all patches +//! (`unpack_then_patch`), or to unpack one block and immediately patch it while the +//! freshly-decoded block is still hot in cache (`fused`)? +//! +//! Both strategies perform identical total work (same unpack kernel calls, same number of +//! patch stores); only the loop ordering differs, so any delta is attributable to locality. + +#![expect(clippy::cast_possible_truncation)] + +use divan::Bencher; +use fastlanes::BitPacking; + +fn main() { + // Correctness guard: both strategies must produce identical output. + let case = (1usize << 18, 9u8, 50u32); + let data = Setup::new(case.0, case.1, case.2); + let a = { + let mut out = vec![0u32; data.n_padded]; + unpack_then_patch(&data, &mut out); + out + }; + let b = { + let mut out = vec![0u32; data.n_padded]; + fused(&data, &mut out); + out + }; + assert_eq!(a, b, "fused and unpack_then_patch must agree"); + + divan::main(); +} + +/// (num_values, bit_width, patch_stride) — one patch every `patch_stride` elements. +const CASES: &[(usize, u8, u32)] = &[ + // 256 KiB output, fits in L2. + (1 << 16, 9, 200), + (1 << 16, 9, 20), + (1 << 16, 9, 5), + // 1 MiB output, around L2. + (1 << 18, 9, 200), + (1 << 18, 9, 20), + (1 << 18, 9, 5), + // 4 MiB output, exceeds typical L2. + (1 << 20, 9, 200), + (1 << 20, 9, 20), + (1 << 20, 9, 5), + // 16 MiB output, far exceeds L2. + (1 << 22, 9, 200), + (1 << 22, 9, 20), + (1 << 22, 9, 5), +]; + +struct Setup { + bit_width: usize, + elems_per_chunk: usize, + num_chunks: usize, + n_padded: usize, + packed: Vec, + /// Patch indices, globally sorted. + indices: Vec, + /// Patch values, parallel to `indices`. + values: Vec, + /// `chunk_offsets[c]..chunk_offsets[c + 1]` is the patch range for chunk `c`. + chunk_offsets: Vec, +} + +impl Setup { + fn new(n: usize, bit_width: u8, patch_stride: u32) -> Self { + let bit_width = bit_width as usize; + let num_chunks = n.div_ceil(1024); + let n_padded = num_chunks * 1024; + let elems_per_chunk = 1024 * bit_width / 32; + let mask = if bit_width == 32 { + u32::MAX + } else { + (1u32 << bit_width) - 1 + }; + + // Deterministic, low-entropy values that fit in `bit_width` bits. + let values_in: Vec = (0..n_padded as u32) + .map(|i| i.wrapping_mul(2654435761) & mask) + .collect(); + + let mut packed = vec![0u32; num_chunks * elems_per_chunk]; + for c in 0..num_chunks { + // SAFETY: input is exactly 1024 elements, output exactly `elems_per_chunk`. + unsafe { + BitPacking::unchecked_pack( + bit_width, + &values_in[c * 1024..][..1024], + &mut packed[c * elems_per_chunk..][..elems_per_chunk], + ); + } + } + + // Uniformly-spread patches: one every `patch_stride` elements. + let stride = patch_stride as usize; + let mut indices = Vec::new(); + let mut values = Vec::new(); + let mut chunk_offsets = vec![0usize; num_chunks + 1]; + let mut idx = 0usize; + while idx < n_padded { + indices.push(idx); + values.push(0xDEAD_BEEF ^ idx as u32); + idx += stride; + } + // Build chunk offsets from the sorted indices. + let mut p = 0usize; + for c in 0..num_chunks { + let chunk_end = (c + 1) * 1024; + while p < indices.len() && indices[p] < chunk_end { + p += 1; + } + chunk_offsets[c + 1] = p; + } + + Self { + bit_width, + elems_per_chunk, + num_chunks, + n_padded, + packed, + indices, + values, + chunk_offsets, + } + } +} + +/// Approach A: unpack every block into the output, then scatter all patches in a second pass. +#[inline] +fn unpack_then_patch(s: &Setup, output: &mut [u32]) { + for c in 0..s.num_chunks { + // SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024. + unsafe { + BitPacking::unchecked_unpack( + s.bit_width, + &s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk], + &mut output[c * 1024..][..1024], + ); + } + } + for (i, &idx) in s.indices.iter().enumerate() { + output[idx] = s.values[i]; + } +} + +/// Approach B: unpack one block and immediately patch it while still hot in cache. +#[inline] +fn fused(s: &Setup, output: &mut [u32]) { + for c in 0..s.num_chunks { + // SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024. + unsafe { + BitPacking::unchecked_unpack( + s.bit_width, + &s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk], + &mut output[c * 1024..][..1024], + ); + } + for p in s.chunk_offsets[c]..s.chunk_offsets[c + 1] { + output[s.indices[p]] = s.values[p]; + } + } +} + +#[divan::bench(args = CASES)] +fn unpack_then_patch_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) { + let setup = Setup::new(n, bit_width, stride); + bencher + .with_inputs(|| vec![0u32; setup.n_padded]) + .bench_local_values(|mut output| { + unpack_then_patch(&setup, &mut output); + divan::black_box(output); + }); +} + +#[divan::bench(args = CASES)] +fn fused_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) { + let setup = Setup::new(n, bit_width, stride); + bencher + .with_inputs(|| vec![0u32; setup.n_padded]) + .bench_local_values(|mut output| { + fused(&setup, &mut output); + divan::black_box(output); + }); +} From b38b783881d19fcf5e74a41d762363ab4eaddff5 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 27 May 2026 13:41:49 +0100 Subject: [PATCH 4/6] chore[array]: rustfmt Patched slice.rs Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/patched/vtable/slice.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vortex-array/src/arrays/patched/vtable/slice.rs b/vortex-array/src/arrays/patched/vtable/slice.rs index 3567bb0e596..124fa72c82a 100644 --- a/vortex-array/src/arrays/patched/vtable/slice.rs +++ b/vortex-array/src/arrays/patched/vtable/slice.rs @@ -21,7 +21,9 @@ impl SliceReduce for Patched { match array.patches().slice(range)? { // Patches remain in the sliced range: rewrap them around the sliced inner. - Some(patches) => Ok(Some(Patched::wrap(inner, &patches, array.n_lanes()).into_array())), + Some(patches) => Ok(Some( + Patched::wrap(inner, &patches, array.n_lanes()).into_array(), + )), // No patches overlap the slice, so the inner array already holds the final values. None => Ok(Some(inner)), } From bcc8e774eaf0d32a1e93903a538652c9a2bcfbe6 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 27 May 2026 18:27:45 +0100 Subject: [PATCH 5/6] fix Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/patched/array.rs | 30 +----- .../src/arrays/patched/compute/filter.rs | 77 +++++---------- .../src/arrays/patched/compute/rules.rs | 7 +- .../src/arrays/patched/compute/take.rs | 95 +++---------------- vortex-array/src/arrays/patched/mod.rs | 21 +--- .../src/arrays/patched/vtable/kernels.rs | 2 + vortex-array/src/arrays/patched/vtable/mod.rs | 12 +-- .../src/arrays/patched/vtable/slice.rs | 4 +- 8 files changed, 46 insertions(+), 202 deletions(-) diff --git a/vortex-array/src/arrays/patched/array.rs b/vortex-array/src/arrays/patched/array.rs index 6c2e3f49cd8..86198889622 100644 --- a/vortex-array/src/arrays/patched/array.rs +++ b/vortex-array/src/arrays/patched/array.rs @@ -20,23 +20,13 @@ use crate::array::TypedArrayRef; use crate::array_slots; use crate::arrays::Patched; use crate::arrays::PrimitiveArray; -use crate::arrays::patched::patch_lanes; use crate::dtype::DType; -use crate::match_each_native_ptype; use crate::match_each_unsigned_integer_ptype; use crate::patches::Patches; use crate::validity::Validity; #[derive(Debug, Clone)] pub struct PatchedData { - /// Number of lanes that *would* be used if these patches were transposed into the - /// data-parallel GPU layout. - /// - /// The patches are stored untransposed; this value is retained only as metadata so the - /// transpose configuration can be recovered later. - // Kept so the data-parallel GPU transpose can be re-added in the future without a format change. - pub(super) n_lanes: usize, - /// The absolute offset of the first in-view element, accounting for any slicing. /// /// Patch indices are stored as global positions, so the final position of a patch within the @@ -67,8 +57,8 @@ impl Display for PatchedData { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "n_lanes: {}, offset: {}, offset_within_chunk: {}", - self.n_lanes, self.offset, self.offset_within_chunk + "offset: {}, offset_within_chunk: {}", + self.offset, self.offset_within_chunk ) } } @@ -108,12 +98,6 @@ impl PatchedData { } pub trait PatchedArrayExt: PatchedArraySlotsExt { - /// The transpose count retained as metadata. See `PatchedData::n_lanes`. - #[inline] - fn n_lanes(&self) -> usize { - self.n_lanes - } - /// The absolute offset of the first in-view element. #[inline] fn offset(&self) -> usize { @@ -171,9 +155,6 @@ impl Patched { "PatchedArray cannot be built from Patches with nulls" ); - let values_ptype = patches.dtype().as_ptype(); - let n_lanes = match_each_native_ptype!(values_ptype, |V| { patch_lanes::() }); - // Ensure the patches carry a chunk offset for every 1024-element chunk, computing them // when the source patches don't already provide them. let patches = match patches.chunk_offsets() { @@ -194,12 +175,12 @@ impl Patched { } }; - Ok(Self::wrap(inner, &patches, n_lanes)) + Ok(Self::wrap(inner, &patches)) } /// Wrap an `inner` array and untransposed `patches` (which must carry chunk offsets) into a /// [`Patched`] array. - pub(super) fn wrap(inner: ArrayRef, patches: &Patches, n_lanes: usize) -> Array { + pub(super) fn wrap(inner: ArrayRef, patches: &Patches) -> Array { let chunk_offsets = patches .chunk_offsets() .clone() @@ -218,7 +199,6 @@ impl Patched { dtype, len, slots, - n_lanes, patches.offset(), patches.offset_within_chunk().unwrap_or(0), ) @@ -229,7 +209,6 @@ impl Patched { dtype: DType, len: usize, slots: ArraySlots, - n_lanes: usize, offset: usize, offset_within_chunk: usize, ) -> Array { @@ -240,7 +219,6 @@ impl Patched { dtype, len, PatchedData { - n_lanes, offset, offset_within_chunk, }, diff --git a/vortex-array/src/arrays/patched/compute/filter.rs b/vortex-array/src/arrays/patched/compute/filter.rs index 9a76d801b36..73a4430cd56 100644 --- a/vortex-array/src/arrays/patched/compute/filter.rs +++ b/vortex-array/src/arrays/patched/compute/filter.rs @@ -2,70 +2,39 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; -use vortex_mask::AllOr; use vortex_mask::Mask; use crate::ArrayRef; +use crate::Canonical; +use crate::ExecutionCtx; use crate::IntoArray; use crate::array::ArrayView; -use crate::arrays::FilterArray; use crate::arrays::Patched; -use crate::arrays::filter::FilterReduce; +use crate::arrays::filter::FilterKernel; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -impl FilterReduce for Patched { - fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { - // Find the contiguous chunk range that the mask covers. We use this to slice the inner - // components, then wrap the rest up with another FilterArray. - // - // This is helpful when we have a very selective filter that is clustered to a small - // range. - // - // Chunk math is done relative to the position within the first chunk: the array's offset - // can be an arbitrary absolute value (after slicing) but only its remainder mod 1024 - // shifts the chunk boundaries within this array's local coordinates. - let offset_within_chunk = array.offset() % 1024; - let (chunk_start, chunk_stop) = match mask.slices() { - AllOr::All | AllOr::None => { - // This is handled as the precondition to this method, see the FilterReduce - // documentation. - unreachable!("mask must be a MaskValues here") - } - AllOr::Some(slices) => { - let (first, _) = slices[0]; - let (_, last) = slices[slices.len() - 1]; - - ( - (offset_within_chunk + first) / 1024, - (offset_within_chunk + last).div_ceil(1024), - ) - } - }; - - let n_chunks = (offset_within_chunk + array.len()).div_ceil(1024); - - // If all chunks already covered, there is nothing to do. - if chunk_start == 0 && chunk_stop == n_chunks { - return Ok(None); +impl FilterKernel for Patched { + fn filter( + array: ArrayView<'_, Self>, + mask: &Mask, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let filtered_inner = array + .inner() + .clone() + .filter(mask.clone())? + .execute::(ctx)? + .into_array(); + + let filtered_patches = array.patches().filter(mask, ctx)?; + + match filtered_patches { + None => Ok(Some(filtered_inner)), + Some(patches) => Ok(Some( + Patched::from_array_and_patches(filtered_inner, &patches, ctx)?.into_array(), + )), } - - // Convert chunk bounds back to local element indices, then slice the patched array down to - // the covered range. - let mask_start = (chunk_start * 1024).saturating_sub(offset_within_chunk); - let mask_end = (chunk_stop * 1024) - .saturating_sub(offset_within_chunk) - .min(array.len()); - - let inner = array.inner().slice(mask_start..mask_end)?; - let sliced = match array.patches().slice(mask_start..mask_end)? { - Some(patches) => Patched::wrap(inner, &patches, array.n_lanes()).into_array(), - None => inner, - }; - - let remainder = mask.slice(mask_start..mask_end); - - Ok(Some(FilterArray::new(sliced, remainder).into_array())) } } diff --git a/vortex-array/src/arrays/patched/compute/rules.rs b/vortex-array/src/arrays/patched/compute/rules.rs index 3ecb25c1efa..9146b78960f 100644 --- a/vortex-array/src/arrays/patched/compute/rules.rs +++ b/vortex-array/src/arrays/patched/compute/rules.rs @@ -2,11 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use crate::arrays::Patched; -use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduceAdaptor; use crate::optimizer::rules::ParentRuleSet; -pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ - ParentRuleSet::lift(&FilterReduceAdaptor(Patched)), - ParentRuleSet::lift(&SliceReduceAdaptor(Patched)), -]); +pub(crate) const PARENT_RULES: ParentRuleSet = + ParentRuleSet::new(&[ParentRuleSet::lift(&SliceReduceAdaptor(Patched))]); diff --git a/vortex-array/src/arrays/patched/compute/take.rs b/vortex-array/src/arrays/patched/compute/take.rs index 55b5c2ad8bd..bd1f59da3c8 100644 --- a/vortex-array/src/arrays/patched/compute/take.rs +++ b/vortex-array/src/arrays/patched/compute/take.rs @@ -1,30 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use rustc_hash::FxHashMap; -use vortex_buffer::Buffer; use vortex_error::VortexResult; use crate::ArrayRef; +use crate::Canonical; use crate::ExecutionCtx; use crate::IntoArray; use crate::array::ArrayView; use crate::arrays::Patched; -use crate::arrays::PrimitiveArray; use crate::arrays::dict::TakeExecute; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::arrays::primitive::PrimitiveDataParts; -use crate::dtype::IntegerPType; -use crate::dtype::NativePType; -use crate::match_each_native_ptype; -use crate::match_each_unsigned_integer_ptype; impl TakeExecute for Patched { - #[expect( - clippy::cognitive_complexity, - reason = "complexity is from nested match_each_* macros" - )] fn take( array: ArrayView<'_, Self>, indices: &ArrayRef, @@ -35,82 +24,20 @@ impl TakeExecute for Patched { return Ok(None); } - // Perform take on the inner array, including the placeholders. - let inner = array + let taken_inner = array .inner() - .take(indices.clone())? - .execute::(ctx)?; - - let PrimitiveDataParts { - buffer, - validity, - ptype, - } = inner.into_data_parts(); - - let indices_ptype = indices.dtype().as_ptype(); - let patch_indices = array - .patch_indices() .clone() - .execute::(ctx)?; - let patch_indices_ptype = patch_indices.ptype(); - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - match_each_native_ptype!(ptype, |V| { - let indices = indices.clone().execute::(ctx)?; - let patch_values = array - .patch_values() - .clone() - .execute::(ctx)?; - let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); - match_each_unsigned_integer_ptype!(patch_indices_ptype, |P| { - take_map( - output.as_mut(), - indices.as_slice::(), - array.offset(), - array.len(), - patch_indices.as_slice::

(), - patch_values.as_slice::(), - ); - }); - - // SAFETY: output and validity still have same length after take_map returns. - unsafe { - Ok(Some( - PrimitiveArray::new_unchecked(output.freeze(), validity).into_array(), - )) - } - }) - }) - } -} + .take(indices.clone())? + .execute::(ctx)? + .into_array(); -/// Take patches for the given `indices` and apply them onto an `output` using a hash map. -/// -/// First, builds a hashmap from index to patch value, then uses the hashmap in a loop to collect -/// the values. -fn take_map( - output: &mut [V], - indices: &[I], - offset: usize, - len: usize, - patch_index: &[P], - patch_value: &[V], -) { - // Build a hashmap of patch_index -> values. - let mut index_map = FxHashMap::with_capacity_and_hasher(patch_index.len(), Default::default()); - for (&patch_idx, &patch_value) in std::iter::zip(patch_index, patch_value) { - let index: usize = patch_idx.as_(); - if index >= offset && index < offset + len { - index_map.insert(index - offset, patch_value); - } - } + let taken_patches = array.patches().take(indices, ctx)?; - // Now, iterate the take indices using the prebuilt hashmap. - // Undefined/null indices will miss the hash map, which we can ignore. - for (output_index, index) in indices.iter().enumerate() { - let index = index.as_(); - if let Some(&patch_value) = index_map.get(&index) { - output[output_index] = patch_value; + match taken_patches { + None => Ok(Some(taken_inner)), + Some(patches) => Ok(Some( + Patched::from_array_and_patches(taken_inner, &patches, ctx)?.into_array(), + )), } } } diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs index 4b205d8527c..47630506d90 100644 --- a/vortex-array/src/arrays/patched/mod.rs +++ b/vortex-array/src/arrays/patched/mod.rs @@ -38,11 +38,8 @@ //! * `chunk_offsets`: an indexing buffer with one entry per 1024-element chunk, so that the //! patches for chunk `c` are `patch_indices[chunk_offsets[c]..chunk_offsets[c + 1]]` //! -//! `patch_indices` and `patch_values` are aligned and accessed together. -//! -//! The number of lanes that *would* be used if these patches were transposed into the -//! data-parallel GPU layout is retained as `n_lanes` metadata, but no transpose is performed: the -//! patches are stored untransposed. +//! `patch_indices` and `patch_values` are aligned and accessed together. The patches are stored +//! untransposed. mod array; mod compute; @@ -54,20 +51,6 @@ use std::sync::LazyLock; pub use array::*; pub use vtable::*; -/// Number of lanes that would be used at patch time for a value of type `V` if the patches were -/// transposed into the data-parallel GPU layout. -/// -/// This is *NOT* equal to the number of FastLanes lanes for the type `V`, rather this is going to -/// correspond to how many "lanes" we would end up copying data on. -/// -/// The patches themselves are stored untransposed; this value is retained only as metadata. -pub(crate) const fn patch_lanes() -> usize { - // For types 32-bits or smaller, we use a 32 lane configuration, and for 64-bit we use 16 lanes. - // This matches up with the number of lanes we use to execute copying results from bit-unpacking - // from shared to global memory. - if size_of::() < 8 { 32 } else { 16 } -} - /// Flag indicating if experimental patched array support is enabled. /// /// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`. diff --git a/vortex-array/src/arrays/patched/vtable/kernels.rs b/vortex-array/src/arrays/patched/vtable/kernels.rs index 7994b19e02e..e5e8a91b2ac 100644 --- a/vortex-array/src/arrays/patched/vtable/kernels.rs +++ b/vortex-array/src/arrays/patched/vtable/kernels.rs @@ -3,10 +3,12 @@ use crate::arrays::Patched; use crate::arrays::dict::TakeExecuteAdaptor; +use crate::arrays::filter::FilterExecuteAdaptor; use crate::kernel::ParentKernelSet; use crate::scalar_fn::fns::binary::CompareExecuteAdaptor; pub(super) const PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ ParentKernelSet::lift(&CompareExecuteAdaptor(Patched)), + ParentKernelSet::lift(&FilterExecuteAdaptor(Patched)), ParentKernelSet::lift(&TakeExecuteAdaptor(Patched)), ]); diff --git a/vortex-array/src/arrays/patched/vtable/mod.rs b/vortex-array/src/arrays/patched/vtable/mod.rs index 99af2df6eea..601ec372d60 100644 --- a/vortex-array/src/arrays/patched/vtable/mod.rs +++ b/vortex-array/src/arrays/patched/vtable/mod.rs @@ -70,10 +70,6 @@ pub struct PatchedMetadata { #[prost(uint32, tag = "1")] pub(crate) n_patches: u32, - /// The transpose count retained as metadata. See `PatchedData::n_lanes`. - #[prost(uint32, tag = "2")] - pub(crate) n_lanes: u32, - /// The absolute offset of the first in-view element. #[prost(uint64, tag = "3")] pub(crate) offset: u64, @@ -99,15 +95,12 @@ impl ArrayHash for PatchedData { fn array_hash(&self, state: &mut H, _precision: Precision) { self.offset.hash(state); self.offset_within_chunk.hash(state); - self.n_lanes.hash(state); } } impl ArrayEq for PatchedData { fn array_eq(&self, other: &Self, _precision: Precision) -> bool { - self.offset == other.offset - && self.offset_within_chunk == other.offset_within_chunk - && self.n_lanes == other.n_lanes + self.offset == other.offset && self.offset_within_chunk == other.offset_within_chunk } } @@ -160,7 +153,6 @@ impl VTable for Patched { Ok(Some( PatchedMetadata { n_patches: u32::try_from(array.patch_indices().len())?, - n_lanes: u32::try_from(array.n_lanes())?, offset: array.offset() as u64, offset_within_chunk: array.offset_within_chunk() as u64, chunk_offsets_len: array.chunk_offsets().len() as u64, @@ -182,7 +174,6 @@ impl VTable for Patched { ) -> VortexResult> { let metadata = PatchedMetadata::decode(metadata)?; let n_patches = metadata.n_patches as usize; - let n_lanes = metadata.n_lanes as usize; let offset = usize::try_from(metadata.offset) .map_err(|_| vortex_err!("patched offset does not fit in usize"))?; let offset_within_chunk = usize::try_from(metadata.offset_within_chunk) @@ -199,7 +190,6 @@ impl VTable for Patched { let chunk_offsets = children.get(3, &chunk_offsets_dtype, chunk_offsets_len)?; let data = PatchedData { - n_lanes, offset, offset_within_chunk, }; diff --git a/vortex-array/src/arrays/patched/vtable/slice.rs b/vortex-array/src/arrays/patched/vtable/slice.rs index 124fa72c82a..d3bb3e4f3c4 100644 --- a/vortex-array/src/arrays/patched/vtable/slice.rs +++ b/vortex-array/src/arrays/patched/vtable/slice.rs @@ -21,9 +21,7 @@ impl SliceReduce for Patched { match array.patches().slice(range)? { // Patches remain in the sliced range: rewrap them around the sliced inner. - Some(patches) => Ok(Some( - Patched::wrap(inner, &patches, array.n_lanes()).into_array(), - )), + Some(patches) => Ok(Some(Patched::wrap(inner, &patches).into_array())), // No patches overlap the slice, so the inner array already holds the final values. None => Ok(Some(inner)), } From 79363ef0e84e2245adfe929dd147c3237a84a8b7 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 27 May 2026 18:51:31 +0100 Subject: [PATCH 6/6] fix Signed-off-by: Joe Isaacs --- .../fastlanes/benches/patched_unpack_locality.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/encodings/fastlanes/benches/patched_unpack_locality.rs b/encodings/fastlanes/benches/patched_unpack_locality.rs index 9273210333e..12a2a2cd312 100644 --- a/encodings/fastlanes/benches/patched_unpack_locality.rs +++ b/encodings/fastlanes/benches/patched_unpack_locality.rs @@ -35,22 +35,9 @@ fn main() { /// (num_values, bit_width, patch_stride) — one patch every `patch_stride` elements. const CASES: &[(usize, u8, u32)] = &[ - // 256 KiB output, fits in L2. (1 << 16, 9, 200), (1 << 16, 9, 20), (1 << 16, 9, 5), - // 1 MiB output, around L2. - (1 << 18, 9, 200), - (1 << 18, 9, 20), - (1 << 18, 9, 5), - // 4 MiB output, exceeds typical L2. - (1 << 20, 9, 200), - (1 << 20, 9, 20), - (1 << 20, 9, 5), - // 16 MiB output, far exceeds L2. - (1 << 22, 9, 200), - (1 << 22, 9, 20), - (1 << 22, 9, 5), ]; struct Setup {