diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java index 87d39bf16e..444df95f03 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java @@ -19,6 +19,9 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.Float16; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -28,6 +31,7 @@ public class BinaryStatistics extends Statistics { private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("fake_binary_type"); + private final boolean isFloat16; private Binary max; private Binary min; @@ -41,26 +45,51 @@ public BinaryStatistics() { BinaryStatistics(PrimitiveType type) { super(type); + this.isFloat16 = type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation; + if (isFloat16) { + incrementNanCount(0); + } } private BinaryStatistics(BinaryStatistics other) { super(other.type()); + this.isFloat16 = other.isFloat16; if (other.hasNonNullValue()) { initializeStats(other.min, other.max); } setNumNulls(other.getNumNulls()); + incrementNanCount(other.getNanCount()); } @Override public void updateStats(Binary value) { + if (isFloat16 && Float16.isNaN(value.get2BytesLittleEndian())) { + incrementNanCount(); + } if (!this.hasNonNullValue()) { min = value.copy(); max = value.copy(); this.markAsNotEmpty(); - } else if (comparator().compare(min, value) > 0) { - min = value.copy(); - } else if (comparator().compare(max, value) < 0) { - max = value.copy(); + } else { + if (isFloat16 && type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + if (!Float16.isNaN(value.get2BytesLittleEndian())) { + if (Float16.isNaN(min.get2BytesLittleEndian()) + || comparator().compare(min, value) > 0) { + min = value.copy(); + } + if (Float16.isNaN(max.get2BytesLittleEndian()) + || comparator().compare(max, value) < 0) { + max = value.copy(); + } + } + return; + } + + if (comparator().compare(min, value) > 0) { + min = value.copy(); + } else if (comparator().compare(max, value) < 0) { + max = value.copy(); + } } } @@ -126,6 +155,20 @@ public boolean isSmallerThanWithTruncation(long size, int truncationLength) { */ @Deprecated public void updateStats(Binary min_value, Binary max_value) { + if (isFloat16 && type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + if (!Float16.isNaN(min_value.get2BytesLittleEndian())) { + if (Float16.isNaN(min.get2BytesLittleEndian()) || comparator().compare(min, min_value) > 0) { + min = min_value.copy(); + } + } + if (!Float16.isNaN(max_value.get2BytesLittleEndian())) { + if (Float16.isNaN(max.get2BytesLittleEndian()) || comparator().compare(max, max_value) < 0) { + max = max_value.copy(); + } + } + return; + } + if (comparator().compare(min, min_value) > 0) { min = min_value.copy(); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java index 3fe8a35530..2aad980692 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -41,6 +42,7 @@ public DoubleStatistics() { DoubleStatistics(PrimitiveType type) { super(type); + incrementNanCount(0); } private DoubleStatistics(DoubleStatistics other) { @@ -49,10 +51,14 @@ private DoubleStatistics(DoubleStatistics other) { initializeStats(other.min, other.max); } setNumNulls(other.getNumNulls()); + incrementNanCount(other.getNanCount()); } @Override public void updateStats(double value) { + if (Double.isNaN(value)) { + incrementNanCount(); + } if (!this.hasNonNullValue()) { initializeStats(value, value); } else { @@ -98,6 +104,20 @@ public boolean isSmallerThan(long size) { } public void updateStats(double min_value, double max_value) { + if (type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + if (!Double.isNaN(min_value)) { + if (Double.isNaN(min) || comparator().compare(min, min_value) > 0) { + min = min_value; + } + } + if (!Double.isNaN(max_value)) { + if (Double.isNaN(max) || comparator().compare(max, max_value) < 0) { + max = max_value; + } + } + return; + } + if (comparator().compare(min, min_value) > 0) { min = min_value; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java index 5b743b6884..ce85cf013e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -42,6 +43,7 @@ public FloatStatistics() { FloatStatistics(PrimitiveType type) { super(type); + incrementNanCount(0); } private FloatStatistics(FloatStatistics other) { @@ -50,10 +52,14 @@ private FloatStatistics(FloatStatistics other) { initializeStats(other.min, other.max); } setNumNulls(other.getNumNulls()); + incrementNanCount(other.getNanCount()); } @Override public void updateStats(float value) { + if (Float.isNaN(value)) { + incrementNanCount(); + } if (!this.hasNonNullValue()) { initializeStats(value, value); } else { @@ -99,6 +105,20 @@ public boolean isSmallerThan(long size) { } public void updateStats(float min_value, float max_value) { + if (type().columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + if (!Float.isNaN(min_value)) { + if (Float.isNaN(min) || comparator().compare(min, min_value) > 0) { + min = min_value; + } + } + if (!Float.isNaN(max_value)) { + if (Float.isNaN(max) || comparator().compare(max, max_value) < 0) { + max = max_value; + } + } + return; + } + if (comparator().compare(min, min_value) > 0) { min = min_value; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java index 206ddadadc..7147dab9a6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java @@ -19,8 +19,10 @@ package org.apache.parquet.column.statistics; import java.util.Arrays; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.UnknownColumnTypeException; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.Float16; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveComparator; @@ -40,10 +42,11 @@ public abstract class Statistics> { * Builder class to build Statistics objects. Used to read the statistics from the Parquet file. */ public static class Builder { - private final PrimitiveType type; + protected final PrimitiveType type; private byte[] min; private byte[] max; private long numNulls = -1; + private long nanCount = -1; private Builder(PrimitiveType type) { this.type = type; @@ -64,12 +67,21 @@ public Builder withNumNulls(long numNulls) { return this; } + public Builder withNanCount(long nanCount) { + this.nanCount = nanCount; + return this; + } + public Statistics build() { Statistics stats = createStats(type); if (min != null && max != null) { stats.setMinMaxFromBytes(min, max); } stats.num_nulls = this.numNulls; + stats.nan_count = this.nanCount; + Preconditions.checkState( + !type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()) || stats.nan_count >= 0, + "nan_count is required by IEEE 754 column order with type " + type); return stats; } } @@ -87,10 +99,12 @@ public Statistics build() { if (stats.hasNonNullValue()) { Float min = stats.genericGetMin(); Float max = stats.genericGetMax(); - // Drop min/max values in case of NaN as the sorting order of values is undefined for this case if (min.isNaN() || max.isNaN()) { - stats.setMinMax(0.0f, 0.0f); - ((Statistics) stats).hasNonNullValue = false; + if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + // For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined + stats.setMinMax(0.0f, 0.0f); + ((Statistics) stats).hasNonNullValue = false; + } } else { // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped if (Float.compare(min, 0.0f) == 0) { @@ -120,10 +134,12 @@ public Statistics build() { if (stats.hasNonNullValue()) { Double min = stats.genericGetMin(); Double max = stats.genericGetMax(); - // Drop min/max values in case of NaN as the sorting order of values is undefined for this case if (min.isNaN() || max.isNaN()) { - stats.setMinMax(0.0, 0.0); - ((Statistics) stats).hasNonNullValue = false; + if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + // For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined + stats.setMinMax(0.0, 0.0); + ((Statistics) stats).hasNonNullValue = false; + } } else { // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped if (Double.compare(min, 0.0) == 0) { @@ -160,10 +176,12 @@ public Statistics build() { Binary bMax = stats.genericGetMax(); short min = bMin.get2BytesLittleEndian(); short max = bMax.get2BytesLittleEndian(); - // Drop min/max values in case of NaN as the sorting order of values is undefined for this case if (Float16.isNaN(min) || Float16.isNaN(max)) { - stats.setMinMax(POSITIVE_ZERO_LITTLE_ENDIAN, NEGATIVE_ZERO_LITTLE_ENDIAN); - ((Statistics) stats).hasNonNullValue = false; + if (!type.columnOrder().equals(ColumnOrder.ieee754TotalOrder())) { + // For TYPE_DEFINED_ORDER: drop min/max values as NaN ordering is undefined + stats.setMinMax(POSITIVE_ZERO_LITTLE_ENDIAN, NEGATIVE_ZERO_LITTLE_ENDIAN); + ((Statistics) stats).hasNonNullValue = false; + } } else { // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped if (min == (short) 0x0000) { @@ -182,6 +200,7 @@ public Statistics build() { private final PrimitiveComparator comparator; private boolean hasNonNullValue; private long num_nulls; + private long nan_count = -1; final PrimitiveStringifier stringifier; Statistics(PrimitiveType type) { @@ -351,7 +370,8 @@ public boolean equals(Object other) { return type.equals(stats.type) && Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) && Arrays.equals(stats.getMinBytes(), this.getMinBytes()) - && stats.getNumNulls() == this.getNumNulls(); + && stats.getNumNulls() == this.getNumNulls() + && stats.getNanCount() == this.getNanCount(); } /** @@ -384,6 +404,11 @@ public void mergeStatistics(Statistics stats) { mergeStatisticsMinMax(stats); markAsNotEmpty(); } + if (isNanCountSet() && stats.isNanCountSet()) { + incrementNanCount(stats.getNanCount()); + } else { + unsetNanCount(); + } } else { throw StatisticsClassException.create(this, stats); } @@ -535,6 +560,53 @@ public void incrementNumNulls(long increment) { num_nulls += increment; } + /** + * Increments the NaN count by one. If nan_count was not set (-1), initializes it to 1. + */ + public void incrementNanCount() { + if (nan_count < 0) { + nan_count = 1; + } else { + nan_count++; + } + } + + /** + * Increments the NaN count by the parameter value. If nan_count was not set (-1), initializes it to increment. + * + * @param increment value to increment the NaN count by + */ + public void incrementNanCount(long increment) { + if (nan_count < 0) { + nan_count = increment; + } else { + nan_count += increment; + } + } + + /** + * Returns the NaN count + * + * @return NaN count or {@code -1} if the NaN count is not set + */ + public long getNanCount() { + return nan_count; + } + + /** + * @return whether nanCount is set and can be used + */ + public boolean isNanCountSet() { + return nan_count >= 0; + } + + /** + * Unsets the NaN count to -1. + */ + public void unsetNanCount() { + nan_count = -1; + } + /** * Returns the null count * diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java index 1c546b5160..4255cf2803 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java @@ -23,6 +23,9 @@ import java.util.List; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.Float16; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveType; @@ -78,10 +81,18 @@ int compareValueToMax(int arrayIndex) { } } + private static final Binary FLOAT16_NAN = Binary.fromConstantByteArray(new byte[] {0x00, 0x7e}); + private static final Binary POSITIVE_ZERO_LITTLE_ENDIAN = Binary.fromConstantByteArray(new byte[] {0x00, 0x00}); + private static final Binary NEGATIVE_ZERO_LITTLE_ENDIAN = + Binary.fromConstantByteArray(new byte[] {0x00, (byte) 0x80}); + private final List minValues = new ArrayList<>(); private final List maxValues = new ArrayList<>(); private final BinaryTruncator truncator; private final int truncateLength; + private final boolean isFloat16; + private final boolean isIeee754TotalOrder; + private boolean invalid; private static Binary convert(ByteBuffer buffer) { return Binary.fromReusedByteBuffer(buffer); @@ -94,6 +105,8 @@ private static ByteBuffer convert(Binary value) { BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) { truncator = BinaryTruncator.getTruncator(type); this.truncateLength = truncateLength; + this.isFloat16 = type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation; + this.isIeee754TotalOrder = type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()); } @Override @@ -104,12 +117,46 @@ void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) { @Override void addMinMax(Object min, Object max) { - minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength)); - maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength)); + Binary bMin = (Binary) min; + Binary bMax = (Binary) max; + + if (isFloat16) { + boolean minIsNaN = bMin != null && Float16.isNaN(bMin.get2BytesLittleEndian()); + boolean maxIsNaN = bMax != null && Float16.isNaN(bMax.get2BytesLittleEndian()); + if (minIsNaN || maxIsNaN) { + if (isIeee754TotalOrder) { + bMin = FLOAT16_NAN; + bMax = FLOAT16_NAN; + } else { + invalid = true; + } + } + } + + // Sorting order is undefined for -0.0 so let min = -0.0 and max = +0.0 to ensure that no 0.0 values are skipped + if (bMin != null && Binary.lexicographicCompare(bMin, POSITIVE_ZERO_LITTLE_ENDIAN) == 0) { + bMin = NEGATIVE_ZERO_LITTLE_ENDIAN; + } + if (bMax != null && Binary.lexicographicCompare(bMax, NEGATIVE_ZERO_LITTLE_ENDIAN) == 0) { + bMax = POSITIVE_ZERO_LITTLE_ENDIAN; + } + + minValues.add(bMin == null ? null : truncator.truncateMin(bMin, truncateLength)); + maxValues.add(bMax == null ? null : truncator.truncateMax(bMax, truncateLength)); + } + + @Override + void onNanEncountered() { + if (isFloat16 && !isIeee754TotalOrder) { + invalid = true; + } } @Override ColumnIndexBase createColumnIndex(PrimitiveType type) { + if (invalid) { + return null; + } BinaryColumnIndex columnIndex = new BinaryColumnIndex(type); columnIndex.minValues = minValues.toArray(new Binary[0]); columnIndex.maxValues = maxValues.toArray(new Binary[0]); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java index 86099717df..1cf73784c0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java @@ -71,4 +71,12 @@ default List getRepetitionLevelHistogram() { default List getDefinitionLevelHistogram() { throw new UnsupportedOperationException("Definition level histogram is not implemented"); } + + /** + * @return the unmodifiable list of NaN counts for each page, or {@code null} if NaN counts are not available; + * used for converting to the related thrift object + */ + default List getNanCounts() { + return null; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index e78b2ceae1..0014de76e7 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -37,6 +37,7 @@ import java.util.PrimitiveIterator; import java.util.Set; import java.util.function.IntPredicate; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.MinMax; import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; @@ -56,6 +57,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveStringifier; import org.apache.parquet.schema.PrimitiveType; @@ -102,6 +104,8 @@ int translate(int arrayIndex) { // might be null private long[] nullCounts; // might be null + private long[] nanCounts; + // might be null private long[] repLevelHistogram; // might be null private long[] defLevelHistogram; @@ -133,6 +137,14 @@ public List getNullCounts() { return LongLists.unmodifiable(LongArrayList.wrap(nullCounts)); } + @Override + public List getNanCounts() { + if (nanCounts == null) { + return null; + } + return LongLists.unmodifiable(LongArrayList.wrap(nanCounts)); + } + @Override public List getNullPages() { return BooleanLists.unmodifiable(BooleanArrayList.wrap(nullPages)); @@ -517,6 +529,7 @@ public long getMinMaxSize() { private PrimitiveType type; private final BooleanList nullPages = new BooleanArrayList(); private final LongList nullCounts = new LongArrayList(); + private LongList nanCounts = new LongArrayList(); private final IntList pageIndexes = new IntArrayList(); private int nextPageIndex; private LongList repLevelHistogram = new LongArrayList(); @@ -550,9 +563,9 @@ private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int trunc case BOOLEAN: return new BooleanColumnIndexBuilder(); case DOUBLE: - return new DoubleColumnIndexBuilder(); + return new DoubleColumnIndexBuilder(type); case FLOAT: - return new FloatColumnIndexBuilder(); + return new FloatColumnIndexBuilder(type); case INT32: return new IntColumnIndexBuilder(); case INT64: @@ -611,10 +624,53 @@ public static ColumnIndex build( List maxValues, List repLevelHistogram, List defLevelHistogram) { + return build( + type, + boundaryOrder, + nullPages, + nullCounts, + null, + minValues, + maxValues, + repLevelHistogram, + defLevelHistogram); + } - ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); + /** + * @param type + * the primitive type + * @param boundaryOrder + * the boundary order of the min/max values + * @param nullPages + * the null pages (one boolean value for each page that signifies whether the page consists of nulls + * entirely) + * @param nullCounts + * the number of null values for each page + * @param nanCounts + * the number of NaN values for each page (may be null) + * @param minValues + * the min values for each page + * @param maxValues + * the max values for each page + * @param repLevelHistogram + * the repetition level histogram for all levels of each page + * @param defLevelHistogram + * the definition level histogram for all levels of each page + * @return the newly created {@link ColumnIndex} object based on the specified arguments + */ + public static ColumnIndex build( + PrimitiveType type, + BoundaryOrder boundaryOrder, + List nullPages, + List nullCounts, + List nanCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram) { - builder.fill(nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); + ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); + builder.fill(nullPages, nullCounts, nanCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); ColumnIndexBase columnIndex = builder.build(type); columnIndex.boundaryOrder = requireNonNull(boundaryOrder); return columnIndex; @@ -653,6 +709,15 @@ public void add(Statistics stats, SizeStatistics sizeStats) { } nullCounts.add(stats.getNumNulls()); + if (nanCounts != null && stats.isNanCountSet()) { + nanCounts.add(stats.getNanCount()); + if (stats.getNanCount() > 0) { + onNanEncountered(); + } + } else { + nanCounts = null; + } + // Collect repetition and definition level histograms only when all pages are valid. if (sizeStats != null && sizeStats.isValid() && repLevelHistogram != null && defLevelHistogram != null) { repLevelHistogram.addAll(sizeStats.getRepetitionLevelHistogram()); @@ -669,9 +734,18 @@ public void add(Statistics stats, SizeStatistics sizeStats) { abstract void addMinMax(Object min, Object max); + /** + * Called when a page with NaN values is encountered (nan_count > 0) and the page also has non-NaN values. + * Subclasses should override to handle NaN presence (e.g., invalidate for TYPE_DEFINED_ORDER). + */ + void onNanEncountered() { + throw new UnsupportedOperationException("Cannot call onNanEncountered on type: " + type); + } + private void fill( List nullPages, List nullCounts, + List nanCounts, List minValues, List maxValues, List repLevelHistogram, @@ -679,12 +753,14 @@ private void fill( clear(); int pageCount = nullPages.size(); if ((nullCounts != null && nullCounts.size() != pageCount) + || (nanCounts != null && nanCounts.size() != pageCount) || minValues.size() != pageCount || maxValues.size() != pageCount) { throw new IllegalArgumentException(String.format( - "Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d", + "Not all sizes are equal (nullPages:%d, nullCounts:%s, nanCounts:%s, minValues:%d, maxValues:%d", nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), + nanCounts == null ? "null" : nanCounts.size(), minValues.size(), maxValues.size())); } @@ -705,6 +781,10 @@ private void fill( if (nullCounts != null) { this.nullCounts.addAll(nullCounts); } + // NaN counts is optional in the format + if (nanCounts != null) { + this.nanCounts.addAll(nanCounts); + } for (int i = 0; i < pageCount; ++i) { if (!nullPages.get(i)) { @@ -750,6 +830,14 @@ private ColumnIndexBase build(PrimitiveType type) { if (!nullCounts.isEmpty()) { columnIndex.nullCounts = nullCounts.toLongArray(); } + // NaN counts is optional so keep it null if the builder has no values + if (nanCounts != null && !nanCounts.isEmpty()) { + columnIndex.nanCounts = nanCounts.toLongArray(); + } else { + Preconditions.checkState( + !type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()), + "NaN counts must be provided for types with IEEE_754_TOTAL_ORDER column order"); + } columnIndex.pageIndexes = pageIndexes.toIntArray(); // Repetition and definition level histograms are optional so keep them null if the builder has no values if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { @@ -802,8 +890,21 @@ private void clear() { clearMinMax(); nextPageIndex = 0; pageIndexes.clear(); - repLevelHistogram.clear(); - defLevelHistogram.clear(); + if (nanCounts != null) { + nanCounts.clear(); + } else { + nanCounts = new LongArrayList(); + } + if (repLevelHistogram == null) { + repLevelHistogram = new LongArrayList(); + } else { + repLevelHistogram.clear(); + } + if (defLevelHistogram == null) { + defLevelHistogram = new LongArrayList(); + } else { + defLevelHistogram.clear(); + } } abstract void clearMinMax(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java index 5d5d54aa76..ba4ca49732 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveType; @@ -83,6 +84,11 @@ int compareValueToMax(int arrayIndex) { private final DoubleList minValues = new DoubleArrayList(); private final DoubleList maxValues = new DoubleArrayList(); private boolean invalid; + private final boolean isIeee754TotalOrder; + + DoubleColumnIndexBuilder(PrimitiveType type) { + this.isIeee754TotalOrder = type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()); + } private static double convert(ByteBuffer buffer) { return buffer.order(LITTLE_ENDIAN).getDouble(0); @@ -103,8 +109,12 @@ void addMinMax(Object min, Object max) { double dMin = (double) min; double dMax = (double) max; if (Double.isNaN(dMin) || Double.isNaN(dMax)) { - // Invalidate this column index in case of NaN as the sorting order of values is undefined for this case - invalid = true; + if (isIeee754TotalOrder) { + dMin = Double.NaN; + dMax = Double.NaN; + } else { + invalid = true; + } } // Sorting order is undefined for -0.0 so let min = -0.0 and max = +0.0 to ensure that no 0.0 values are skipped @@ -119,6 +129,13 @@ void addMinMax(Object min, Object max) { maxValues.add(dMax); } + @Override + void onNanEncountered() { + if (!isIeee754TotalOrder) { + invalid = true; + } + } + @Override ColumnIndexBase createColumnIndex(PrimitiveType type) { if (invalid) { diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java index be66f85d15..0acf482c4c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveType; @@ -83,6 +84,11 @@ int compareValueToMax(int arrayIndex) { private final FloatList minValues = new FloatArrayList(); private final FloatList maxValues = new FloatArrayList(); private boolean invalid; + private final boolean isIeee754TotalOrder; + + FloatColumnIndexBuilder(PrimitiveType type) { + this.isIeee754TotalOrder = type.columnOrder().equals(ColumnOrder.ieee754TotalOrder()); + } private static float convert(ByteBuffer buffer) { return buffer.order(LITTLE_ENDIAN).getFloat(0); @@ -103,8 +109,12 @@ void addMinMax(Object min, Object max) { float fMin = (float) min; float fMax = (float) max; if (Float.isNaN(fMin) || Float.isNaN(fMax)) { - // Invalidate this column index in case of NaN as the sorting order of values is undefined for this case - invalid = true; + if (isIeee754TotalOrder) { + fMin = Float.NaN; + fMax = Float.NaN; + } else { + invalid = true; + } } // Sorting order is undefined for -0.0 so let min = -0.0 and max = +0.0 to ensure that no 0.0 values are skipped @@ -119,6 +129,13 @@ void addMinMax(Object min, Object max) { maxValues.add(fMax); } + @Override + void onNanEncountered() { + if (!isIeee754TotalOrder) { + invalid = true; + } + } + @Override ColumnIndexBase createColumnIndex(PrimitiveType type) { if (invalid) { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java index 94a1275569..35ef0ec9d2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java @@ -36,11 +36,16 @@ public enum ColumnOrderName { /** * Type defined order meaning that the comparison order of the elements are based on its type. */ - TYPE_DEFINED_ORDER + TYPE_DEFINED_ORDER, + /** + * The column order is defined by the IEEE 754 standard. + */ + IEEE_754_TOTAL_ORDER, } private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED); private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER); + private static final ColumnOrder IEEE_754_TOTAL_ORDER = new ColumnOrder(ColumnOrderName.IEEE_754_TOTAL_ORDER); /** * @return a {@link ColumnOrder} instance representing an undefined order @@ -58,6 +63,14 @@ public static ColumnOrder typeDefined() { return TYPE_DEFINED_COLUMN_ORDER; } + /** + * @return a {@link ColumnOrder} instance representing an IEEE 754 total order + * @see ColumnOrderName#IEEE_754_TOTAL_ORDER + */ + public static ColumnOrder ieee754TotalOrder() { + return IEEE_754_TOTAL_ORDER; + } + private final ColumnOrderName columnOrderName; private ColumnOrder(ColumnOrderName columnOrderName) { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java index 98bc5c0237..625e9fd9d3 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java @@ -19,6 +19,7 @@ package org.apache.parquet.schema; import static java.util.Optional.empty; +import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.IEEE_754_TOTAL_ORDER; import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.TYPE_DEFINED_ORDER; import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.UNDEFINED; import static org.apache.parquet.schema.PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER; @@ -1047,6 +1048,13 @@ LogicalTypeToken getType() { PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { return PrimitiveStringifier.FLOAT16_STRINGIFIER; } + + @Override + boolean isValidColumnOrder(ColumnOrder columnOrder) { + return columnOrder.getColumnOrderName() == UNDEFINED + || columnOrder.getColumnOrderName() == TYPE_DEFINED_ORDER + || columnOrder.getColumnOrderName() == IEEE_754_TOTAL_ORDER; + } } public static class UnknownLogicalTypeAnnotation extends LogicalTypeAnnotation { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 50c4acd4c9..9d22d25312 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -293,4 +293,65 @@ public String toString() { return "BINARY_AS_FLOAT16_COMPARATOR"; } }; + + static final PrimitiveComparator FLOAT_IEEE_754_TOTAL_ORDER_COMPARATOR = new PrimitiveComparator() { + @Override + int compareNotNulls(Float o1, Float o2) { + return compare(o1.floatValue(), o2.floatValue()); + } + + @Override + public int compare(float f1, float f2) { + int f1Int = Float.floatToRawIntBits(f1); + int f2Int = Float.floatToRawIntBits(f2); + f1Int ^= ((f1Int >> 31) >>> 1); + f2Int ^= ((f2Int >> 31) >>> 1); + return Integer.compare(f1Int, f2Int); + } + + @Override + public String toString() { + return "FLOAT_IEEE_754_TOTAL_ORDER_COMPARATOR"; + } + }; + + static final PrimitiveComparator DOUBLE_IEEE_754_TOTAL_ORDER_COMPARATOR = + new PrimitiveComparator() { + @Override + int compareNotNulls(Double o1, Double o2) { + return compare(o1.doubleValue(), o2.doubleValue()); + } + + @Override + public int compare(double d1, double d2) { + long d1Long = Double.doubleToRawLongBits(d1); + long d2Long = Double.doubleToRawLongBits(d2); + d1Long ^= ((d1Long >> 63) >>> 1); + d2Long ^= ((d2Long >> 63) >>> 1); + return Long.compare(d1Long, d2Long); + } + + @Override + public String toString() { + return "DOUBLE_IEEE_754_TOTAL_ORDER_COMPARATOR"; + } + }; + + static final PrimitiveComparator BINARY_AS_FLOAT16_IEEE_754_TOTAL_ORDER_COMPARATOR = + new BinaryComparator() { + + @Override + int compareBinary(Binary b1, Binary b2) { + int b1Short = b1.get2BytesLittleEndian(); + int b2Short = b2.get2BytesLittleEndian(); + b1Short ^= ((b1Short >> 15) >>> 1); + b2Short ^= ((b2Short >> 15) >>> 1); + return Integer.compare(b1Short, b2Short); + } + + @Override + public String toString() { + return "BINARY_AS_FLOAT16_IEEE_754_TOTAL_ORDER_COMPARATOR"; + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 944cfb58eb..74149197fc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -88,7 +88,7 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { if (logicalType == null) { return PrimitiveComparator.SIGNED_INT64_COMPARATOR; } @@ -152,7 +152,7 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { if (logicalType == null) { return PrimitiveComparator.SIGNED_INT32_COMPARATOR; } @@ -222,7 +222,7 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { return PrimitiveComparator.BOOLEAN_COMPARATOR; } }, @@ -248,7 +248,7 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { if (logicalType == null) { return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR; } @@ -328,8 +328,10 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { - return PrimitiveComparator.FLOAT_COMPARATOR; + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { + return columnOrder != null && columnOrder.getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER + ? PrimitiveComparator.FLOAT_IEEE_754_TOTAL_ORDER_COMPARATOR + : PrimitiveComparator.FLOAT_COMPARATOR; } }, DOUBLE("getDouble", Double.TYPE) { @@ -354,8 +356,10 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { - return PrimitiveComparator.DOUBLE_COMPARATOR; + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { + return columnOrder != null && columnOrder.getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER + ? PrimitiveComparator.DOUBLE_IEEE_754_TOTAL_ORDER_COMPARATOR + : PrimitiveComparator.DOUBLE_COMPARATOR; } }, INT96("getBinary", Binary.class) { @@ -380,7 +384,7 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; } }, @@ -406,11 +410,17 @@ public T convert(PrimitiveTypeNameConverter conve } @Override - PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder) { if (logicalType == null) { return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR; } + if (logicalType.getType() == LogicalTypeAnnotation.LogicalTypeToken.FLOAT16 + && columnOrder != null + && columnOrder.getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER) { + return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR; + } + return logicalType .accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { @Override @@ -477,7 +487,11 @@ public abstract void addValueToPrimitiveConverter( public abstract T convert(PrimitiveTypeNameConverter converter) throws E; - abstract PrimitiveComparator comparator(LogicalTypeAnnotation logicalType); + abstract PrimitiveComparator comparator(LogicalTypeAnnotation logicalType, ColumnOrder columnOrder); + + public PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { + return comparator(logicalType, ColumnOrder.typeDefined()); + } } private final PrimitiveTypeName primitive; @@ -569,6 +583,12 @@ public PrimitiveType( columnOrder = primitive == PrimitiveTypeName.INT96 || originalType == OriginalType.INTERVAL ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); + } else if (columnOrder.getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER) { + Preconditions.checkArgument( + primitive == PrimitiveTypeName.FLOAT || primitive == PrimitiveTypeName.DOUBLE, + "The column order %s is not supported by type %s", + columnOrder, + primitive); } this.columnOrder = requireValidColumnOrder(columnOrder); } @@ -615,6 +635,17 @@ public PrimitiveType( || logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); + } else if (columnOrder.getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER) { + Preconditions.checkArgument( + primitive == PrimitiveTypeName.FLOAT + || primitive == PrimitiveTypeName.DOUBLE + || (logicalTypeAnnotation != null + && logicalTypeAnnotation.getType() + == LogicalTypeAnnotation.LogicalTypeToken.FLOAT16), + "The column order %s is not supported by type %s logical type %s", + columnOrder, + primitive, + logicalTypeAnnotation); } this.columnOrder = requireValidColumnOrder(columnOrder); } @@ -655,6 +686,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId()); } + /** + * @param columnOrder the column order + * @return the same type with the column order set + */ + public Type withColumnOrder(ColumnOrder columnOrder) { + return new PrimitiveType( + getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder); + } + /** * @return the primitive type */ @@ -869,7 +909,7 @@ protected Type union(Type toMerge, boolean strict) { */ @SuppressWarnings("unchecked") public PrimitiveComparator comparator() { - return (PrimitiveComparator) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation()); + return (PrimitiveComparator) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation(), columnOrder()); } /** diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatisticsNanCount.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatisticsNanCount.java new file mode 100644 index 0000000000..796dc3cff9 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatisticsNanCount.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.Float16; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +public class TestStatisticsNanCount { + + private static final PrimitiveType FLOAT_TYPE = + Types.optional(PrimitiveTypeName.FLOAT).named("test_float"); + private static final PrimitiveType DOUBLE_TYPE = + Types.optional(PrimitiveTypeName.DOUBLE).named("test_double"); + private static final PrimitiveType FLOAT16_TYPE = Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .as(LogicalTypeAnnotation.float16Type()) + .named("test_float16"); + + private static final PrimitiveType FLOAT_IEEE754_TYPE = Types.optional(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_float_ieee754"); + private static final PrimitiveType DOUBLE_IEEE754_TYPE = Types.optional(PrimitiveTypeName.DOUBLE) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_double_ieee754"); + private static final PrimitiveType FLOAT16_IEEE754_TYPE = Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .as(LogicalTypeAnnotation.float16Type()) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_float16_ieee754"); + + private static Binary float16Binary(short h) { + return Binary.fromConstantByteArray(new byte[] {(byte) (h & 0xFF), (byte) ((h >> 8) & 0xFF)}); + } + + private static final Binary FLOAT16_NAN = float16Binary((short) 0x7e00); + private static final Binary FLOAT16_ONE = float16Binary((short) 0x3C00); + private static final Binary FLOAT16_TWO = float16Binary((short) 0x4000); + + @Test + public void testFloatNanCountMixedValues() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats.updateStats(1.0f); + stats.updateStats(Float.NaN); + stats.updateStats(2.0f); + stats.updateStats(Float.NaN); + stats.updateStats(3.0f); + + assertTrue(stats.isNanCountSet()); + assertEquals(2, stats.getNanCount()); + assertTrue(Float.isNaN(stats.getMax()) || Float.isNaN(stats.getMin())); + } + + @Test + public void testFloatNanCountAllNaN() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats.updateStats(Float.NaN); + stats.updateStats(Float.NaN); + + assertTrue(stats.isNanCountSet()); + assertEquals(2, stats.getNanCount()); + assertTrue(stats.hasNonNullValue()); + } + + @Test + public void testFloatNanCountNoNaN() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats.updateStats(1.0f); + stats.updateStats(2.0f); + + assertTrue(stats.isNanCountSet()); + assertEquals(0, stats.getNanCount()); + } + + @Test + public void testDoubleNanCountMixedValues() { + DoubleStatistics stats = (DoubleStatistics) Statistics.createStats(DOUBLE_TYPE); + stats.updateStats(1.0); + stats.updateStats(Double.NaN); + stats.updateStats(2.0); + + assertTrue(stats.isNanCountSet()); + assertEquals(1, stats.getNanCount()); + assertTrue(Double.isNaN(stats.getMax()) || Double.isNaN(stats.getMin())); + } + + @Test + public void testFloat16NanCountMixedValues() { + BinaryStatistics stats = (BinaryStatistics) Statistics.createStats(FLOAT16_TYPE); + stats.updateStats(FLOAT16_ONE); + stats.updateStats(FLOAT16_NAN); + stats.updateStats(FLOAT16_TWO); + + assertTrue(stats.isNanCountSet()); + assertEquals(1, stats.getNanCount()); + assertTrue(stats.hasNonNullValue()); + assertTrue(Float16.isNaN(stats.getMin().get2BytesLittleEndian()) + || Float16.isNaN(stats.getMax().get2BytesLittleEndian())); + } + + @Test + public void testFloat16NanCountNoNaN() { + BinaryStatistics stats = (BinaryStatistics) Statistics.createStats(FLOAT16_TYPE); + stats.updateStats(FLOAT16_ONE); + + assertTrue(stats.isNanCountSet()); + assertEquals(0, stats.getNanCount()); + } + + @Test + public void testMergeNanCounts() { + FloatStatistics stats1 = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats1.updateStats(1.0f); + stats1.updateStats(Float.NaN); + + FloatStatistics stats2 = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats2.updateStats(2.0f); + stats2.updateStats(Float.NaN); + stats2.updateStats(Float.NaN); + + stats1.mergeStatistics(stats2); + assertEquals(3, stats1.getNanCount()); + } + + @Test + public void testCopyPreservesNanCount() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats.updateStats(1.0f); + stats.updateStats(Float.NaN); + stats.updateStats(Float.NaN); + + FloatStatistics copy = stats.copy(); + assertEquals(stats.getNanCount(), copy.getNanCount()); + assertTrue(copy.isNanCountSet()); + assertEquals(2, copy.getNanCount()); + } + + @Test + public void testFloatBuilderIEEE754KeepsNanMinMax() { + Statistics.Builder builder = Statistics.getBuilderForReading(FLOAT_IEEE754_TYPE); + byte[] nanBytes = org.apache.parquet.bytes.BytesUtils.intToBytes(Float.floatToIntBits(Float.NaN)); + Statistics stats = builder.withMin(nanBytes) + .withMax(nanBytes) + .withNanCount(10) + .withNumNulls(0) + .build(); + + assertTrue(stats.hasNonNullValue()); + assertTrue(Float.isNaN(((FloatStatistics) stats).getMin())); + assertTrue(Float.isNaN(((FloatStatistics) stats).getMax())); + assertEquals(10, stats.getNanCount()); + } + + @Test + public void testFloatBuilderTypeDefinedDropsNanMinMax() { + PrimitiveType type = FLOAT_TYPE; + Statistics.Builder builder = Statistics.getBuilderForReading(type); + byte[] nanBytes = org.apache.parquet.bytes.BytesUtils.intToBytes(Float.floatToIntBits(Float.NaN)); + Statistics stats = + builder.withMin(nanBytes).withMax(nanBytes).withNumNulls(0).build(); + + assertFalse(stats.hasNonNullValue()); + assertFalse(Float.isNaN(((FloatStatistics) stats).getMin())); + assertFalse(Float.isNaN(((FloatStatistics) stats).getMax())); + } + + @Test + public void testDoubleBuilderIEEE754KeepsNanMinMax() { + PrimitiveType type = DOUBLE_IEEE754_TYPE; + Statistics.Builder builder = Statistics.getBuilderForReading(type); + byte[] nanBytes = org.apache.parquet.bytes.BytesUtils.longToBytes(Double.doubleToLongBits(Double.NaN)); + Statistics stats = builder.withMin(nanBytes) + .withMax(nanBytes) + .withNanCount(10) + .withNumNulls(0) + .build(); + + assertTrue(stats.hasNonNullValue()); + assertTrue(Double.isNaN(((DoubleStatistics) stats).getMin())); + assertTrue(Double.isNaN(((DoubleStatistics) stats).getMax())); + assertEquals(10, stats.getNanCount()); + } + + @Test + public void testFloatIEEE754NanOnlySetHasNonNullValue() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_IEEE754_TYPE); + stats.updateStats(Float.NaN); + stats.updateStats(Float.NaN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(2, stats.getNanCount()); + assertTrue(Float.isNaN(stats.getMin())); + assertTrue(Float.isNaN(stats.getMax())); + } + + @Test + public void testDoubleIEEE754NanOnlySetHasNonNullValue() { + DoubleStatistics stats = (DoubleStatistics) Statistics.createStats(DOUBLE_IEEE754_TYPE); + stats.updateStats(Double.NaN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(1, stats.getNanCount()); + assertTrue(Double.isNaN(stats.getMin())); + assertTrue(Double.isNaN(stats.getMax())); + } + + @Test + public void testFloat16IEEE754NanOnlySetHasNonNullValue() { + BinaryStatistics stats = (BinaryStatistics) Statistics.createStats(FLOAT16_IEEE754_TYPE); + stats.updateStats(FLOAT16_NAN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(1, stats.getNanCount()); + assertTrue(Float16.isNaN(stats.getMin().get2BytesLittleEndian())); + assertTrue(Float16.isNaN(stats.getMax().get2BytesLittleEndian())); + } + + @Test + public void testFloatIEEE754NanExcludedFromMax() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_IEEE754_TYPE); + stats.updateStats(1.0f); + stats.updateStats(Float.NaN); + stats.updateStats(2.0f); + + // NaN is excluded from min/max on the write path for all column orders + assertEquals(2.0f, stats.getMax(), 0.0f); + assertEquals(1.0f, stats.getMin(), 0.0f); + assertEquals(1, stats.getNanCount()); + } + + @Test + public void testDoubleIEEE754NanExcludedFromMax() { + DoubleStatistics stats = (DoubleStatistics) Statistics.createStats(DOUBLE_IEEE754_TYPE); + stats.updateStats(1.0); + stats.updateStats(Double.NaN); + stats.updateStats(2.0); + + // NaN is excluded from min/max on the write path for all column orders + assertEquals(2.0, stats.getMax(), 0.0); + assertEquals(1.0, stats.getMin(), 0.0); + assertEquals(1, stats.getNanCount()); + } + + @Test + public void testFloatTypeDefinedNanOnlySetHasNonNullValue() { + FloatStatistics stats = (FloatStatistics) Statistics.createStats(FLOAT_TYPE); + stats.updateStats(Float.NaN); + stats.updateStats(Float.NaN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(2, stats.getNanCount()); + } + + @Test + public void testDoubleTypeDefinedNanOnlySetHasNonNullValue() { + DoubleStatistics stats = (DoubleStatistics) Statistics.createStats(DOUBLE_TYPE); + stats.updateStats(Double.NaN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(1, stats.getNanCount()); + } + + @Test + public void testFloat16TypeDefinedNanOnlySetHasNonNullValue() { + BinaryStatistics stats = (BinaryStatistics) Statistics.createStats(FLOAT16_TYPE); + stats.updateStats(FLOAT16_NAN); + + assertTrue(stats.hasNonNullValue()); + assertEquals(1, stats.getNanCount()); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilderNaN.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilderNaN.java new file mode 100644 index 0000000000..737bc0cca0 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilderNaN.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.internal.column.columnindex; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.List; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +/** + * Tests for ColumnIndex NaN handling under IEEE 754 total order and TYPE_DEFINED_ORDER. + */ +public class TestColumnIndexBuilderNaN { + + private static final PrimitiveType FLOAT_TYPE = + Types.required(PrimitiveTypeName.FLOAT).named("test_float"); + private static final PrimitiveType FLOAT_IEEE754_TYPE = Types.required(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_float_ieee754"); + private static final PrimitiveType DOUBLE_TYPE = + Types.required(PrimitiveTypeName.DOUBLE).named("test_double"); + private static final PrimitiveType DOUBLE_IEEE754_TYPE = Types.required(PrimitiveTypeName.DOUBLE) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_double_ieee754"); + private static final PrimitiveType FLOAT16_TYPE = Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .as(LogicalTypeAnnotation.float16Type()) + .named("test_float16"); + private static final PrimitiveType FLOAT16_IEEE754_TYPE = Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .as(LogicalTypeAnnotation.float16Type()) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_float16_ieee754"); + + private static Binary float16Binary(short h) { + return Binary.fromConstantByteArray(new byte[] {(byte) (h & 0xFF), (byte) ((h >> 8) & 0xFF)}); + } + + private static final Binary FLOAT16_NAN = float16Binary((short) 0x7e00); + private static final Binary FLOAT16_ONE = float16Binary((short) 0x3C00); + private static final Binary FLOAT16_TWO = float16Binary((short) 0x4000); + + private Statistics floatStats(PrimitiveType type, Object... values) { + Statistics stats = Statistics.createStats(type); + for (Object value : values) { + if (value == null) { + stats.incrementNumNulls(); + } else { + stats.updateStats((float) value); + } + } + return stats; + } + + private Statistics doubleStats(PrimitiveType type, Object... values) { + Statistics stats = Statistics.createStats(type); + for (Object value : values) { + if (value == null) { + stats.incrementNumNulls(); + } else { + stats.updateStats((double) value); + } + } + return stats; + } + + private Statistics binaryStats(PrimitiveType type, Binary... values) { + Statistics stats = Statistics.createStats(type); + for (Binary value : values) { + if (value == null) { + stats.incrementNumNulls(); + } else { + stats.updateStats(value); + } + } + return stats; + } + + @Test + public void testFloatTypeDefinedOrderNaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(FLOAT_TYPE, Integer.MAX_VALUE); + builder.add(floatStats(FLOAT_TYPE, 1.0f, 2.0f)); + builder.add(floatStats(FLOAT_TYPE, Float.NaN)); + builder.add(floatStats(FLOAT_TYPE, 3.0f, 4.0f)); + ColumnIndex ci = builder.build(); + assertNull(ci); + } + + @Test + public void testFloatIeee754NaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(FLOAT_IEEE754_TYPE, Integer.MAX_VALUE); + builder.add(floatStats(FLOAT_IEEE754_TYPE, 1.0f, 2.0f)); + builder.add(floatStats(FLOAT_IEEE754_TYPE, Float.NaN, Float.NaN)); + builder.add(floatStats(FLOAT_IEEE754_TYPE, 3.0f, Float.NaN, 4.0f)); + ColumnIndex ci = builder.build(); + assertNotNull(ci); + assertEquals(3, ci.getNullPages().size()); + List nanCounts = ci.getNanCounts(); + assertNotNull(nanCounts); + assertEquals(3, nanCounts.size()); + assertEquals(0L, (long) nanCounts.get(0)); + assertEquals(2L, (long) nanCounts.get(1)); + assertEquals(1L, (long) nanCounts.get(2)); + } + + @Test + public void testDoubleTypeDefinedOrderNaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(DOUBLE_TYPE, Integer.MAX_VALUE); + builder.add(doubleStats(DOUBLE_TYPE, 1.0, 2.0)); + builder.add(doubleStats(DOUBLE_TYPE, Double.NaN)); + builder.add(doubleStats(DOUBLE_TYPE, 3.0, 4.0)); + ColumnIndex ci = builder.build(); + assertNull(ci); + } + + @Test + public void testDoubleIeee754NaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(DOUBLE_IEEE754_TYPE, Integer.MAX_VALUE); + builder.add(doubleStats(DOUBLE_IEEE754_TYPE, 1.0, 2.0)); + builder.add(doubleStats(DOUBLE_IEEE754_TYPE, Double.NaN, Double.NaN, Double.NaN)); + builder.add(doubleStats(DOUBLE_IEEE754_TYPE, 3.0, Double.NaN, 4.0)); + ColumnIndex ci = builder.build(); + assertNotNull(ci); + assertEquals(3, ci.getNullPages().size()); + List nanCounts = ci.getNanCounts(); + assertNotNull(nanCounts); + assertEquals(3, nanCounts.size()); + assertEquals(0L, (long) nanCounts.get(0)); + assertEquals(3L, (long) nanCounts.get(1)); + assertEquals(1L, (long) nanCounts.get(2)); + } + + @Test + public void testFloat16TypeDefinedOrderNaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(FLOAT16_TYPE, Integer.MAX_VALUE); + builder.add(binaryStats(FLOAT16_TYPE, FLOAT16_ONE, FLOAT16_TWO)); + builder.add(binaryStats(FLOAT16_TYPE, FLOAT16_NAN)); + builder.add(binaryStats(FLOAT16_TYPE, FLOAT16_ONE)); + ColumnIndex ci = builder.build(); + assertNull(ci); + } + + @Test + public void testFloat16Ieee754NaNOnlyPage() { + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(FLOAT16_IEEE754_TYPE, Integer.MAX_VALUE); + builder.add(binaryStats(FLOAT16_IEEE754_TYPE, FLOAT16_ONE, FLOAT16_NAN, FLOAT16_TWO)); + builder.add(binaryStats(FLOAT16_IEEE754_TYPE, FLOAT16_NAN, FLOAT16_NAN)); + builder.add(binaryStats(FLOAT16_IEEE754_TYPE, FLOAT16_ONE)); + ColumnIndex ci = builder.build(); + assertNotNull(ci); + assertEquals(3, ci.getNullPages().size()); + List nanCounts = ci.getNanCounts(); + assertNotNull(nanCounts); + assertEquals(3, nanCounts.size()); + assertEquals(1L, (long) nanCounts.get(0)); + assertEquals(2L, (long) nanCounts.get(1)); + assertEquals(0L, (long) nanCounts.get(2)); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index 8fb53aca0f..ec7425141e 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -19,10 +19,13 @@ package org.apache.parquet.schema; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_IEEE_754_TOTAL_ORDER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_IEEE_754_TOTAL_ORDER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.FLOAT_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.FLOAT_IEEE_754_TOTAL_ORDER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.SIGNED_INT32_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.SIGNED_INT64_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.UNSIGNED_INT32_COMPARATOR; @@ -167,6 +170,22 @@ private void testInt64Comparator(PrimitiveComparator comparator, Long... v checkThrowingUnsupportedException(comparator, Long.TYPE); } + private void testFloatComparator(PrimitiveComparator comparator, Float... valuesInAscendingOrder) { + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + Float vi = valuesInAscendingOrder[i]; + Float vj = valuesInAscendingOrder[j]; + int exp = i - j; + assertSignumEquals(vi, vj, exp, comparator.compare(vi, vj)); + if (vi != null && vj != null) { + assertSignumEquals(vi, vj, exp, comparator.compare(vi.floatValue(), vj.floatValue())); + } + } + } + + checkThrowingUnsupportedException(comparator, Float.TYPE); + } + @Test public void testFloatComparator() { Float[] valuesInAscendingOrder = { @@ -182,19 +201,46 @@ public void testFloatComparator() { Float.POSITIVE_INFINITY }; + testFloatComparator(FLOAT_COMPARATOR, valuesInAscendingOrder); + } + + @Test + public void testFloatIEEE754TotalOrderComparator() { + Float[] valuesInAscendingOrder = { + null, + Float.intBitsToFloat(0xFFFFFFFF), // -NaN (smallest) + Float.intBitsToFloat(0xFFF00001), // -NaN (largest) + Float.NEGATIVE_INFINITY, + -Float.MAX_VALUE, + -1234.5678F, + -Float.MIN_VALUE, + -0.0F, + 0.0F, + Float.MIN_VALUE, + 1234.5678F, + Float.MAX_VALUE, + Float.POSITIVE_INFINITY, + Float.intBitsToFloat(0x7FF00001), // +NaN (smallest) + Float.intBitsToFloat(0x7FFFFFFF), // +NaN (largest) + }; + + testFloatComparator(FLOAT_IEEE_754_TOTAL_ORDER_COMPARATOR, valuesInAscendingOrder); + } + + private void testDoubleComparator(PrimitiveComparator comparator, Double... valuesInAscendingOrder) { for (int i = 0; i < valuesInAscendingOrder.length; ++i) { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { - Float vi = valuesInAscendingOrder[i]; - Float vj = valuesInAscendingOrder[j]; + Double vi = valuesInAscendingOrder[i]; + Double vj = valuesInAscendingOrder[j]; int exp = i - j; - assertSignumEquals(vi, vj, exp, FLOAT_COMPARATOR.compare(vi, vj)); + assertSignumEquals(vi, vj, exp, comparator.compare(vi, vj)); if (vi != null && vj != null) { - assertSignumEquals(vi, vj, exp, FLOAT_COMPARATOR.compare(vi.floatValue(), vj.floatValue())); + assertSignumEquals(vi, vj, exp, comparator.compare(vi.doubleValue(), vj.doubleValue())); } } } - checkThrowingUnsupportedException(FLOAT_COMPARATOR, Float.TYPE); + checkThrowingUnsupportedException(comparator, Double.TYPE); } @Test @@ -212,19 +258,30 @@ public void testDoubleComparator() { Double.POSITIVE_INFINITY }; - for (int i = 0; i < valuesInAscendingOrder.length; ++i) { - for (int j = 0; j < valuesInAscendingOrder.length; ++j) { - Double vi = valuesInAscendingOrder[i]; - Double vj = valuesInAscendingOrder[j]; - int exp = i - j; - assertSignumEquals(vi, vj, exp, DOUBLE_COMPARATOR.compare(vi, vj)); - if (vi != null && vj != null) { - assertSignumEquals(vi, vj, exp, DOUBLE_COMPARATOR.compare(vi.doubleValue(), vj.doubleValue())); - } - } - } + testDoubleComparator(DOUBLE_COMPARATOR, valuesInAscendingOrder); + } - checkThrowingUnsupportedException(DOUBLE_COMPARATOR, Double.TYPE); + @Test + public void testDoubleIEEE754TotalOrderComparator() { + Double[] valuesInAscendingOrder = { + null, + Double.longBitsToDouble(0xFFFFFFFFFFFFFFFFL), // -NaN (smallest) + Double.longBitsToDouble(0xFFF0000000000001L), // -NaN (largest) + Double.NEGATIVE_INFINITY, + -Double.MAX_VALUE, + -123456.7890123456789, + -Double.MIN_VALUE, + -0.0, + +0.0, + Double.MIN_VALUE, + 123456.7890123456789, + Double.MAX_VALUE, + Double.POSITIVE_INFINITY, + Double.longBitsToDouble(0x7FF0000000000001L), // +NaN (smallest) + Double.longBitsToDouble(0x7FFFFFFFFFFFFFFFL), // +NaN (largest) + }; + + testDoubleComparator(DOUBLE_IEEE_754_TOTAL_ORDER_COMPARATOR, valuesInAscendingOrder); } @Test @@ -324,6 +381,34 @@ public void testFloat16Comparator() { } } + @Test + public void testBinaryAsFloat16IEEE754TotalOrderComparator() { + Binary[] valuesInAscendingOrder = { + null, + Binary.fromConstantByteArray(new byte[] {(byte) 0xff, (byte) 0xff}), // -NaN (smallest) + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0xfc}), // -NaN (largest) + Binary.fromConstantByteArray(new byte[] {0x00, (byte) 0xfc}), // -Infinity + Binary.fromConstantByteArray(new byte[] {0x00, (byte) 0xc0}), // -2.0 + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x84}), // -6.109476E-5 + Binary.fromConstantByteArray(new byte[] {0x00, (byte) 0x80}), // -0 + Binary.fromConstantByteArray(new byte[] {0x00, 0x00}), // +0 + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x00}), // 5.9604645E-8 + Binary.fromConstantByteArray(new byte[] {(byte) 0xff, (byte) 0x7b}), // 65504.0 + Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x7c}), // Infinity + Binary.fromConstantByteArray(new byte[] {0x01, 0x7c}), // +NaN (smallest) + Binary.fromConstantByteArray(new byte[] {(byte) 0xff, 0x7f}) // +NaN (largest) + }; + + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + Binary vi = valuesInAscendingOrder[i]; + Binary vj = valuesInAscendingOrder[j]; + int exp = i - j; + assertSignumEquals(vi, vj, exp, BINARY_AS_FLOAT16_IEEE_754_TOTAL_ORDER_COMPARATOR.compare(vi, vj)); + } + } + } + private void testObjectComparator(PrimitiveComparator comparator, T... valuesInAscendingOrder) { for (int i = 0; i < valuesInAscendingOrder.length; ++i) { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 60150439a6..03909a7ef5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -47,6 +47,7 @@ import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -86,6 +87,7 @@ import org.apache.parquet.format.GeographyType; import org.apache.parquet.format.GeometryType; import org.apache.parquet.format.GeospatialStatistics; +import org.apache.parquet.format.IEEE754TotalOrder; import org.apache.parquet.format.IntType; import org.apache.parquet.format.KeyValue; import org.apache.parquet.format.LogicalType; @@ -143,6 +145,7 @@ public class ParquetMetadataConverter { private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder(); + private static final IEEE754TotalOrder IEEE_754_TOTAL_ORDER = new IEEE754TotalOrder(); public static final MetadataFilter NO_FILTER = new NoFilter(); public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k @@ -278,11 +281,23 @@ public FileMetaData toParquetMetadata( private List getColumnOrders(MessageType schema) { List columnOrders = new ArrayList<>(); - // Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with - // TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders. - for (int i = 0, n = schema.getPaths().size(); i < n; ++i) { + for (ColumnDescriptor column : schema.getColumns()) { ColumnOrder columnOrder = new ColumnOrder(); - columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); + switch (column.getPrimitiveType().columnOrder().getColumnOrderName()) { + case TYPE_DEFINED_ORDER: + columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); + break; + case IEEE_754_TOTAL_ORDER: + columnOrder.setIEEE_754_TOTAL_ORDER(IEEE_754_TOTAL_ORDER); + break; + case UNDEFINED: + // Use TypeDefinedOrder if some types (e.g. INT96) have undefined column orders. + columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); + break; + default: + throw new IllegalArgumentException( + "Unknown column order: " + column.getPrimitiveType().columnOrder()); + } columnOrders.add(columnOrder); } return columnOrders; @@ -804,6 +819,9 @@ public static Statistics toParquetStatistics( // value has been truncated and is a lower bound and not in the page. if (!stats.isEmpty() && withinLimit(stats, truncateLength)) { formatStats.setNull_count(stats.getNumNulls()); + if (stats.isNanCountSet()) { + formatStats.setNan_count(stats.getNanCount()); + } if (stats.hasNonNullValue()) { byte[] min; byte[] max; @@ -889,7 +907,8 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, } private static boolean isMinMaxStatsSupported(PrimitiveType type) { - return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; + return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER + || type.columnOrder().getColumnOrderName() == ColumnOrderName.IEEE_754_TOTAL_ORDER; } /** @@ -958,6 +977,9 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte if (formatStats.isSetNull_count()) { statsBuilder.withNumNulls(formatStats.null_count); } + if (formatStats.isSetNan_count()) { + statsBuilder.withNanCount(formatStats.getNan_count()); + } } return statsBuilder.build(); } @@ -2088,6 +2110,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu if (columnOrder.isSetTYPE_ORDER()) { return org.apache.parquet.schema.ColumnOrder.typeDefined(); } + if (columnOrder.isSetIEEE_754_TOTAL_ORDER()) { + return org.apache.parquet.schema.ColumnOrder.ieee754TotalOrder(); + } // The column order is not yet supported by this API return org.apache.parquet.schema.ColumnOrder.undefined(); } @@ -2547,6 +2572,10 @@ public static ColumnIndex toParquetColumnIndex( columnIndex.getMaxValues(), toParquetBoundaryOrder(columnIndex.getBoundaryOrder())); parquetColumnIndex.setNull_counts(columnIndex.getNullCounts()); + List nanCounts = columnIndex.getNanCounts(); + if (nanCounts != null && !nanCounts.isEmpty()) { + parquetColumnIndex.setNan_counts(nanCounts); + } List repLevelHistogram = columnIndex.getRepetitionLevelHistogram(); if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { parquetColumnIndex.setRepetition_level_histograms(repLevelHistogram); @@ -2568,6 +2597,7 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar fromParquetBoundaryOrder(parquetColumnIndex.getBoundary_order()), parquetColumnIndex.getNull_pages(), parquetColumnIndex.getNull_counts(), + parquetColumnIndex.getNan_counts(), parquetColumnIndex.getMin_values(), parquetColumnIndex.getMax_values(), parquetColumnIndex.getRepetition_level_histograms(), diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 264017a1f0..42ed5db024 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -1961,4 +1961,160 @@ public void testEdgeInterpolationAlgorithmConversion() { assertNull(ParquetMetadataConverter.fromParquetEdgeInterpolationAlgorithm(null)); assertNull(ParquetMetadataConverter.toParquetEdgeInterpolationAlgorithm(null)); } + + @Test + public void testIEEE754TotalOrderColumnOrder() throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("float_ieee754") + .required(PrimitiveTypeName.DOUBLE) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("double_ieee754") + .named("Message"); + + org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = + new org.apache.parquet.hadoop.metadata.FileMetaData(schema, new HashMap(), null); + ParquetMetadata metadata = new ParquetMetadata(fileMetaData, new ArrayList()); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + FileMetaData formatMetadata = converter.toParquetMetadata(1, metadata); + + List columnOrders = formatMetadata.getColumn_orders(); + assertEquals(2, columnOrders.size()); + for (org.apache.parquet.format.ColumnOrder columnOrder : columnOrders) { + assertTrue(columnOrder.isSetIEEE_754_TOTAL_ORDER()); + } + + MessageType resultSchema = + converter.fromParquetMetadata(formatMetadata).getFileMetaData().getSchema(); + assertEquals( + ColumnOrder.ieee754TotalOrder(), + resultSchema.getType("float_ieee754").asPrimitiveType().columnOrder()); + assertEquals( + ColumnOrder.ieee754TotalOrder(), + resultSchema.getType("double_ieee754").asPrimitiveType().columnOrder()); + } + + @Test + public void testStatisticsNanCountRoundTripFloat() { + PrimitiveType type = Types.required(PrimitiveTypeName.FLOAT).named("test_float"); + FloatStatistics stats = (FloatStatistics) Statistics.createStats(type); + stats.updateStats(1.0f); + stats.updateStats(Float.NaN); + stats.updateStats(3.0f); + stats.updateStats(Float.NaN); + + org.apache.parquet.format.Statistics formatStats = ParquetMetadataConverter.toParquetStatistics(stats); + assertTrue("nan_count should be set", formatStats.isSetNan_count()); + assertEquals(2, formatStats.getNan_count()); + + Statistics roundTrip = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, type, ParquetMetadataConverter.SortOrder.SIGNED); + assertTrue(roundTrip.isNanCountSet()); + assertEquals(2, roundTrip.getNanCount()); + } + + @Test + public void testStatisticsNanCountRoundTripDouble() { + PrimitiveType type = Types.required(PrimitiveTypeName.DOUBLE).named("test_double"); + DoubleStatistics stats = (DoubleStatistics) Statistics.createStats(type); + stats.updateStats(1.0); + stats.updateStats(Double.NaN); + stats.updateStats(3.0); + + org.apache.parquet.format.Statistics formatStats = ParquetMetadataConverter.toParquetStatistics(stats); + assertTrue("nan_count should be set", formatStats.isSetNan_count()); + assertEquals(1, formatStats.getNan_count()); + + Statistics roundTrip = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, type, ParquetMetadataConverter.SortOrder.SIGNED); + assertTrue(roundTrip.isNanCountSet()); + assertEquals(1, roundTrip.getNanCount()); + } + + @Test + public void testStatisticsNanCountRoundTripFloat16() { + PrimitiveType type = Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .as(LogicalTypeAnnotation.float16Type()) + .named("test_float16"); + BinaryStatistics stats = (BinaryStatistics) Statistics.createStats(type); + // FLOAT16 1.0 = 0x3C00 + stats.updateStats(Binary.fromConstantByteArray(new byte[] {0x00, 0x3C})); + // FLOAT16 NaN = 0x7E00 + stats.updateStats(Binary.fromConstantByteArray(new byte[] {0x00, 0x7E})); + + org.apache.parquet.format.Statistics formatStats = ParquetMetadataConverter.toParquetStatistics(stats); + assertTrue("nan_count should be set", formatStats.isSetNan_count()); + assertEquals(1, formatStats.getNan_count()); + + Statistics roundTrip = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, type, ParquetMetadataConverter.SortOrder.SIGNED); + assertTrue(roundTrip.isNanCountSet()); + assertEquals(1, roundTrip.getNanCount()); + } + + @Test + public void testStatisticsNanCountZeroRoundTrip() { + PrimitiveType type = Types.required(PrimitiveTypeName.FLOAT).named("test_float"); + FloatStatistics stats = (FloatStatistics) Statistics.createStats(type); + stats.updateStats(1.0f); + stats.updateStats(2.0f); + + org.apache.parquet.format.Statistics formatStats = ParquetMetadataConverter.toParquetStatistics(stats); + assertTrue("nan_count should be set even when zero", formatStats.isSetNan_count()); + assertEquals(0, formatStats.getNan_count()); + + Statistics roundTrip = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, type, ParquetMetadataConverter.SortOrder.SIGNED); + assertTrue(roundTrip.isNanCountSet()); + assertEquals(0, roundTrip.getNanCount()); + } + + @Test + public void testStatisticsNanCountBackwardCompatibility() { + PrimitiveType type = Types.required(PrimitiveTypeName.FLOAT).named("test_float"); + org.apache.parquet.format.Statistics formatStats = new org.apache.parquet.format.Statistics(); + formatStats.setNull_count(5); + + Statistics stats = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, type, ParquetMetadataConverter.SortOrder.SIGNED); + assertFalse("nan_count should not be set for old files", stats.isNanCountSet()); + } + + @Test + public void testColumnIndexNanCountsRoundTrip() { + PrimitiveType type = Types.required(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("test_float"); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); + + // Page 1: mixed NaN and non-NaN + FloatStatistics stats1 = (FloatStatistics) Statistics.createStats(type); + stats1.updateStats(1.0f); + stats1.updateStats(Float.NaN); + stats1.updateStats(3.0f); + builder.add(stats1); + + // Page 2: all nulls + FloatStatistics stats2 = (FloatStatistics) Statistics.createStats(type); + stats2.incrementNumNulls(10); + builder.add(stats2); + + // Page 3: no NaN + FloatStatistics stats3 = (FloatStatistics) Statistics.createStats(type); + stats3.updateStats(5.0f); + stats3.updateStats(10.0f); + builder.add(stats3); + + ColumnIndex columnIndex = builder.build(); + org.apache.parquet.format.ColumnIndex parquetColumnIndex = + ParquetMetadataConverter.toParquetColumnIndex(type, columnIndex); + assertNotNull("nan_counts should be set", parquetColumnIndex.getNan_counts()); + assertEquals(List.of(1L, 0L, 0L), parquetColumnIndex.getNan_counts()); + + ColumnIndex roundTrip = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); + assertNotNull(roundTrip); + assertEquals(List.of(1L, 0L, 0L), roundTrip.getNanCounts()); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16ReadWriteRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16ReadWriteRoundTrip.java index 8251ab2123..8040b60ec2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16ReadWriteRoundTrip.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16ReadWriteRoundTrip.java @@ -21,6 +21,7 @@ import static org.apache.parquet.schema.LogicalTypeAnnotation.float16Type; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.io.File; import java.io.IOException; @@ -56,7 +57,7 @@ public class TestFloat16ReadWriteRoundTrip { private Binary[] valuesInAscendingOrder = { Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xfc}), // -Infinity Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xc0}), // -2.0 - Binary.fromConstantByteArray(new byte[] {(byte) 0xff, (byte) 0x7b}), // -6.109476E-5 + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x84}), // -6.109476E-5 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}), // -0 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}), // +0 Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x00}), // 5.9604645E-8 @@ -70,7 +71,7 @@ public class TestFloat16ReadWriteRoundTrip { Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x00}), // 5.9604645E-8 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}), // +0 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}), // -0 - Binary.fromConstantByteArray(new byte[] {(byte) 0xff, (byte) 0x7b}), // -6.109476E-5 + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x84}), // -6.109476E-5 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xc0}), // -2.0 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xfc}) }; // -Infinity @@ -82,7 +83,7 @@ public class TestFloat16ReadWriteRoundTrip { Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x00}), // 5.9604645E-8 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}), // +0 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xc0}), // -2.0 - Binary.fromConstantByteArray(new byte[] {(byte) 0xff, (byte) 0x7b}), // -6.109476E-5 + Binary.fromConstantByteArray(new byte[] {(byte) 0x01, (byte) 0x84}), // -6.109476E-5 Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xfc}) }; // -Infinity @@ -123,20 +124,15 @@ public class TestFloat16ReadWriteRoundTrip { }; // Infinity private Binary[] valuesAllPositiveZeroMinMax = { - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}), // +0 - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}) - }; // +0 + Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}), // -0 (adjusted min) + Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}) // +0 (adjusted max) + }; private Binary[] valuesAllNegativeZeroMinMax = { - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}), // -0 - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}) + Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x80}), + Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x00}) }; // -0 - private Binary[] valuesWithNaNMinMax = { - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0xc0}), // -2.0 - Binary.fromConstantByteArray(new byte[] {(byte) 0x00, (byte) 0x7e}) - }; // NaN - @Test public void testFloat16ColumnIndex() throws IOException { List testValues = List.of( @@ -144,15 +140,13 @@ public void testFloat16ColumnIndex() throws IOException { valuesInDescendingOrder, valuesUndefinedOrder, valuesAllPositiveZero, - valuesAllNegativeZero, - valuesWithNaN); + valuesAllNegativeZero); List expectedValues = List.of( valuesInAscendingOrderMinMax, valuesInDescendingOrderMinMax, valuesUndefinedOrderMinMax, valuesAllPositiveZeroMinMax, - valuesAllNegativeZeroMinMax, - valuesWithNaNMinMax); + valuesAllNegativeZeroMinMax); for (int i = 0; i < testValues.size(); i++) { MessageType schema = Types.buildMessage() @@ -185,6 +179,38 @@ public void testFloat16ColumnIndex() throws IOException { assertEquals(Collections.singletonList(expectedValues.get(i)[1]), toFloat16List(index.getMaxValues())); } } + + // NaN values with TYPE_DEFINED_ORDER result in a null column index + { + MessageType schema = Types.buildMessage() + .required(FIXED_LEN_BYTE_ARRAY) + .as(float16Type()) + .length(2) + .named("col_float16") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + + for (Binary value : valuesWithNaN) { + writer.write(factory.newGroup().append("col_float16", value)); + } + } + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ColumnChunkMetaData column = + reader.getFooter().getBlocks().get(0).getColumns().get(0); + ColumnIndex index = reader.readColumnIndex(column); + assertNull(index); + } + } } private Path newTempPath() throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestIeee754TotalOrderEndToEnd.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestIeee754TotalOrderEndToEnd.java new file mode 100644 index 0000000000..9adff7734c --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestIeee754TotalOrderEndToEnd.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.DoubleStatistics; +import org.apache.parquet.column.statistics.FloatStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIeee754TotalOrderEndToEnd { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final MessageType FLOAT_SCHEMA = Types.buildMessage() + .required(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("float_col") + .named("msg"); + + private static final MessageType DOUBLE_SCHEMA = Types.buildMessage() + .required(PrimitiveTypeName.DOUBLE) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("double_col") + .named("msg"); + + private static final MessageType FLOAT_DOUBLE_SCHEMA = Types.buildMessage() + .required(PrimitiveTypeName.FLOAT) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("float_col") + .required(PrimitiveTypeName.DOUBLE) + .columnOrder(ColumnOrder.ieee754TotalOrder()) + .named("double_col") + .named("msg"); + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.getAbsolutePath()); + } + + private Path writeFloatFile(float... values) throws IOException { + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withType(FLOAT_SCHEMA) + .withDictionaryEncoding(false) + .build()) { + GroupFactory factory = new SimpleGroupFactory(FLOAT_SCHEMA); + for (float v : values) { + writer.write(factory.newGroup().append("float_col", v)); + } + } + return path; + } + + private Path writeDoubleFile(double... values) throws IOException { + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withType(DOUBLE_SCHEMA) + .withDictionaryEncoding(false) + .build()) { + GroupFactory factory = new SimpleGroupFactory(DOUBLE_SCHEMA); + for (double v : values) { + writer.write(factory.newGroup().append("double_col", v)); + } + } + return path; + } + + private Path writeFloatDoubleFile(float[] floatValues, double[] doubleValues) throws IOException { + Preconditions.checkArgument(floatValues.length == doubleValues.length, "Arrays must have same length"); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withType(FLOAT_DOUBLE_SCHEMA) + .withDictionaryEncoding(false) + .build()) { + GroupFactory factory = new SimpleGroupFactory(FLOAT_DOUBLE_SCHEMA); + for (int i = 0; i < floatValues.length; i++) { + writer.write( + factory.newGroup().append("float_col", floatValues[i]).append("double_col", doubleValues[i])); + } + } + return path; + } + + private List readFloatValues(Path path) throws IOException { + return readFloatValues(path, null); + } + + private List readFloatValues(Path path, FilterCompat.Filter filter) throws IOException { + List result = new ArrayList<>(); + ParquetReader.Builder builder = ParquetReader.builder(new GroupReadSupport(), path); + if (filter != null) { + builder.withFilter(filter); + } + try (ParquetReader reader = builder.build()) { + Group group; + while ((group = reader.read()) != null) { + result.add(group.getFloat("float_col", 0)); + } + } + return result; + } + + private List readDoubleValues(Path path) throws IOException { + return readDoubleValues(path, null); + } + + private List readDoubleValues(Path path, FilterCompat.Filter filter) throws IOException { + List result = new ArrayList<>(); + ParquetReader.Builder builder = ParquetReader.builder(new GroupReadSupport(), path); + if (filter != null) { + builder.withFilter(filter); + } + try (ParquetReader reader = builder.build()) { + Group group; + while ((group = reader.read()) != null) { + result.add(group.getDouble("double_col", 0)); + } + } + return result; + } + + @Test + public void testFloatStatisticsWithNaN() throws IOException { + Path path = writeFloatFile(1.0f, Float.NaN, 3.0f, Float.NaN, 5.0f); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData block = reader.getFooter().getBlocks().get(0); + ColumnChunkMetaData column = block.getColumns().get(0); + + // Verify column order is IEEE 754 + assertEquals( + ColumnOrder.ieee754TotalOrder(), column.getPrimitiveType().columnOrder()); + + // Verify statistics + Statistics stats = column.getStatistics(); + assertNotNull(stats); + assertTrue("nan_count should be set", stats.isNanCountSet()); + assertEquals("nan_count should be 2", 2, stats.getNanCount()); + + // Min/max should exclude NaN + FloatStatistics floatStats = (FloatStatistics) stats; + assertEquals(1.0f, floatStats.getMin(), 0.0f); + assertEquals(5.0f, floatStats.getMax(), 0.0f); + + // Verify column index + ColumnIndex columnIndex = reader.readColumnIndex(column); + assertNotNull("ColumnIndex should not be null for IEEE 754 with NaN", columnIndex); + List nanCounts = columnIndex.getNanCounts(); + assertNotNull("nan_counts should be set in column index", nanCounts); + // All values in one page, so one entry with nan_count = 2 + assertEquals(1, nanCounts.size()); + assertEquals(Long.valueOf(2), nanCounts.get(0)); + + // Verify min/max in column index exclude NaN + List minValues = columnIndex.getMinValues(); + List maxValues = columnIndex.getMaxValues(); + assertEquals(1, minValues.size()); + float ciMin = minValues.get(0).order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + float ciMax = maxValues.get(0).order(ByteOrder.LITTLE_ENDIAN).getFloat(0); + assertEquals(1.0f, ciMin, 0.0f); + assertEquals(5.0f, ciMax, 0.0f); + } + } + + @Test + public void testDoubleStatisticsWithNaN() throws IOException { + // Write: -10.0, NaN, 20.0, NaN, NaN + Path path = writeDoubleFile(-10.0, Double.NaN, 20.0, Double.NaN, Double.NaN); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData block = reader.getFooter().getBlocks().get(0); + ColumnChunkMetaData column = block.getColumns().get(0); + + assertEquals( + ColumnOrder.ieee754TotalOrder(), column.getPrimitiveType().columnOrder()); + + Statistics stats = column.getStatistics(); + assertTrue(stats.isNanCountSet()); + assertEquals(3, stats.getNanCount()); + + DoubleStatistics doubleStats = (DoubleStatistics) stats; + assertEquals(-10.0, doubleStats.getMin(), 0.0); + assertEquals(20.0, doubleStats.getMax(), 0.0); + + ColumnIndex columnIndex = reader.readColumnIndex(column); + assertNotNull(columnIndex); + List nanCounts = columnIndex.getNanCounts(); + assertNotNull(nanCounts); + assertEquals(1, nanCounts.size()); + assertEquals(Long.valueOf(3), nanCounts.get(0)); + } + } + + @Test + public void testFloatStatisticsNoNaN() throws IOException { + Path path = writeFloatFile(1.0f, 2.0f, 3.0f); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ColumnChunkMetaData column = + reader.getFooter().getBlocks().get(0).getColumns().get(0); + Statistics stats = column.getStatistics(); + + assertTrue(stats.isNanCountSet()); + assertEquals(0, stats.getNanCount()); + + FloatStatistics floatStats = (FloatStatistics) stats; + assertEquals(1.0f, floatStats.getMin(), 0.0f); + assertEquals(3.0f, floatStats.getMax(), 0.0f); + } + } + + @Test + public void testFloatStatisticsAllNaN() throws IOException { + Path path = writeFloatFile(Float.NaN, Float.NaN, Float.NaN); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ColumnChunkMetaData column = + reader.getFooter().getBlocks().get(0).getColumns().get(0); + Statistics stats = column.getStatistics(); + + assertTrue(stats.isNanCountSet()); + assertEquals(3, stats.getNanCount()); + + assertTrue("All-NaN column should have non-null min/max values", stats.hasNonNullValue()); + assertTrue("All-NaN min should be NaN", Float.isNaN(((FloatStatistics) stats).getMin())); + assertTrue("All-NaN max should be NaN", Float.isNaN(((FloatStatistics) stats).getMax())); + } + } + + @Test + public void testFloatDoubleColumnsWithNaN() throws IOException { + float[] floatValues = {1.0f, Float.NaN, 3.0f}; + double[] doubleValues = {-5.0, Double.NaN, 10.0}; + Path path = writeFloatDoubleFile(floatValues, doubleValues); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData block = reader.getFooter().getBlocks().get(0); + + // Verify float column + ColumnChunkMetaData floatCol = block.getColumns().get(0); + assertEquals( + ColumnOrder.ieee754TotalOrder(), floatCol.getPrimitiveType().columnOrder()); + Statistics floatStats = floatCol.getStatistics(); + assertTrue(floatStats.isNanCountSet()); + assertEquals(1, floatStats.getNanCount()); + assertEquals(1.0f, ((FloatStatistics) floatStats).getMin(), 0.0f); + assertEquals(3.0f, ((FloatStatistics) floatStats).getMax(), 0.0f); + + // Verify double column + ColumnChunkMetaData doubleCol = block.getColumns().get(1); + assertEquals( + ColumnOrder.ieee754TotalOrder(), + doubleCol.getPrimitiveType().columnOrder()); + Statistics doubleStats = doubleCol.getStatistics(); + assertTrue(doubleStats.isNanCountSet()); + assertEquals(1, doubleStats.getNanCount()); + assertEquals(-5.0, ((DoubleStatistics) doubleStats).getMin(), 0.0); + assertEquals(10.0, ((DoubleStatistics) doubleStats).getMax(), 0.0); + + // Verify column indexes for both + ColumnIndex floatCI = reader.readColumnIndex(floatCol); + assertNotNull(floatCI); + assertNotNull(floatCI.getNanCounts()); + assertEquals(Long.valueOf(1), floatCI.getNanCounts().get(0)); + + ColumnIndex doubleCI = reader.readColumnIndex(doubleCol); + assertNotNull(doubleCI); + assertNotNull(doubleCI.getNanCounts()); + assertEquals(Long.valueOf(1), doubleCI.getNanCounts().get(0)); + } + } +} diff --git a/pom.xml b/pom.xml index 88c8b50594..f644fd3b38 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ shaded.parquet 3.3.0 - 2.12.0 + 2.13.0-SNAPSHOT 1.17.0 thrift ${thrift.executable}