diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java new file mode 100644 index 000000000000..943b751870e9 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * A {@link PredicateReplaceVisitor} that evaluates partition predicates against a known partition + * value, replacing them with {@link AlwaysTrue} or {@link AlwaysFalse}. + * + *

For leaf predicates that only reference partition fields, the predicate is evaluated against + * the given partition row (with field indices remapped from table schema to partition schema). If + * the evaluation returns true, the predicate is replaced with AlwaysTrue; otherwise with + * AlwaysFalse. + * + *

For leaf predicates that reference any non-partition field, the predicate is kept as-is. + * + *

For compound predicates (AND/OR), children are recursively visited and the result is + * simplified via {@link PredicateBuilder#and} / {@link PredicateBuilder#or}. + */ +public class PartitionValuePredicateVisitor implements PredicateReplaceVisitor { + + private final Set partitionFields; + + /** Mapping from table field index to partition field index. -1 if not a partition field. */ + private final int[] tableToPartitionMapping; + + private final InternalRow partitionRow; + + public PartitionValuePredicateVisitor( + RowType tableType, RowType partitionType, InternalRow partitionRow) { + this.partitionRow = partitionRow; + this.partitionFields = new HashSet<>(partitionType.getFieldNames()); + + List tableFieldNames = tableType.getFieldNames(); + List partitionFieldNames = partitionType.getFieldNames(); + + this.tableToPartitionMapping = new int[tableFieldNames.size()]; + for (int i = 0; i < tableFieldNames.size(); i++) { + tableToPartitionMapping[i] = partitionFieldNames.indexOf(tableFieldNames.get(i)); + } + } + + @Override + public Optional visit(LeafPredicate predicate) { + Set refFields = PredicateVisitor.collectFieldNames(predicate); + if (!partitionFields.containsAll(refFields)) { + return Optional.of(predicate); + } + + // Remap field indices from table schema to partition schema + List remappedInputs = new ArrayList<>(); + for (Object input : predicate.transform().inputs()) { + if (input instanceof FieldRef) { + FieldRef ref = (FieldRef) input; + int partIdx = tableToPartitionMapping[ref.index()]; + remappedInputs.add(new FieldRef(partIdx, ref.name(), ref.type())); + } else { + remappedInputs.add(input); + } + } + + // Evaluate the remapped predicate against the known partition row + LeafPredicate remapped = predicate.copyWithNewInputs(remappedInputs); + boolean result = remapped.test(partitionRow); + return Optional.of(result ? PredicateBuilder.alwaysTrue() : PredicateBuilder.alwaysFalse()); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java index aeaa5d5aee7e..b71941c0ccbd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java @@ -37,6 +37,10 @@ default Optional visit(CompoundPredicate predicate) { return Optional.empty(); } } - return Optional.of(new CompoundPredicate(predicate.function(), converted)); + if (predicate.function() instanceof And) { + return Optional.of(PredicateBuilder.and(converted)); + } else { + return Optional.of(PredicateBuilder.or(converted)); + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java new file mode 100644 index 000000000000..8a5de1e63136 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +/** Represents a filter (boolean-valued function) of three argument. */ +@FunctionalInterface +public interface TriFilter { + + TriFilter ALWAYS_TRUE = (t, u, r) -> true; + + boolean test(T t, U u, R r); + + @SuppressWarnings({"unchecked", "rawtypes"}) + static TriFilter alwaysTrue() { + return (TriFilter) ALWAYS_TRUE; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java new file mode 100644 index 000000000000..7486d00372d7 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PartitionValuePredicateVisitor}. */ +public class PartitionValuePredicateVisitorTest { + + // Table schema: (pt INT, a INT, b INT), partition key: pt + private static final RowType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD(0, "pt", DataTypes.INT()), + DataTypes.FIELD(1, "a", DataTypes.INT()), + DataTypes.FIELD(2, "b", DataTypes.INT())); + + private static final RowType PARTITION_TYPE = + DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT())); + + private static final PredicateBuilder BUILDER = new PredicateBuilder(TABLE_TYPE); + + // ========================== Leaf: partition field ========================== + + @Test + public void testPartitionEqualMatch() { + // pt = 1, partition value is pt=1 => AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.equal(0, 1).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testPartitionEqualNoMatch() { + // pt = 2, partition value is pt=1 => AlwaysFalse + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.equal(0, 2).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + } + + @Test + public void testPartitionGreaterThanMatch() { + // pt > 0, partition value is pt=1 => AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.greaterThan(0, 0).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testPartitionGreaterThanNoMatch() { + // pt > 5, partition value is pt=1 => AlwaysFalse + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.greaterThan(0, 5).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + } + + @Test + public void testPartitionLessOrEqualMatch() { + // pt <= 1, partition value is pt=1 => AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.lessOrEqual(0, 1).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testPartitionBetweenMatch() { + // pt BETWEEN 0 AND 5, partition value is pt=3 => AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(3)); + + Optional result = BUILDER.between(0, 0, 5).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testPartitionBetweenNoMatch() { + // pt BETWEEN 5 AND 10, partition value is pt=3 => AlwaysFalse + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(3)); + + Optional result = BUILDER.between(0, 5, 10).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + } + + @Test + public void testPartitionIsNullNoMatch() { + // pt IS NULL, partition value is pt=1 => AlwaysFalse + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.isNull(0).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + } + + @Test + public void testPartitionIsNotNullMatch() { + // pt IS NOT NULL, partition value is pt=1 => AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Optional result = BUILDER.isNotNull(0).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + // ========================== Leaf: non-partition field ========================== + + @Test + public void testNonPartitionFieldKeptAsIs() { + // a = 5 is not a partition predicate => kept unchanged + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate original = BUILDER.equal(1, 5); + Optional result = original.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(original); + } + + // ========================== Compound: AND ========================== + + @Test + public void testAndPartitionMatchAndNonPartition() { + // (pt = 1 AND a = 5), partition value is pt=1 + // => AlwaysTrue simplified away by PredicateBuilder.and(), leaving just a=5 + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), BUILDER.equal(1, 5)); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5)); + } + + @Test + public void testAndPartitionNoMatchAndNonPartition() { + // (pt = 2 AND a = 5), partition value is pt=1 + // => AlwaysFalse short-circuits the entire AND + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate and = PredicateBuilder.and(BUILDER.equal(0, 2), BUILDER.equal(1, 5)); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + } + + @Test + public void testAndAllPartitionMatch() { + // (pt = 1 AND pt > 0), partition value is pt=1 + // => both AlwaysTrue, simplified to AlwaysTrue + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), BUILDER.greaterThan(0, 0)); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + // ========================== Compound: OR ========================== + + @Test + public void testOrPartitionPredicates() { + // (pt = 1 OR pt = 2), partition value is pt=1 + // => AlwaysTrue short-circuits the entire OR + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(0, 2)); + Optional result = or.visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testOrPartitionMatchAndNonPartition() { + // (pt = 1 OR a = 5), partition value is pt=1 + // => AlwaysTrue short-circuits the entire OR + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(1, 5)); + Optional result = or.visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testOrPartitionNoMatchAndNonPartition() { + // (pt = 2 OR a = 5), partition value is pt=1 + // => AlwaysFalse filtered out, leaving just a=5 + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate or = PredicateBuilder.or(BUILDER.equal(0, 2), BUILDER.equal(1, 5)); + Optional result = or.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5)); + } + + // ========================== Multiple partition keys ========================== + + @Test + public void testMultiplePartitionFieldsBothMatch() { + // Table: (pt1 INT, pt2 STRING, a INT), partition keys: (pt1, pt2) + RowType tableType = + DataTypes.ROW( + DataTypes.FIELD(0, "pt1", DataTypes.INT()), + DataTypes.FIELD(1, "pt2", DataTypes.STRING()), + DataTypes.FIELD(2, "a", DataTypes.INT())); + RowType partitionType = + DataTypes.ROW( + DataTypes.FIELD(0, "pt1", DataTypes.INT()), + DataTypes.FIELD(1, "pt2", DataTypes.STRING())); + PredicateBuilder builder = new PredicateBuilder(tableType); + + // pt1 = 1 AND pt2 = 'x', partition value is (1, 'x') => both match => AlwaysTrue + GenericRow partitionRow = GenericRow.of(1, BinaryString.fromString("x")); + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(tableType, partitionType, partitionRow); + + Predicate and = + PredicateBuilder.and( + builder.equal(0, 1), builder.equal(1, BinaryString.fromString("x"))); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + } + + @Test + public void testMultiplePartitionFieldsPartialMatch() { + // pt1 = 1 AND pt2 = 'y', partition value is (1, 'x') => pt1 matches, pt2 doesn't + RowType tableType = + DataTypes.ROW( + DataTypes.FIELD(0, "pt1", DataTypes.INT()), + DataTypes.FIELD(1, "pt2", DataTypes.STRING()), + DataTypes.FIELD(2, "a", DataTypes.INT())); + RowType partitionType = + DataTypes.ROW( + DataTypes.FIELD(0, "pt1", DataTypes.INT()), + DataTypes.FIELD(1, "pt2", DataTypes.STRING())); + PredicateBuilder builder = new PredicateBuilder(tableType); + + GenericRow partitionRow = GenericRow.of(1, BinaryString.fromString("x")); + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(tableType, partitionType, partitionRow); + + Predicate and = + PredicateBuilder.and( + builder.equal(0, 1), builder.equal(1, BinaryString.fromString("y"))); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + // pt2 doesn't match => AlwaysFalse short-circuits the entire AND + assertAlwaysFalse(result.get()); + } + + // ========================== Partition field not at index 0 ========================== + + @Test + public void testPartitionFieldNotFirstInTable() { + // Table: (a INT, pt INT, b INT), partition key: pt (index 1 in table, index 0 in partition) + RowType tableType = + DataTypes.ROW( + DataTypes.FIELD(0, "a", DataTypes.INT()), + DataTypes.FIELD(1, "pt", DataTypes.INT()), + DataTypes.FIELD(2, "b", DataTypes.INT())); + RowType partitionType = DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT())); + PredicateBuilder builder = new PredicateBuilder(tableType); + + // pt = 3, partition value is pt=3 + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(tableType, partitionType, GenericRow.of(3)); + + Optional result = builder.equal(1, 3).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysTrue(result.get()); + + // pt = 5, partition value is pt=3 => AlwaysFalse + result = builder.equal(1, 5).visit(visitor); + assertThat(result).isPresent(); + assertAlwaysFalse(result.get()); + + // a = 10 => kept as-is (non-partition field) + Predicate original = builder.equal(0, 10); + result = original.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(original); + } + + // ========================== Nested compound ========================== + + @Test + public void testNestedAndOr() { + // ((pt = 1 OR pt = 2) AND a = 5), partition value is pt=1 + // Inner OR: AlwaysTrue short-circuits to AlwaysTrue + // Outer AND: AlwaysTrue simplified away, leaving just a=5 + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(0, 2)); + Predicate and = PredicateBuilder.and(or, BUILDER.equal(1, 5)); + Optional result = and.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5)); + } + + // ========================== Only non-partition predicates ========================== + + @Test + public void testAllNonPartitionPredicatesUnchanged() { + // (a = 5 AND b = 10), partition value is pt=1 => kept unchanged + PartitionValuePredicateVisitor visitor = + new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1)); + + Predicate original = PredicateBuilder.and(BUILDER.equal(1, 5), BUILDER.equal(2, 10)); + Optional result = original.visit(visitor); + assertThat(result).isPresent(); + assertThat(result.get()).isEqualTo(original); + } + + // ========================== Helpers ========================== + + private static void assertAlwaysTrue(Predicate predicate) { + assertThat(predicate).isInstanceOf(LeafPredicate.class); + assertThat(((LeafPredicate) predicate).function()).isEqualTo(AlwaysTrue.INSTANCE); + } + + private static void assertAlwaysFalse(Predicate predicate) { + assertThat(predicate).isInstanceOf(LeafPredicate.class); + assertThat(((LeafPredicate) predicate).function()).isEqualTo(AlwaysFalse.INSTANCE); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index ad71a1d59518..f0f5e1c6fff5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -30,7 +30,6 @@ import org.apache.paimon.operation.DataEvolutionFileStoreScan; import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; @@ -41,12 +40,6 @@ import javax.annotation.Nullable; import java.util.Comparator; -import java.util.List; -import java.util.Optional; - -import static org.apache.paimon.predicate.PredicateBuilder.and; -import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; -import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** {@link FileStore} for reading and writing {@link InternalRow}. */ public class AppendOnlyFileStore extends AbstractFileStore { @@ -143,26 +136,12 @@ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer wr @Override public AppendOnlyFileStoreScan newScan() { BucketSelectConverter bucketSelectConverter = - predicate -> { - if (bucketMode() != BucketMode.HASH_FIXED) { - return Optional.empty(); - } - - if (bucketKeyType.getFieldCount() == 0) { - return Optional.empty(); - } - - List bucketFilters = - pickTransformFieldMapping( - splitAnd(predicate), - rowType.getFieldNames(), - bucketKeyType.getFieldNames()); - if (!bucketFilters.isEmpty()) { - return BucketSelectConverter.create( - and(bucketFilters), bucketKeyType, options.bucketFunctionType()); - } - return Optional.empty(); - }; + new BucketSelectConverter( + bucketMode(), + options.bucketFunctionType(), + rowType, + partitionType, + bucketKeyType); if (options().dataEvolutionEnabled()) { return new DataEvolutionFileStoreScan( diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 710e2dc3a587..a1ae650b433a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -33,7 +33,6 @@ import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; -import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -48,18 +47,12 @@ import java.util.Comparator; import java.util.List; -import java.util.Optional; import java.util.function.Supplier; -import static org.apache.paimon.predicate.PredicateBuilder.and; -import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; -import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; - /** {@link FileStore} for querying and updating {@link KeyValue}s. */ public class KeyValueFileStore extends AbstractFileStore { private final boolean crossPartitionUpdate; - private final RowType bucketKeyType; private final RowType keyType; private final RowType valueType; private final KeyValueFieldsExtractor keyValueFieldsExtractor; @@ -74,7 +67,6 @@ public KeyValueFileStore( boolean crossPartitionUpdate, CoreOptions options, RowType partitionType, - RowType bucketKeyType, RowType keyType, RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, @@ -83,7 +75,6 @@ public KeyValueFileStore( CatalogEnvironment catalogEnvironment) { super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment); this.crossPartitionUpdate = crossPartitionUpdate; - this.bucketKeyType = bucketKeyType; this.keyType = keyType; this.valueType = valueType; this.keyValueFieldsExtractor = keyValueFieldsExtractor; @@ -203,25 +194,13 @@ public AbstractFileStoreWrite newWrite(String commitUser, @Nullable In @Override public KeyValueFileStoreScan newScan() { - BucketMode bucketMode = bucketMode(); BucketSelectConverter bucketSelectConverter = - keyFilter -> { - if (bucketMode != BucketMode.HASH_FIXED - && bucketMode != BucketMode.POSTPONE_MODE) { - return Optional.empty(); - } - - List bucketFilters = - pickTransformFieldMapping( - splitAnd(keyFilter), - keyType.getFieldNames(), - bucketKeyType.getFieldNames()); - if (!bucketFilters.isEmpty()) { - return BucketSelectConverter.create( - and(bucketFilters), bucketKeyType, options.bucketFunctionType()); - } - return Optional.empty(); - }; + new BucketSelectConverter( + bucketMode(), + options.bucketFunctionType(), + schema.logicalRowType(), + partitionType, + schema.logicalBucketKeyType()); return new KeyValueFileStoreScan( newManifestsReader(), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java index 45cd074a5ad1..4662d7dab517 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java @@ -18,8 +18,9 @@ package org.apache.paimon.manifest; -import org.apache.paimon.utils.BiFilter; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.TriFilter; import javax.annotation.Nullable; @@ -29,13 +30,13 @@ public class BucketFilter { private final boolean onlyReadRealBuckets; private final @Nullable Integer specifiedBucket; private final @Nullable Filter bucketFilter; - private final @Nullable BiFilter totalAwareBucketFilter; + private final @Nullable TriFilter totalAwareBucketFilter; public BucketFilter( boolean onlyReadRealBuckets, @Nullable Integer specifiedBucket, @Nullable Filter bucketFilter, - @Nullable BiFilter totalAwareBucketFilter) { + @Nullable TriFilter totalAwareBucketFilter) { this.onlyReadRealBuckets = onlyReadRealBuckets; this.specifiedBucket = specifiedBucket; this.bucketFilter = bucketFilter; @@ -46,7 +47,7 @@ public BucketFilter( boolean onlyReadRealBuckets, @Nullable Integer specifiedBucket, @Nullable Filter bucketFilter, - @Nullable BiFilter totalAwareBucketFilter) { + @Nullable TriFilter totalAwareBucketFilter) { if (!onlyReadRealBuckets && specifiedBucket == null && bucketFilter == null @@ -63,7 +64,7 @@ public Integer specifiedBucket() { return specifiedBucket; } - public boolean test(int bucket, int totalBucket) { + public boolean test(BinaryRow partition, int bucket, int totalBucket) { if (onlyReadRealBuckets && bucket < 0) { return false; } @@ -73,6 +74,7 @@ public boolean test(int bucket, int totalBucket) { if (bucketFilter != null && !bucketFilter.test(bucket)) { return false; } - return totalAwareBucketFilter == null || totalAwareBucketFilter.test(bucket, totalBucket); + return totalAwareBucketFilter == null + || totalAwareBucketFilter.test(partition, bucket, totalBucket); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java index 59b2a34650a4..09afdcc3ac58 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java @@ -125,10 +125,10 @@ protected List readFromSegments( List segments = manifestSegments.segments(); // try to do fast filter first - Optional partition = extractSinglePartition(partitionFilter); - if (partition.isPresent()) { + Optional singlePartition = extractSinglePartition(partitionFilter); + if (singlePartition.isPresent()) { Map> segMap = - manifestSegments.indexedSegments().get(partition.get()); + manifestSegments.indexedSegments().get(singlePartition.get()); if (segMap == null) { return Collections.emptyList(); } @@ -147,11 +147,13 @@ protected List readFromSegments( // do force loop filter List segmentsList = new ArrayList<>(); for (RichSegments richSegments : segments) { - if (partitionFilter != null && !partitionFilter.test(richSegments.partition())) { + BinaryRow partition = richSegments.partition(); + if (partitionFilter != null && !partitionFilter.test(partition)) { continue; } if (bucketFilter != null - && !bucketFilter.test(richSegments.bucket(), richSegments.totalBucket())) { + && !bucketFilter.test( + partition, richSegments.bucket(), richSegments.totalBucket())) { continue; } segmentsList.add(richSegments.segments()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 68ebacaa805f..30908cce3e43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -46,6 +46,7 @@ import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RowRangeIndex; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TriFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +88,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private boolean onlyReadRealBuckets = false; private Integer specifiedBucket = null; private Filter bucketFilter = null; - private BiFilter totalAwareBucketFilter = null; + private TriFilter totalAwareBucketFilter = null; protected ScanMode scanMode = ScanMode.ALL; private Integer specifiedLevel = null; private Filter levelFilter = null; @@ -162,7 +163,7 @@ public FileStoreScan withBucketFilter(Filter bucketFilter) { @Override public FileStoreScan withTotalAwareBucketFilter( - BiFilter totalAwareBucketFilter) { + TriFilter totalAwareBucketFilter) { this.totalAwareBucketFilter = totalAwareBucketFilter; return this; } @@ -533,14 +534,21 @@ private Filter createEntryRowFilter() { Function levelGetter = ManifestEntrySerializer.levelGetter(); BucketFilter bucketFilter = createBucketFilter(); return row -> { - if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) { - return false; + BinaryRow partition = null; + if (partitionFilter != null) { + partition = partitionGetter.apply(row); + if (!partitionFilter.test(partition)) { + return false; + } } if (bucketFilter != null) { int bucket = bucketGetter.apply(row); int totalBucket = totalBucketGetter.apply(row); - if (!bucketFilter.test(bucket, totalBucket)) { + if (partition == null) { + partition = partitionGetter.apply(row); + } + if (!bucketFilter.test(partition, bucket, totalBucket)) { return false; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 3546a1fcae3c..2714e773a2ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -86,6 +86,11 @@ public AppendOnlyFileStoreScan( public AppendOnlyFileStoreScan withFilter(Predicate predicate) { this.inputFilter = predicate; + return this; + } + + @Override + public FileStoreScan withCompleteFilter(Predicate predicate) { this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java index 6577099aa885..e441641f7e14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java @@ -19,160 +19,55 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions.BucketFunctionType; -import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.bucket.BucketFunction; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.predicate.Equal; -import org.apache.paimon.predicate.FieldRef; -import org.apache.paimon.predicate.In; -import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.BiFilter; +import org.apache.paimon.utils.TriFilter; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; - -import javax.annotation.concurrent.ThreadSafe; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.predicate.PredicateBuilder.splitOr; +import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames; /** Bucket filter push down in scan to skip files. */ -public interface BucketSelectConverter { - - int MAX_VALUES = 1000; - - Optional> convert(Predicate predicate); - - static Optional> create( - Predicate bucketPredicate, - RowType bucketKeyType, - BucketFunctionType bucketFunctionType) { - @SuppressWarnings("unchecked") - List[] bucketValues = new List[bucketKeyType.getFieldCount()]; - - BucketFunction bucketFunction = BucketFunction.create(bucketFunctionType, bucketKeyType); - - nextAnd: - for (Predicate andPredicate : splitAnd(bucketPredicate)) { - Integer reference = null; - List values = new ArrayList<>(); - for (Predicate orPredicate : splitOr(andPredicate)) { - if (orPredicate instanceof LeafPredicate) { - LeafPredicate leaf = (LeafPredicate) orPredicate; - Optional fieldRefOptional = leaf.fieldRefOptional(); - if (fieldRefOptional.isPresent()) { - FieldRef fieldRef = fieldRefOptional.get(); - if (reference == null || reference == fieldRef.index()) { - reference = fieldRef.index(); - if (leaf.function().equals(Equal.INSTANCE) - || leaf.function().equals(In.INSTANCE)) { - values.addAll( - leaf.literals().stream() - .filter(Objects::nonNull) - .collect(Collectors.toList())); - continue; - } - } - } - } - - // failed, go to next predicate - continue nextAnd; - } - if (reference != null) { - if (bucketValues[reference] != null) { - // Repeated equals in And? - return Optional.empty(); - } - - bucketValues[reference] = values; - } - } - - int rowCount = 1; - for (List values : bucketValues) { - if (values == null) { - return Optional.empty(); - } - - rowCount *= values.size(); - if (rowCount > MAX_VALUES) { - return Optional.empty(); - } - } - - InternalRowSerializer serializer = new InternalRowSerializer(bucketKeyType); - List bucketKeys = new ArrayList<>(); - assembleRows( - bucketValues, - columns -> - bucketKeys.add( - serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()), - new ArrayList<>(), - 0); - - return Optional.of(new Selector(bucketKeys, bucketFunction)); +public class BucketSelectConverter { + + private final BucketMode bucketMode; + private final BucketFunctionType bucketFunctionType; + private final RowType rowType; + private final RowType partitionType; + private final RowType bucketKeyType; + + public BucketSelectConverter( + BucketMode bucketMode, + BucketFunctionType bucketFunctionType, + RowType rowType, + RowType partitionType, + RowType bucketKeyType) { + this.bucketMode = bucketMode; + this.bucketFunctionType = bucketFunctionType; + this.rowType = rowType; + this.partitionType = partitionType; + this.bucketKeyType = bucketKeyType; } - static void assembleRows( - List[] rowValues, - Consumer> consumer, - List stack, - int columnIndex) { - List columnValues = rowValues[columnIndex]; - for (Object value : columnValues) { - stack.add(value); - if (columnIndex == rowValues.length - 1) { - // last column, consume row - consumer.accept(stack); - } else { - assembleRows(rowValues, consumer, stack, columnIndex + 1); - } - stack.remove(stack.size() - 1); + public Optional> convert(Predicate predicate) { + if (bucketMode != BucketMode.HASH_FIXED && bucketMode != BucketMode.POSTPONE_MODE) { + return Optional.empty(); } - } - /** Selector to select bucket from {@link Predicate}. */ - @ThreadSafe - class Selector implements BiFilter { - - private final List bucketKeys; - - private final BucketFunction bucketFunction; - - private final Map> buckets = new ConcurrentHashMap<>(); - - public Selector(List bucketKeys, BucketFunction bucketFunction) { - this.bucketKeys = bucketKeys; - this.bucketFunction = bucketFunction; + if (bucketKeyType.getFieldCount() == 0) { + return Optional.empty(); } - @Override - public boolean test(Integer bucket, Integer numBucket) { - return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket)) - .contains(bucket); + Set predicateFields = collectFieldNames(predicate); + if (!predicateFields.containsAll(bucketKeyType.getFieldNames())) { + return Optional.empty(); } - @VisibleForTesting - Set createBucketSet(int numBucket) { - ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); - for (BinaryRow key : bucketKeys) { - builder.add(bucketFunction.bucket(key, numBucket)); - } - return builder.build(); - } + return Optional.of( + new BucketSelector( + predicate, bucketFunctionType, rowType, partitionType, bucketKeyType)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java new file mode 100644 index 000000000000..8c28c3c0c641 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions.BucketFunctionType; +import org.apache.paimon.bucket.BucketFunction; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.PartitionValuePredicateVisitor; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BiFilter; +import org.apache.paimon.utils.TriFilter; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.paimon.predicate.PredicateBuilder.and; +import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; +import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; +import static org.apache.paimon.predicate.PredicateBuilder.splitOr; + +/** Selector to select bucket from {@link Predicate}. */ +@ThreadSafe +public class BucketSelector implements TriFilter { + + public static final int MAX_VALUES = 1000; + + private final BucketFunctionType bucketFunctionType; + private final RowType rowType; + private final RowType partitionType; + private final RowType bucketKeyType; + private final Predicate predicate; + private final Map> partitionSelectors; + + public BucketSelector( + Predicate predicate, + BucketFunctionType bucketFunctionType, + RowType rowType, + RowType partitionType, + RowType bucketKeyType) { + this.predicate = predicate; + this.bucketFunctionType = bucketFunctionType; + this.rowType = rowType; + this.partitionType = partitionType; + this.bucketKeyType = bucketKeyType; + this.partitionSelectors = new ConcurrentHashMap<>(); + } + + @Override + public boolean test(BinaryRow partition, Integer bucket, Integer numBucket) { + return partitionSelectors + .computeIfAbsent(partition, this::createPartitionSelector) + .map(selector -> selector.test(bucket, numBucket)) + .orElse(true); + } + + private Optional createPartitionSelector(BinaryRow partition) { + Optional partRemoved = + predicate.visit( + new PartitionValuePredicateVisitor(rowType, partitionType, partition)); + if (!partRemoved.isPresent()) { + return Optional.empty(); + } + + List bucketFilters = + pickTransformFieldMapping( + splitAnd(partRemoved.get()), + rowType.getFieldNames(), + bucketKeyType.getFieldNames()); + if (bucketFilters.isEmpty()) { + return Optional.empty(); + } + + return createPartitionSelector(and(bucketFilters)); + } + + private Optional createPartitionSelector(Predicate bucketPredicate) { + @SuppressWarnings("unchecked") + List[] bucketValues = new List[bucketKeyType.getFieldCount()]; + + BucketFunction bucketFunction = BucketFunction.create(bucketFunctionType, bucketKeyType); + + nextAnd: + for (Predicate andPredicate : splitAnd(bucketPredicate)) { + Integer reference = null; + List values = new ArrayList<>(); + for (Predicate orPredicate : splitOr(andPredicate)) { + if (orPredicate instanceof LeafPredicate) { + LeafPredicate leaf = (LeafPredicate) orPredicate; + Optional fieldRefOptional = leaf.fieldRefOptional(); + if (fieldRefOptional.isPresent()) { + FieldRef fieldRef = fieldRefOptional.get(); + if (reference == null || reference == fieldRef.index()) { + reference = fieldRef.index(); + if (leaf.function().equals(Equal.INSTANCE) + || leaf.function().equals(In.INSTANCE)) { + values.addAll( + leaf.literals().stream() + .filter(Objects::nonNull) + .collect(Collectors.toList())); + continue; + } + } + } + } + + // failed, go to next predicate + continue nextAnd; + } + if (reference != null) { + if (bucketValues[reference] != null) { + // Same field appears in multiple AND branches, + // compute intersection to narrow down possible values + bucketValues[reference].retainAll(new HashSet<>(values)); + if (bucketValues[reference].isEmpty()) { + // Empty intersection: contradictory conditions, no match + return Optional.empty(); + } + } else { + bucketValues[reference] = values; + } + } + } + + int rowCount = 1; + for (List values : bucketValues) { + if (values == null) { + return Optional.empty(); + } + + rowCount *= values.size(); + if (rowCount > MAX_VALUES) { + return Optional.empty(); + } + } + + InternalRowSerializer serializer = new InternalRowSerializer(bucketKeyType); + List bucketKeys = new ArrayList<>(); + assembleRows( + bucketValues, + columns -> + bucketKeys.add( + serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()), + new ArrayList<>(), + 0); + + return Optional.of(new PartitionSelector(bucketKeys, bucketFunction)); + } + + private static void assembleRows( + List[] rowValues, + Consumer> consumer, + List stack, + int columnIndex) { + List columnValues = rowValues[columnIndex]; + for (Object value : columnValues) { + stack.add(value); + if (columnIndex == rowValues.length - 1) { + // last column, consume row + consumer.accept(stack); + } else { + assembleRows(rowValues, consumer, stack, columnIndex + 1); + } + stack.remove(stack.size() - 1); + } + } + + @ThreadSafe + private static class PartitionSelector implements BiFilter { + + private final List bucketKeys; + + private final BucketFunction bucketFunction; + + private final Map> buckets = new ConcurrentHashMap<>(); + + public PartitionSelector(List bucketKeys, BucketFunction bucketFunction) { + this.bucketKeys = bucketKeys; + this.bucketFunction = bucketFunction; + } + + @Override + public boolean test(Integer bucket, Integer numBucket) { + return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket)) + .contains(bucket); + } + + private Set createBucketSet(int numBucket) { + ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); + for (BinaryRow key : bucketKeys) { + builder.add(bucketFunction.bucket(key, numBucket)); + } + return builder.build(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 1e044f810f66..8f543ca7d39a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -35,6 +35,7 @@ import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RowRangeIndex; +import org.apache.paimon.utils.TriFilter; import javax.annotation.Nullable; @@ -56,13 +57,15 @@ public interface FileStoreScan { FileStoreScan withPartitionFilter(PartitionPredicate predicate); + FileStoreScan withCompleteFilter(Predicate predicate); + FileStoreScan withBucket(int bucket); FileStoreScan onlyReadRealBuckets(); FileStoreScan withBucketFilter(Filter bucketFilter); - FileStoreScan withTotalAwareBucketFilter(BiFilter bucketFilter); + FileStoreScan withTotalAwareBucketFilter(TriFilter bucketFilter); FileStoreScan withPartitionBucket(BinaryRow partition, int bucket); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index a09fb25ee62d..8595753cf0ab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -116,6 +116,11 @@ public KeyValueFileStoreScan( public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { this.keyFilter = predicate; + return this; + } + + @Override + public FileStoreScan withCompleteFilter(Predicate predicate) { this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 5e4cffcbf672..a2fee49bfb88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -93,8 +93,6 @@ public KeyValueFileStore store() { tableSchema.crossPartitionUpdate(), options, tableSchema.logicalPartitionType(), - PrimaryKeyTableUtils.addKeyNamePrefix( - tableSchema.logicalBucketKeyType()), keyType, rowType, extractor, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index a6710ff00848..67d0b9a56633 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -241,6 +241,7 @@ public SnapshotReader withFilter(Predicate predicate) { if (!pair.getRight().isEmpty()) { nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight())); } + scan.withCompleteFilter(predicate); return this; } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 8142d44acf7d..09595d91889c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -134,7 +134,6 @@ private TestFileStore( options, partitionType, keyType, - keyType, valueType, keyValueFieldsExtractor, mfFactory, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java deleted file mode 100644 index eeec69d20226..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.utils.BiFilter; -import org.apache.paimon.utils.Filter; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link BucketFilter}. */ -public class BucketFilterTest { - - @Test - public void testCreateWithAllNullParameters() { - // Test that create method returns null when all parameters are null/false - assertThat(BucketFilter.create(false, null, null, null)).isNull(); - } - - @Test - public void testCreateWithOnlyReadRealBuckets() { - // Test that create method returns a BucketFilter when onlyReadRealBuckets is true - BucketFilter filter = BucketFilter.create(true, null, null, null); - assertThat(filter).isNotNull(); - assertThat(filter.specifiedBucket()).isNull(); - } - - @Test - public void testCreateWithSpecifiedBucket() { - // Test that create method returns a BucketFilter when specifiedBucket is not null - BucketFilter filter = BucketFilter.create(false, 1, null, null); - assertThat(filter).isNotNull(); - assertThat(filter.specifiedBucket()).isEqualTo(1); - } - - @Test - public void testCreateWithBucketFilter() { - // Test that create method returns a BucketFilter when bucketFilter is not null - Filter bucketFilter = value -> value > 0; - BucketFilter filter = BucketFilter.create(false, null, bucketFilter, null); - assertThat(filter).isNotNull(); - assertThat(filter.specifiedBucket()).isNull(); - } - - @Test - public void testCreateWithTotalAwareBucketFilter() { - // Test that create method returns a BucketFilter when totalAwareBucketFilter is not null - BiFilter totalAwareBucketFilter = - (bucket, totalBucket) -> bucket < totalBucket; - BucketFilter filter = BucketFilter.create(false, null, null, totalAwareBucketFilter); - assertThat(filter).isNotNull(); - assertThat(filter.specifiedBucket()).isNull(); - } - - @Test - public void testTestWithOnlyReadRealBuckets() { - // Test the test method with onlyReadRealBuckets parameter - BucketFilter filter = BucketFilter.create(true, null, null, null); - - // Real buckets (non-negative) should pass - assertThat(filter.test(0, 1)).isTrue(); - assertThat(filter.test(1, 2)).isTrue(); - - // Virtual buckets (negative) should not pass - assertThat(filter.test(-1, 1)).isFalse(); - assertThat(filter.test(-2, 2)).isFalse(); - } - - @Test - public void testTestWithSpecifiedBucket() { - // Test the test method with specifiedBucket parameter - BucketFilter filter = BucketFilter.create(false, 1, null, null); - - // Only the specified bucket should pass - assertThat(filter.test(1, 2)).isTrue(); - - // Other buckets should not pass - assertThat(filter.test(0, 2)).isFalse(); - assertThat(filter.test(2, 3)).isFalse(); - } - - @Test - public void testTestWithBucketFilter() { - // Test the test method with bucketFilter parameter - Filter bucketFilter = value -> value % 2 == 0; // Even buckets only - BucketFilter filter = BucketFilter.create(false, null, bucketFilter, null); - - // Even buckets should pass - assertThat(filter.test(0, 1)).isTrue(); - assertThat(filter.test(2, 3)).isTrue(); - assertThat(filter.test(4, 5)).isTrue(); - - // Odd buckets should not pass - assertThat(filter.test(1, 2)).isFalse(); - assertThat(filter.test(3, 4)).isFalse(); - assertThat(filter.test(5, 6)).isFalse(); - } - - @Test - public void testTestWithTotalAwareBucketFilter() { - // Test the test method with totalAwareBucketFilter parameter - BiFilter totalAwareBucketFilter = - (bucket, totalBucket) -> bucket < totalBucket / 2; - BucketFilter filter = BucketFilter.create(false, null, null, totalAwareBucketFilter); - - // Buckets less than half of totalBucket should pass - assertThat(filter.test(0, 4)).isTrue(); - assertThat(filter.test(1, 4)).isTrue(); - - // Buckets greater than or equal to half of totalBucket should not pass - assertThat(filter.test(2, 4)).isFalse(); - assertThat(filter.test(3, 4)).isFalse(); - } - - @Test - public void testTestWithMultipleFilters() { - // Test the test method with multiple filters combined - Filter bucketFilter = value -> value > 0; // Positive buckets only - BiFilter totalAwareBucketFilter = - (bucket, totalBucket) -> bucket < totalBucket - 1; - BucketFilter filter = BucketFilter.create(true, 1, bucketFilter, totalAwareBucketFilter); - - // Bucket 1 is positive, is the specified bucket, and is less than totalBucket-1 for - // totalBucket=3 - assertThat(filter.test(1, 3)).isTrue(); - - // Bucket 0 is not positive, so it should not pass - assertThat(filter.test(0, 3)).isFalse(); - - // Bucket 2 is not the specified bucket, so it should not pass - assertThat(filter.test(2, 3)).isFalse(); - - // Bucket 1 with totalBucket=2 should not pass because 1 >= 2-1 - assertThat(filter.test(1, 2)).isFalse(); - - // Negative bucket should not pass because onlyReadRealBuckets is true - assertThat(filter.test(-1, 3)).isFalse(); - } - - @Test - public void testSpecifiedBucket() { - // Test the specifiedBucket method - BucketFilter filterWithSpecifiedBucket = BucketFilter.create(false, 2, null, null); - assertThat(filterWithSpecifiedBucket.specifiedBucket()).isEqualTo(2); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java deleted file mode 100644 index dc64bdb596d4..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation; - -import org.apache.paimon.CoreOptions.BucketFunctionType; -import org.apache.paimon.operation.BucketSelectConverter.Selector; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.Optional; - -import static org.apache.paimon.predicate.PredicateBuilder.and; -import static org.apache.paimon.predicate.PredicateBuilder.or; -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link BucketSelectConverter}. */ -public class BucketSelectConverterTest { - - private final RowType rowType = RowType.of(new IntType(), new IntType(), new IntType()); - - private final PredicateBuilder builder = new PredicateBuilder(rowType); - - @Test - public void testRepeatEqual() { - assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0, 1)))).isEmpty(); - } - - @Test - public void testNotFull() { - assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty(); - } - - @Test - public void testOtherPredicate() { - assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty(); - } - - @Test - public void testOrIllegal() { - assertThat( - newSelector( - and( - or(builder.equal(0, 5), builder.equal(1, 6)), - builder.equal(1, 1), - builder.equal(2, 2)))) - .isEmpty(); - } - - @Test - public void testNormal() { - Selector selector = - newSelector(and(builder.equal(0, 0), builder.equal(1, 1), builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(11); - } - - @Test - public void testIn() { - Selector selector = - newSelector( - and( - builder.in(0, Arrays.asList(5, 6, 7)), - builder.equal(1, 1), - builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); - } - - @Test - public void testOr() { - Selector selector = - newSelector( - and( - or( - builder.equal(0, 5), - builder.equal(0, 6), - builder.equal(0, 7)), - builder.equal(1, 1), - builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); - } - - @Test - public void testInNull() { - Selector selector = - newSelector( - and( - builder.in(0, Arrays.asList(5, 6, 7, null)), - builder.equal(1, 1), - builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10); - } - - @Test - public void testMultipleIn() { - Selector selector = - newSelector( - and( - builder.in(0, Arrays.asList(5, 6, 7)), - builder.in(1, Arrays.asList(1, 8)), - builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19); - } - - @Test - public void testMultipleOr() { - Selector selector = - newSelector( - and( - or( - builder.equal(0, 5), - builder.equal(0, 6), - builder.equal(0, 7)), - or(builder.equal(1, 1), builder.equal(1, 8)), - builder.equal(2, 2))) - .get(); - assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19); - } - - private Optional newSelector(Predicate predicate) { - return (Optional) - BucketSelectConverter.create(predicate, rowType, BucketFunctionType.DEFAULT); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java new file mode 100644 index 000000000000..3128ff20db22 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions.BucketFunctionType; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BucketSelector}. */ +public class BucketSelectorTest { + + private static final int NUM_BUCKETS = 10; + + // ========================== Single bucket key, non-partitioned ========================== + + @Test + public void testEqualPredicate() { + // k = 5 => should select exactly one bucket + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = pb.equal(0, 5); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testEqualAndRangePredicate() { + // k = 5 AND k < 100 => should still select bucket for k=5 + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.lessThan(0, 100)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testEqualAndInWithOverlap() { + // k = 5 AND k IN (5, 10) => intersection is {5}, should select one bucket + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.in(0, Arrays.asList(5, 10))); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testInAndInWithOverlap() { + // k IN (1, 5) AND k IN (5, 10) => intersection is {5}, should select one bucket + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = + PredicateBuilder.and(pb.in(0, Arrays.asList(1, 5)), pb.in(0, Arrays.asList(5, 10))); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testRedundantEquals() { + // k = 5 AND k = 5 => redundant, should still select one bucket + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 5)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testContradictoryEquals() { + // k = 5 AND k = 10 => empty intersection, no bucket can match + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 10)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + // Empty intersection => Optional.empty() => orElse(true) => all buckets pass + // (conservative: no filtering when we can't determine) + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(NUM_BUCKETS); + } + + @Test + public void testRangeOnlyFallsBackToFullScan() { + // k < 100 => no Equal/In to extract, full scan + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + RowType partType = RowType.of(); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = pb.lessThan(0, 100); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(NUM_BUCKETS); + } + + // ========================== Multi-field bucket key ========================== + + @Test + public void testMultiFieldBucketKey() { + // k1 = 5 AND k2 = 10 => one combination, one bucket + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k1", DataTypes.INT()), + DataTypes.FIELD(1, "k2", DataTypes.INT())); + RowType partType = RowType.of(); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(1, 10)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, rowType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + @Test + public void testMultiFieldWithIntersection() { + // k1 IN (1, 5) AND k1 IN (5, 10) AND k2 = 3 => k1={5}, k2={3} => one combination + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k1", DataTypes.INT()), + DataTypes.FIELD(1, "k2", DataTypes.INT())); + RowType partType = RowType.of(); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = + PredicateBuilder.and( + pb.in(0, Arrays.asList(1, 5)), + pb.in(0, Arrays.asList(5, 10)), + pb.equal(1, 3)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, rowType); + + Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS); + assertThat(selected).hasSize(1); + } + + // ========================== Partitioned table ========================== + + @Test + public void testPartitionedTableWithBucketFilter() { + // Table: (pt INT, k INT), partition: (pt), bucket key: (k) + // Predicate: pt = 1 AND k = 5 + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD(0, "pt", DataTypes.INT()), + DataTypes.FIELD(1, "k", DataTypes.INT())); + RowType partType = DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT())); + RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT())); + PredicateBuilder pb = new PredicateBuilder(rowType); + + Predicate predicate = PredicateBuilder.and(pb.equal(0, 1), pb.equal(1, 5)); + BucketSelector selector = + new BucketSelector( + predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType); + + // For partition pt=1 (matching), bucket filtering should work + BinaryRow partition1 = + new org.apache.paimon.data.serializer.InternalRowSerializer(partType) + .toBinaryRow(org.apache.paimon.data.GenericRow.of(1)) + .copy(); + Set selected1 = selectedBuckets(selector, partition1, NUM_BUCKETS); + assertThat(selected1).hasSize(1); + + // For partition pt=2 (not matching), predicate becomes AlwaysFalse + // => no bucket key values extracted => full scan (conservative) + BinaryRow partition2 = + new org.apache.paimon.data.serializer.InternalRowSerializer(partType) + .toBinaryRow(org.apache.paimon.data.GenericRow.of(2)) + .copy(); + Set selected2 = selectedBuckets(selector, partition2, NUM_BUCKETS); + assertThat(selected2).hasSize(NUM_BUCKETS); + } + + // ========================== Helpers ========================== + + private static Set selectedBuckets( + BucketSelector selector, BinaryRow partition, int numBuckets) { + Set selected = new HashSet<>(); + for (int b = 0; b < numBuckets; b++) { + if (selector.test(partition, b, numBuckets)) { + selected.add(b); + } + } + return selected; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java new file mode 100644 index 000000000000..94494e040661 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests bucket filtering with compound predicates on a partitioned, fixed-bucket, append-only + * table. + */ +public class BucketFilterScanTest extends TableTestBase { + + @Test + public void testBucketFilterWithCompoundPredicateOnAppendTable() throws Exception { + testBucketFilterWithCompoundPredicate(false); + } + + @Test + public void testBucketFilterWithCompoundPredicateOnPkTable() throws Exception { + testBucketFilterWithCompoundPredicate(true); + } + + @Test + public void testCompositeBucketFilterWithCompoundPredicateOnAppendTable() throws Exception { + testCompositeBucketFilterWithCompoundPredicate(false); + } + + @Test + public void testCompositeBucketFilterWithCompoundPredicateOnPkTable() throws Exception { + testCompositeBucketFilterWithCompoundPredicate(true); + } + + /** + * Tests bucket filtering with compound predicates on a single-field bucket key. + * + *

Table schema: + * + *

    + *
  • Partition key: column 'a' (INT) + *
  • Bucket key: column 'b' (INT) + *
  • Bucket count: 10 + *
+ * + *

Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) = 100 rows. + * + *

Test scenarios: + * + *

    + *
  1. Predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) - Tests partition range filter + * with bucket equality, combined with OR. Expected: buckets for partition 1,2 with b=5 + * and partition 3 with b=7. + *
  2. Predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) - Tests partition range with + * bucket equality, OR partition equality with bucket range. Expected: mixed buckets from + * partition 3 and specific buckets from partitions 1,2. + *
  3. Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests partition equality with + * bucket equality in both OR branches. Expected: exact bucket matching for each + * partition-b combination. + *
+ */ + private void testBucketFilterWithCompoundPredicate(boolean pk) throws Exception { + // ---- schema & table ---- + Schema.Builder builder = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.INT()) + .partitionKeys("a") + .option(BUCKET.key(), "10"); + if (pk) { + builder.primaryKey("a", "b"); + } else { + builder.option(BUCKET_KEY.key(), "b"); + } + Schema schema = builder.build(); + + Identifier tableId = identifier("test_bucket_filter"); + catalog.createTable(tableId, schema, false); + Table table = catalog.getTable(tableId); + + // ---- write data: 5 partitions × 20 b-values = 100 rows ---- + GenericRow[] rows = new GenericRow[100]; + int idx = 0; + for (int a = 1; a <= 5; a++) { + for (int b = 1; b <= 20; b++) { + rows[idx++] = GenericRow.of(a, b, a * 100 + b); + } + } + write(table, rows); + PredicateBuilder pb = new PredicateBuilder(table.rowType()); + + // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) ---- + Predicate predicate1 = + PredicateBuilder.or( + PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))); + assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("3,1", "1,6", "2,6"); + + // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) ---- + Predicate predicate2 = + PredicateBuilder.or( + PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1, 100))); + assertThat(plan(table, predicate2)) + .containsExactlyInAnyOrder( + "3,0", "3,1", "1,6", "3,4", "3,5", "2,6", "3,6", "3,7", "3,8"); + + // ---- build predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) ---- + Predicate predicate3 = + PredicateBuilder.or( + PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))); + assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("3,1", "2,6"); + } + + /** + * Tests bucket filtering with compound predicates on a composite (multi-field) bucket key. + * + *

Table schema: + * + *

    + *
  • Partition key: column 'a' (INT) + *
  • Bucket key: columns 'b' and 'c' (composite, INT) + *
  • Bucket count: 10 + *
+ * + *

Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) × 10 c-values (c=0 to + * 9) = 1000 rows. + * + *

Test scenarios: + * + *

    + *
  1. Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests nested OR + * within AND, with partition range, bucket field equality, and additional bucket field + * filter. The 'c = 5' condition is part of the composite bucket key, affecting bucket + * selection. + *
  2. Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 - Tests range + * predicate on one bucket field (b) combined with equality on another (c). Validates + * handling of multiple bucket key fields with different predicate types. + *
  3. Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests exact matching on + * both partition and bucket fields. The composite bucket key (b,c) ensures precise bucket + * targeting. + *
+ */ + private void testCompositeBucketFilterWithCompoundPredicate(boolean pk) throws Exception { + // ---- schema & table ---- + Schema.Builder builder = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.INT()) + .partitionKeys("a") + .option(BUCKET.key(), "10"); + if (pk) { + builder.primaryKey("a", "b", "c"); + } else { + builder.option(BUCKET_KEY.key(), "b,c"); + } + Schema schema = builder.build(); + + Identifier tableId = identifier("test_composite_bucket_filter"); + catalog.createTable(tableId, schema, false); + Table table = catalog.getTable(tableId); + + // ---- write data: 5 partitions × 20 b-values x 10 c-values = 1000 rows ---- + GenericRow[] rows = new GenericRow[1000]; + int idx = 0; + for (int a = 1; a <= 5; a++) { + for (int b = 1; b <= 20; b++) { + for (int c = 0; c < 10; c++) { + rows[idx++] = GenericRow.of(a, b, c); + } + } + } + write(table, rows); + PredicateBuilder pb = new PredicateBuilder(table.rowType()); + + // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 ---- + Predicate predicate1 = + PredicateBuilder.and( + PredicateBuilder.or( + PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))), + pb.equal(2, 5)); + assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("1,0", "2,0", "3,5"); + + // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 ---- + Predicate predicate2 = + PredicateBuilder.and( + PredicateBuilder.or( + PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1, 100))), + pb.equal(2, 5)); + assertThat(plan(table, predicate2)) + .containsExactlyInAnyOrder( + "3,9", "1,0", "2,0", "3,0", "3,1", "3,2", "3,3", "3,4", "3,5", "3,6", "3,7", + "3,8"); + + // ---- build predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 ---- + Predicate predicate3 = + PredicateBuilder.and( + PredicateBuilder.or( + PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)), + PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))), + pb.equal(2, 5)); + assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("2,0", "3,5"); + } + + private Set plan(Table table, Predicate predicate) { + return table.newReadBuilder().withFilter(predicate).newScan().plan().splits().stream() + .map( + split -> { + DataSplit dataSplit = (DataSplit) split; + int partitionA = dataSplit.partition().getInt(0); + int bucket = dataSplit.bucket(); + return partitionA + "," + bucket; + }) + .collect(Collectors.toSet()); + } +}