diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index 08c96c481d7..b418f9699a8 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -68,3 +68,7 @@ harness = false name = "cast_bitpacked" harness = false required-features = ["_test-harness"] + +[[bench]] +name = "patched_unpack_locality" +harness = false diff --git a/encodings/fastlanes/benches/patched_unpack_locality.rs b/encodings/fastlanes/benches/patched_unpack_locality.rs new file mode 100644 index 00000000000..12a2a2cd312 --- /dev/null +++ b/encodings/fastlanes/benches/patched_unpack_locality.rs @@ -0,0 +1,176 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Microbenchmark isolating the cache-locality question for patched bitpacked arrays: +//! is it faster to unpack every 1024-element block and *then* scatter all patches +//! (`unpack_then_patch`), or to unpack one block and immediately patch it while the +//! freshly-decoded block is still hot in cache (`fused`)? +//! +//! Both strategies perform identical total work (same unpack kernel calls, same number of +//! patch stores); only the loop ordering differs, so any delta is attributable to locality. + +#![expect(clippy::cast_possible_truncation)] + +use divan::Bencher; +use fastlanes::BitPacking; + +fn main() { + // Correctness guard: both strategies must produce identical output. + let case = (1usize << 18, 9u8, 50u32); + let data = Setup::new(case.0, case.1, case.2); + let a = { + let mut out = vec![0u32; data.n_padded]; + unpack_then_patch(&data, &mut out); + out + }; + let b = { + let mut out = vec![0u32; data.n_padded]; + fused(&data, &mut out); + out + }; + assert_eq!(a, b, "fused and unpack_then_patch must agree"); + + divan::main(); +} + +/// (num_values, bit_width, patch_stride) — one patch every `patch_stride` elements. +const CASES: &[(usize, u8, u32)] = &[ + (1 << 16, 9, 200), + (1 << 16, 9, 20), + (1 << 16, 9, 5), +]; + +struct Setup { + bit_width: usize, + elems_per_chunk: usize, + num_chunks: usize, + n_padded: usize, + packed: Vec, + /// Patch indices, globally sorted. + indices: Vec, + /// Patch values, parallel to `indices`. + values: Vec, + /// `chunk_offsets[c]..chunk_offsets[c + 1]` is the patch range for chunk `c`. + chunk_offsets: Vec, +} + +impl Setup { + fn new(n: usize, bit_width: u8, patch_stride: u32) -> Self { + let bit_width = bit_width as usize; + let num_chunks = n.div_ceil(1024); + let n_padded = num_chunks * 1024; + let elems_per_chunk = 1024 * bit_width / 32; + let mask = if bit_width == 32 { + u32::MAX + } else { + (1u32 << bit_width) - 1 + }; + + // Deterministic, low-entropy values that fit in `bit_width` bits. + let values_in: Vec = (0..n_padded as u32) + .map(|i| i.wrapping_mul(2654435761) & mask) + .collect(); + + let mut packed = vec![0u32; num_chunks * elems_per_chunk]; + for c in 0..num_chunks { + // SAFETY: input is exactly 1024 elements, output exactly `elems_per_chunk`. + unsafe { + BitPacking::unchecked_pack( + bit_width, + &values_in[c * 1024..][..1024], + &mut packed[c * elems_per_chunk..][..elems_per_chunk], + ); + } + } + + // Uniformly-spread patches: one every `patch_stride` elements. + let stride = patch_stride as usize; + let mut indices = Vec::new(); + let mut values = Vec::new(); + let mut chunk_offsets = vec![0usize; num_chunks + 1]; + let mut idx = 0usize; + while idx < n_padded { + indices.push(idx); + values.push(0xDEAD_BEEF ^ idx as u32); + idx += stride; + } + // Build chunk offsets from the sorted indices. + let mut p = 0usize; + for c in 0..num_chunks { + let chunk_end = (c + 1) * 1024; + while p < indices.len() && indices[p] < chunk_end { + p += 1; + } + chunk_offsets[c + 1] = p; + } + + Self { + bit_width, + elems_per_chunk, + num_chunks, + n_padded, + packed, + indices, + values, + chunk_offsets, + } + } +} + +/// Approach A: unpack every block into the output, then scatter all patches in a second pass. +#[inline] +fn unpack_then_patch(s: &Setup, output: &mut [u32]) { + for c in 0..s.num_chunks { + // SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024. + unsafe { + BitPacking::unchecked_unpack( + s.bit_width, + &s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk], + &mut output[c * 1024..][..1024], + ); + } + } + for (i, &idx) in s.indices.iter().enumerate() { + output[idx] = s.values[i]; + } +} + +/// Approach B: unpack one block and immediately patch it while still hot in cache. +#[inline] +fn fused(s: &Setup, output: &mut [u32]) { + for c in 0..s.num_chunks { + // SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024. + unsafe { + BitPacking::unchecked_unpack( + s.bit_width, + &s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk], + &mut output[c * 1024..][..1024], + ); + } + for p in s.chunk_offsets[c]..s.chunk_offsets[c + 1] { + output[s.indices[p]] = s.values[p]; + } + } +} + +#[divan::bench(args = CASES)] +fn unpack_then_patch_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) { + let setup = Setup::new(n, bit_width, stride); + bencher + .with_inputs(|| vec![0u32; setup.n_padded]) + .bench_local_values(|mut output| { + unpack_then_patch(&setup, &mut output); + divan::black_box(output); + }); +} + +#[divan::bench(args = CASES)] +fn fused_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) { + let setup = Setup::new(n, bit_width, stride); + bencher + .with_inputs(|| vec![0u32; setup.n_padded]) + .bench_local_values(|mut output| { + fused(&setup, &mut output); + divan::black_box(output); + }); +} diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index 42b5cd46724..fcd24d0e17f 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -65,9 +65,11 @@ mod canonical; mod compute; mod kernel; mod ops; +mod plugin; mod rules; mod slice; +pub use plugin::SparsePatchedPlugin; use vortex_array::aggregate_fn::AggregateFnVTable as _; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::min_max::MinMax; diff --git a/encodings/sparse/src/plugin.rs b/encodings/sparse/src/plugin.rs new file mode 100644 index 00000000000..24cf57d9877 --- /dev/null +++ b/encodings/sparse/src/plugin.rs @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A custom [`ArrayPlugin`] that lets you load in and deserialize a `Sparse` array as a +//! `PatchedArray` that wraps a constant fill array. +//! +//! A `Sparse` array is logically a set of patches applied on top of a constant fill value, which +//! is exactly what a `Patched` array over a [`ConstantArray`] represents. This plugin externalizes +//! that representation on deserialize when the array is primitive with non-null patches, which is +//! the subset that `Patched` can represent. All other sparse arrays are returned unchanged. + +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayPlugin; +use vortex_array::ArrayRef; +use vortex_array::ArrayVTable; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::Patched; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::serde::ArrayChildren; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use crate::Sparse; +use crate::SparseExt; + +/// Custom deserialization plugin that converts a primitive `Sparse` array into a `PatchedArray` +/// holding a [`ConstantArray`] fill. +#[derive(Debug, Clone)] +pub struct SparsePatchedPlugin; + +impl ArrayPlugin for SparsePatchedPlugin { + fn id(&self) -> ArrayId { + // We reuse the existing `Sparse` ID so that we can take over its deserialization pathway. + ArrayVTable::id(&Sparse) + } + + fn serialize( + &self, + array: &ArrayRef, + session: &VortexSession, + ) -> VortexResult>> { + // Delegate to the Sparse VTable for serialization. + Sparse.serialize(array, session) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + session: &VortexSession, + ) -> VortexResult { + let sparse = Array::::try_from_parts(ArrayVTable::deserialize( + &Sparse, dtype, len, metadata, buffers, children, session, + )?) + .map_err(|_| vortex_err!("Sparse plugin should only deserialize vortex.sparse"))?; + + // `Patched` can only represent primitive inners with non-null patch values, so anything + // else (bool, varbin, struct, fixed-size-list, nullable patches) stays a Sparse array. + if !dtype.is_primitive() { + return Ok(sparse.into_array()); + } + + let patches = sparse.patches(); + let mut ctx = session.create_execution_ctx(); + if !patches.values().all_valid(&mut ctx)? { + return Ok(sparse.into_array()); + } + + let fill = ConstantArray::new(sparse.fill_scalar().clone(), len).into_array(); + let patched = Patched::from_array_and_patches(fill, &patches, &mut ctx)?; + + Ok(patched.into_array()) + } + + fn is_supported_encoding(&self, id: &ArrayId) -> bool { + id == ArrayVTable::id(&Sparse) || id == ArrayVTable::id(&Patched) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::ArrayPlugin; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::PatchedArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::patched::PatchedArraySlotsExt; + use vortex_array::buffer::BufferHandle; + use vortex_array::patches::Patches; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + use vortex_session::VortexSession; + + use super::SparsePatchedPlugin; + use crate::Sparse; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(SparsePatchedPlugin); + session + }); + + fn primitive_sparse() -> VortexResult { + let patches = Patches::new( + 10, + 0, + PrimitiveArray::from_iter([1u32, 3, 7]).into_array(), + PrimitiveArray::from_iter([10u32, 30, 70]).into_array(), + None, + )?; + Sparse::try_new_from_patches( + patches, + Scalar::primitive(0u32, vortex_array::dtype::Nullability::NonNullable), + ) + } + + fn round_trip(array: &vortex_array::ArrayRef) -> VortexResult { + let metadata = SESSION.array_serialize(array)?.unwrap(); + let children = array.children(); + let buffers = array + .buffers() + .into_iter() + .map(BufferHandle::new_host) + .collect::>(); + + SparsePatchedPlugin.deserialize( + array.dtype(), + array.len(), + &metadata, + &buffers, + &children, + &SESSION, + ) + } + + #[test] + fn primitive_sparse_becomes_patched() -> VortexResult<()> { + let sparse = primitive_sparse()?.into_array(); + let deserialized = round_trip(&sparse)?; + + let patched: PatchedArray = deserialized + .try_downcast() + .map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?; + + // The inner is the constant fill. + assert!(patched.inner().as_constant().is_some()); + + // The decoded values must match the original sparse array. + let mut ctx = SESSION.create_execution_ctx(); + let expected = sparse + .execute::(&mut ctx)? + .into_buffer::(); + let actual = patched + .into_array() + .execute::(&mut ctx)? + .into_buffer::(); + assert_eq!(expected, actual); + + Ok(()) + } + + #[test] + fn non_primitive_sparse_stays_sparse() -> VortexResult<()> { + use vortex_array::arrays::BoolArray; + + let patches = Patches::new( + 5, + 0, + PrimitiveArray::from_iter([1u32, 3]).into_array(), + BoolArray::from_iter([true, false]).into_array(), + None, + )?; + let sparse = Sparse::try_new_from_patches( + patches, + Scalar::bool(false, vortex_array::dtype::Nullability::NonNullable), + )? + .into_array(); + + let deserialized = round_trip(&sparse)?; + assert!( + deserialized.is::(), + "non-primitive sparse should stay sparse, got {}", + deserialized.encoding_id() + ); + + Ok(()) + } +} diff --git a/vortex-array/src/arrays/patched/array.rs b/vortex-array/src/arrays/patched/array.rs index b1e5367607b..86198889622 100644 --- a/vortex-array/src/arrays/patched/array.rs +++ b/vortex-array/src/arrays/patched/array.rs @@ -3,69 +3,63 @@ use std::fmt::Display; use std::fmt::Formatter; -use std::ops::Range; -use vortex_buffer::Buffer; +use num_traits::AsPrimitive; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_ensure; -use vortex_error::vortex_err; use crate::ArrayRef; use crate::ArraySlots; -use crate::Canonical; use crate::ExecutionCtx; use crate::IntoArray; -use crate::LEGACY_SESSION; -use crate::VortexSessionExecute; use crate::array::Array; use crate::array::ArrayParts; use crate::array::TypedArrayRef; use crate::array_slots; use crate::arrays::Patched; use crate::arrays::PrimitiveArray; -use crate::arrays::patched::TransposedPatches; -use crate::arrays::patched::patch_lanes; -use crate::buffer::BufferHandle; use crate::dtype::DType; -use crate::dtype::IntegerPType; -use crate::dtype::NativePType; -use crate::dtype::PType; -use crate::match_each_native_ptype; use crate::match_each_unsigned_integer_ptype; use crate::patches::Patches; use crate::validity::Validity; #[derive(Debug, Clone)] pub struct PatchedData { - /// Number of lanes the patch indices and values have been split into. Each of the `n_chunks` - /// of 1024 values is split into `n_lanes` lanes horizontally, each lane having 1024 / n_lanes - /// values that might be patched. - pub(super) n_lanes: usize, - - /// The offset into that first chunk that is considered in bounds. + /// The absolute offset of the first in-view element, accounting for any slicing. /// - /// The patch indices of the first chunk less than `offset` should be skipped, and the offset - /// should be subtracted out of the remaining offsets to get their final position in the - /// executed array. + /// Patch indices are stored as global positions, so the final position of a patch within the + /// executed array is `index - offset`. pub(super) offset: usize, + + /// Number of patches sliced off the start of the first in-view chunk. + /// + /// `chunk_offsets` are sliced at chunk granularity while the patches themselves are sliced at + /// element granularity, so this records how many leading patches of the first chunk fall + /// outside the view. + pub(super) offset_within_chunk: usize, } #[array_slots(Patched)] pub struct PatchedSlots { /// The inner array containing the base unpatched values. pub inner: ArrayRef, - /// The lane offsets array for locating patches within lanes. - pub lane_offsets: ArrayRef, - /// The indices of patched (exception) values. + /// The sorted global indices of patched (exception) values. pub patch_indices: ArrayRef, /// The patched (exception) values at the corresponding indices. pub patch_values: ArrayRef, + /// One offset per 1024-element chunk into `patch_indices`/`patch_values`. + pub chunk_offsets: ArrayRef, } impl Display for PatchedData { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "n_lanes: {}, offset: {}", self.n_lanes, self.offset) + write!( + f, + "offset: {}, offset_within_chunk: {}", + self.offset, self.offset_within_chunk + ) } } @@ -94,76 +88,42 @@ impl PatchedData { slots.patch_indices.len(), slots.patch_values.len() ); + vortex_ensure!( + slots.patch_indices.dtype().is_unsigned_int(), + "PatchedArray patch indices must be unsigned integers, got {}", + slots.patch_indices.dtype() + ); Ok(()) } } pub trait PatchedArrayExt: PatchedArraySlotsExt { - #[inline] - fn n_lanes(&self) -> usize { - self.n_lanes - } - + /// The absolute offset of the first in-view element. #[inline] fn offset(&self) -> usize { self.offset } + /// Number of patches sliced off the start of the first in-view chunk. #[inline] - fn lane_range(&self, chunk: usize, lane: usize) -> VortexResult> { - assert!(chunk * 1024 <= self.as_ref().len() + self.offset()); - assert!(lane < self.n_lanes()); - - let start = self.lane_offsets().execute_scalar( - chunk * self.n_lanes() + lane, - &mut LEGACY_SESSION.create_execution_ctx(), - )?; - let stop = self.lane_offsets().execute_scalar( - chunk * self.n_lanes() + lane + 1, - &mut LEGACY_SESSION.create_execution_ctx(), - )?; - - let start = start - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("could not cast lane_offset to usize"))?; - - let stop = stop - .as_primitive() - .as_::() - .ok_or_else(|| vortex_err!("could not cast lane_offset to usize"))?; - - Ok(start..stop) + fn offset_within_chunk(&self) -> usize { + self.offset_within_chunk } - fn slice_chunks(&self, chunks: Range) -> VortexResult> { - let lane_offsets_start = chunks.start * self.n_lanes(); - let lane_offsets_stop = chunks.end * self.n_lanes() + 1; - - let sliced_lane_offsets = self - .lane_offsets() - .slice(lane_offsets_start..lane_offsets_stop)?; - let indices = self.patch_indices().clone(); - let values = self.patch_values().clone(); - - let begin = (chunks.start * 1024).saturating_sub(self.offset()); - let end = (chunks.end * 1024) - .saturating_sub(self.offset()) - .min(self.as_ref().len()); - - let offset = if chunks.start == 0 { self.offset() } else { 0 }; - let inner = self.inner().slice(begin..end)?; - let len = inner.len(); - let dtype = self.as_ref().dtype().clone(); - let slots = PatchedSlots { - inner, - lane_offsets: sliced_lane_offsets, - patch_indices: indices, - patch_values: values, + /// Reconstruct the untransposed [`Patches`] backing this array. + fn patches(&self) -> Patches { + // SAFETY: a `Patched` array is only ever constructed from valid, sorted patches with + // matching index/value lengths and chunk offsets. + unsafe { + Patches::new_unchecked( + self.as_ref().len(), + self.offset(), + self.patch_indices().clone(), + self.patch_values().clone(), + Some(self.chunk_offsets().clone()), + Some(self.offset_within_chunk()), + ) } - .into_slots(); - - Ok(unsafe { Patched::new_unchecked(dtype, len, slots, self.n_lanes(), offset) }) } } @@ -195,169 +155,110 @@ impl Patched { "PatchedArray cannot be built from Patches with nulls" ); - let values_ptype = patches.dtype().as_ptype(); - - let TransposedPatches { - n_lanes, - lane_offsets, - indices, - values, - } = transpose_patches(patches, ctx)?; - - let lane_offsets = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(lane_offsets), - PType::U32, - Validity::NonNullable, - ) - .into_array(); - let indices = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(indices), - PType::U16, - Validity::NonNullable, - ) - .into_array(); - let values = PrimitiveArray::from_buffer_handle( - BufferHandle::new_host(values), - values_ptype, - Validity::NonNullable, - ) - .into_array(); + // Ensure the patches carry a chunk offset for every 1024-element chunk, computing them + // when the source patches don't already provide them. + let patches = match patches.chunk_offsets() { + Some(_) => patches.clone(), + None => { + let chunk_offsets = compute_chunk_offsets(patches, ctx)?; + // SAFETY: we only attach freshly computed chunk offsets to existing valid patches. + unsafe { + Patches::new_unchecked( + patches.array_len(), + patches.offset(), + patches.indices().clone(), + patches.values().clone(), + Some(chunk_offsets), + Some(0), + ) + } + } + }; + + Ok(Self::wrap(inner, &patches)) + } + /// Wrap an `inner` array and untransposed `patches` (which must carry chunk offsets) into a + /// [`Patched`] array. + pub(super) fn wrap(inner: ArrayRef, patches: &Patches) -> Array { + let chunk_offsets = patches + .chunk_offsets() + .clone() + .vortex_expect("Patched requires chunk offsets"); let dtype = inner.dtype().clone(); let len = inner.len(); let slots = PatchedSlots { inner, - lane_offsets, - patch_indices: indices, - patch_values: values, + patch_indices: patches.indices().clone(), + patch_values: patches.values().clone(), + chunk_offsets, } .into_slots(); - Ok(unsafe { Self::new_unchecked(dtype, len, slots, n_lanes, 0) }) + unsafe { + Self::new_unchecked( + dtype, + len, + slots, + patches.offset(), + patches.offset_within_chunk().unwrap_or(0), + ) + } } pub(crate) unsafe fn new_unchecked( dtype: DType, len: usize, slots: ArraySlots, - n_lanes: usize, offset: usize, + offset_within_chunk: usize, ) -> Array { unsafe { Array::from_parts_unchecked( - ArrayParts::new(Patched, dtype, len, PatchedData { n_lanes, offset }) - .with_slots(slots), + ArrayParts::new( + Patched, + dtype, + len, + PatchedData { + offset, + offset_within_chunk, + }, + ) + .with_slots(slots), ) } } } -/// Transpose a set of patches from the default sorted layout into the data parallel layout. -fn transpose_patches(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult { +/// Compute one `u64` chunk offset per 1024-element chunk for a set of sorted patches. +/// +/// `chunk_offsets[c]` is the position in the patch arrays at which the patches for chunk `c` +/// begin, i.e. the number of patches whose global index is less than `c * 1024`. +fn compute_chunk_offsets(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult { let array_len = patches.array_len(); let offset = patches.offset(); + let total_chunks = array_len.div_ceil(1024); - let indices = patches - .indices() - .clone() - .execute::(ctx)? - .into_primitive(); - - let values = patches - .values() - .clone() - .execute::(ctx)? - .into_primitive(); - + let indices = patches.indices().clone().execute::(ctx)?; let indices_ptype = indices.ptype(); - let values_ptype = values.ptype(); - - let indices = indices.buffer_handle().clone().unwrap_host(); - let values = values.buffer_handle().clone().unwrap_host(); - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - match_each_native_ptype!(values_ptype, |V| { - let indices: Buffer = Buffer::from_byte_buffer(indices); - let values: Buffer = Buffer::from_byte_buffer(values); - - Ok(transpose( - indices.as_slice(), - values.as_slice(), - offset, - array_len, - )) - }) - }) -} - -#[expect(clippy::cast_possible_truncation)] -fn transpose( - indices_in: &[I], - values_in: &[V], - offset: usize, - array_len: usize, -) -> TransposedPatches { - // Total number of slots is number of chunks times number of lanes. - let n_chunks = array_len.div_ceil(1024); - assert!( - n_chunks <= u32::MAX as usize, - "Cannot transpose patches for array with >= 4 trillion elements" - ); - - let n_lanes = patch_lanes::(); - - // We know upfront how many indices and values we'll have. - let mut indices_buffer = BufferMut::with_capacity(indices_in.len()); - let mut values_buffer = BufferMut::with_capacity(values_in.len()); - - // Number of patches in each chunk/lane. - let mut lane_offsets: BufferMut = BufferMut::zeroed(n_chunks * n_lanes + 1); - // Scan the index/value pairs once to get chunk/lane counts. - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - lane_offsets[chunk * n_lanes + lane + 1] += 1; - } - - for index in 1..lane_offsets.len() { - lane_offsets[index] += lane_offsets[index - 1]; - } - - // Loop over patches, writing them to final positions. - let indices_out = indices_buffer.spare_capacity_mut(); - let values_out = values_buffer.spare_capacity_mut(); - for (index, &value) in std::iter::zip(indices_in, values_in) { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - let position = &mut lane_offsets[chunk * n_lanes + lane]; - indices_out[*position as usize].write((index % 1024) as u16); - values_out[*position as usize].write(value); - *position += 1; - } - - unsafe { - indices_buffer.set_len(indices_in.len()); - values_buffer.set_len(values_in.len()); - } - - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - lane_offsets[chunk * n_lanes + lane] -= 1; - } + let chunk_offsets = match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let indices = indices.as_slice::(); + let mut offsets: BufferMut = BufferMut::with_capacity(total_chunks); + let mut pos = 0usize; + for chunk in 0..total_chunks { + let chunk_start = chunk * 1024; + while pos < indices.len() && { + let index: usize = indices[pos].as_(); + index - offset < chunk_start + } { + pos += 1; + } + offsets.push(pos as u64); + } + offsets.freeze() + }); - TransposedPatches { - n_lanes, - lane_offsets: lane_offsets.freeze().into_byte_buffer(), - indices: indices_buffer.freeze().into_byte_buffer(), - values: values_buffer.freeze().into_byte_buffer(), - } + Ok(PrimitiveArray::new(chunk_offsets, Validity::NonNullable).into_array()) } #[cfg(test)] diff --git a/vortex-array/src/arrays/patched/compute/compare.rs b/vortex-array/src/arrays/patched/compute/compare.rs index c7d879323cb..b0e9e9e9df8 100644 --- a/vortex-array/src/arrays/patched/compute/compare.rs +++ b/vortex-array/src/arrays/patched/compute/compare.rs @@ -20,12 +20,18 @@ use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; use crate::arrays::primitive::NativeValue; use crate::builtins::ArrayBuiltins; +use crate::dtype::IntegerPType; use crate::dtype::NativePType; use crate::match_each_native_ptype; +use crate::match_each_unsigned_integer_ptype; use crate::scalar_fn::fns::binary::CompareKernel; use crate::scalar_fn::fns::operators::CompareOperator; impl CompareKernel for Patched { + #[expect( + clippy::cognitive_complexity, + reason = "complexity is from nested match_each_* macros" + )] fn compare( lhs: ArrayView<'_, Self>, rhs: &ArrayRef, @@ -59,50 +65,48 @@ impl CompareKernel for Patched { let mut bits = BitBufferMut::from_buffer(bits.unwrap_host().into_mut(), offset, len); - let lane_offsets = lhs.lane_offsets().clone().execute::(ctx)?; let indices = lhs.patch_indices().clone().execute::(ctx)?; let values = lhs.patch_values().clone().execute::(ctx)?; - let n_lanes = lhs.n_lanes(); + let indices_ptype = indices.ptype(); match_each_native_ptype!(values.ptype(), |V| { let offset = lhs.offset(); - let indices = indices.as_slice::(); let values = values.as_slice::(); let constant = constant .as_primitive() .as_::() .vortex_expect("compare constant not null"); - let apply_patches = ApplyPatches { - bits: &mut bits, - offset, - n_lanes, - lane_offsets: lane_offsets.as_slice::(), - indices, - values, - constant, - }; - - match operator { - CompareOperator::Eq => { - apply_patches.apply(|l, r| NativeValue(l) == NativeValue(r))?; + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let apply_patches = ApplyPatches { + bits: &mut bits, + offset, + indices: indices.as_slice::(), + values, + constant, + }; + + match operator { + CompareOperator::Eq => { + apply_patches.apply(|l, r| NativeValue(l) == NativeValue(r)); + } + CompareOperator::NotEq => { + apply_patches.apply(|l, r| NativeValue(l) != NativeValue(r)); + } + CompareOperator::Gt => { + apply_patches.apply(|l, r| NativeValue(l) > NativeValue(r)); + } + CompareOperator::Gte => { + apply_patches.apply(|l, r| NativeValue(l) >= NativeValue(r)); + } + CompareOperator::Lt => { + apply_patches.apply(|l, r| NativeValue(l) < NativeValue(r)); + } + CompareOperator::Lte => { + apply_patches.apply(|l, r| NativeValue(l) <= NativeValue(r)); + } } - CompareOperator::NotEq => { - apply_patches.apply(|l, r| NativeValue(l) != NativeValue(r))?; - } - CompareOperator::Gt => { - apply_patches.apply(|l, r| NativeValue(l) > NativeValue(r))?; - } - CompareOperator::Gte => { - apply_patches.apply(|l, r| NativeValue(l) >= NativeValue(r))?; - } - CompareOperator::Lt => { - apply_patches.apply(|l, r| NativeValue(l) < NativeValue(r))?; - } - CompareOperator::Lte => { - apply_patches.apply(|l, r| NativeValue(l) <= NativeValue(r))?; - } - } + }); }); let result = BoolArray::new(bits.freeze(), validity); @@ -110,49 +114,35 @@ impl CompareKernel for Patched { } } -struct ApplyPatches<'a, V: NativePType> { +struct ApplyPatches<'a, I: IntegerPType, V: NativePType> { bits: &'a mut BitBufferMut, offset: usize, - n_lanes: usize, - lane_offsets: &'a [u32], - indices: &'a [u16], + indices: &'a [I], values: &'a [V], constant: V, } -impl ApplyPatches<'_, V> { - fn apply(self, cmp: F) -> VortexResult<()> +impl ApplyPatches<'_, I, V> { + fn apply(self, cmp: F) where F: Fn(V, V) -> bool, { - for index in 0..(self.lane_offsets.len() - 1) { - let chunk = index / self.n_lanes; - - let lane_start = self.lane_offsets[index] as usize; - let lane_end = self.lane_offsets[index + 1] as usize; - - for (&patch_index, &patch_value) in std::iter::zip( - &self.indices[lane_start..lane_end], - &self.values[lane_start..lane_end], - ) { - let bit_index = chunk * 1024 + patch_index as usize; - // Skip any indices < the offset. - if bit_index < self.offset { - continue; - } - let bit_index = bit_index - self.offset; - if bit_index >= self.bits.len() { - break; - } - if cmp(patch_value, self.constant) { - self.bits.set(bit_index) - } else { - self.bits.unset(bit_index) - } + for (&patch_index, &patch_value) in std::iter::zip(self.indices, self.values) { + let bit_index: usize = patch_index.as_(); + // Skip any indices < the offset. + if bit_index < self.offset { + continue; + } + let bit_index = bit_index - self.offset; + if bit_index >= self.bits.len() { + continue; + } + if cmp(patch_value, self.constant) { + self.bits.set(bit_index) + } else { + self.bits.unset(bit_index) } } - - Ok(()) } } diff --git a/vortex-array/src/arrays/patched/compute/filter.rs b/vortex-array/src/arrays/patched/compute/filter.rs index d2c60941059..73a4430cd56 100644 --- a/vortex-array/src/arrays/patched/compute/filter.rs +++ b/vortex-array/src/arrays/patched/compute/filter.rs @@ -2,62 +2,39 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; -use vortex_mask::AllOr; use vortex_mask::Mask; use crate::ArrayRef; +use crate::Canonical; +use crate::ExecutionCtx; use crate::IntoArray; use crate::array::ArrayView; -use crate::arrays::FilterArray; use crate::arrays::Patched; -use crate::arrays::filter::FilterReduce; +use crate::arrays::filter::FilterKernel; use crate::arrays::patched::PatchedArrayExt; - -impl FilterReduce for Patched { - fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { - // Find the contiguous chunk range that the mask covers. We use this to slice the inner - // components, then wrap the rest up with another FilterArray. - // - // This is helpful when we have a very selective filter that is clustered to a small - // range. - let (chunk_start, chunk_stop) = match mask.slices() { - AllOr::All | AllOr::None => { - // This is handled as the precondition to this method, see the FilterReduce - // documentation. - unreachable!("mask must be a MaskValues here") - } - AllOr::Some(slices) => { - let (first, _) = slices[0]; - let (_, last) = slices[slices.len() - 1]; - - // Convert mask indices to absolute positions by adding offset - ( - (array.offset() + first) / 1024, - (array.offset() + last).div_ceil(1024), - ) - } - }; - - let n_chunks = (array.offset() + array.len()).div_ceil(1024); - - // If all chunks already covered, there is nothing to do. - if chunk_start == 0 && chunk_stop == n_chunks { - return Ok(None); +use crate::arrays::patched::PatchedArraySlotsExt; + +impl FilterKernel for Patched { + fn filter( + array: ArrayView<'_, Self>, + mask: &Mask, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let filtered_inner = array + .inner() + .clone() + .filter(mask.clone())? + .execute::(ctx)? + .into_array(); + + let filtered_patches = array.patches().filter(mask, ctx)?; + + match filtered_patches { + None => Ok(Some(filtered_inner)), + Some(patches) => Ok(Some( + Patched::from_array_and_patches(filtered_inner, &patches, ctx)?.into_array(), + )), } - - let sliced = array.slice_chunks(chunk_start..chunk_stop)?; - - // Slice the mask according to if the chunk is sliced. - // Convert chunk bounds back to mask indices by subtracting offset. - let mask_start = (chunk_start * 1024).saturating_sub(array.offset()); - let mask_end = (chunk_stop * 1024) - .saturating_sub(array.offset()) - .min(array.len()); - let remainder = mask.slice(mask_start..mask_end); - - Ok(Some( - FilterArray::new(sliced.into_array(), remainder).into_array(), - )) } } diff --git a/vortex-array/src/arrays/patched/compute/rules.rs b/vortex-array/src/arrays/patched/compute/rules.rs index 3ecb25c1efa..9146b78960f 100644 --- a/vortex-array/src/arrays/patched/compute/rules.rs +++ b/vortex-array/src/arrays/patched/compute/rules.rs @@ -2,11 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use crate::arrays::Patched; -use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduceAdaptor; use crate::optimizer::rules::ParentRuleSet; -pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ - ParentRuleSet::lift(&FilterReduceAdaptor(Patched)), - ParentRuleSet::lift(&SliceReduceAdaptor(Patched)), -]); +pub(crate) const PARENT_RULES: ParentRuleSet = + ParentRuleSet::new(&[ParentRuleSet::lift(&SliceReduceAdaptor(Patched))]); diff --git a/vortex-array/src/arrays/patched/compute/take.rs b/vortex-array/src/arrays/patched/compute/take.rs index 893d18db1ac..bd1f59da3c8 100644 --- a/vortex-array/src/arrays/patched/compute/take.rs +++ b/vortex-array/src/arrays/patched/compute/take.rs @@ -1,24 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use rustc_hash::FxHashMap; -use vortex_buffer::Buffer; use vortex_error::VortexResult; use crate::ArrayRef; +use crate::Canonical; use crate::ExecutionCtx; use crate::IntoArray; use crate::array::ArrayView; use crate::arrays::Patched; -use crate::arrays::PrimitiveArray; use crate::arrays::dict::TakeExecute; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::arrays::primitive::PrimitiveDataParts; -use crate::dtype::IntegerPType; -use crate::dtype::NativePType; -use crate::match_each_native_ptype; -use crate::match_each_unsigned_integer_ptype; impl TakeExecute for Patched { fn take( @@ -31,98 +24,20 @@ impl TakeExecute for Patched { return Ok(None); } - // Perform take on the inner array, including the placeholders. - let inner = array + let taken_inner = array .inner() + .clone() .take(indices.clone())? - .execute::(ctx)?; - - let PrimitiveDataParts { - buffer, - validity, - ptype, - } = inner.into_data_parts(); - - let indices_ptype = indices.dtype().as_ptype(); - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - match_each_native_ptype!(ptype, |V| { - let indices = indices.clone().execute::(ctx)?; - let lane_offsets = array - .lane_offsets() - .clone() - .execute::(ctx)?; - let patch_indices = array - .patch_indices() - .clone() - .execute::(ctx)?; - let patch_values = array - .patch_values() - .clone() - .execute::(ctx)?; - let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); - take_map( - output.as_mut(), - indices.as_slice::(), - array.offset(), - array.len(), - array.n_lanes(), - lane_offsets.as_slice::(), - patch_indices.as_slice::(), - patch_values.as_slice::(), - ); - - // SAFETY: output and validity still have same length after take_map returns. - unsafe { - Ok(Some( - PrimitiveArray::new_unchecked(output.freeze(), validity).into_array(), - )) - } - }) - }) - } -} + .execute::(ctx)? + .into_array(); -/// Take patches for the given `indices` and apply them onto an `output` using a hash map. -/// -/// First, builds a hashmap from index to patch value, then uses the hashmap in a loop to collect -/// the values. -#[expect(clippy::too_many_arguments)] -fn take_map( - output: &mut [V], - indices: &[I], - offset: usize, - len: usize, - n_lanes: usize, - lane_offsets: &[u32], - patch_index: &[u16], - patch_value: &[V], -) { - let n_chunks = (offset + len).div_ceil(1024); - // Build a hashmap of patch_index -> values. - let mut index_map = FxHashMap::with_capacity_and_hasher(patch_index.len(), Default::default()); - for chunk in 0..n_chunks { - for lane in 0..n_lanes { - let lane_start = lane_offsets[chunk * n_lanes + lane]; - let lane_end = lane_offsets[chunk * n_lanes + lane + 1]; - for i in lane_start..lane_end { - let patch_idx = patch_index[i as usize]; - let patch_value = patch_value[i as usize]; - - let index = chunk * 1024 + patch_idx as usize; - if index >= offset && index < offset + len { - index_map.insert(index - offset, patch_value); - } - } - } - } + let taken_patches = array.patches().take(indices, ctx)?; - // Now, iterate the take indices using the prebuilt hashmap. - // Undefined/null indices will miss the hash map, which we can ignore. - for (output_index, index) in indices.iter().enumerate() { - let index = index.as_(); - if let Some(&patch_value) = index_map.get(&index) { - output[output_index] = patch_value; + match taken_patches { + None => Ok(Some(taken_inner)), + Some(patches) => Ok(Some( + Patched::from_array_and_patches(taken_inner, &patches, ctx)?.into_array(), + )), } } } diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs index 0ba25ab7929..47630506d90 100644 --- a/vortex-array/src/arrays/patched/mod.rs +++ b/vortex-array/src/arrays/patched/mod.rs @@ -22,50 +22,24 @@ //! //! # Details //! -//! To patch an array, we first divide it into a set of chunks of length 1024, and then within -//! each chunk, we assign each position to a lane. The number of lanes depends on the width of -//! the underlying type. -//! -//! Thus, rather than sorting patch indices and values by their global offset, they are sorted -//! primarily by their chunk, and then subsequently by their lanes. +//! Patch indices and values are kept in their natural sorted (untransposed) layout, exactly like +//! the [`Patches`](crate::patches::Patches) helper. To allow constant-time seeking to the patches +//! belonging to a given chunk, we additionally store a `chunk_offsets` array holding one offset +//! per 1024-element chunk. //! //! The Patched array layout has 4 children //! //! * `inner`: the inner array is the one containing encoded values, including the filler values //! that need to be patched over at execution time -//! * `lane_offsets`: this is an indexing buffer that allows you to see into ranges of the other -//! two children -//! * `indices`: An array of `u16` chunk indices, indicating where within the chunk should the value -//! be overwritten by the patch value -//! * `values`: The child array containing the patch values, which should be inserted over -//! the values of the `inner` at the locations provided by `indices` -//! -//! `indices` and `values` are aligned and accessed together. -//! -//! ```text +//! * `patch_indices`: a sorted array of unsigned global indices indicating which positions of +//! `inner` should be overwritten by the patch value +//! * `patch_values`: the child array containing the patch values, which should be inserted over +//! the values of the `inner` at the locations provided by `patch_indices` +//! * `chunk_offsets`: an indexing buffer with one entry per 1024-element chunk, so that the +//! patches for chunk `c` are `patch_indices[chunk_offsets[c]..chunk_offsets[c + 1]]` //! -//! chunk 0 chunk 0 chunk 0 chunk 0 chunk 0 chunk 0 -//! lane 0 lane 1 lane 2 lane 3 lane 4 lane 5 -//! ┌────────────┬────────────┬────────────┬────────────┬────────────┬────────────┐ -//! lane_offsets │ 0 │ 0 │ 2 │ 2 │ 3 │ 5 │ ... -//! └─────┬──────┴─────┬──────┴─────┬──────┴──────┬─────┴──────┬─────┴──────┬─────┘ -//! │ │ │ │ │ │ -//! │ │ │ │ │ │ -//! ┌─────┴────────────┘ └──────┬──────┘ ┌──────┘ └─────┐ -//! │ │ │ │ -//! │ │ │ │ -//! │ │ │ │ -//! ▼────────────┬────────────┬────────────▼────────────▼────────────┬────────────▼ -//! indices │ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ -//! ├────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤ -//! values │ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ -//! └────────────┴────────────┴────────────┴────────────┴────────────┴────────────┘ -//! ``` -//! -//! It turns out that this layout is optimal for executing patching on GPUs, because the -//! `lane_offsets` allows each thread in a warp to seek to its patches in constant time. +//! `patch_indices` and `patch_values` are aligned and accessed together. The patches are stored +//! untransposed. mod array; mod compute; @@ -75,31 +49,8 @@ use std::env; use std::sync::LazyLock; pub use array::*; -use vortex_buffer::ByteBuffer; pub use vtable::*; -/// Patches that have been transposed into GPU format. -struct TransposedPatches { - n_lanes: usize, - lane_offsets: ByteBuffer, - indices: ByteBuffer, - values: ByteBuffer, -} - -/// Number of lanes used at patch time for a value of type `V`. -/// -/// This is *NOT* equal to the number of FastLanes lanes for the type `V`, rather this is going to -/// correspond to how many "lanes" we will end up copying data on. -/// -/// When applied on the CPU, this configuration doesn't really matter. On the GPU, it is based -/// on the number of patches involved here. -const fn patch_lanes() -> usize { - // For types 32-bits or smaller, we use a 32 lane configuration, and for 64-bit we use 16 lanes. - // This matches up with the number of lanes we use to execute copying results from bit-unpacking - // from shared to global memory. - if size_of::() < 8 { 32 } else { 16 } -} - /// Flag indicating if experimental patched array support is enabled. /// /// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`. diff --git a/vortex-array/src/arrays/patched/vtable/kernels.rs b/vortex-array/src/arrays/patched/vtable/kernels.rs index 7994b19e02e..e5e8a91b2ac 100644 --- a/vortex-array/src/arrays/patched/vtable/kernels.rs +++ b/vortex-array/src/arrays/patched/vtable/kernels.rs @@ -3,10 +3,12 @@ use crate::arrays::Patched; use crate::arrays::dict::TakeExecuteAdaptor; +use crate::arrays::filter::FilterExecuteAdaptor; use crate::kernel::ParentKernelSet; use crate::scalar_fn::fns::binary::CompareExecuteAdaptor; pub(super) const PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ ParentKernelSet::lift(&CompareExecuteAdaptor(Patched)), + ParentKernelSet::lift(&FilterExecuteAdaptor(Patched)), ParentKernelSet::lift(&TakeExecuteAdaptor(Patched)), ]); diff --git a/vortex-array/src/arrays/patched/vtable/mod.rs b/vortex-array/src/arrays/patched/vtable/mod.rs index f35a13600c5..601ec372d60 100644 --- a/vortex-array/src/arrays/patched/vtable/mod.rs +++ b/vortex-array/src/arrays/patched/vtable/mod.rs @@ -12,9 +12,9 @@ mod slice; use std::hash::Hash; use std::hash::Hasher; -use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_session::VortexSession; use vortex_session::registry::CachedId; @@ -32,7 +32,6 @@ use crate::array::ArrayView; use crate::array::VTable; use crate::array::ValidityChild; use crate::array::ValidityVTableFromChild; -use crate::arrays::Primitive; use crate::arrays::PrimitiveArray; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; @@ -41,15 +40,16 @@ use crate::arrays::patched::PatchedSlots; use crate::arrays::patched::PatchedSlotsView; use crate::arrays::patched::compute::rules::PARENT_RULES; use crate::arrays::patched::vtable::kernels::PARENT_KERNELS; -use crate::arrays::primitive::PrimitiveDataParts; use crate::buffer::BufferHandle; use crate::builders::ArrayBuilder; use crate::builders::PrimitiveBuilder; use crate::dtype::DType; +use crate::dtype::IntegerPType; use crate::dtype::NativePType; use crate::dtype::PType; use crate::match_each_native_ptype; -use crate::require_child; +use crate::match_each_unsigned_integer_ptype; +use crate::patches::Patches; use crate::serde::ArrayChildren; /// A [`Patched`]-encoded Vortex array. @@ -70,27 +70,37 @@ pub struct PatchedMetadata { #[prost(uint32, tag = "1")] pub(crate) n_patches: u32, - /// The number of lanes used for patch indexing. Must be a power of two between 1 and 128. - #[prost(uint32, tag = "2")] - pub(crate) n_lanes: u32, + /// The absolute offset of the first in-view element. + #[prost(uint64, tag = "3")] + pub(crate) offset: u64, - /// An offset into the first chunk's patches that should be considered in-view. - /// - /// Always between 0 and 1023. - #[prost(uint32, tag = "3")] - pub(crate) offset: u32, + /// Number of patches sliced off the start of the first in-view chunk. + #[prost(uint64, tag = "4")] + pub(crate) offset_within_chunk: u64, + + /// The number of chunk offsets, one per 1024-element chunk. + #[prost(uint64, tag = "5")] + pub(crate) chunk_offsets_len: u64, + + /// The primitive type of the patch indices child array. + #[prost(enumeration = "PType", tag = "6")] + pub(crate) indices_ptype: i32, + + /// The primitive type of the chunk offsets child array. + #[prost(enumeration = "PType", tag = "7")] + pub(crate) chunk_offsets_ptype: i32, } impl ArrayHash for PatchedData { fn array_hash(&self, state: &mut H, _precision: Precision) { self.offset.hash(state); - self.n_lanes.hash(state); + self.offset_within_chunk.hash(state); } } impl ArrayEq for PatchedData { fn array_eq(&self, other: &Self, _precision: Precision) -> bool { - self.offset == other.offset && self.n_lanes == other.n_lanes + self.offset == other.offset && self.offset_within_chunk == other.offset_within_chunk } } @@ -129,9 +139,9 @@ impl VTable for Patched { fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef { match idx { PatchedSlots::INNER => array.inner().clone(), - PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(), PatchedSlots::PATCH_INDICES => array.patch_indices().clone(), PatchedSlots::PATCH_VALUES => array.patch_values().clone(), + PatchedSlots::CHUNK_OFFSETS => array.chunk_offsets().clone(), _ => vortex_panic!("invalid child index for PatchedArray: {idx}"), } } @@ -143,8 +153,11 @@ impl VTable for Patched { Ok(Some( PatchedMetadata { n_patches: u32::try_from(array.patch_indices().len())?, - n_lanes: u32::try_from(array.n_lanes())?, - offset: u32::try_from(array.offset())?, + offset: array.offset() as u64, + offset_within_chunk: array.offset_within_chunk() as u64, + chunk_offsets_len: array.chunk_offsets().len() as u64, + indices_ptype: array.patch_indices().dtype().as_ptype() as i32, + chunk_offsets_ptype: array.chunk_offsets().dtype().as_ptype() as i32, } .encode_to_vec(), )) @@ -161,24 +174,30 @@ impl VTable for Patched { ) -> VortexResult> { let metadata = PatchedMetadata::decode(metadata)?; let n_patches = metadata.n_patches as usize; - let n_lanes = metadata.n_lanes as usize; - let offset = metadata.offset as usize; + let offset = usize::try_from(metadata.offset) + .map_err(|_| vortex_err!("patched offset does not fit in usize"))?; + let offset_within_chunk = usize::try_from(metadata.offset_within_chunk) + .map_err(|_| vortex_err!("patched offset_within_chunk does not fit in usize"))?; + let chunk_offsets_len = usize::try_from(metadata.chunk_offsets_len) + .map_err(|_| vortex_err!("patched chunk_offsets_len does not fit in usize"))?; - // n_chunks should correspond to the chunk in the `inner`. - // After slicing when offset > 0, there may be additional chunks. - let n_chunks = (len + offset).div_ceil(1024); + let indices_dtype: DType = PType::try_from(metadata.indices_ptype)?.into(); + let chunk_offsets_dtype: DType = PType::try_from(metadata.chunk_offsets_ptype)?.into(); let inner = children.get(0, dtype, len)?; - let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?; - let indices = children.get(2, PType::U16.into(), n_patches)?; - let values = children.get(3, dtype, n_patches)?; + let indices = children.get(1, &indices_dtype, n_patches)?; + let values = children.get(2, dtype, n_patches)?; + let chunk_offsets = children.get(3, &chunk_offsets_dtype, chunk_offsets_len)?; - let data = PatchedData { n_lanes, offset }; + let data = PatchedData { + offset, + offset_within_chunk, + }; let slots = PatchedSlots { inner, - lane_offsets, patch_indices: indices, patch_values: values, + chunk_offsets, } .into_slots(); Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots)) @@ -209,10 +228,6 @@ impl VTable for Patched { array.inner().append_to_builder(builder, ctx)?; let offset = array.offset(); - let lane_offsets = array - .lane_offsets() - .clone() - .execute::(ctx)?; let indices = array .patch_indices() .clone() @@ -221,6 +236,7 @@ impl VTable for Patched { .patch_values() .clone() .execute::(ctx)?; + let indices_ptype = indices.ptype(); match_each_native_ptype!(ptype, |V| { let typed_builder = builder @@ -232,16 +248,16 @@ impl VTable for Patched { // populated by the inner.append_to_builder() call above. let output = typed_builder.values_mut(); let trailer = output.len() - len; - - apply_patches_primitive::( - &mut output[trailer..], - offset, - len, - array.n_lanes(), - lane_offsets.as_slice::(), - indices.as_slice::(), - values.as_slice::(), - ); + let values = values.as_slice::(); + + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + apply_patches_primitive::( + &mut output[trailer..], + offset, + indices.as_slice::(), + values, + ); + }); }); Ok(()) @@ -251,54 +267,30 @@ impl VTable for Patched { PatchedSlots::NAMES[idx].to_string() } - fn execute(array: Array, _ctx: &mut ExecutionCtx) -> VortexResult { - let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive); - let array = - require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive); - let array = - require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive); - let array = - require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive); - + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { let len = array.len(); - - let n_lanes = array.n_lanes; let offset = array.offset; - let slots = match array.try_into_parts() { - Ok(parts) => PatchedSlots::from_slots(parts.slots), - Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(), - }; - - // TODO(joe): use iterative execution - let PrimitiveDataParts { - buffer, - ptype, - validity, - } = slots.inner.downcast::().into_data_parts(); - - let values = slots.patch_values.downcast::(); - let lane_offsets = slots.lane_offsets.downcast::(); - let patch_indices = slots.patch_indices.downcast::(); + let offset_within_chunk = array.offset_within_chunk; - let patched_values = match_each_native_ptype!(values.ptype(), |V| { - let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); - - apply_patches_primitive::( - &mut output, - offset, + let view = PatchedSlotsView::from_slots(array.slots()); + let inner = view.inner.clone(); + // SAFETY: a `Patched` array always holds valid, sorted patches with chunk offsets. + let patches = unsafe { + Patches::new_unchecked( len, - n_lanes, - lane_offsets.as_slice::(), - patch_indices.as_slice::(), - values.as_slice::(), - ); - - let output = output.freeze(); + offset, + view.patch_indices.clone(), + view.patch_values.clone(), + Some(view.chunk_offsets.clone()), + Some(offset_within_chunk), + ) + }; - PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity) - }); + // TODO(joe): use iterative execution + let inner = inner.execute::(ctx)?; + let patched = inner.patch(&patches, ctx)?; - Ok(ExecutionResult::done(patched_values.into_array())) + Ok(ExecutionResult::done(patched.into_array())) } fn execute_parent( @@ -319,31 +311,27 @@ impl VTable for Patched { } } -/// Apply patches on top of the existing value types. -fn apply_patches_primitive( +/// Apply untransposed patches on top of an output buffer. +/// +/// Patch indices are global; a patch lands at `index - offset` and any index outside +/// `[offset, offset + output.len())` is skipped. +fn apply_patches_primitive( output: &mut [V], offset: usize, - len: usize, - n_lanes: usize, - lane_offsets: &[u32], - indices: &[u16], + indices: &[I], values: &[V], ) { - let n_chunks = (offset + len).div_ceil(1024); - for chunk in 0..n_chunks { - let start = lane_offsets[chunk * n_lanes] as usize; - let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize; - - for idx in start..stop { - // the indices slice is measured as an offset into the 1024-value chunk. - let index = chunk * 1024 + indices[idx] as usize; - if index < offset || index >= offset + len { - continue; - } - - let value = values[idx]; - output[index - offset] = value; + let len = output.len(); + for (patch, &value) in std::iter::zip(indices, values) { + let index: usize = patch.as_(); + if index < offset { + continue; + } + let position = index - offset; + if position >= len { + continue; } + output[position] = value; } } @@ -663,9 +651,9 @@ mod tests { let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array(); let slots = PatchedSlots { inner: new_inner, - lane_offsets: array.lane_offsets().clone(), patch_indices: array.patch_indices().clone(), patch_values: array.patch_values().clone(), + chunk_offsets: array.chunk_offsets().clone(), }; let array_ref = array.into_array(); diff --git a/vortex-array/src/arrays/patched/vtable/operations.rs b/vortex-array/src/arrays/patched/vtable/operations.rs index f0491666e56..0a0f1ba2cd1 100644 --- a/vortex-array/src/arrays/patched/vtable/operations.rs +++ b/vortex-array/src/arrays/patched/vtable/operations.rs @@ -6,11 +6,9 @@ use vortex_error::VortexResult; use crate::ExecutionCtx; use crate::array::ArrayView; use crate::array::OperationsVTable; -use crate::arrays::PrimitiveArray; use crate::arrays::patched::Patched; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::optimizer::ArrayOptimizer; use crate::scalar::Scalar; impl OperationsVTable for Patched { @@ -19,35 +17,9 @@ impl OperationsVTable for Patched { index: usize, ctx: &mut ExecutionCtx, ) -> VortexResult { - let chunk = (index + array.offset()) / 1024; - - #[expect( - clippy::cast_possible_truncation, - reason = "N % 1024 always fits in u16" - )] - let chunk_index = ((index + array.offset()) % 1024) as u16; - - let lane = (index + array.offset()) % array.n_lanes(); - - let range = array.lane_range(chunk, lane)?; - - // Get the range of indices corresponding to the lane, potentially decoding them to avoid - // the overhead of repeated scalar_at calls. - let patch_indices = array - .patch_indices() - .slice(range.clone())? - .optimize()? - .execute::(ctx)?; - - // NOTE: we do linear scan as lane has <= 32 patches, binary search would likely - // be slower. - for (&patch_index, idx) in std::iter::zip(patch_indices.as_slice::(), range) { - if patch_index == chunk_index { - return array - .patch_values() - .execute_scalar(idx, ctx)? - .cast(array.dtype()); - } + // Constant-time chunked lookup via the untransposed patches' chunk offsets. + if let Some(patch) = array.patches().get_patched(index)? { + return patch.cast(array.dtype()); } // Otherwise, access the underlying value. diff --git a/vortex-array/src/arrays/patched/vtable/slice.rs b/vortex-array/src/arrays/patched/vtable/slice.rs index 8655e0c7d84..d3bb3e4f3c4 100644 --- a/vortex-array/src/arrays/patched/vtable/slice.rs +++ b/vortex-array/src/arrays/patched/vtable/slice.rs @@ -11,46 +11,20 @@ use crate::array::ArrayView; use crate::arrays::Patched; use crate::arrays::patched::PatchedArrayExt; use crate::arrays::patched::PatchedArraySlotsExt; -use crate::arrays::patched::PatchedSlots; use crate::arrays::slice::SliceReduce; impl SliceReduce for Patched { fn slice(array: ArrayView<'_, Self>, range: Range) -> VortexResult> { - // We **always** slice the patches at 1024-element chunk boundaries. We keep the offset + len - // around so that when we execute we know how much to chop off. - let new_offset = (range.start + array.offset()) % 1024; - let chunk_start = (range.start + array.offset()) / 1024; - let chunk_stop = (range.end + array.offset()).div_ceil(1024); - let sliced_lane_offsets = array - .lane_offsets() - .slice((chunk_start * array.n_lanes())..(chunk_stop * array.n_lanes()) + 1)?; - - // Unlike the patches, we slice the inner to the exact range. This is handled - // at execution time by making sure to skip patch indices that are < offset - // or >= len. + // Slice the inner to the exact range; the patches keep their own offset bookkeeping and + // are sliced at chunk granularity via `Patches::slice`. let inner = array.inner().slice(range.start..range.end)?; - let len = inner.len(); - let slots = PatchedSlots { - inner, - lane_offsets: sliced_lane_offsets, - patch_indices: array.patch_indices().clone(), - patch_values: array.patch_values().clone(), + match array.patches().slice(range)? { + // Patches remain in the sliced range: rewrap them around the sliced inner. + Some(patches) => Ok(Some(Patched::wrap(inner, &patches).into_array())), + // No patches overlap the slice, so the inner array already holds the final values. + None => Ok(Some(inner)), } - .into_slots(); - - Ok(Some( - unsafe { - Patched::new_unchecked( - array.dtype().clone(), - len, - slots, - array.n_lanes(), - new_offset, - ) - } - .into_array(), - )) } } @@ -92,9 +66,9 @@ mod tests { @r#" root: vortex.patched(u16, len=9) inner: vortex.primitive(u16, len=9) - lane_offsets: vortex.primitive(u32, len=33) - patch_indices: vortex.primitive(u16, len=3) - patch_values: vortex.primitive(u16, len=3) + patch_indices: vortex.primitive(u32, len=2) + patch_values: vortex.primitive(u16, len=2) + chunk_offsets: vortex.primitive(u64, len=1) "#); let executed = sliced.execute::(&mut ctx)?.into_primitive(); diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index e69b5848de2..5d4c520cd98 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -117,6 +117,7 @@ use vortex_bytebool::ByteBool; use vortex_fsst::FSST; use vortex_pco::Pco; use vortex_session::VortexSession; +use vortex_sparse::SparsePatchedPlugin; use vortex_zigzag::ZigZag; pub use writer::*; @@ -183,6 +184,13 @@ pub fn register_default_encodings(session: &VortexSession) { vortex_sequence::initialize(session); vortex_sparse::initialize(session); + if use_experimental_patches() { + // Override Sparse registration with a plugin that decodes primitive Sparse arrays + // as a Patched array over a constant fill. Must run after `vortex_sparse::initialize` + // so it takes priority for the same encoding id. + session.arrays().register(SparsePatchedPlugin); + } + #[cfg(feature = "unstable_encodings")] vortex_tensor::initialize(session); }