diff --git a/Cargo.lock b/Cargo.lock index 045c72176fd..b070caff1de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9324,6 +9324,7 @@ dependencies = [ "codspeed-divan-compat", "itertools 0.14.0", "num-traits", + "parquet-variant-compute", "pco", "rand 0.10.1", "rstest", @@ -9340,6 +9341,7 @@ dependencies = [ "vortex-fastlanes", "vortex-fsst", "vortex-mask", + "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-sequence", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 980af8a75a1..cb5a8492fea 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -12,7 +12,7 @@ readme = { workspace = true } repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } -publish = false +publish = true [lints] workspace = true diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 40b0ae52aae..2c9f299063d 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -16,6 +16,7 @@ version = { workspace = true } [dependencies] itertools = { workspace = true } num-traits = { workspace = true } +parquet-variant-compute = { workspace = true, optional = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } @@ -30,6 +31,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } +vortex-parquet-variant = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } @@ -41,6 +43,7 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } test-with = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } @@ -49,6 +52,11 @@ vortex-session = { workspace = true } [features] # This feature enabled unstable encodings for which we don't guarantee stability. unstable_encodings = ["dep:vortex-tensor", "vortex-zstd?/unstable_encodings"] +parquet-variant = [ + "dep:vortex-parquet-variant", + "dep:parquet-variant-compute", + "zstd", +] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 43d0c56a80c..5a3b772ad2e 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -61,6 +61,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Binary schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &binary::BinaryDictScheme, + &binary::BinaryFSSTScheme, &binary::BinaryConstantScheme, // Decimal schemes. &decimal::DecimalScheme, @@ -142,7 +143,9 @@ impl BtrBlocksCompressorBuilder { /// Panics if any of the compact schemes are already present. #[cfg(feature = "zstd")] pub fn with_compact(self) -> Self { - let builder = self.with_new_scheme(&string::ZstdScheme); + let builder = self + .with_new_scheme(&string::ZstdScheme) + .with_new_scheme(&binary::BinaryZstdScheme); #[cfg(feature = "pco")] let builder = builder @@ -182,12 +185,17 @@ impl BtrBlocksCompressorBuilder { string::StringDictScheme.id(), string::FSSTScheme.id(), binary::BinaryDictScheme.id(), + binary::BinaryFSSTScheme.id(), ]); #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - let builder = builder.with_new_scheme(&string::ZstdBuffersScheme); + let builder = builder + .with_new_scheme(&string::ZstdBuffersScheme) + .with_new_scheme(&binary::BinaryZstdBuffersScheme); #[cfg(all(feature = "zstd", not(feature = "unstable_encodings")))] - let builder = builder.with_new_scheme(&string::ZstdScheme); + let builder = builder + .with_new_scheme(&string::ZstdScheme) + .with_new_scheme(&binary::BinaryZstdScheme); builder } diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 39db05246a6..6001d77684f 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -58,6 +58,8 @@ mod builder; mod canonical_compressor; /// Compression scheme implementations. pub mod schemes; +#[cfg(feature = "parquet-variant")] +pub mod variant; // Re-export framework types from vortex-compressor for backwards compatibility. // Btrblocks-specific exports. diff --git a/vortex-btrblocks/src/schemes/binary.rs b/vortex-btrblocks/src/schemes/binary.rs index 2e8b28cd396..7839e59138c 100644 --- a/vortex-btrblocks/src/schemes/binary.rs +++ b/vortex-btrblocks/src/schemes/binary.rs @@ -3,6 +3,263 @@ //! Binary compression schemes. -// Re-export builtin schemes from vortex-compressor. +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::arrays::varbin::VarBinArrayExt; pub use vortex_compressor::builtins::BinaryConstantScheme; pub use vortex_compressor::builtins::BinaryDictScheme; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_error::VortexResult; +use vortex_fsst::FSST; +use vortex_fsst::FSSTArrayExt; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; + +/// FSST compression for binary values. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryFSSTScheme; + +/// Zstd compression for binary values. +#[cfg(feature = "zstd")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryZstdScheme; + +/// Buffer-level Zstd compression for binary values. +#[cfg(all(feature = "zstd", feature = "unstable_encodings"))] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryZstdBuffersScheme; + +impl Scheme for BinaryFSSTScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.fsst" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + /// Children: lengths=0, code_offsets=1. + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let binary = data.array_as_varbinview().into_owned(); + let compressor_fsst = fsst_train_compressor(&binary); + let fsst = fsst_compress( + &binary, + binary.len(), + binary.dtype(), + &compressor_fsst, + exec_ctx, + ); + + let uncompressed_lengths_primitive = fsst + .uncompressed_lengths() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_original_lengths = compressor.compress_child( + &uncompressed_lengths_primitive.into_array(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + + let codes_offsets_primitive = fsst + .codes() + .offsets() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_codes_offsets = compressor.compress_child( + &codes_offsets_primitive.into_array(), + &compress_ctx, + self.id(), + 1, + exec_ctx, + )?; + let compressed_codes = VarBinArray::try_new( + compressed_codes_offsets, + fsst.codes().bytes().clone(), + fsst.codes().dtype().clone(), + fsst.codes().validity()?, + )?; + + let fsst = FSST::try_new( + fsst.dtype().clone(), + fsst.symbols().clone(), + fsst.symbol_lengths().clone(), + compressed_codes, + compressed_original_lengths, + exec_ctx, + )?; + + Ok(fsst.into_array()) + } +} + +#[cfg(feature = "zstd")] +impl Scheme for BinaryZstdScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.zstd" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let compacted = data.array_as_varbinview().into_owned().compact_buffers()?; + Ok( + vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)? + .into_array(), + ) + } +} + +#[cfg(all(feature = "zstd", feature = "unstable_encodings"))] +impl Scheme for BinaryZstdBuffersScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.zstd_buffers" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_fsst::FSST; + use vortex_session::VortexSession; + + use crate::BtrBlocksCompressor; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn binary_fsst_data() -> VarBinViewArray { + VarBinViewArray::from_iter( + (0..1024).map(|idx| { + Some(format!("variant-key-{idx:04}-invoice-total-line-items").into_bytes()) + }), + DType::Binary(Nullability::NonNullable), + ) + } + + #[test] + fn default_compressor_uses_fsst_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = + BtrBlocksCompressor::default().compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected binary data to be FSST-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } + + #[cfg(feature = "zstd")] + #[test] + fn compact_compressor_uses_zstd_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = crate::BtrBlocksCompressorBuilder::default() + .with_compact() + .build() + .compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected compact binary data to be Zstd-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } +} diff --git a/vortex-btrblocks/src/schemes/mod.rs b/vortex-btrblocks/src/schemes/mod.rs index 16123429e86..8b8629d3f0a 100644 --- a/vortex-btrblocks/src/schemes/mod.rs +++ b/vortex-btrblocks/src/schemes/mod.rs @@ -5,11 +5,10 @@ pub mod binary; pub mod bool; +pub mod decimal; pub mod float; pub mod integer; pub mod string; - -pub mod decimal; pub mod temporal; pub(crate) mod patches; diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs new file mode 100644 index 00000000000..12a97ab425c --- /dev/null +++ b/vortex-btrblocks/src/variant/mod.rs @@ -0,0 +1,461 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression scheme for JSON data into binary variant representation + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::dtype::extension::ExtId; +use vortex_array::dtype::extension::ExtVTable; +use vortex_array::extension::EmptyMetadata; +use vortex_array::scalar::ScalarValue; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_parquet_variant::ParquetVariant; +use vortex_parquet_variant::ParquetVariantArrayExt; + +use crate::CascadingCompressor; + +/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +#[derive(Debug)] +pub struct JsonToVariantScheme; + +/// Child indices for recursively compressed Parquet Variant binary children. +mod parquet_variant_children { + /// The Parquet Variant metadata child. + pub const METADATA: usize = 0; + /// The raw Parquet Variant value child. + pub const VALUE: usize = 1; +} + +/// JSON logical type backed by UTF-8 string storage. +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct Json; + +impl ExtVTable for Json { + type Metadata = EmptyMetadata; + type NativeValue<'a> = &'a str; + + fn id(&self) -> ExtId { + ExtId::new("vortex.json") + } + + fn serialize_metadata(&self, _metadata: &Self::Metadata) -> VortexResult> { + Ok(Vec::new()) + } + + fn deserialize_metadata(&self, metadata: &[u8]) -> VortexResult { + vortex_ensure!(metadata.is_empty(), "JSON metadata must be empty"); + Ok(EmptyMetadata) + } + + fn validate_dtype(ext_dtype: &ExtDType) -> VortexResult<()> { + vortex_ensure!( + ext_dtype.storage_dtype().is_utf8(), + "JSON storage dtype must be utf8, got {}", + ext_dtype.storage_dtype() + ); + Ok(()) + } + + fn unpack_native<'a>( + _ext_dtype: &'a ExtDType, + storage_value: &'a ScalarValue, + ) -> VortexResult> { + let ScalarValue::Utf8(value) = storage_value else { + vortex_bail!("JSON storage scalar must be utf8, got {storage_value}"); + }; + Ok(value.as_str()) + } +} + +impl Scheme for JsonToVariantScheme { + fn scheme_name(&self) -> &'static str { + "json_to_variant" + } + + fn matches(&self, canonical: &Canonical) -> bool { + let Canonical::Extension(ext_array) = canonical else { + return false; + }; + + ext_array.ext_dtype().is::() + } + + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let array = data.array().clone().execute::(exec_ctx)?; + let storage = array.storage_array().clone(); + + if !storage.dtype().is_utf8() { + vortex_bail!("storage must be utf8"); + } + + let arrow_array = { + let session = exec_ctx.session().clone(); + let arrow = session.arrow(); + arrow.execute_arrow(storage, None, exec_ctx)? + }; + + let array = parquet_variant_compute::json_to_variant(&arrow_array)?; + + let parquet_variant = + ParquetVariant::from_arrow_variant(&array)?.downcast::(); + + let compressed_metadata = compressor.compress_child( + parquet_variant.metadata_array(), + &compress_ctx, + self.id(), + parquet_variant_children::METADATA, + exec_ctx, + )?; + let compressed_value = parquet_variant + .value_array() + .map(|value| { + compressor.compress_child( + value, + &compress_ctx, + self.id(), + parquet_variant_children::VALUE, + exec_ctx, + ) + }) + .transpose()?; + + ParquetVariant::try_new( + parquet_variant.validity()?, + compressed_metadata, + compressed_value, + parquet_variant.typed_value_array().cloned(), + ) + .map(IntoArray::into_array) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rand::RngExt; + use rand::SeedableRng; + use rand::rngs::StdRng; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::Extension; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinView; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::session::ArraySession; + use vortex_compressor::builtins::BinaryDictScheme; + use vortex_compressor::builtins::IntConstantScheme; + use vortex_compressor::builtins::StringConstantScheme; + use vortex_compressor::builtins::StringDictScheme; + use vortex_fsst::FSST; + use vortex_session::VortexSession; + use vortex_zstd::Zstd; + + use super::*; + use crate::schemes::binary::BinaryFSSTScheme; + use crate::schemes::integer::BitPackingScheme; + use crate::schemes::integer::FoRScheme; + use crate::schemes::integer::RunEndScheme; + use crate::schemes::integer::SequenceScheme; + use crate::schemes::integer::SparseScheme; + use crate::schemes::integer::ZigZagScheme; + use crate::schemes::string::FSSTScheme; + use crate::schemes::string::ZstdScheme; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn json_data() -> Vec { + let mut rng = StdRng::seed_from_u64(0); + const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; + const REGION_KEYS: &[&str] = &["region", "market", "country"]; + const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; + const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; + const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; + const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; + const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; + const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; + + (0..1024) + .map(|_| { + let mut fields = vec![ + format!( + r#""{}":"acct_{:04x}""#, + ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], + rng.random::(), + ), + format!( + r#""invoice_total":{}.{:02}"#, + rng.random_range(10_u32..100_000), + rng.random_range(0_u32..100), + ), + format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), + ]; + + if rng.random_bool(0.85) { + fields.push(format!( + r#""{}":"{}""#, + STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], + STATUSES[rng.random_range(0..STATUSES.len())], + )); + } + if rng.random_bool(0.75) { + fields.push(format!( + r#""{}":"{}""#, + REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], + REGIONS[rng.random_range(0..REGIONS.len())], + )); + } + if rng.random_bool(0.55) { + fields.push(format!( + r#""{}":{}.{:02}"#, + AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], + rng.random_range(0_u32..2_500), + rng.random_range(0_u32..100), + )); + } + if rng.random_bool(0.40) { + fields.push(format!( + r#""{}":{}"#, + FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], + rng.random_bool(0.5), + )); + } + if rng.random_bool(0.30) { + fields.push(format!( + r#""tags":["{}","{}"]"#, + TAGS[rng.random_range(0..TAGS.len())], + TAGS[rng.random_range(0..TAGS.len())], + )); + } + + format!("{{{}}}", fields.join(",")) + }) + .collect() + } + + fn json_array(values: &[String]) -> VortexResult { + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) + } + + fn parquet_variant_child_compressor() -> CascadingCompressor { + CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &BinaryFSSTScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + } + + fn print_comparison_output( + array: &ArrayRef, + string_compressed: &ArrayRef, + compressed: &ArrayRef, + ) { + let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; + let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; + println!( + "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", + array.nbytes(), + string_compressed.nbytes(), + compressed.nbytes(), + ); + println!("Compressed output ratio: {compressed_ratio:.2}x\n"); + println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); + println!("JSON input encoding tree:\n{}", array.tree_display()); + println!( + "String-compressed encoding tree:\n{}", + string_compressed.tree_display() + ); + println!( + "Compressed output encoding tree:\n{}", + compressed.tree_display() + ); + } + + #[test] + fn parquet_variant_compresses_repeated_json_keys() -> VortexResult<()> { + let array = json_array(&json_data())?; + + let string_compressor = + CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert!( + variant_compressed.is::(), + "expected ParquetVariant output, got encoding {} with dtype {} and {} bytes", + variant_compressed.encoding_id(), + variant_compressed.dtype(), + variant_compressed.nbytes() + ); + assert!( + variant_compressed.nbytes() < string_compressed.nbytes(), + "Parquet Variant conversion should compress repeated JSON keys: \ + variant={} bytes, input={} bytes", + variant_compressed.nbytes(), + string_compressed.nbytes(), + ); + + print_comparison_output(&array, &string_compressed, &variant_compressed); + + Ok(()) + } + + #[test] + fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + let parquet_variant = compressed.clone().downcast::(); + + assert!( + !parquet_variant.metadata_array().is::(), + "expected Parquet Variant metadata child to be compressed, got {}", + parquet_variant.metadata_array().encoding_id(), + ); + assert!(parquet_variant.value_array().is_some()); + assert!(parquet_variant.typed_value_array().is_none()); + + Ok(()) + } + + #[test] + fn binary_fsst_improves_parquet_variant_child_compression() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let without_binary_fsst = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + .compress(&array, &mut exec_ctx)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let with_binary_fsst = + parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; + let parquet_variant = with_binary_fsst.clone().downcast::(); + + assert!( + with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), + "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", + with_binary_fsst.nbytes(), + without_binary_fsst.nbytes(), + ); + assert!( + parquet_variant + .value_array() + .is_some_and(|value| value.is::()), + "expected Parquet Variant value child to use binary FSST, got {}", + parquet_variant.value_array().map_or_else( + || "missing".to_string(), + |value| value.encoding_id().to_string() + ), + ); + + Ok(()) + } + + #[test] + fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let string_compressor = CascadingCompressor::new(vec![ + &StringDictScheme, + &FSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &FSSTScheme, + &BinaryFSSTScheme, + // &ZstdScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + // let extension = compressed.clone().downcast::(); + // let storage = extension.storage_array(); + // assert!( + // storage.is::(), + // "expected JSON extension storage fallback to use zstd, got {}", + // storage.encoding_id(), + // ); + + print_comparison_output(&array, &string_compressed, &compressed); + + Ok(()) + } +}