Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions encodings/fsst/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub type vortex_fsst::FSST::ArrayData = vortex_fsst::FSSTData

pub type vortex_fsst::FSST::OperationsVTable = vortex_fsst::FSST

pub type vortex_fsst::FSST::ValidityVTable = vortex_array::array::vtable::validity::ValidityVTableFromChild
pub type vortex_fsst::FSST::ValidityVTable = vortex_fsst::FSST

pub fn vortex_fsst::FSST::append_to_builder(array: vortex_array::array::view::ArrayView<'_, Self>, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>

Expand Down Expand Up @@ -52,9 +52,9 @@ impl vortex_array::array::vtable::operations::OperationsVTable<vortex_fsst::FSST

pub fn vortex_fsst::FSST::scalar_at(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>, index: usize, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

impl vortex_array::array::vtable::validity::ValidityChild<vortex_fsst::FSST> for vortex_fsst::FSST
impl vortex_array::array::vtable::validity::ValidityVTable<vortex_fsst::FSST> for vortex_fsst::FSST

pub fn vortex_fsst::FSST::validity_child(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>) -> vortex_array::array::erased::ArrayRef
pub fn vortex_fsst::FSST::validity(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>) -> vortex_error::VortexResult<vortex_array::validity::Validity>

impl vortex_array::arrays::dict::take::TakeExecute for vortex_fsst::FSST

Expand Down
21 changes: 10 additions & 11 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use vortex_array::serde::ArrayChildren;
use vortex_array::validity::Validity;
use vortex_array::vtable;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityChild;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_array::vtable::ValidityVTable;
use vortex_array::vtable::child_to_validity;
use vortex_array::vtable::validity_to_child;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl ArrayEq for FSSTData {
impl VTable for FSST {
type ArrayData = FSSTData;
type OperationsVTable = Self;
type ValidityVTable = ValidityVTableFromChild;
type ValidityVTable = Self;

fn id(&self) -> ArrayId {
Self::ID
Expand Down Expand Up @@ -319,9 +319,8 @@ pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
pub struct FSSTData {
symbols: Buffer<Symbol>,
symbol_lengths: Buffer<u8>,
// TODO(joe): this was broken by a previous pr. This will not be updated if a slot is replaced.
codes: VarBinArray,
/// NOTE(ngates): this === codes, but is stored as an ArrayRef so we can return &ArrayRef!
codes_array: ArrayRef,

/// Memoized compressor used for push-down of compute by compressing the RHS.
compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
Expand Down Expand Up @@ -505,13 +504,10 @@ impl FSSTData {
Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
})
as Box<dyn Fn() -> Compressor + Send>));
let codes_array = codes.clone().into_array();

Self {
symbols,
symbol_lengths,
codes,
codes_array,
compressor,
}
}
Expand Down Expand Up @@ -577,9 +573,12 @@ pub trait FSSTArrayExt: TypedArrayRef<FSST> {

impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}

impl ValidityChild<FSST> for FSST {
fn validity_child(array: ArrayView<'_, FSST>) -> ArrayRef {
array.codes_array.clone()
impl ValidityVTable<FSST> for FSST {
fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
Ok(child_to_validity(
&array.slots()[CODES_VALIDITY_SLOT],
array.dtype().nullability(),
))
}
}

Expand Down
18 changes: 9 additions & 9 deletions encodings/pco/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub type vortex_pco::Pco::ArrayData = vortex_pco::PcoData

pub type vortex_pco::Pco::OperationsVTable = vortex_pco::Pco

pub type vortex_pco::Pco::ValidityVTable = vortex_array::array::vtable::validity::ValidityVTableFromValiditySliceHelper
pub type vortex_pco::Pco::ValidityVTable = vortex_pco::Pco

pub fn vortex_pco::Pco::buffer(array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> vortex_array::buffer::BufferHandle

Expand All @@ -42,12 +42,16 @@ pub fn vortex_pco::Pco::serialize(array: vortex_array::array::view::ArrayView<'_

pub fn vortex_pco::Pco::slot_name(_array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> alloc::string::String

pub fn vortex_pco::Pco::validate(&self, data: &vortex_pco::PcoData, dtype: &vortex_array::dtype::DType, len: usize, _slots: &[core::option::Option<vortex_array::array::erased::ArrayRef>]) -> vortex_error::VortexResult<()>
pub fn vortex_pco::Pco::validate(&self, data: &vortex_pco::PcoData, dtype: &vortex_array::dtype::DType, len: usize, slots: &[core::option::Option<vortex_array::array::erased::ArrayRef>]) -> vortex_error::VortexResult<()>

impl vortex_array::array::vtable::operations::OperationsVTable<vortex_pco::Pco> for vortex_pco::Pco

pub fn vortex_pco::Pco::scalar_at(array: vortex_array::array::view::ArrayView<'_, vortex_pco::Pco>, index: usize, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

impl vortex_array::array::vtable::validity::ValidityVTable<vortex_pco::Pco> for vortex_pco::Pco

pub fn vortex_pco::Pco::validity(array: vortex_array::array::view::ArrayView<'_, vortex_pco::Pco>) -> vortex_error::VortexResult<vortex_array::validity::Validity>

impl vortex_array::arrays::slice::SliceReduce for vortex_pco::Pco

pub fn vortex_pco::Pco::slice(array: vortex_array::array::view::ArrayView<'_, Self>, range: core::ops::range::Range<usize>) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::erased::ArrayRef>>
Expand Down Expand Up @@ -82,7 +86,7 @@ pub struct vortex_pco::PcoData

impl vortex_pco::PcoData

pub fn vortex_pco::PcoData::decompress(&self, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::vtable::PrimitiveArray>
pub fn vortex_pco::PcoData::decompress(&self, unsliced_validity: &vortex_array::validity::Validity, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::vtable::PrimitiveArray>

pub fn vortex_pco::PcoData::from_array(array: vortex_array::array::erased::ArrayRef, level: usize, nums_per_page: usize) -> vortex_error::VortexResult<Self>

Expand All @@ -92,9 +96,9 @@ pub fn vortex_pco::PcoData::is_empty(&self) -> bool

pub fn vortex_pco::PcoData::len(&self) -> usize

pub fn vortex_pco::PcoData::new(chunk_metas: alloc::vec::Vec<vortex_buffer::ByteBuffer>, pages: alloc::vec::Vec<vortex_buffer::ByteBuffer>, ptype: vortex_array::dtype::ptype::PType, metadata: vortex_pco::PcoMetadata, len: usize, validity: vortex_array::validity::Validity) -> Self
pub fn vortex_pco::PcoData::new(chunk_metas: alloc::vec::Vec<vortex_buffer::ByteBuffer>, pages: alloc::vec::Vec<vortex_buffer::ByteBuffer>, ptype: vortex_array::dtype::ptype::PType, metadata: vortex_pco::PcoMetadata, len: usize) -> Self

pub fn vortex_pco::PcoData::validate(&self, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<()>
pub fn vortex_pco::PcoData::validate(&self, dtype: &vortex_array::dtype::DType, len: usize, validity: &vortex_array::validity::Validity) -> vortex_error::VortexResult<()>

impl core::clone::Clone for vortex_pco::PcoData

Expand All @@ -104,10 +108,6 @@ impl core::fmt::Debug for vortex_pco::PcoData

pub fn vortex_pco::PcoData::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::array::vtable::validity::ValiditySliceHelper for vortex_pco::PcoData

pub fn vortex_pco::PcoData::unsliced_validity_and_slice(&self) -> (&vortex_array::validity::Validity, usize, usize)

impl vortex_array::hash::ArrayEq for vortex_pco::PcoData

pub fn vortex_pco::PcoData::array_eq(&self, other: &Self, precision: vortex_array::hash::Precision) -> bool
Expand Down
88 changes: 44 additions & 44 deletions encodings/pco/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use vortex_array::validity::Validity;
use vortex_array::vtable;
use vortex_array::vtable::OperationsVTable;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValiditySliceHelper;
use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
use vortex_array::vtable::ValidityVTable;
use vortex_array::vtable::child_to_validity;
use vortex_array::vtable::validity_to_child;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
Expand Down Expand Up @@ -83,7 +83,6 @@ vtable!(Pco, Pco, PcoData);

impl ArrayHash for PcoData {
fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
self.unsliced_validity.array_hash(state, precision);
self.unsliced_n_rows.hash(state);
self.slice_start.hash(state);
self.slice_stop.hash(state);
Expand All @@ -99,10 +98,7 @@ impl ArrayHash for PcoData {

impl ArrayEq for PcoData {
fn array_eq(&self, other: &Self, precision: Precision) -> bool {
if !self
.unsliced_validity
.array_eq(&other.unsliced_validity, precision)
|| self.unsliced_n_rows != other.unsliced_n_rows
if self.unsliced_n_rows != other.unsliced_n_rows
|| self.slice_start != other.slice_start
|| self.slice_stop != other.slice_stop
|| self.chunk_metas.len() != other.chunk_metas.len()
Expand All @@ -128,7 +124,7 @@ impl VTable for Pco {
type ArrayData = PcoData;

type OperationsVTable = Self;
type ValidityVTable = ValidityVTableFromValiditySliceHelper;
type ValidityVTable = Self;

fn id(&self) -> ArrayId {
Self::ID
Expand All @@ -139,9 +135,10 @@ impl VTable for Pco {
data: &PcoData,
dtype: &DType,
len: usize,
_slots: &[Option<ArrayRef>],
slots: &[Option<ArrayRef>],
) -> VortexResult<()> {
data.validate(dtype, len)
let validity = child_to_validity(&slots[0], dtype.nullability());
data.validate(dtype, len, &validity)
}

fn nbuffers(array: ArrayView<'_, Self>) -> usize {
Expand Down Expand Up @@ -206,14 +203,7 @@ impl VTable for Pco {
vortex_ensure!(pages.len() == expected_n_pages);

let slots = vec![validity_to_child(&validity, len)];
let data = PcoData::new(
chunk_metas,
pages,
dtype.as_ptype(),
metadata,
len,
validity,
);
let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
}

Expand All @@ -222,7 +212,14 @@ impl VTable for Pco {
}

fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
Ok(ExecutionResult::done(array.decompress(ctx)?.into_array()))
let unsliced_validity =
child_to_validity(&array.as_ref().slots()[0], array.dtype().nullability());
Ok(ExecutionResult::done(
array
.data()
.decompress(&unsliced_validity, ctx)?
.into_array(),
))
}

fn reduce_parent(
Expand Down Expand Up @@ -273,13 +270,14 @@ pub struct Pco;
impl Pco {
pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");

pub(crate) fn try_new(dtype: DType, data: PcoData) -> VortexResult<PcoArray> {
pub(crate) fn try_new(
dtype: DType,
data: PcoData,
validity: Validity,
) -> VortexResult<PcoArray> {
let len = data.len();
data.validate(&dtype, len)?;
let slots = vec![validity_to_child(
&data.unsliced_validity,
data.unsliced_n_rows,
)];
data.validate(&dtype, len, &validity)?;
let slots = vec![validity_to_child(&validity, data.unsliced_n_rows())];
Ok(unsafe {
Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
})
Expand All @@ -292,8 +290,9 @@ impl Pco {
values_per_page: usize,
) -> VortexResult<PcoArray> {
let dtype = parray.dtype().clone();
let validity = parray.validity()?;
let data = PcoData::from_primitive(parray, level, values_per_page)?;
Self::try_new(dtype, data)
Self::try_new(dtype, data, validity)
}
}

Expand All @@ -307,14 +306,13 @@ pub struct PcoData {
pub(crate) pages: Vec<ByteBuffer>,
pub(crate) metadata: PcoMetadata,
ptype: PType,
pub(crate) unsliced_validity: Validity,
unsliced_n_rows: usize,
slice_start: usize,
slice_stop: usize,
}

impl PcoData {
pub fn validate(&self, dtype: &DType, len: usize) -> VortexResult<()> {
pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
let _ = number_type_from_ptype(self.ptype);
vortex_ensure!(
dtype.as_ptype() == self.ptype,
Expand All @@ -323,9 +321,9 @@ impl PcoData {
dtype.as_ptype()
);
vortex_ensure!(
dtype.nullability() == self.unsliced_validity.nullability(),
dtype.nullability() == validity.nullability(),
"expected nullability {}, got {}",
self.unsliced_validity.nullability(),
validity.nullability(),
dtype.nullability()
);
vortex_ensure!(
Expand All @@ -340,7 +338,7 @@ impl PcoData {
"expected len {len}, got {}",
self.slice_stop - self.slice_start
);
if let Some(validity_len) = self.unsliced_validity.maybe_len() {
if let Some(validity_len) = validity.maybe_len() {
vortex_ensure!(
validity_len == self.unsliced_n_rows,
"expected validity len {}, got {}",
Expand Down Expand Up @@ -373,14 +371,12 @@ impl PcoData {
ptype: PType,
metadata: PcoMetadata,
len: usize,
validity: Validity,
) -> Self {
Self {
chunk_metas,
pages,
metadata,
ptype,
unsliced_validity: validity,
unsliced_n_rows: len,
slice_start: 0,
slice_stop: len,
Expand Down Expand Up @@ -464,7 +460,6 @@ impl PcoData {
parray.dtype().as_ptype(),
metadata,
parray.len(),
parray.validity()?,
))
}

Expand All @@ -478,33 +473,36 @@ impl PcoData {
Self::from_primitive(&parray, level, nums_per_page)
}

pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
pub fn decompress(
&self,
unsliced_validity: &Validity,
ctx: &mut ExecutionCtx,
) -> VortexResult<PrimitiveArray> {
// To start, we figure out which chunks and pages we need to decompress, and with
// what value offset into the first such page.
let number_type = number_type_from_ptype(self.ptype);
let values_byte_buffer = match_number_enum!(
number_type,
NumberType<T> => {
self.decompress_values_typed::<T>(ctx)?
self.decompress_values_typed::<T>(unsliced_validity, ctx)?
}
);

Ok(PrimitiveArray::from_values_byte_buffer(
values_byte_buffer,
self.ptype,
self.unsliced_validity
.slice(self.slice_start..self.slice_stop)?,
unsliced_validity.slice(self.slice_start..self.slice_stop)?,
self.slice_stop - self.slice_start,
))
}

fn decompress_values_typed<T: Number>(
&self,
unsliced_validity: &Validity,
ctx: &mut ExecutionCtx,
) -> VortexResult<ByteBuffer> {
// To start, we figure out what range of values we need to decompress.
let slice_value_indices = self
.unsliced_validity
let slice_value_indices = unsliced_validity
.execute_mask(self.unsliced_n_rows, ctx)?
.valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
let slice_value_start = slice_value_indices[0];
Expand Down Expand Up @@ -605,9 +603,10 @@ impl PcoData {
}
}

impl ValiditySliceHelper for PcoData {
fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
(&self.unsliced_validity, self.slice_start, self.slice_stop)
impl ValidityVTable<Pco> for Pco {
fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
unsliced_validity.slice(array.slice_start()..array.slice_stop())
}
}

Expand All @@ -618,9 +617,10 @@ impl OperationsVTable<Pco> for Pco {
_ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
array
._slice(index, index + 1)
.decompress(&mut ctx)?
.decompress(&unsliced_validity, &mut ctx)?
.into_array()
.scalar_at(0)
}
Expand Down
Loading
Loading