diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index bb2a9b00d81..97723eb97e8 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -348,6 +348,8 @@ pub fn vortex_file::WriteStrategyBuilder::build(self) -> alloc::sync::Arc) -> Self +pub fn vortex_file::WriteStrategyBuilder::with_array_tree(self, bool) -> Self + pub fn vortex_file::WriteStrategyBuilder::with_btrblocks_builder(self, vortex_btrblocks::builder::BtrBlocksCompressorBuilder) -> Self pub fn vortex_file::WriteStrategyBuilder::with_compressor(self, C) -> Self diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 71c72ffc904..4fcbf0fcd9a 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -40,6 +40,7 @@ use vortex_fastlanes::FoR; use vortex_fastlanes::RLE; use vortex_fsst::FSST; use vortex_layout::LayoutStrategy; +use vortex_layout::layouts::array_tree::writer as array_tree_writer; use vortex_layout::layouts::buffered::BufferedStrategy; use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy; use vortex_layout::layouts::collect::CollectStrategy; @@ -143,6 +144,7 @@ pub struct WriteStrategyBuilder { field_writers: HashMap>, allow_encodings: Option>, flat_strategy: Option>, + array_tree: bool, } impl Default for WriteStrategyBuilder { @@ -155,6 +157,7 @@ impl Default for WriteStrategyBuilder { field_writers: HashMap::new(), allow_encodings: Some(ALLOWED_ENCODINGS.clone()), flat_strategy: None, + array_tree: false, } } } @@ -187,11 +190,32 @@ impl WriteStrategyBuilder { /// /// By default, this uses [`FlatLayoutStrategy`]. This can be used to substitute a custom /// layout strategy, e.g. one that inlines constant array buffers for GPU reads. + /// + /// Passing a custom flat strategy implicitly disables the array-tree outlining feature + /// (see [`Self::with_array_tree`]), since the custom strategy owns the leaf format. pub fn with_flat_strategy(mut self, flat: Arc) -> Self { self.flat_strategy = Some(flat); self } + /// Enable array-tree outlining: each chunk's encoding tree (without per-chunk statistics) + /// is collected into a single auxiliary segment per column rather than being inlined + /// alongside the chunk's data. + /// + /// Disabled by default. When enabled, the written file uses two encodings that older + /// readers will not understand: + /// [`vortex_layout::layouts::array_tree::ArrayTreeFlatLayout`] at the data leaves, and a + /// wrapping [`vortex_layout::layouts::array_tree::ArrayTreeLayout`] that owns the + /// consolidated auxiliary segment. Files written by this builder with the feature on + /// require a reader that recognizes both encodings. + /// + /// Has no effect if a custom flat strategy is provided via + /// [`Self::with_flat_strategy`] — the user-supplied leaf format wins. + pub fn with_array_tree(mut self, array_tree: bool) -> Self { + self.array_tree = array_tree; + self + } + /// Override the default [`BtrBlocksCompressorBuilder`] used for compression. /// /// The builder is finalized during [`build`](Self::build), producing two compressors: one for @@ -212,23 +236,17 @@ impl WriteStrategyBuilder { /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides /// applied. pub fn build(self) -> Arc { - let flat: Arc = if let Some(flat) = self.flat_strategy { - flat - } else if let Some(allow_encodings) = self.allow_encodings { - Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings)) + let flat: Arc = if let Some(flat) = &self.flat_strategy { + Arc::clone(flat) + } else if let Some(allow_encodings) = &self.allow_encodings { + Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings.clone())) } else { Arc::new(FlatLayoutStrategy::default()) }; - // 7. for each chunk create a flat layout - let chunked = ChunkedLayoutStrategy::new(Arc::clone(&flat)); - // 6. buffer chunks so they end up with closer segment ids physically - let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB - - // 5. compress each chunk. - // Exclude IntDictScheme from the data compressor because DictStrategy (step 3) already - // dictionary-encodes columns. Allowing IntDictScheme here would redundantly - // dictionary-encode the integer codes produced by that earlier step. + // Data compressor: excludes IntDictScheme because DictStrategy (step 3 below) already + // dictionary-encodes columns; allowing it here would redundantly dictionary-encode the + // integer codes produced by that earlier step. let data_compressor: Arc = match &self.compressor { CompressorConfig::BtrBlocks(builder) => Arc::new( builder @@ -238,6 +256,37 @@ impl WriteStrategyBuilder { ), CompressorConfig::Opaque(compressor) => Arc::clone(compressor), }; + // Stats compressor: used for zone-map tables, dict values, and (when enabled) the + // consolidated array-trees segment. + let stats_compressor: Arc = match &self.compressor { + CompressorConfig::BtrBlocks(builder) => Arc::new(builder.clone().build()), + CompressorConfig::Opaque(compressor) => Arc::clone(compressor), + }; + let compress_then_flat = + CompressingStrategy::new(Arc::clone(&flat), Arc::clone(&stats_compressor)); + let compress_then_flat_arc: Arc = Arc::new(compress_then_flat.clone()); + + let array_tree_enabled = self.array_tree && self.flat_strategy.is_none(); + let (data_leaf, array_tree_collector): (Arc, _) = if !array_tree_enabled + { + (Arc::clone(&flat), None) + } else { + let data_flat = if let Some(allow_encodings) = &self.allow_encodings { + FlatLayoutStrategy::default().with_allow_encodings(allow_encodings.clone()) + } else { + FlatLayoutStrategy::default() + }; + let (collector, leaf) = + array_tree_writer::writer(data_flat, Arc::clone(&compress_then_flat_arc)); + (Arc::new(leaf), Some(collector)) + }; + + // 7. for each chunk create a flat layout + let chunked = ChunkedLayoutStrategy::new(data_leaf); + // 6. buffer chunks so they end up with closer segment ids physically + let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB + + // 5. compress each chunk. let compressing = CompressingStrategy::new(buffered, data_compressor); // 4. prior to compression, coalesce up to a minimum size @@ -257,13 +306,6 @@ impl WriteStrategyBuilder { }, ); - // 2.1. | 3.1. compress stats tables and dict values. - let stats_compressor: Arc = match self.compressor { - CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()), - CompressorConfig::Opaque(compressor) => compressor, - }; - let compress_then_flat = CompressingStrategy::new(flat, stats_compressor); - // 3. apply dict encoding or fallback let dict = DictStrategy::new( coalescing.clone(), @@ -272,9 +314,16 @@ impl WriteStrategyBuilder { Default::default(), ); + // 2.5. wrap dict in the array-tree collector if outlining is enabled. + let data_pipeline: Arc = if let Some(collector) = array_tree_collector { + Arc::new(collector.wrap(dict)) + } else { + Arc::new(dict) + }; + // 2. calculate stats for each row group let stats = ZonedStrategy::new( - dict, + data_pipeline, compress_then_flat.clone(), ZonedLayoutOptions { block_size: self.row_block_size, diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 2ba10d96684..f4f9a504b13 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1953,3 +1953,233 @@ async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> { Ok(()) } + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_segment_ordering_array_trees_consolidated_and_after_data() -> VortexResult<()> { + // Multi-column struct large enough to produce chunked data, so each column will have + // many ArrayTreeFlat leaves. The collector should consolidate their compact trees into a + // single segment per ArrayTreeLayout. + let n = 100_000; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + let numbers = PrimitiveArray::from_iter(0..n as i32).into_array(); + let floats = PrimitiveArray::from_iter((0..n).map(|i| i as f64 * 0.1)).into_array(); + + let st = StructArray::from_fields(&[ + ("strings", strings), + ("numbers", numbers), + ("floats", floats), + ]) + .unwrap(); + + let mut buf = ByteBufferMut::empty(); + let strategy = crate::WriteStrategyBuilder::default() + .with_array_tree(true) + .build(); + let summary = SESSION + .write_options() + .with_strategy(strategy) + .write(&mut buf, st.into_array().to_array_stream()) + .await?; + + let footer = summary.footer(); + let segment_specs = footer.segment_map(); + let root = footer.layout(); + + // For each ArrayTreeLayout in the tree, assert: + // 1. **Consolidation:** the auxiliary `array_trees` child writes exactly one segment. + // 2. **Per-column ordering:** every data segment under child 0 appears before the + // array_trees segment under child 1. + fn check_array_tree_layouts( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + found_any: &mut bool, + ) { + if layout.encoding_id().as_ref() == "vortex.array_tree" { + *found_any = true; + + let data_child = layout.child(0).unwrap(); + let array_trees_child = layout.child(1).unwrap(); + + let data_offsets = collect_segment_offsets(data_child.as_ref(), segment_specs); + let array_trees_offsets = + collect_segment_offsets(array_trees_child.as_ref(), segment_specs); + + assert_eq!( + array_trees_offsets.len(), + 1, + "array_tree: auxiliary child must consolidate to exactly 1 segment, got {} segments at offsets {:?}", + array_trees_offsets.len(), + array_trees_offsets, + ); + + assert!( + !data_offsets.is_empty(), + "array_tree: data child must have at least one segment" + ); + + assert_offsets_ordered( + &data_offsets, + &array_trees_offsets, + "array_tree: all data segments should come before the array_trees segment", + ); + } + + for child in layout.children().unwrap() { + check_array_tree_layouts(child.as_ref(), segment_specs, found_any); + } + } + + let mut found_any = false; + check_array_tree_layouts(root.as_ref(), segment_specs, &mut found_any); + assert!( + found_any, + "test setup expected the default write strategy to produce at least one ArrayTreeLayout" + ); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_segment_ordering_array_trees_before_zones() -> VortexResult<()> { + // The write strategy wraps every column in `ZonedStrategy { data: ArrayTree, zones }`. + // Assert per-Zoned-layout that the array_trees segment (inside the data child) appears + // before every zone-map segment in the same column. + let n = 100_000; + let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(values).into_array(); + let numbers = PrimitiveArray::from_iter(0..n as i32).into_array(); + let floats = PrimitiveArray::from_iter((0..n).map(|i| i as f64 * 0.1)).into_array(); + + let st = StructArray::from_fields(&[ + ("strings", strings), + ("numbers", numbers), + ("floats", floats), + ]) + .unwrap(); + + let mut buf = ByteBufferMut::empty(); + let strategy = crate::WriteStrategyBuilder::default() + .with_array_tree(true) + .build(); + let summary = SESSION + .write_options() + .with_strategy(strategy) + .write(&mut buf, st.into_array().to_array_stream()) + .await?; + + let footer = summary.footer(); + let segment_specs = footer.segment_map(); + let root = footer.layout(); + + fn check_zoned_with_array_tree( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + found_any: &mut bool, + ) { + if layout.encoding_id().as_ref() == "vortex.stats" { + let data_child = layout.child(0).unwrap(); + let zones_child = layout.child(1).unwrap(); + + if data_child.encoding_id().as_ref() == "vortex.array_tree" { + *found_any = true; + let array_trees_offsets = + collect_segment_offsets(data_child.child(1).unwrap().as_ref(), segment_specs); + let zones_offsets = collect_segment_offsets(zones_child.as_ref(), segment_specs); + + assert_offsets_ordered( + &array_trees_offsets, + &zones_offsets, + "zoned wrapping array_tree: the array_trees segment should come before zone-map segments", + ); + } + } + + for child in layout.children().unwrap() { + check_zoned_with_array_tree(child.as_ref(), segment_specs, found_any); + } + } + + let mut found_any = false; + check_zoned_with_array_tree(root.as_ref(), segment_specs, &mut found_any); + assert!( + found_any, + "test setup expected the default write strategy to produce at least one Zoned wrapping an ArrayTree" + ); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_roundtrip_array_tree_layout() -> VortexResult<()> { + // End-to-end coverage: write with `with_array_tree(true)`, then read back through the + // source-publishing reader-ctx path and assert the data matches. Exercises: + // - ArrayTreeCollectorStrategy collecting compact trees from leaf transient state + // - ArrayTreeLayout::new_reader publishing the ArrayTreesSource into the reader ctx + // - ArrayTreeFlatReader pulling the source and resolving its tree by segment_id + // - ColumnarSerializedArray::from_segment_and_tree decoding the data segment + let mut ctx = SESSION.create_execution_ctx(); + + let n = 10_000; + let strings_in: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect(); + let strings = VarBinArray::from(strings_in.clone()).into_array(); + let numbers_in: Vec = (0..n as i32).collect(); + let numbers = PrimitiveArray::from_iter(numbers_in.iter().copied()).into_array(); + + let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)])?.into_array(); + let dtype = st.dtype().clone(); + + let mut buf = ByteBufferMut::empty(); + let strategy = crate::WriteStrategyBuilder::default() + .with_array_tree(true) + .build(); + SESSION + .write_options() + .with_strategy(strategy) + .write(&mut buf, st.to_array_stream()) + .await?; + + // Sanity-check we actually wrote ArrayTreeLayout nodes — otherwise the test would + // silently pass on the default code path. + let file = SESSION.open_options().open_buffer(buf)?; + fn has_array_tree(layout: &dyn Layout) -> bool { + if layout.encoding_id().as_ref() == "vortex.array_tree" { + return true; + } + layout + .children() + .map(|cs| cs.iter().any(|c| has_array_tree(c.as_ref()))) + .unwrap_or(false) + } + assert!( + has_array_tree(file.footer().layout().as_ref()), + "test expected ArrayTreeLayout in the written file" + ); + + let result = file.scan()?.into_array_stream()?.read_all().await?; + assert_eq!(result.len(), n); + assert_eq!(result.dtype(), &dtype); + + let struct_array = result.execute::(&mut ctx)?; + + let read_numbers = struct_array.unmasked_field_by_name("numbers").cloned()?; + let expected_numbers = PrimitiveArray::from_iter(numbers_in.iter().copied()).into_array(); + assert_arrays_eq!(read_numbers, expected_numbers); + + let read_strings = struct_array + .unmasked_field_by_name("strings") + .cloned()? + .execute::(&mut ctx)? + .with_iterator(|iter| { + iter.map(|s| s.map(|st| unsafe { String::from_utf8_unchecked(st.to_vec()) })) + .collect::>() + }); + let expected_strings: Vec> = + strings_in.iter().map(|s| Some((*s).to_string())).collect(); + assert_eq!(read_strings, expected_strings); + + Ok(()) +} diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index c794e93aa41..df74ef6cedb 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -20,6 +20,234 @@ pub fn vortex_layout::display::DisplayLayoutTree::fmt(&self, &mut core::fmt::For pub mod vortex_layout::layouts +pub mod vortex_layout::layouts::array_tree + +pub mod vortex_layout::layouts::array_tree::writer + +pub struct vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy + +impl vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy::wrap(self, impl vortex_layout::LayoutStrategy) -> Self + +impl vortex_layout::LayoutStrategy for vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy::buffered_bytes(&self) -> u64 + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy::write_stream<'life0, 'life1, 'async_trait>(&'life0 self, vortex_array::ArrayContext, vortex_layout::segments::SegmentSinkRef, vortex_layout::sequence::SendableSequentialStream, vortex_layout::sequence::SequencePointer, &'life1 vortex_session::VortexSession) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +pub struct vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy + +impl core::clone::Clone for vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy::clone(&self) -> vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy + +impl vortex_layout::LayoutStrategy for vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy::buffered_bytes(&self) -> u64 + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy::write_stream<'life0, 'life1, 'async_trait>(&'life0 self, vortex_array::ArrayContext, vortex_layout::segments::SegmentSinkRef, vortex_layout::sequence::SendableSequentialStream, vortex_layout::sequence::SequencePointer, &'life1 vortex_session::VortexSession) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +pub fn vortex_layout::layouts::array_tree::writer::writer(vortex_layout::layouts::flat::writer::FlatLayoutStrategy, alloc::sync::Arc) -> (vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy, vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy) + +pub struct vortex_layout::layouts::array_tree::ArrayTree + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTree + +pub fn vortex_layout::layouts::array_tree::ArrayTree::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTree + +pub type vortex_layout::layouts::array_tree::ArrayTree::Encoding = vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTree::Layout = vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub type vortex_layout::layouts::array_tree::ArrayTree::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTree::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTree::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTree::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTree::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTree::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + +pub struct vortex_layout::layouts::array_tree::ArrayTreeFlat + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreeFlat + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTreeFlat + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Encoding = vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Layout = vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + +pub struct vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +impl vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::inner(&self) -> &vortex_layout::layouts::flat::FlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::new(vortex_layout::layouts::flat::FlatLayout) -> Self + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::take_tree(&self) -> core::option::Option + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::with_tree(vortex_layout::layouts::flat::FlatLayout, vortex_array::serde::columnar::ColumnarArrayTree) -> Self + +impl core::clone::Clone for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::clone(&self) -> vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +impl core::convert::AsRef for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::as_ref(&self) -> &dyn vortex_layout::Layout + +impl core::convert::From for vortex_layout::LayoutRef + +pub fn vortex_layout::LayoutRef::from(vortex_layout::layouts::array_tree::ArrayTreeFlatLayout) -> vortex_layout::LayoutRef + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::Target = dyn vortex_layout::Layout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::deref(&self) -> &Self::Target + +impl vortex_layout::IntoLayout for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::into_layout(self) -> vortex_layout::LayoutRef + +pub struct vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +impl core::convert::AsRef for vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding::as_ref(&self) -> &dyn vortex_layout::LayoutEncoding + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding::Target = dyn vortex_layout::LayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding::deref(&self) -> &Self::Target + +pub struct vortex_layout::layouts::array_tree::ArrayTreeLayout + +impl vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::array_trees_dtype() -> vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::derive_reader_ctx(&self, &str, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::new(vortex_layout::LayoutRef, vortex_layout::LayoutRef) -> Self + +impl core::clone::Clone for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::clone(&self) -> vortex_layout::layouts::array_tree::ArrayTreeLayout + +impl core::convert::AsRef for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::as_ref(&self) -> &dyn vortex_layout::Layout + +impl core::convert::From for vortex_layout::LayoutRef + +pub fn vortex_layout::LayoutRef::from(vortex_layout::layouts::array_tree::ArrayTreeLayout) -> vortex_layout::LayoutRef + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub type vortex_layout::layouts::array_tree::ArrayTreeLayout::Target = dyn vortex_layout::Layout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::deref(&self) -> &Self::Target + +impl vortex_layout::IntoLayout for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::into_layout(self) -> vortex_layout::LayoutRef + +pub struct vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +impl core::convert::AsRef for vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding::as_ref(&self) -> &dyn vortex_layout::LayoutEncoding + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::ops::deref::Deref for vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding::Target = dyn vortex_layout::LayoutEncoding + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding::deref(&self) -> &Self::Target + +pub struct vortex_layout::layouts::array_tree::ArrayTreesSource + +impl vortex_layout::layouts::array_tree::ArrayTreesSource + +pub fn vortex_layout::layouts::array_tree::ArrayTreesSource::get_for_segment(&self, vortex_layout::segments::SegmentId) -> vortex_layout::layouts::array_tree::SharedSegmentTreeFuture + +pub fn vortex_layout::layouts::array_tree::ArrayTreesSource::new(vortex_layout::LayoutReaderRef, vortex_session::VortexSession) -> Self + +impl core::fmt::Debug for vortex_layout::layouts::array_tree::ArrayTreesSource + +pub fn vortex_layout::layouts::array_tree::ArrayTreesSource::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub static vortex_layout::layouts::array_tree::ARRAY_TREES_SOURCE_ID: std::sync::lazy_lock::LazyLock + +pub type vortex_layout::layouts::array_tree::SharedSegmentTreeFuture = futures_util::future::future::shared::Shared>>> + pub mod vortex_layout::layouts::buffered pub struct vortex_layout::layouts::buffered::BufferedStrategy @@ -1464,6 +1692,70 @@ pub fn vortex_layout::vtable::VTable::segment_ids(&Self::Layout) -> alloc::vec:: pub fn vortex_layout::vtable::VTable::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTree + +pub type vortex_layout::layouts::array_tree::ArrayTree::Encoding = vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTree::Layout = vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub type vortex_layout::layouts::array_tree::ArrayTree::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTree::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTree::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTree::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTree::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTree::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTreeFlat + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Encoding = vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Layout = vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + impl vortex_layout::VTable for vortex_layout::layouts::chunked::Chunked pub type vortex_layout::layouts::chunked::Chunked::Encoding = vortex_layout::layouts::chunked::ChunkedLayoutEncoding @@ -1782,6 +2074,14 @@ pub trait vortex_layout::IntoLayout pub fn vortex_layout::IntoLayout::into_layout(self) -> vortex_layout::LayoutRef +impl vortex_layout::IntoLayout for vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlatLayout::into_layout(self) -> vortex_layout::LayoutRef + +impl vortex_layout::IntoLayout for vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub fn vortex_layout::layouts::array_tree::ArrayTreeLayout::into_layout(self) -> vortex_layout::LayoutRef + impl vortex_layout::IntoLayout for vortex_layout::layouts::chunked::ChunkedLayout pub fn vortex_layout::layouts::chunked::ChunkedLayout::into_layout(self) -> vortex_layout::LayoutRef @@ -1938,6 +2238,18 @@ pub fn alloc::sync::Arc::buffered_bytes(&self pub fn alloc::sync::Arc::write_stream<'life0, 'life1, 'async_trait>(&'life0 self, vortex_array::ArrayContext, vortex_layout::segments::SegmentSinkRef, vortex_layout::sequence::SendableSequentialStream, vortex_layout::sequence::SequencePointer, &'life1 vortex_session::VortexSession) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait +impl vortex_layout::LayoutStrategy for vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy::buffered_bytes(&self) -> u64 + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeCollectorStrategy::write_stream<'life0, 'life1, 'async_trait>(&'life0 self, vortex_array::ArrayContext, vortex_layout::segments::SegmentSinkRef, vortex_layout::sequence::SendableSequentialStream, vortex_layout::sequence::SequencePointer, &'life1 vortex_session::VortexSession) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + +impl vortex_layout::LayoutStrategy for vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy::buffered_bytes(&self) -> u64 + +pub fn vortex_layout::layouts::array_tree::writer::ArrayTreeFlatStrategy::write_stream<'life0, 'life1, 'async_trait>(&'life0 self, vortex_array::ArrayContext, vortex_layout::segments::SegmentSinkRef, vortex_layout::sequence::SendableSequentialStream, vortex_layout::sequence::SequencePointer, &'life1 vortex_session::VortexSession) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait + impl vortex_layout::LayoutStrategy for vortex_layout::layouts::buffered::BufferedStrategy pub fn vortex_layout::layouts::buffered::BufferedStrategy::buffered_bytes(&self) -> u64 @@ -2024,6 +2336,70 @@ pub fn vortex_layout::VTable::segment_ids(&Self::Layout) -> alloc::vec::Vec) -> vortex_error::VortexResult<()> +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTree + +pub type vortex_layout::layouts::array_tree::ArrayTree::Encoding = vortex_layout::layouts::array_tree::ArrayTreeLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTree::Layout = vortex_layout::layouts::array_tree::ArrayTreeLayout + +pub type vortex_layout::layouts::array_tree::ArrayTree::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTree::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTree::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTree::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTree::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTree::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTree::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTree::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTree::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + +impl vortex_layout::VTable for vortex_layout::layouts::array_tree::ArrayTreeFlat + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Encoding = vortex_layout::layouts::array_tree::ArrayTreeFlatLayoutEncoding + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Layout = vortex_layout::layouts::array_tree::ArrayTreeFlatLayout + +pub type vortex_layout::layouts::array_tree::ArrayTreeFlat::Metadata = vortex_array::metadata::EmptyMetadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::build(&Self::Encoding, &vortex_array::dtype::DType, u64, &vortex_array::metadata::EmptyMetadata, alloc::vec::Vec, &dyn vortex_layout::LayoutChildren, &vortex_session::registry::ReadContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child(&Self::Layout, usize) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::child_type(&Self::Layout, usize) -> vortex_layout::LayoutChildType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::dtype(&Self::Layout) -> &vortex_array::dtype::DType + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::encoding(&Self::Layout) -> vortex_layout::LayoutEncodingRef + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::id(&Self::Encoding) -> vortex_layout::LayoutId + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::metadata(&Self::Layout) -> Self::Metadata + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::nchildren(&Self::Layout) -> usize + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::new_reader(&Self::Layout, alloc::sync::Arc, alloc::sync::Arc, &vortex_session::VortexSession, &vortex_layout::LayoutReaderContext) -> vortex_error::VortexResult + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::row_count(&Self::Layout) -> u64 + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::segment_ids(&Self::Layout) -> alloc::vec::Vec + +pub fn vortex_layout::layouts::array_tree::ArrayTreeFlat::with_children(&mut Self::Layout, alloc::vec::Vec) -> vortex_error::VortexResult<()> + impl vortex_layout::VTable for vortex_layout::layouts::chunked::Chunked pub type vortex_layout::layouts::chunked::Chunked::Encoding = vortex_layout::layouts::chunked::ChunkedLayoutEncoding diff --git a/vortex-layout/src/layouts/array_tree/flat.rs b/vortex-layout/src/layouts/array_tree/flat.rs new file mode 100644 index 00000000000..33e4823c483 --- /dev/null +++ b/vortex-layout/src/layouts/array_tree/flat.rs @@ -0,0 +1,173 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use parking_lot::Mutex; +use vortex_array::EmptyMetadata; +use vortex_array::dtype::DType; +use vortex_array::serde::ColumnarArrayTree; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::ReadContext; + +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::layouts::array_tree::ARRAY_TREES_SOURCE_ID; +use crate::layouts::array_tree::ArrayTreesSource; +use crate::layouts::array_tree::reader::ArrayTreeFlatReader; +use crate::layouts::flat::FlatLayout; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(ArrayTreeFlat); + +/// Encoding marker for [`ArrayTreeFlatLayout`]. +#[derive(Debug)] +pub struct ArrayTreeFlatLayoutEncoding; + +/// Flat-layout variant that retrieves its encoding tree from a sibling consolidated array +/// rather than from a per-segment trailing flatbuffer. +/// +/// At write time, the leaf strategy attaches the chunk's [`ColumnarArrayTree`] as transient +/// state (consumed by the collector strategy in its post-write subtree walk). +/// +/// At read time, the reader pulls a shared [`ArrayTreesSource`] from the +/// [`LayoutReaderContext`] and resolves its tree by segment id. The source must be +/// published by an ancestor [`super::ArrayTreeLayout`]; constructing a reader without one +/// fails with a clear error. +#[derive(Clone, Debug)] +pub struct ArrayTreeFlatLayout { + inner: FlatLayout, + /// Transient write-time state: the leaf strategy attaches its [`ColumnarArrayTree`] for + /// the collector to pluck via [`Self::take_tree`]. `Mutex>` so the collector + /// can take ownership cheaply during its post-write walk. Read-path construction (via + /// `build`) leaves this `None`; the field is never serialized to disk. + tree: Arc>>, +} + +impl ArrayTreeFlatLayout { + /// Creates a new layout from the inner flat layout without any attached tree. + pub fn new(inner: FlatLayout) -> Self { + Self { + inner, + tree: Arc::new(Mutex::new(None)), + } + } + + /// Creates a new layout with an attached transient [`ColumnarArrayTree`]. Used only by + /// the array-tree writer; the tree is consumed by the collector and never serialized. + pub fn with_tree(inner: FlatLayout, tree: ColumnarArrayTree) -> Self { + Self { + inner, + tree: Arc::new(Mutex::new(Some(tree))), + } + } + + /// Returns the inner flat layout. + pub fn inner(&self) -> &FlatLayout { + &self.inner + } + + /// Take ownership of any attached transient tree, leaving `None` behind. + pub fn take_tree(&self) -> Option { + self.tree.lock().take() + } +} + +impl VTable for ArrayTreeFlat { + type Layout = ArrayTreeFlatLayout; + type Encoding = ArrayTreeFlatLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_static("vortex.array_tree_flat") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ArrayTreeFlatLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.inner.row_count() + } + + fn dtype(layout: &Self::Layout) -> &DType { + layout.inner.dtype() + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(layout: &Self::Layout) -> Vec { + vec![layout.inner.segment_id()] + } + + fn nchildren(_layout: &Self::Layout) -> usize { + 0 + } + + fn child(_layout: &Self::Layout, idx: usize) -> VortexResult { + vortex_bail!("ArrayTreeFlatLayout has no children, got index {}", idx) + } + + fn child_type(_layout: &Self::Layout, idx: usize) -> LayoutChildType { + vortex_panic!("ArrayTreeFlatLayout has no children, got index {}", idx) + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + let source = ctx + .get::(*ARRAY_TREES_SOURCE_ID) + .ok_or_else(|| { + vortex_error::vortex_err!( + "ArrayTreeFlatLayout requires an ancestor ArrayTreeLayout to publish an \ + ArrayTreesSource into the reader context; call \ + ArrayTreeLayout::derive_reader_ctx on each ArrayTreeLayout ancestor before \ + constructing a reader for this layout" + ) + })?; + Ok(Arc::new(ArrayTreeFlatReader::new( + layout.clone(), + name, + segment_source, + session.clone(), + source, + ))) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &EmptyMetadata, + segment_ids: Vec, + _children: &dyn LayoutChildren, + ctx: &ReadContext, + ) -> VortexResult { + if segment_ids.len() != 1 { + vortex_bail!("ArrayTreeFlatLayout must have exactly one segment ID"); + } + Ok(ArrayTreeFlatLayout::new(FlatLayout::new( + row_count, + dtype.clone(), + segment_ids[0], + ctx.clone(), + ))) + } +} diff --git a/vortex-layout/src/layouts/array_tree/mod.rs b/vortex-layout/src/layouts/array_tree/mod.rs new file mode 100644 index 00000000000..f651ab7a70b --- /dev/null +++ b/vortex-layout/src/layouts/array_tree/mod.rs @@ -0,0 +1,512 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Array tree layout: stores per-chunk encoding trees as one consolidated columnar struct +//! array (one row per data segment), alongside the data layout itself. The leaves write only +//! data buffers — their encoding-tree metadata (plus per-node stats and buffer descriptors) +//! lives in the auxiliary `array_trees` child, which is BtrBlocks-compressed end-to-end. At +//! read time, the source published by [`ArrayTreeLayout::new_reader`] resolves a segment id +//! to its [`ColumnarArrayTree`] in one lookup, then [`ArrayTreeFlatReader`] pairs it with +//! the fetched data segment for decode. + +mod flat; +mod reader; +pub mod writer; + +use std::sync::Arc; +use std::sync::LazyLock; +use std::sync::OnceLock; + +use futures::FutureExt; +use vortex_array::EmptyMetadata; +use vortex_array::Executable; +use vortex_array::MaskFuture; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ListViewArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::Struct; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::VarBinView; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::list::ListArrayExt; +use vortex_array::arrays::listview::list_from_list_view; +use vortex_array::arrays::struct_::StructArrayExt; +use vortex_array::dtype::DType; +use vortex_array::dtype::FieldName; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::dtype::StructFields; +use vortex_array::expr::root; +use vortex_array::serde::BUFFER_COLUMNS_DTYPE; +use vortex_array::serde::ColumnarArrayTree; +use vortex_array::serde::NODES_COLUMNS_DTYPE; +use vortex_array::serde::StatsColumns; +use vortex_error::SharedVortexResult; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::Id; +use vortex_session::registry::ReadContext; +use vortex_utils::aliases::hash_map::HashMap; + +pub use self::flat::ArrayTreeFlat; +pub use self::flat::ArrayTreeFlatLayout; +pub use self::flat::ArrayTreeFlatLayoutEncoding; +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::children::OwnedLayoutChildren; +use crate::layouts::array_tree::reader::ArrayTreeReader; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(ArrayTree); + +/// Well-known [`LayoutReaderContext`] key under which [`ArrayTreeLayout::derive_reader_ctx`] +/// publishes its [`ArrayTreesSource`]. +/// +/// Both the publisher (parent [`ArrayTreeLayout`]) and the consumer +/// ([`ArrayTreeFlatLayout`]'s `new_reader`) hardcode this id, so no metadata persistence is +/// needed to bind them. Two stacked `ArrayTreeLayouts` both publish under this id; the +/// inner one overrides the outer in the descendant's view — exactly the "nearest ancestor +/// wins" semantic each `ArrayTreeFlat` leaf wants. +pub static ARRAY_TREES_SOURCE_ID: LazyLock = + LazyLock::new(|| Id::new_static("vortex.array_tree.source")); + +/// Encoding marker for [`ArrayTreeLayout`]. +#[derive(Debug)] +pub struct ArrayTreeLayoutEncoding; + +/// Stores per-chunk [`ColumnarArrayTree`]s as a consolidated columnar struct array, sharing +/// schema (and compression) across all chunks. +/// +/// # Children +/// +/// - Child 0 (`Transparent "data"`): The actual data layout tree, with [`ArrayTreeFlatLayout`] +/// at the leaves. +/// - Child 1 (`Auxiliary "array_trees"`): A struct array (`{segment_id, nodes, buffers}`) with +/// one row per data leaf; see [`Self::array_trees_dtype`] for the schema. +#[derive(Clone, Debug)] +pub struct ArrayTreeLayout { + dtype: DType, + children: Arc, +} + +impl ArrayTreeLayout { + /// Creates a new `ArrayTreeLayout` from the data and array_trees children. + pub fn new(data: LayoutRef, array_trees: LayoutRef) -> Self { + Self { + dtype: data.dtype().clone(), + children: OwnedLayoutChildren::layout_children(vec![data, array_trees]), + } + } + + /// Returns the dtype of the auxiliary `array_trees` child. + /// + /// The consolidated form is a struct array with one row per data-segment chunk: + /// + /// ```text + /// Struct { + /// segment_id: u32, + /// nodes: List, // one element per ArrayNode in the chunk's tree + /// buffers: List, // one element per data buffer for the chunk + /// } + /// ``` + /// + /// The List<> element types are exactly the canonical per-tree schemas defined by + /// [`vortex_array::serde::NODES_COLUMNS_DTYPE`] / [`vortex_array::serde::BUFFER_COLUMNS_DTYPE`], + /// so slicing one row's nodes (resp. buffers) yields a [`StructArray`] matching the inner + /// shape of a [`ColumnarArrayTree`]. + pub fn array_trees_dtype() -> DType { + let nn = Nullability::NonNullable; + DType::Struct( + StructFields::new( + vec![ + FieldName::from("segment_id"), + FieldName::from("nodes"), + FieldName::from("buffers"), + ] + .into(), + vec![ + DType::Primitive(PType::U32, nn), + DType::List(Arc::new(NODES_COLUMNS_DTYPE.clone()), nn), + DType::List(Arc::new(BUFFER_COLUMNS_DTYPE.clone()), nn), + ], + ), + nn, + ) + } + + /// Derive a [`LayoutReaderContext`] that publishes an [`ArrayTreesSource`] backed by this + /// layout's auxiliary `array_trees` child under [`ARRAY_TREES_SOURCE_ID`]. Descendant + /// [`ArrayTreeFlatLayout`] readers pull the source by the same id to resolve their + /// compact trees. + /// + /// Used by: + /// - The normal [`crate::VTable::new_reader`] dispatch on `ArrayTreeLayout` (production path). + /// - Tools that construct readers at arbitrary points in the layout tree (explorers, + /// debuggers): walk from the root to the target node, calling this method for each + /// `ArrayTreeLayout` ancestor on the path so the accumulated ctx carries the right + /// source when the leaf is finally constructed. + pub fn derive_reader_ctx( + &self, + name: &str, + segment_source: Arc, + session: &VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + let array_trees_child = self.children.child(1, &Self::array_trees_dtype())?; + let trees_reader = array_trees_child.new_reader( + Arc::from(format!("{name}/array_trees")), + segment_source, + session, + ctx, + )?; + let source = Arc::new(ArrayTreesSource::new(trees_reader, session.clone())); + Ok(ctx.with(*ARRAY_TREES_SOURCE_ID, source)) + } +} + +impl VTable for ArrayTree { + type Layout = ArrayTreeLayout; + type Encoding = ArrayTreeLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_static("vortex.array_tree") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ArrayTreeLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.children.child_row_count(0) + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(_layout: &Self::Layout) -> usize { + 2 + } + + fn child(layout: &Self::Layout, idx: usize) -> VortexResult { + match idx { + 0 => layout.children.child(0, &layout.dtype), + 1 => layout.children.child(1, &Self::Layout::array_trees_dtype()), + _ => vortex_bail!("ArrayTreeLayout has 2 children, got index {}", idx), + } + } + + fn child_type(_layout: &Self::Layout, idx: usize) -> LayoutChildType { + match idx { + 0 => LayoutChildType::Transparent("data".into()), + 1 => LayoutChildType::Auxiliary("array_trees".into()), + _ => vortex_panic!("ArrayTreeLayout has 2 children, got index {}", idx), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + let derived_ctx = + layout.derive_reader_ctx(&name, Arc::clone(&segment_source), session, ctx)?; + let data_child = Self::child(layout, 0)?; + let data_reader = + data_child.new_reader(Arc::clone(&name), segment_source, session, &derived_ctx)?; + Ok(Arc::new(ArrayTreeReader::new(name, data_reader))) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + _row_count: u64, + _metadata: &EmptyMetadata, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: &ReadContext, + ) -> VortexResult { + Ok(ArrayTreeLayout { + dtype: dtype.clone(), + children: children.to_arc(), + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + if children.len() != 2 { + vortex_bail!( + "ArrayTreeLayout expects 2 children (data, array_trees), got {}", + children.len() + ); + } + layout.children = OwnedLayoutChildren::layout_children(children); + Ok(()) + } +} + +/// Shared source for per-segment [`ColumnarArrayTree`]s. Holds a reader for the auxiliary +/// `array_trees` child; on first lookup materializes the consolidated array and builds a +/// `HashMap>`, then serves all subsequent lookups from +/// the cached map. +/// +/// Published by [`ArrayTreeLayout::derive_reader_ctx`] into the [`LayoutReaderContext`] +/// passed to descendants under [`ARRAY_TREES_SOURCE_ID`]; pulled by +/// [`ArrayTreeFlatLayout`]'s reader by the same id. +pub struct ArrayTreesSource { + reader: LayoutReaderRef, + /// Session used to create execution contexts when canonicalizing the consolidated array + /// (its fields may be compressed, depending on the writer's `array_trees_strategy`). + session: VortexSession, + /// Lazily initialized shared future for the segment-keyed lookup map. + map: OnceLock, +} + +type SharedSegmentMapFuture = futures::future::Shared< + futures::future::BoxFuture< + 'static, + SharedVortexResult>>>, + >, +>; + +/// Future returned by [`ArrayTreesSource::get_for_segment`]. +pub type SharedSegmentTreeFuture = futures::future::Shared< + futures::future::BoxFuture<'static, SharedVortexResult>>, +>; + +impl std::fmt::Debug for ArrayTreesSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrayTreesSource").finish_non_exhaustive() + } +} + +impl ArrayTreesSource { + /// Creates a new source backed by the given array_trees reader and session. + pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self { + Self { + reader, + session, + map: OnceLock::new(), + } + } + + /// Future resolving to the per-chunk [`ColumnarArrayTree`] for the given data-leaf segment. + /// + /// First call triggers materialization of the entire consolidated struct + lookup map; + /// subsequent calls reuse the cached map. + pub fn get_for_segment(&self, segment_id: SegmentId) -> SharedSegmentTreeFuture { + let map_fut = self.map_future(); + async move { + let map = map_fut.await?; + map.get(&segment_id).cloned().ok_or_else(|| { + Arc::new(vortex_err!( + "no columnar array tree found for segment id {}", + *segment_id + )) + }) + } + .boxed() + .shared() + } + + fn map_future(&self) -> SharedSegmentMapFuture { + self.map + .get_or_init(|| { + let row_count = self.reader.row_count(); + let reader = Arc::clone(&self.reader); + let session = self.session.clone(); + async move { + let array = reader + .projection_evaluation( + &(0..row_count), + &root(), + MaskFuture::new_true( + usize::try_from(row_count) + .vortex_expect("row count must fit in usize"), + ), + ) + .map_err(Arc::new)? + .await + .map_err(Arc::new)?; + let mut ctx = session.create_execution_ctx(); + build_segment_map(array, &mut ctx) + .map(Arc::new) + .map_err(Arc::new) + } + .boxed() + .shared() + }) + .clone() + } +} + +/// Decode the consolidated `array_trees` struct array into a per-segment lookup of +/// [`ColumnarArrayTree`]. +/// +/// Canonicalizes each field once (it may be in a compressed encoding), then slices the +/// per-row List<> ranges into typed columns and feeds them to +/// [`ColumnarArrayTree::try_new`]. +fn build_segment_map( + array: vortex_array::ArrayRef, + ctx: &mut vortex_array::ExecutionCtx, +) -> VortexResult>> { + let outer = StructArray::execute(array, ctx)?; + + let segment_ids = PrimitiveArray::execute(field_clone(&outer, "segment_id")?, ctx)?; + let segment_ids = segment_ids.as_slice::(); + + let nodes_list = + list_from_list_view(field_clone(&outer, "nodes")?.execute::(ctx)?)?; + let nodes_inner = StructArray::execute(nodes_list.elements().clone(), ctx)?; + let nodes_cols = NodesColumns::extract(&nodes_inner, ctx)?; + + let buffers_list = + list_from_list_view(field_clone(&outer, "buffers")?.execute::(ctx)?)?; + let buffers_inner = StructArray::execute(buffers_list.elements().clone(), ctx)?; + let buffers_cols = BuffersColumns::extract(&buffers_inner, ctx)?; + + let mut map = HashMap::with_capacity(segment_ids.len()); + for (row, &seg) in segment_ids.iter().enumerate() { + let n_start = nodes_list.offset_at(row)?; + let n_end = nodes_list.offset_at(row + 1)?; + let b_start = buffers_list.offset_at(row)?; + let b_end = buffers_list.offset_at(row + 1)?; + + let tree = nodes_cols.tree_at(&buffers_cols, n_start..n_end, b_start..b_end)?; + map.insert(SegmentId::from(seg), Arc::new(tree)); + } + Ok(map) +} + +fn field_clone(s: &StructArray, name: &str) -> VortexResult { + Ok(s.unmasked_field_by_name_opt(name) + .ok_or_else(|| vortex_err!("array_trees struct missing field '{}'", name))? + .clone()) +} + +/// Canonical typed handles into the nodes inner struct, plus a `tree_at` row-slice helper. +struct NodesColumns { + encoding_ids: PrimitiveArray, + child_counts: PrimitiveArray, + metadata: VarBinViewArray, + buffers_per_node: PrimitiveArray, + subtree_sizes: PrimitiveArray, + buffer_offsets: PrimitiveArray, + stats: StructArray, +} + +impl NodesColumns { + fn extract(inner: &StructArray, ctx: &mut vortex_array::ExecutionCtx) -> VortexResult { + Ok(Self { + encoding_ids: PrimitiveArray::execute(field_clone(inner, "encoding_id")?, ctx)?, + child_counts: PrimitiveArray::execute(field_clone(inner, "child_count")?, ctx)?, + metadata: VarBinViewArray::execute(field_clone(inner, "metadata")?, ctx)?, + buffers_per_node: PrimitiveArray::execute( + field_clone(inner, "buffers_per_node")?, + ctx, + )?, + subtree_sizes: PrimitiveArray::execute(field_clone(inner, "subtree_size")?, ctx)?, + buffer_offsets: PrimitiveArray::execute(field_clone(inner, "buffer_offset")?, ctx)?, + stats: StructArray::execute(field_clone(inner, "stats")?, ctx)?, + }) + } + + fn tree_at( + &self, + buffers: &BuffersColumns, + node_range: std::ops::Range, + buffer_range: std::ops::Range, + ) -> VortexResult { + let n_len = node_range.end - node_range.start; + let b_len = buffer_range.end - buffer_range.start; + ColumnarArrayTree::try_new( + slice_primitive(&self.encoding_ids, node_range.clone(), n_len)?, + slice_primitive(&self.child_counts, node_range.clone(), n_len)?, + slice_varbinview(&self.metadata, node_range.clone(), n_len)?, + slice_primitive(&self.buffers_per_node, node_range.clone(), n_len)?, + slice_primitive(&self.subtree_sizes, node_range.clone(), n_len)?, + slice_primitive(&self.buffer_offsets, node_range.clone(), n_len)?, + StatsColumns::new(slice_struct(&self.stats, node_range, n_len)?)?, + slice_primitive(&buffers.padding, buffer_range.clone(), b_len)?, + slice_primitive(&buffers.alignment_exponent, buffer_range.clone(), b_len)?, + slice_primitive(&buffers.length, buffer_range, b_len)?, + ) + } +} + +struct BuffersColumns { + padding: PrimitiveArray, + alignment_exponent: PrimitiveArray, + length: PrimitiveArray, +} + +impl BuffersColumns { + fn extract(inner: &StructArray, ctx: &mut vortex_array::ExecutionCtx) -> VortexResult { + Ok(Self { + padding: PrimitiveArray::execute(field_clone(inner, "padding")?, ctx)?, + alignment_exponent: PrimitiveArray::execute( + field_clone(inner, "alignment_exponent")?, + ctx, + )?, + length: PrimitiveArray::execute(field_clone(inner, "length")?, ctx)?, + }) + } +} + +fn slice_primitive( + arr: &PrimitiveArray, + range: std::ops::Range, + expected_len: usize, +) -> VortexResult { + let sliced = arr.as_ref().slice(range)?; + debug_assert_eq!(sliced.len(), expected_len); + sliced + .try_downcast::() + .map_err(|_| vortex_err!("sliced array_trees field is not a canonical PrimitiveArray")) +} + +fn slice_varbinview( + arr: &VarBinViewArray, + range: std::ops::Range, + expected_len: usize, +) -> VortexResult { + let sliced = arr.as_ref().slice(range)?; + debug_assert_eq!(sliced.len(), expected_len); + sliced + .try_downcast::() + .map_err(|_| vortex_err!("sliced array_trees field is not a canonical VarBinViewArray")) +} + +fn slice_struct( + arr: &StructArray, + range: std::ops::Range, + expected_len: usize, +) -> VortexResult { + let sliced = arr.as_ref().slice(range)?; + debug_assert_eq!(sliced.len(), expected_len); + sliced + .try_downcast::() + .map_err(|_| vortex_err!("sliced array_trees stats field is not a canonical StructArray")) +} diff --git a/vortex-layout/src/layouts/array_tree/reader.rs b/vortex-layout/src/layouts/array_tree/reader.rs new file mode 100644 index 00000000000..5e9ab306b10 --- /dev/null +++ b/vortex-layout/src/layouts/array_tree/reader.rs @@ -0,0 +1,278 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use vortex_array::MaskFuture; +use vortex_array::VortexSessionExecute; +use vortex_array::dtype::DType; +use vortex_array::dtype::FieldMask; +use vortex_array::expr::Expression; +use vortex_array::serde::ColumnarSerializedArray; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::layouts::SharedArrayFuture; +use crate::layouts::array_tree::ArrayTreesSource; +use crate::layouts::array_tree::flat::ArrayTreeFlatLayout; +use crate::reader::ArrayFuture; +use crate::reader::SplitRange; +use crate::segments::SegmentSource; + +/// Transparent reader for [`super::ArrayTreeLayout`]. Delegates all operations to the data +/// child reader; the auxiliary `array_trees` child is consumed at construction time (via +/// [`super::ArrayTreeLayout::derive_reader_ctx`]) to publish the source descendants need. +pub struct ArrayTreeReader { + name: Arc, + data_reader: LayoutReaderRef, +} + +impl ArrayTreeReader { + pub fn new(name: Arc, data_reader: LayoutReaderRef) -> Self { + Self { name, data_reader } + } +} + +impl LayoutReader for ArrayTreeReader { + fn name(&self) -> &Arc { + &self.name + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn dtype(&self) -> &DType { + self.data_reader.dtype() + } + + fn row_count(&self) -> u64 { + self.data_reader.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + split_range: &SplitRange, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + self.data_reader + .register_splits(field_mask, split_range, splits) + } + + fn pruning_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: Mask, + ) -> VortexResult { + self.data_reader.pruning_evaluation(row_range, expr, mask) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + self.data_reader.filter_evaluation(row_range, expr, mask) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + self.data_reader + .projection_evaluation(row_range, expr, mask) + } +} + +/// Mask-density threshold below which we evaluate expressions over the filtered subset and +/// above which we evaluate over all rows then filter. +const EXPR_EVAL_THRESHOLD: f64 = 0.2; + +/// Reader for [`ArrayTreeFlatLayout`]. Pulls its compact tree from the shared +/// [`ArrayTreesSource`] (keyed by its own segment id) and pairs it with the fetched data +/// segment for decode. +pub struct ArrayTreeFlatReader { + layout: ArrayTreeFlatLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + source: Arc, +} + +impl ArrayTreeFlatReader { + pub(crate) fn new( + layout: ArrayTreeFlatLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + source: Arc, + ) -> Self { + Self { + layout, + name, + segment_source, + session, + source, + } + } + + /// Resolve the columnar array tree from the shared source and the data segment from the + /// segment source concurrently, then combine them into a decoded array. + fn array_future(&self) -> SharedArrayFuture { + let row_count = usize::try_from(self.layout.inner().row_count()) + .vortex_expect("row count must fit in usize"); + + let segment_id = self.layout.inner().segment_id(); + let segment_fut = self.segment_source.request(segment_id); + let tree_fut = self.source.get_for_segment(segment_id); + + let ctx = self.layout.inner().array_ctx().clone(); + let session = self.session.clone(); + let dtype = self.layout.inner().dtype().clone(); + + async move { + let segment_fut = async move { segment_fut.await.map_err(Arc::new) }; + let (segment, tree) = futures::try_join!(segment_fut, tree_fut)?; + let parts = + ColumnarSerializedArray::from_segment_and_tree(segment, tree).map_err(Arc::new)?; + parts + .decode(&dtype, row_count, &ctx, &session) + .map_err(Arc::new) + } + .boxed() + .shared() + } +} + +impl LayoutReader for ArrayTreeFlatReader { + fn name(&self) -> &Arc { + &self.name + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn dtype(&self) -> &DType { + self.layout.inner().dtype() + } + + fn row_count(&self) -> u64 { + self.layout.inner().row_count() + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + split_range: &SplitRange, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + split_range.check_bounds(self.layout.inner().row_count())?; + splits.insert(split_range.root_row_range().end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = usize::try_from(row_range.start) + .vortex_expect("Row range begin must fit within layout size") + ..usize::try_from(row_range.end) + .vortex_expect("Row range end must fit within layout size"); + let name = Arc::clone(&self.name); + let array = self.array_future(); + let expr = expr.clone(); + let session = self.session.clone(); + + Ok(MaskFuture::new(mask.len(), async move { + let mut array = array.clone().await?; + let mask = mask.await?; + + if row_range.start > 0 || row_range.end < array.len() { + array = array.slice(row_range.clone())?; + } + + let array_mask = if mask.density() < EXPR_EVAL_THRESHOLD { + let array = array.apply(&expr)?; + let array = array.filter(mask.clone())?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + mask.intersect_by_rank(&array_mask) + } else { + let array = array.apply(&expr)?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + mask.bitand(&array_mask) + }; + + tracing::debug!( + "ArrayTreeFlat mask evaluation {} - {} (mask = {}) => {}", + name, + expr, + mask.density(), + array_mask.density(), + ); + + Ok(array_mask) + })) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = usize::try_from(row_range.start) + .vortex_expect("Row range begin must fit within layout size") + ..usize::try_from(row_range.end) + .vortex_expect("Row range end must fit within layout size"); + let name = Arc::clone(&self.name); + let array = self.array_future(); + let expr = expr.clone(); + + Ok(async move { + tracing::debug!("ArrayTreeFlat array evaluation {} - {}", name, expr); + + let mut array = array.clone().await?; + let mask = mask.await?; + + if row_range.start > 0 || row_range.end < array.len() { + array = array.slice(row_range.clone())?; + } + + if !mask.all_true() { + array = array.filter(mask)?; + } + + array = array.apply(&expr)?; + Ok(array) + } + .boxed()) + } +} diff --git a/vortex-layout/src/layouts/array_tree/writer.rs b/vortex-layout/src/layouts/array_tree/writer.rs new file mode 100644 index 00000000000..137bddce541 --- /dev/null +++ b/vortex-layout/src/layouts/array_tree/writer.rs @@ -0,0 +1,282 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt as _; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::list::ListArray; +use vortex_array::dtype::FieldName; +use vortex_array::serde::ColumnarArrayTree; +use vortex_array::serde::SerializeOptions; +use vortex_array::serde::serialize_to_columnar_tree; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_session::VortexSession; +use vortex_session::registry::ReadContext; + +use crate::IntoLayout; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::array_tree::ArrayTreeLayout; +use crate::layouts::array_tree::flat::ArrayTreeFlat; +use crate::layouts::array_tree::flat::ArrayTreeFlatLayout; +use crate::layouts::flat::FlatLayout; +use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::segments::SegmentId; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialArrayStreamExt; + +/// Returns a `(collector, leaf)` pair of cooperating strategies for array-tree collection. +/// +/// The leaf strategy replaces [`FlatLayoutStrategy`] in the data pipeline and attaches each +/// chunk's [`ColumnarArrayTree`] to the resulting [`ArrayTreeFlatLayout`]. The collector +/// wraps the data pipeline and, after it completes, walks the data subtree to extract those +/// attached trees and writes them as a consolidated columnar struct array via the +/// `array_trees_strategy` (typically the same compress-then-flat strategy used for data). +/// +/// Why attach trees to the leaf layout rather than push to a shared sink: column writers in +/// [`crate::layouts::table::TableStrategy`] run concurrently, and a shared sink would mix +/// leaves across collector invocations. The leaf-attached design keeps every collector +/// scoped to its own subtree. +pub fn writer( + flat: FlatLayoutStrategy, + array_trees_strategy: Arc, +) -> (ArrayTreeCollectorStrategy, ArrayTreeFlatStrategy) { + let leaf = ArrayTreeFlatStrategy { flat }; + let collector = ArrayTreeCollectorStrategy { + child: None, + array_trees_strategy, + }; + (collector, leaf) +} + +/// Leaf strategy (TX) that replaces [`FlatLayoutStrategy`]. +/// +/// Walks the chunk's array tree once via [`serialize_to_columnar_tree`] to produce data +/// buffers and a [`ColumnarArrayTree`], writes the buffers as a data-only segment (no +/// trailing flatbuffer), then attaches the tree to the returned layout for the collector to +/// extract. +#[derive(Clone)] +pub struct ArrayTreeFlatStrategy { + flat: FlatLayoutStrategy, +} + +#[async_trait] +impl LayoutStrategy for ArrayTreeFlatStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + mut stream: SendableSequentialStream, + _eof: SequencePointer, + session: &VortexSession, + ) -> VortexResult { + let ctx = ctx.clone(); + let Some(chunk) = stream.next().await else { + vortex_bail!("array tree flat layout needs a single chunk"); + }; + let (sequence_id, chunk) = chunk?; + + let row_count = chunk.len() as u64; + + // Normalize if the flat strategy restricts encodings. + let chunk = if let Some(allowed) = &self.flat.allowed_encodings { + use vortex_array::normalize::NormalizeOptions; + use vortex_array::normalize::Operation; + chunk.normalize(&mut NormalizeOptions { + allowed, + operation: Operation::Error, + })? + } else { + chunk + }; + + // Single walk: data buffers + ColumnarArrayTree. No trailing flatbuffer in the + // segment — the consolidated array_trees segment is the sole source of encoding + // metadata for this leaf. + let (buffers, tree) = serialize_to_columnar_tree( + &chunk, + &ctx, + session, + &SerializeOptions { + offset: 0, + include_padding: self.flat.include_padding, + }, + )?; + + // IMPORTANT ORDERING CONSTRAINT: write the segment first, then advance past the + // sequence id. `segment_sink.write` consumes the SequenceId and only drops it on + // return; doing more work while holding it would let later leaves wait on + // SequenceId::collapse. + let segment_id = segment_sink.write(sequence_id, buffers).await?; + + let None = stream.next().await else { + vortex_bail!("array tree flat layout received stream with more than a single chunk"); + }; + + Ok(ArrayTreeFlatLayout::with_tree( + FlatLayout::new( + row_count, + stream.dtype().clone(), + segment_id, + ReadContext::new(ctx.to_ids()), + ), + tree, + ) + .into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + 0 + } +} + +/// Collector strategy (RX) that wraps the data pipeline. +/// +/// After the data child completes, walks the resulting subtree to extract each +/// [`ArrayTreeFlatLayout`]'s attached [`ColumnarArrayTree`], builds the consolidated +/// `{segment_id, nodes, buffers}` struct array (see [`ArrayTreeLayout::array_trees_dtype`]), +/// and writes it via the configured `array_trees_strategy`. +pub struct ArrayTreeCollectorStrategy { + child: Option>, + array_trees_strategy: Arc, +} + +impl ArrayTreeCollectorStrategy { + /// Sets the data child pipeline that this collector wraps. + pub fn wrap(mut self, child: impl LayoutStrategy) -> Self { + self.child = Some(Arc::new(child)); + self + } +} + +#[async_trait] +impl LayoutStrategy for ArrayTreeCollectorStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + session: &VortexSession, + ) -> VortexResult { + let Some(child) = self.child.as_ref() else { + vortex_bail!("ArrayTreeCollectorStrategy must have a child set via wrap()") + }; + + // Data segments get earlier sequence IDs than the consolidated array_trees segment. + let data_eof = eof.split_off(); + + let data_layout = child + .write_stream( + ctx.clone(), + Arc::clone(&segment_sink), + stream, + data_eof, + session, + ) + .await?; + + // Walk the data subtree to extract per-leaf trees. Each ArrayTreeFlatLayout leaf + // carries its tree attached as transient write-time state (not serialized to disk). + let mut entries: Vec<(SegmentId, ColumnarArrayTree)> = Vec::new(); + for layout_ref in data_layout.depth_first_traversal() { + let layout_ref = layout_ref?; + if let Some(atf) = layout_ref.as_opt::() + && let Some(tree) = atf.take_tree() + { + entries.push((atf.inner().segment_id(), tree)); + } + } + + // Sort by segment ID so on-disk row order matches segment-write order. + entries.sort_by_key(|(seg, _)| *seg); + + let array_trees_array = build_consolidated_array(&entries)?; + + // Write the consolidated array via the array_trees strategy. + let trees_stream = array_trees_array + .to_array_stream() + .sequenced(eof.split_off()); + let array_trees_layout = self + .array_trees_strategy + .write_stream(ctx, segment_sink, trees_stream, eof, session) + .await?; + + Ok(ArrayTreeLayout::new(data_layout, array_trees_layout).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + self.child.as_ref().map(|c| c.buffered_bytes()).unwrap_or(0) + + self.array_trees_strategy.buffered_bytes() + } +} + +/// Build the consolidated `{segment_id, nodes, buffers}` struct array from a sorted list of +/// per-chunk entries. Each List<>'s elements are the concatenated per-chunk nodes (resp. +/// buffers) struct arrays, with offsets recorded per row. +fn build_consolidated_array(entries: &[(SegmentId, ColumnarArrayTree)]) -> VortexResult { + let nrows = entries.len(); + + let segment_ids: Buffer = entries.iter().map(|(seg, _)| **seg).collect(); + let segment_ids_array = PrimitiveArray::new(segment_ids, Validity::NonNullable).into_array(); + + // Build the nodes list: concatenate every chunk's `tree.nodes` and record per-row offsets. + let mut nodes_offsets: Vec = Vec::with_capacity(nrows + 1); + nodes_offsets.push(0); + let mut cum: i32 = 0; + for (_, tree) in entries { + cum += i32::try_from(tree.nodes.as_ref().len()) + .map_err(|_| vortex_err!("array tree node count overflows i32 offsets"))?; + nodes_offsets.push(cum); + } + let nodes_inner = StructArray::try_concat(entries.iter().map(|(_, t)| &t.nodes))?.into_array(); + let nodes_list = ListArray::try_new( + nodes_inner, + PrimitiveArray::new(Buffer::from(nodes_offsets), Validity::NonNullable).into_array(), + Validity::NonNullable, + )? + .into_array(); + + // Build the buffers list: same pattern. + let mut buffers_offsets: Vec = Vec::with_capacity(nrows + 1); + buffers_offsets.push(0); + let mut cum: i32 = 0; + for (_, tree) in entries { + cum += i32::try_from(tree.buffers.as_ref().len()) + .map_err(|_| vortex_err!("array tree buffer count overflows i32 offsets"))?; + buffers_offsets.push(cum); + } + let buffers_inner = + StructArray::try_concat(entries.iter().map(|(_, t)| &t.buffers))?.into_array(); + let buffers_list = ListArray::try_new( + buffers_inner, + PrimitiveArray::new(Buffer::from(buffers_offsets), Validity::NonNullable).into_array(), + Validity::NonNullable, + )? + .into_array(); + + Ok(StructArray::try_new( + vec![ + FieldName::from("segment_id"), + FieldName::from("nodes"), + FieldName::from("buffers"), + ] + .into(), + vec![segment_ids_array, nodes_list, buffers_list], + nrows, + Validity::NonNullable, + )? + .into_array()) +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 18df5b8f347..c483020c029 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -8,6 +8,7 @@ use futures::future::Shared; use vortex_array::ArrayRef; use vortex_error::SharedVortexResult; +pub mod array_tree; pub mod buffered; pub mod chunked; pub mod collect; diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 370fe391ca0..8a388dc3d46 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -9,6 +9,8 @@ use vortex_session::SessionVar; use vortex_session::registry::Registry; use crate::LayoutEncodingRef; +use crate::layouts::array_tree::ArrayTreeFlatLayoutEncoding; +use crate::layouts::array_tree::ArrayTreeLayoutEncoding; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; @@ -52,6 +54,14 @@ impl Default for LayoutSession { layouts.register(StructLayoutEncoding.id(), StructLayoutEncoding.as_ref()); layouts.register(ZonedLayoutEncoding.id(), ZonedLayoutEncoding.as_ref()); layouts.register(DictLayoutEncoding.id(), DictLayoutEncoding.as_ref()); + layouts.register( + ArrayTreeLayoutEncoding.id(), + ArrayTreeLayoutEncoding.as_ref(), + ); + layouts.register( + ArrayTreeFlatLayoutEncoding.id(), + ArrayTreeFlatLayoutEncoding.as_ref(), + ); Self { registry: layouts } }