diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
new file mode 100644
index 000000000000..943b751870e9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A {@link PredicateReplaceVisitor} that evaluates partition predicates against a known partition
+ * value, replacing them with {@link AlwaysTrue} or {@link AlwaysFalse}.
+ *
+ *
For leaf predicates that only reference partition fields, the predicate is evaluated against
+ * the given partition row (with field indices remapped from table schema to partition schema). If
+ * the evaluation returns true, the predicate is replaced with AlwaysTrue; otherwise with
+ * AlwaysFalse.
+ *
+ *
For leaf predicates that reference any non-partition field, the predicate is kept as-is.
+ *
+ *
For compound predicates (AND/OR), children are recursively visited and the result is
+ * simplified via {@link PredicateBuilder#and} / {@link PredicateBuilder#or}.
+ */
+public class PartitionValuePredicateVisitor implements PredicateReplaceVisitor {
+
+ private final Set partitionFields;
+
+ /** Mapping from table field index to partition field index. -1 if not a partition field. */
+ private final int[] tableToPartitionMapping;
+
+ private final InternalRow partitionRow;
+
+ public PartitionValuePredicateVisitor(
+ RowType tableType, RowType partitionType, InternalRow partitionRow) {
+ this.partitionRow = partitionRow;
+ this.partitionFields = new HashSet<>(partitionType.getFieldNames());
+
+ List tableFieldNames = tableType.getFieldNames();
+ List partitionFieldNames = partitionType.getFieldNames();
+
+ this.tableToPartitionMapping = new int[tableFieldNames.size()];
+ for (int i = 0; i < tableFieldNames.size(); i++) {
+ tableToPartitionMapping[i] = partitionFieldNames.indexOf(tableFieldNames.get(i));
+ }
+ }
+
+ @Override
+ public Optional visit(LeafPredicate predicate) {
+ Set refFields = PredicateVisitor.collectFieldNames(predicate);
+ if (!partitionFields.containsAll(refFields)) {
+ return Optional.of(predicate);
+ }
+
+ // Remap field indices from table schema to partition schema
+ List remappedInputs = new ArrayList<>();
+ for (Object input : predicate.transform().inputs()) {
+ if (input instanceof FieldRef) {
+ FieldRef ref = (FieldRef) input;
+ int partIdx = tableToPartitionMapping[ref.index()];
+ remappedInputs.add(new FieldRef(partIdx, ref.name(), ref.type()));
+ } else {
+ remappedInputs.add(input);
+ }
+ }
+
+ // Evaluate the remapped predicate against the known partition row
+ LeafPredicate remapped = predicate.copyWithNewInputs(remappedInputs);
+ boolean result = remapped.test(partitionRow);
+ return Optional.of(result ? PredicateBuilder.alwaysTrue() : PredicateBuilder.alwaysFalse());
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
index aeaa5d5aee7e..b71941c0ccbd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
@@ -37,6 +37,10 @@ default Optional visit(CompoundPredicate predicate) {
return Optional.empty();
}
}
- return Optional.of(new CompoundPredicate(predicate.function(), converted));
+ if (predicate.function() instanceof And) {
+ return Optional.of(PredicateBuilder.and(converted));
+ } else {
+ return Optional.of(PredicateBuilder.or(converted));
+ }
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
new file mode 100644
index 000000000000..8a5de1e63136
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+/** Represents a filter (boolean-valued function) of three argument. */
+@FunctionalInterface
+public interface TriFilter {
+
+ TriFilter, ?, ?> ALWAYS_TRUE = (t, u, r) -> true;
+
+ boolean test(T t, U u, R r);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static TriFilter alwaysTrue() {
+ return (TriFilter) ALWAYS_TRUE;
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
new file mode 100644
index 000000000000..7486d00372d7
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PartitionValuePredicateVisitor}. */
+public class PartitionValuePredicateVisitorTest {
+
+ // Table schema: (pt INT, a INT, b INT), partition key: pt
+ private static final RowType TABLE_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt", DataTypes.INT()),
+ DataTypes.FIELD(1, "a", DataTypes.INT()),
+ DataTypes.FIELD(2, "b", DataTypes.INT()));
+
+ private static final RowType PARTITION_TYPE =
+ DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT()));
+
+ private static final PredicateBuilder BUILDER = new PredicateBuilder(TABLE_TYPE);
+
+ // ========================== Leaf: partition field ==========================
+
+ @Test
+ public void testPartitionEqualMatch() {
+ // pt = 1, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.equal(0, 1).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionEqualNoMatch() {
+ // pt = 2, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.equal(0, 2).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionGreaterThanMatch() {
+ // pt > 0, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.greaterThan(0, 0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionGreaterThanNoMatch() {
+ // pt > 5, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.greaterThan(0, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionLessOrEqualMatch() {
+ // pt <= 1, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.lessOrEqual(0, 1).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionBetweenMatch() {
+ // pt BETWEEN 0 AND 5, partition value is pt=3 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(3));
+
+ Optional result = BUILDER.between(0, 0, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionBetweenNoMatch() {
+ // pt BETWEEN 5 AND 10, partition value is pt=3 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(3));
+
+ Optional result = BUILDER.between(0, 5, 10).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionIsNullNoMatch() {
+ // pt IS NULL, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.isNull(0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionIsNotNullMatch() {
+ // pt IS NOT NULL, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Optional result = BUILDER.isNotNull(0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ // ========================== Leaf: non-partition field ==========================
+
+ @Test
+ public void testNonPartitionFieldKeptAsIs() {
+ // a = 5 is not a partition predicate => kept unchanged
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate original = BUILDER.equal(1, 5);
+ Optional result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Compound: AND ==========================
+
+ @Test
+ public void testAndPartitionMatchAndNonPartition() {
+ // (pt = 1 AND a = 5), partition value is pt=1
+ // => AlwaysTrue simplified away by PredicateBuilder.and(), leaving just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), BUILDER.equal(1, 5));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ @Test
+ public void testAndPartitionNoMatchAndNonPartition() {
+ // (pt = 2 AND a = 5), partition value is pt=1
+ // => AlwaysFalse short-circuits the entire AND
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 2), BUILDER.equal(1, 5));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testAndAllPartitionMatch() {
+ // (pt = 1 AND pt > 0), partition value is pt=1
+ // => both AlwaysTrue, simplified to AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), BUILDER.greaterThan(0, 0));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ // ========================== Compound: OR ==========================
+
+ @Test
+ public void testOrPartitionPredicates() {
+ // (pt = 1 OR pt = 2), partition value is pt=1
+ // => AlwaysTrue short-circuits the entire OR
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(0, 2));
+ Optional result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testOrPartitionMatchAndNonPartition() {
+ // (pt = 1 OR a = 5), partition value is pt=1
+ // => AlwaysTrue short-circuits the entire OR
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(1, 5));
+ Optional result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testOrPartitionNoMatchAndNonPartition() {
+ // (pt = 2 OR a = 5), partition value is pt=1
+ // => AlwaysFalse filtered out, leaving just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 2), BUILDER.equal(1, 5));
+ Optional result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ // ========================== Multiple partition keys ==========================
+
+ @Test
+ public void testMultiplePartitionFieldsBothMatch() {
+ // Table: (pt1 INT, pt2 STRING, a INT), partition keys: (pt1, pt2)
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+ DataTypes.FIELD(2, "a", DataTypes.INT()));
+ RowType partitionType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ // pt1 = 1 AND pt2 = 'x', partition value is (1, 'x') => both match => AlwaysTrue
+ GenericRow partitionRow = GenericRow.of(1, BinaryString.fromString("x"));
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType, partitionRow);
+
+ Predicate and =
+ PredicateBuilder.and(
+ builder.equal(0, 1), builder.equal(1, BinaryString.fromString("x")));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testMultiplePartitionFieldsPartialMatch() {
+ // pt1 = 1 AND pt2 = 'y', partition value is (1, 'x') => pt1 matches, pt2 doesn't
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+ DataTypes.FIELD(2, "a", DataTypes.INT()));
+ RowType partitionType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ GenericRow partitionRow = GenericRow.of(1, BinaryString.fromString("x"));
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType, partitionRow);
+
+ Predicate and =
+ PredicateBuilder.and(
+ builder.equal(0, 1), builder.equal(1, BinaryString.fromString("y")));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ // pt2 doesn't match => AlwaysFalse short-circuits the entire AND
+ assertAlwaysFalse(result.get());
+ }
+
+ // ========================== Partition field not at index 0 ==========================
+
+ @Test
+ public void testPartitionFieldNotFirstInTable() {
+ // Table: (a INT, pt INT, b INT), partition key: pt (index 1 in table, index 0 in partition)
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "a", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt", DataTypes.INT()),
+ DataTypes.FIELD(2, "b", DataTypes.INT()));
+ RowType partitionType = DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ // pt = 3, partition value is pt=3
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType, GenericRow.of(3));
+
+ Optional result = builder.equal(1, 3).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+
+ // pt = 5, partition value is pt=3 => AlwaysFalse
+ result = builder.equal(1, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+
+ // a = 10 => kept as-is (non-partition field)
+ Predicate original = builder.equal(0, 10);
+ result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Nested compound ==========================
+
+ @Test
+ public void testNestedAndOr() {
+ // ((pt = 1 OR pt = 2) AND a = 5), partition value is pt=1
+ // Inner OR: AlwaysTrue short-circuits to AlwaysTrue
+ // Outer AND: AlwaysTrue simplified away, leaving just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), BUILDER.equal(0, 2));
+ Predicate and = PredicateBuilder.and(or, BUILDER.equal(1, 5));
+ Optional result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ // ========================== Only non-partition predicates ==========================
+
+ @Test
+ public void testAllNonPartitionPredicatesUnchanged() {
+ // (a = 5 AND b = 10), partition value is pt=1 => kept unchanged
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, GenericRow.of(1));
+
+ Predicate original = PredicateBuilder.and(BUILDER.equal(1, 5), BUILDER.equal(2, 10));
+ Optional result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Helpers ==========================
+
+ private static void assertAlwaysTrue(Predicate predicate) {
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+ assertThat(((LeafPredicate) predicate).function()).isEqualTo(AlwaysTrue.INSTANCE);
+ }
+
+ private static void assertAlwaysFalse(Predicate predicate) {
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+ assertThat(((LeafPredicate) predicate).function()).isEqualTo(AlwaysFalse.INSTANCE);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ad71a1d59518..f0f5e1c6fff5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -30,7 +30,6 @@
import org.apache.paimon.operation.DataEvolutionFileStoreScan;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
@@ -41,12 +40,6 @@
import javax.annotation.Nullable;
import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore {
@@ -143,26 +136,12 @@ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer wr
@Override
public AppendOnlyFileStoreScan newScan() {
BucketSelectConverter bucketSelectConverter =
- predicate -> {
- if (bucketMode() != BucketMode.HASH_FIXED) {
- return Optional.empty();
- }
-
- if (bucketKeyType.getFieldCount() == 0) {
- return Optional.empty();
- }
-
- List bucketFilters =
- pickTransformFieldMapping(
- splitAnd(predicate),
- rowType.getFieldNames(),
- bucketKeyType.getFieldNames());
- if (!bucketFilters.isEmpty()) {
- return BucketSelectConverter.create(
- and(bucketFilters), bucketKeyType, options.bucketFunctionType());
- }
- return Optional.empty();
- };
+ new BucketSelectConverter(
+ bucketMode(),
+ options.bucketFunctionType(),
+ rowType,
+ partitionType,
+ bucketKeyType);
if (options().dataEvolutionEnabled()) {
return new DataEvolutionFileStoreScan(
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 710e2dc3a587..a1ae650b433a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -33,7 +33,6 @@
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -48,18 +47,12 @@
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
import java.util.function.Supplier;
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
public class KeyValueFileStore extends AbstractFileStore {
private final boolean crossPartitionUpdate;
- private final RowType bucketKeyType;
private final RowType keyType;
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
@@ -74,7 +67,6 @@ public KeyValueFileStore(
boolean crossPartitionUpdate,
CoreOptions options,
RowType partitionType,
- RowType bucketKeyType,
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
@@ -83,7 +75,6 @@ public KeyValueFileStore(
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
- this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
this.valueType = valueType;
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
@@ -203,25 +194,13 @@ public AbstractFileStoreWrite newWrite(String commitUser, @Nullable In
@Override
public KeyValueFileStoreScan newScan() {
- BucketMode bucketMode = bucketMode();
BucketSelectConverter bucketSelectConverter =
- keyFilter -> {
- if (bucketMode != BucketMode.HASH_FIXED
- && bucketMode != BucketMode.POSTPONE_MODE) {
- return Optional.empty();
- }
-
- List bucketFilters =
- pickTransformFieldMapping(
- splitAnd(keyFilter),
- keyType.getFieldNames(),
- bucketKeyType.getFieldNames());
- if (!bucketFilters.isEmpty()) {
- return BucketSelectConverter.create(
- and(bucketFilters), bucketKeyType, options.bucketFunctionType());
- }
- return Optional.empty();
- };
+ new BucketSelectConverter(
+ bucketMode(),
+ options.bucketFunctionType(),
+ schema.logicalRowType(),
+ partitionType,
+ schema.logicalBucketKeyType());
return new KeyValueFileStoreScan(
newManifestsReader(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
index 45cd074a5ad1..4662d7dab517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
@@ -18,8 +18,9 @@
package org.apache.paimon.manifest;
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.TriFilter;
import javax.annotation.Nullable;
@@ -29,13 +30,13 @@ public class BucketFilter {
private final boolean onlyReadRealBuckets;
private final @Nullable Integer specifiedBucket;
private final @Nullable Filter bucketFilter;
- private final @Nullable BiFilter totalAwareBucketFilter;
+ private final @Nullable TriFilter totalAwareBucketFilter;
public BucketFilter(
boolean onlyReadRealBuckets,
@Nullable Integer specifiedBucket,
@Nullable Filter bucketFilter,
- @Nullable BiFilter totalAwareBucketFilter) {
+ @Nullable TriFilter totalAwareBucketFilter) {
this.onlyReadRealBuckets = onlyReadRealBuckets;
this.specifiedBucket = specifiedBucket;
this.bucketFilter = bucketFilter;
@@ -46,7 +47,7 @@ public BucketFilter(
boolean onlyReadRealBuckets,
@Nullable Integer specifiedBucket,
@Nullable Filter bucketFilter,
- @Nullable BiFilter totalAwareBucketFilter) {
+ @Nullable TriFilter totalAwareBucketFilter) {
if (!onlyReadRealBuckets
&& specifiedBucket == null
&& bucketFilter == null
@@ -63,7 +64,7 @@ public Integer specifiedBucket() {
return specifiedBucket;
}
- public boolean test(int bucket, int totalBucket) {
+ public boolean test(BinaryRow partition, int bucket, int totalBucket) {
if (onlyReadRealBuckets && bucket < 0) {
return false;
}
@@ -73,6 +74,7 @@ public boolean test(int bucket, int totalBucket) {
if (bucketFilter != null && !bucketFilter.test(bucket)) {
return false;
}
- return totalAwareBucketFilter == null || totalAwareBucketFilter.test(bucket, totalBucket);
+ return totalAwareBucketFilter == null
+ || totalAwareBucketFilter.test(partition, bucket, totalBucket);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 59b2a34650a4..09afdcc3ac58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -125,10 +125,10 @@ protected List readFromSegments(
List segments = manifestSegments.segments();
// try to do fast filter first
- Optional partition = extractSinglePartition(partitionFilter);
- if (partition.isPresent()) {
+ Optional singlePartition = extractSinglePartition(partitionFilter);
+ if (singlePartition.isPresent()) {
Map> segMap =
- manifestSegments.indexedSegments().get(partition.get());
+ manifestSegments.indexedSegments().get(singlePartition.get());
if (segMap == null) {
return Collections.emptyList();
}
@@ -147,11 +147,13 @@ protected List readFromSegments(
// do force loop filter
List segmentsList = new ArrayList<>();
for (RichSegments richSegments : segments) {
- if (partitionFilter != null && !partitionFilter.test(richSegments.partition())) {
+ BinaryRow partition = richSegments.partition();
+ if (partitionFilter != null && !partitionFilter.test(partition)) {
continue;
}
if (bucketFilter != null
- && !bucketFilter.test(richSegments.bucket(), richSegments.totalBucket())) {
+ && !bucketFilter.test(
+ partition, richSegments.bucket(), richSegments.totalBucket())) {
continue;
}
segmentsList.add(richSegments.segments());
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 68ebacaa805f..30908cce3e43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -46,6 +46,7 @@
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TriFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +88,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private boolean onlyReadRealBuckets = false;
private Integer specifiedBucket = null;
private Filter bucketFilter = null;
- private BiFilter totalAwareBucketFilter = null;
+ private TriFilter totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
private Integer specifiedLevel = null;
private Filter levelFilter = null;
@@ -162,7 +163,7 @@ public FileStoreScan withBucketFilter(Filter bucketFilter) {
@Override
public FileStoreScan withTotalAwareBucketFilter(
- BiFilter totalAwareBucketFilter) {
+ TriFilter totalAwareBucketFilter) {
this.totalAwareBucketFilter = totalAwareBucketFilter;
return this;
}
@@ -533,14 +534,21 @@ private Filter createEntryRowFilter() {
Function levelGetter = ManifestEntrySerializer.levelGetter();
BucketFilter bucketFilter = createBucketFilter();
return row -> {
- if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) {
- return false;
+ BinaryRow partition = null;
+ if (partitionFilter != null) {
+ partition = partitionGetter.apply(row);
+ if (!partitionFilter.test(partition)) {
+ return false;
+ }
}
if (bucketFilter != null) {
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
- if (!bucketFilter.test(bucket, totalBucket)) {
+ if (partition == null) {
+ partition = partitionGetter.apply(row);
+ }
+ if (!bucketFilter.test(partition, bucket, totalBucket)) {
return false;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 3546a1fcae3c..2714e773a2ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -86,6 +86,11 @@ public AppendOnlyFileStoreScan(
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
this.inputFilter = predicate;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withCompleteFilter(Predicate predicate) {
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
return this;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
index 6577099aa885..e441641f7e14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
@@ -19,160 +19,55 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.bucket.BucketFunction;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.predicate.Equal;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.In;
-import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
/** Bucket filter push down in scan to skip files. */
-public interface BucketSelectConverter {
-
- int MAX_VALUES = 1000;
-
- Optional> convert(Predicate predicate);
-
- static Optional> create(
- Predicate bucketPredicate,
- RowType bucketKeyType,
- BucketFunctionType bucketFunctionType) {
- @SuppressWarnings("unchecked")
- List[] bucketValues = new List[bucketKeyType.getFieldCount()];
-
- BucketFunction bucketFunction = BucketFunction.create(bucketFunctionType, bucketKeyType);
-
- nextAnd:
- for (Predicate andPredicate : splitAnd(bucketPredicate)) {
- Integer reference = null;
- List values = new ArrayList<>();
- for (Predicate orPredicate : splitOr(andPredicate)) {
- if (orPredicate instanceof LeafPredicate) {
- LeafPredicate leaf = (LeafPredicate) orPredicate;
- Optional fieldRefOptional = leaf.fieldRefOptional();
- if (fieldRefOptional.isPresent()) {
- FieldRef fieldRef = fieldRefOptional.get();
- if (reference == null || reference == fieldRef.index()) {
- reference = fieldRef.index();
- if (leaf.function().equals(Equal.INSTANCE)
- || leaf.function().equals(In.INSTANCE)) {
- values.addAll(
- leaf.literals().stream()
- .filter(Objects::nonNull)
- .collect(Collectors.toList()));
- continue;
- }
- }
- }
- }
-
- // failed, go to next predicate
- continue nextAnd;
- }
- if (reference != null) {
- if (bucketValues[reference] != null) {
- // Repeated equals in And?
- return Optional.empty();
- }
-
- bucketValues[reference] = values;
- }
- }
-
- int rowCount = 1;
- for (List values : bucketValues) {
- if (values == null) {
- return Optional.empty();
- }
-
- rowCount *= values.size();
- if (rowCount > MAX_VALUES) {
- return Optional.empty();
- }
- }
-
- InternalRowSerializer serializer = new InternalRowSerializer(bucketKeyType);
- List bucketKeys = new ArrayList<>();
- assembleRows(
- bucketValues,
- columns ->
- bucketKeys.add(
- serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()),
- new ArrayList<>(),
- 0);
-
- return Optional.of(new Selector(bucketKeys, bucketFunction));
+public class BucketSelectConverter {
+
+ private final BucketMode bucketMode;
+ private final BucketFunctionType bucketFunctionType;
+ private final RowType rowType;
+ private final RowType partitionType;
+ private final RowType bucketKeyType;
+
+ public BucketSelectConverter(
+ BucketMode bucketMode,
+ BucketFunctionType bucketFunctionType,
+ RowType rowType,
+ RowType partitionType,
+ RowType bucketKeyType) {
+ this.bucketMode = bucketMode;
+ this.bucketFunctionType = bucketFunctionType;
+ this.rowType = rowType;
+ this.partitionType = partitionType;
+ this.bucketKeyType = bucketKeyType;
}
- static void assembleRows(
- List[] rowValues,
- Consumer> consumer,
- List stack,
- int columnIndex) {
- List columnValues = rowValues[columnIndex];
- for (Object value : columnValues) {
- stack.add(value);
- if (columnIndex == rowValues.length - 1) {
- // last column, consume row
- consumer.accept(stack);
- } else {
- assembleRows(rowValues, consumer, stack, columnIndex + 1);
- }
- stack.remove(stack.size() - 1);
+ public Optional> convert(Predicate predicate) {
+ if (bucketMode != BucketMode.HASH_FIXED && bucketMode != BucketMode.POSTPONE_MODE) {
+ return Optional.empty();
}
- }
- /** Selector to select bucket from {@link Predicate}. */
- @ThreadSafe
- class Selector implements BiFilter {
-
- private final List bucketKeys;
-
- private final BucketFunction bucketFunction;
-
- private final Map> buckets = new ConcurrentHashMap<>();
-
- public Selector(List bucketKeys, BucketFunction bucketFunction) {
- this.bucketKeys = bucketKeys;
- this.bucketFunction = bucketFunction;
+ if (bucketKeyType.getFieldCount() == 0) {
+ return Optional.empty();
}
- @Override
- public boolean test(Integer bucket, Integer numBucket) {
- return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket))
- .contains(bucket);
+ Set predicateFields = collectFieldNames(predicate);
+ if (!predicateFields.containsAll(bucketKeyType.getFieldNames())) {
+ return Optional.empty();
}
- @VisibleForTesting
- Set createBucketSet(int numBucket) {
- ImmutableSet.Builder builder = new ImmutableSet.Builder<>();
- for (BinaryRow key : bucketKeys) {
- builder.add(bucketFunction.bucket(key, numBucket));
- }
- return builder.build();
- }
+ return Optional.of(
+ new BucketSelector(
+ predicate, bucketFunctionType, rowType, partitionType, bucketKeyType));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
new file mode 100644
index 000000000000..8c28c3c0c641
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions.BucketFunctionType;
+import org.apache.paimon.bucket.BucketFunction;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.In;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.PartitionValuePredicateVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
+
+/** Selector to select bucket from {@link Predicate}. */
+@ThreadSafe
+public class BucketSelector implements TriFilter {
+
+ public static final int MAX_VALUES = 1000;
+
+ private final BucketFunctionType bucketFunctionType;
+ private final RowType rowType;
+ private final RowType partitionType;
+ private final RowType bucketKeyType;
+ private final Predicate predicate;
+ private final Map> partitionSelectors;
+
+ public BucketSelector(
+ Predicate predicate,
+ BucketFunctionType bucketFunctionType,
+ RowType rowType,
+ RowType partitionType,
+ RowType bucketKeyType) {
+ this.predicate = predicate;
+ this.bucketFunctionType = bucketFunctionType;
+ this.rowType = rowType;
+ this.partitionType = partitionType;
+ this.bucketKeyType = bucketKeyType;
+ this.partitionSelectors = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public boolean test(BinaryRow partition, Integer bucket, Integer numBucket) {
+ return partitionSelectors
+ .computeIfAbsent(partition, this::createPartitionSelector)
+ .map(selector -> selector.test(bucket, numBucket))
+ .orElse(true);
+ }
+
+ private Optional createPartitionSelector(BinaryRow partition) {
+ Optional partRemoved =
+ predicate.visit(
+ new PartitionValuePredicateVisitor(rowType, partitionType, partition));
+ if (!partRemoved.isPresent()) {
+ return Optional.empty();
+ }
+
+ List bucketFilters =
+ pickTransformFieldMapping(
+ splitAnd(partRemoved.get()),
+ rowType.getFieldNames(),
+ bucketKeyType.getFieldNames());
+ if (bucketFilters.isEmpty()) {
+ return Optional.empty();
+ }
+
+ return createPartitionSelector(and(bucketFilters));
+ }
+
+ private Optional createPartitionSelector(Predicate bucketPredicate) {
+ @SuppressWarnings("unchecked")
+ List[] bucketValues = new List[bucketKeyType.getFieldCount()];
+
+ BucketFunction bucketFunction = BucketFunction.create(bucketFunctionType, bucketKeyType);
+
+ nextAnd:
+ for (Predicate andPredicate : splitAnd(bucketPredicate)) {
+ Integer reference = null;
+ List values = new ArrayList<>();
+ for (Predicate orPredicate : splitOr(andPredicate)) {
+ if (orPredicate instanceof LeafPredicate) {
+ LeafPredicate leaf = (LeafPredicate) orPredicate;
+ Optional fieldRefOptional = leaf.fieldRefOptional();
+ if (fieldRefOptional.isPresent()) {
+ FieldRef fieldRef = fieldRefOptional.get();
+ if (reference == null || reference == fieldRef.index()) {
+ reference = fieldRef.index();
+ if (leaf.function().equals(Equal.INSTANCE)
+ || leaf.function().equals(In.INSTANCE)) {
+ values.addAll(
+ leaf.literals().stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()));
+ continue;
+ }
+ }
+ }
+ }
+
+ // failed, go to next predicate
+ continue nextAnd;
+ }
+ if (reference != null) {
+ if (bucketValues[reference] != null) {
+ // Same field appears in multiple AND branches,
+ // compute intersection to narrow down possible values
+ bucketValues[reference].retainAll(new HashSet<>(values));
+ if (bucketValues[reference].isEmpty()) {
+ // Empty intersection: contradictory conditions, no match
+ return Optional.empty();
+ }
+ } else {
+ bucketValues[reference] = values;
+ }
+ }
+ }
+
+ int rowCount = 1;
+ for (List values : bucketValues) {
+ if (values == null) {
+ return Optional.empty();
+ }
+
+ rowCount *= values.size();
+ if (rowCount > MAX_VALUES) {
+ return Optional.empty();
+ }
+ }
+
+ InternalRowSerializer serializer = new InternalRowSerializer(bucketKeyType);
+ List bucketKeys = new ArrayList<>();
+ assembleRows(
+ bucketValues,
+ columns ->
+ bucketKeys.add(
+ serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()),
+ new ArrayList<>(),
+ 0);
+
+ return Optional.of(new PartitionSelector(bucketKeys, bucketFunction));
+ }
+
+ private static void assembleRows(
+ List[] rowValues,
+ Consumer> consumer,
+ List stack,
+ int columnIndex) {
+ List columnValues = rowValues[columnIndex];
+ for (Object value : columnValues) {
+ stack.add(value);
+ if (columnIndex == rowValues.length - 1) {
+ // last column, consume row
+ consumer.accept(stack);
+ } else {
+ assembleRows(rowValues, consumer, stack, columnIndex + 1);
+ }
+ stack.remove(stack.size() - 1);
+ }
+ }
+
+ @ThreadSafe
+ private static class PartitionSelector implements BiFilter {
+
+ private final List bucketKeys;
+
+ private final BucketFunction bucketFunction;
+
+ private final Map> buckets = new ConcurrentHashMap<>();
+
+ public PartitionSelector(List bucketKeys, BucketFunction bucketFunction) {
+ this.bucketKeys = bucketKeys;
+ this.bucketFunction = bucketFunction;
+ }
+
+ @Override
+ public boolean test(Integer bucket, Integer numBucket) {
+ return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket))
+ .contains(bucket);
+ }
+
+ private Set createBucketSet(int numBucket) {
+ ImmutableSet.Builder builder = new ImmutableSet.Builder<>();
+ for (BinaryRow key : bucketKeys) {
+ builder.add(bucketFunction.bucket(key, numBucket));
+ }
+ return builder.build();
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 1e044f810f66..8f543ca7d39a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -35,6 +35,7 @@
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RowRangeIndex;
+import org.apache.paimon.utils.TriFilter;
import javax.annotation.Nullable;
@@ -56,13 +57,15 @@ public interface FileStoreScan {
FileStoreScan withPartitionFilter(PartitionPredicate predicate);
+ FileStoreScan withCompleteFilter(Predicate predicate);
+
FileStoreScan withBucket(int bucket);
FileStoreScan onlyReadRealBuckets();
FileStoreScan withBucketFilter(Filter bucketFilter);
- FileStoreScan withTotalAwareBucketFilter(BiFilter bucketFilter);
+ FileStoreScan withTotalAwareBucketFilter(TriFilter bucketFilter);
FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a09fb25ee62d..8595753cf0ab 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -116,6 +116,11 @@ public KeyValueFileStoreScan(
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
this.keyFilter = predicate;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withCompleteFilter(Predicate predicate) {
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
return this;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 5e4cffcbf672..a2fee49bfb88 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -93,8 +93,6 @@ public KeyValueFileStore store() {
tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
- PrimaryKeyTableUtils.addKeyNamePrefix(
- tableSchema.logicalBucketKeyType()),
keyType,
rowType,
extractor,
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a6710ff00848..67d0b9a56633 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -241,6 +241,7 @@ public SnapshotReader withFilter(Predicate predicate) {
if (!pair.getRight().isEmpty()) {
nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight()));
}
+ scan.withCompleteFilter(predicate);
return this;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 8142d44acf7d..09595d91889c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -134,7 +134,6 @@ private TestFileStore(
options,
partitionType,
keyType,
- keyType,
valueType,
keyValueFieldsExtractor,
mfFactory,
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
deleted file mode 100644
index eeec69d20226..000000000000
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.utils.BiFilter;
-import org.apache.paimon.utils.Filter;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketFilter}. */
-public class BucketFilterTest {
-
- @Test
- public void testCreateWithAllNullParameters() {
- // Test that create method returns null when all parameters are null/false
- assertThat(BucketFilter.create(false, null, null, null)).isNull();
- }
-
- @Test
- public void testCreateWithOnlyReadRealBuckets() {
- // Test that create method returns a BucketFilter when onlyReadRealBuckets is true
- BucketFilter filter = BucketFilter.create(true, null, null, null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testCreateWithSpecifiedBucket() {
- // Test that create method returns a BucketFilter when specifiedBucket is not null
- BucketFilter filter = BucketFilter.create(false, 1, null, null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isEqualTo(1);
- }
-
- @Test
- public void testCreateWithBucketFilter() {
- // Test that create method returns a BucketFilter when bucketFilter is not null
- Filter bucketFilter = value -> value > 0;
- BucketFilter filter = BucketFilter.create(false, null, bucketFilter, null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testCreateWithTotalAwareBucketFilter() {
- // Test that create method returns a BucketFilter when totalAwareBucketFilter is not null
- BiFilter totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket;
- BucketFilter filter = BucketFilter.create(false, null, null, totalAwareBucketFilter);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testTestWithOnlyReadRealBuckets() {
- // Test the test method with onlyReadRealBuckets parameter
- BucketFilter filter = BucketFilter.create(true, null, null, null);
-
- // Real buckets (non-negative) should pass
- assertThat(filter.test(0, 1)).isTrue();
- assertThat(filter.test(1, 2)).isTrue();
-
- // Virtual buckets (negative) should not pass
- assertThat(filter.test(-1, 1)).isFalse();
- assertThat(filter.test(-2, 2)).isFalse();
- }
-
- @Test
- public void testTestWithSpecifiedBucket() {
- // Test the test method with specifiedBucket parameter
- BucketFilter filter = BucketFilter.create(false, 1, null, null);
-
- // Only the specified bucket should pass
- assertThat(filter.test(1, 2)).isTrue();
-
- // Other buckets should not pass
- assertThat(filter.test(0, 2)).isFalse();
- assertThat(filter.test(2, 3)).isFalse();
- }
-
- @Test
- public void testTestWithBucketFilter() {
- // Test the test method with bucketFilter parameter
- Filter bucketFilter = value -> value % 2 == 0; // Even buckets only
- BucketFilter filter = BucketFilter.create(false, null, bucketFilter, null);
-
- // Even buckets should pass
- assertThat(filter.test(0, 1)).isTrue();
- assertThat(filter.test(2, 3)).isTrue();
- assertThat(filter.test(4, 5)).isTrue();
-
- // Odd buckets should not pass
- assertThat(filter.test(1, 2)).isFalse();
- assertThat(filter.test(3, 4)).isFalse();
- assertThat(filter.test(5, 6)).isFalse();
- }
-
- @Test
- public void testTestWithTotalAwareBucketFilter() {
- // Test the test method with totalAwareBucketFilter parameter
- BiFilter totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket / 2;
- BucketFilter filter = BucketFilter.create(false, null, null, totalAwareBucketFilter);
-
- // Buckets less than half of totalBucket should pass
- assertThat(filter.test(0, 4)).isTrue();
- assertThat(filter.test(1, 4)).isTrue();
-
- // Buckets greater than or equal to half of totalBucket should not pass
- assertThat(filter.test(2, 4)).isFalse();
- assertThat(filter.test(3, 4)).isFalse();
- }
-
- @Test
- public void testTestWithMultipleFilters() {
- // Test the test method with multiple filters combined
- Filter bucketFilter = value -> value > 0; // Positive buckets only
- BiFilter totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket - 1;
- BucketFilter filter = BucketFilter.create(true, 1, bucketFilter, totalAwareBucketFilter);
-
- // Bucket 1 is positive, is the specified bucket, and is less than totalBucket-1 for
- // totalBucket=3
- assertThat(filter.test(1, 3)).isTrue();
-
- // Bucket 0 is not positive, so it should not pass
- assertThat(filter.test(0, 3)).isFalse();
-
- // Bucket 2 is not the specified bucket, so it should not pass
- assertThat(filter.test(2, 3)).isFalse();
-
- // Bucket 1 with totalBucket=2 should not pass because 1 >= 2-1
- assertThat(filter.test(1, 2)).isFalse();
-
- // Negative bucket should not pass because onlyReadRealBuckets is true
- assertThat(filter.test(-1, 3)).isFalse();
- }
-
- @Test
- public void testSpecifiedBucket() {
- // Test the specifiedBucket method
- BucketFilter filterWithSpecifiedBucket = BucketFilter.create(false, 2, null, null);
- assertThat(filterWithSpecifiedBucket.specifiedBucket()).isEqualTo(2);
- }
-}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
deleted file mode 100644
index dc64bdb596d4..000000000000
--- a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.operation.BucketSelectConverter.Selector;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static org.apache.paimon.predicate.PredicateBuilder.or;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketSelectConverter}. */
-public class BucketSelectConverterTest {
-
- private final RowType rowType = RowType.of(new IntType(), new IntType(), new IntType());
-
- private final PredicateBuilder builder = new PredicateBuilder(rowType);
-
- @Test
- public void testRepeatEqual() {
- assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0, 1)))).isEmpty();
- }
-
- @Test
- public void testNotFull() {
- assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty();
- }
-
- @Test
- public void testOtherPredicate() {
- assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty();
- }
-
- @Test
- public void testOrIllegal() {
- assertThat(
- newSelector(
- and(
- or(builder.equal(0, 5), builder.equal(1, 6)),
- builder.equal(1, 1),
- builder.equal(2, 2))))
- .isEmpty();
- }
-
- @Test
- public void testNormal() {
- Selector selector =
- newSelector(and(builder.equal(0, 0), builder.equal(1, 1), builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(11);
- }
-
- @Test
- public void testIn() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testOr() {
- Selector selector =
- newSelector(
- and(
- or(
- builder.equal(0, 5),
- builder.equal(0, 6),
- builder.equal(0, 7)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testInNull() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7, null)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testMultipleIn() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7)),
- builder.in(1, Arrays.asList(1, 8)),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19);
- }
-
- @Test
- public void testMultipleOr() {
- Selector selector =
- newSelector(
- and(
- or(
- builder.equal(0, 5),
- builder.equal(0, 6),
- builder.equal(0, 7)),
- or(builder.equal(1, 1), builder.equal(1, 8)),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19);
- }
-
- private Optional newSelector(Predicate predicate) {
- return (Optional)
- BucketSelectConverter.create(predicate, rowType, BucketFunctionType.DEFAULT);
- }
-}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
new file mode 100644
index 000000000000..3128ff20db22
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions.BucketFunctionType;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BucketSelector}. */
+public class BucketSelectorTest {
+
+ private static final int NUM_BUCKETS = 10;
+
+ // ========================== Single bucket key, non-partitioned ==========================
+
+ @Test
+ public void testEqualPredicate() {
+ // k = 5 => should select exactly one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = pb.equal(0, 5);
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testEqualAndRangePredicate() {
+ // k = 5 AND k < 100 => should still select bucket for k=5
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.lessThan(0, 100));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testEqualAndInWithOverlap() {
+ // k = 5 AND k IN (5, 10) => intersection is {5}, should select one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.in(0, Arrays.asList(5, 10)));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testInAndInWithOverlap() {
+ // k IN (1, 5) AND k IN (5, 10) => intersection is {5}, should select one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate =
+ PredicateBuilder.and(pb.in(0, Arrays.asList(1, 5)), pb.in(0, Arrays.asList(5, 10)));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testRedundantEquals() {
+ // k = 5 AND k = 5 => redundant, should still select one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 5));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testContradictoryEquals() {
+ // k = 5 AND k = 10 => empty intersection, no bucket can match
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 10));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ // Empty intersection => Optional.empty() => orElse(true) => all buckets pass
+ // (conservative: no filtering when we can't determine)
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(NUM_BUCKETS);
+ }
+
+ @Test
+ public void testRangeOnlyFallsBackToFullScan() {
+ // k < 100 => no Equal/In to extract, full scan
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = pb.lessThan(0, 100);
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(NUM_BUCKETS);
+ }
+
+ // ========================== Multi-field bucket key ==========================
+
+ @Test
+ public void testMultiFieldBucketKey() {
+ // k1 = 5 AND k2 = 10 => one combination, one bucket
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k1", DataTypes.INT()),
+ DataTypes.FIELD(1, "k2", DataTypes.INT()));
+ RowType partType = RowType.of();
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(1, 10));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, rowType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testMultiFieldWithIntersection() {
+ // k1 IN (1, 5) AND k1 IN (5, 10) AND k2 = 3 => k1={5}, k2={3} => one combination
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k1", DataTypes.INT()),
+ DataTypes.FIELD(1, "k2", DataTypes.INT()));
+ RowType partType = RowType.of();
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate =
+ PredicateBuilder.and(
+ pb.in(0, Arrays.asList(1, 5)),
+ pb.in(0, Arrays.asList(5, 10)),
+ pb.equal(1, 3));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, rowType);
+
+ Set selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ // ========================== Partitioned table ==========================
+
+ @Test
+ public void testPartitionedTableWithBucketFilter() {
+ // Table: (pt INT, k INT), partition: (pt), bucket key: (k)
+ // Predicate: pt = 1 AND k = 5
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt", DataTypes.INT()),
+ DataTypes.FIELD(1, "k", DataTypes.INT()));
+ RowType partType = DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT()));
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 1), pb.equal(1, 5));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType, partType, bucketKeyType);
+
+ // For partition pt=1 (matching), bucket filtering should work
+ BinaryRow partition1 =
+ new org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+ .toBinaryRow(org.apache.paimon.data.GenericRow.of(1))
+ .copy();
+ Set selected1 = selectedBuckets(selector, partition1, NUM_BUCKETS);
+ assertThat(selected1).hasSize(1);
+
+ // For partition pt=2 (not matching), predicate becomes AlwaysFalse
+ // => no bucket key values extracted => full scan (conservative)
+ BinaryRow partition2 =
+ new org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+ .toBinaryRow(org.apache.paimon.data.GenericRow.of(2))
+ .copy();
+ Set selected2 = selectedBuckets(selector, partition2, NUM_BUCKETS);
+ assertThat(selected2).hasSize(NUM_BUCKETS);
+ }
+
+ // ========================== Helpers ==========================
+
+ private static Set selectedBuckets(
+ BucketSelector selector, BinaryRow partition, int numBuckets) {
+ Set selected = new HashSet<>();
+ for (int b = 0; b < numBuckets; b++) {
+ if (selector.test(partition, b, numBuckets)) {
+ selected.add(b);
+ }
+ }
+ return selected;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
new file mode 100644
index 000000000000..94494e040661
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests bucket filtering with compound predicates on a partitioned, fixed-bucket, append-only
+ * table.
+ */
+public class BucketFilterScanTest extends TableTestBase {
+
+ @Test
+ public void testBucketFilterWithCompoundPredicateOnAppendTable() throws Exception {
+ testBucketFilterWithCompoundPredicate(false);
+ }
+
+ @Test
+ public void testBucketFilterWithCompoundPredicateOnPkTable() throws Exception {
+ testBucketFilterWithCompoundPredicate(true);
+ }
+
+ @Test
+ public void testCompositeBucketFilterWithCompoundPredicateOnAppendTable() throws Exception {
+ testCompositeBucketFilterWithCompoundPredicate(false);
+ }
+
+ @Test
+ public void testCompositeBucketFilterWithCompoundPredicateOnPkTable() throws Exception {
+ testCompositeBucketFilterWithCompoundPredicate(true);
+ }
+
+ /**
+ * Tests bucket filtering with compound predicates on a single-field bucket key.
+ *
+ * Table schema:
+ *
+ *
+ * Partition key: column 'a' (INT)
+ * Bucket key: column 'b' (INT)
+ * Bucket count: 10
+ *
+ *
+ * Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) = 100 rows.
+ *
+ *
Test scenarios:
+ *
+ *
+ * Predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) - Tests partition range filter
+ * with bucket equality, combined with OR. Expected: buckets for partition 1,2 with b=5
+ * and partition 3 with b=7.
+ * Predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) - Tests partition range with
+ * bucket equality, OR partition equality with bucket range. Expected: mixed buckets from
+ * partition 3 and specific buckets from partitions 1,2.
+ * Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests partition equality with
+ * bucket equality in both OR branches. Expected: exact bucket matching for each
+ * partition-b combination.
+ *
+ */
+ private void testBucketFilterWithCompoundPredicate(boolean pk) throws Exception {
+ // ---- schema & table ----
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .option(BUCKET.key(), "10");
+ if (pk) {
+ builder.primaryKey("a", "b");
+ } else {
+ builder.option(BUCKET_KEY.key(), "b");
+ }
+ Schema schema = builder.build();
+
+ Identifier tableId = identifier("test_bucket_filter");
+ catalog.createTable(tableId, schema, false);
+ Table table = catalog.getTable(tableId);
+
+ // ---- write data: 5 partitions × 20 b-values = 100 rows ----
+ GenericRow[] rows = new GenericRow[100];
+ int idx = 0;
+ for (int a = 1; a <= 5; a++) {
+ for (int b = 1; b <= 20; b++) {
+ rows[idx++] = GenericRow.of(a, b, a * 100 + b);
+ }
+ }
+ write(table, rows);
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) ----
+ Predicate predicate1 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+ assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("3,1", "1,6", "2,6");
+
+ // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) ----
+ Predicate predicate2 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1, 100)));
+ assertThat(plan(table, predicate2))
+ .containsExactlyInAnyOrder(
+ "3,0", "3,1", "1,6", "3,4", "3,5", "2,6", "3,6", "3,7", "3,8");
+
+ // ---- build predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) ----
+ Predicate predicate3 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+ assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("3,1", "2,6");
+ }
+
+ /**
+ * Tests bucket filtering with compound predicates on a composite (multi-field) bucket key.
+ *
+ * Table schema:
+ *
+ *
+ * Partition key: column 'a' (INT)
+ * Bucket key: columns 'b' and 'c' (composite, INT)
+ * Bucket count: 10
+ *
+ *
+ * Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) × 10 c-values (c=0 to
+ * 9) = 1000 rows.
+ *
+ *
Test scenarios:
+ *
+ *
+ * Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests nested OR
+ * within AND, with partition range, bucket field equality, and additional bucket field
+ * filter. The 'c = 5' condition is part of the composite bucket key, affecting bucket
+ * selection.
+ * Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 - Tests range
+ * predicate on one bucket field (b) combined with equality on another (c). Validates
+ * handling of multiple bucket key fields with different predicate types.
+ * Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests exact matching on
+ * both partition and bucket fields. The composite bucket key (b,c) ensures precise bucket
+ * targeting.
+ *
+ */
+ private void testCompositeBucketFilterWithCompoundPredicate(boolean pk) throws Exception {
+ // ---- schema & table ----
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .option(BUCKET.key(), "10");
+ if (pk) {
+ builder.primaryKey("a", "b", "c");
+ } else {
+ builder.option(BUCKET_KEY.key(), "b,c");
+ }
+ Schema schema = builder.build();
+
+ Identifier tableId = identifier("test_composite_bucket_filter");
+ catalog.createTable(tableId, schema, false);
+ Table table = catalog.getTable(tableId);
+
+ // ---- write data: 5 partitions × 20 b-values x 10 c-values = 1000 rows ----
+ GenericRow[] rows = new GenericRow[1000];
+ int idx = 0;
+ for (int a = 1; a <= 5; a++) {
+ for (int b = 1; b <= 20; b++) {
+ for (int c = 0; c < 10; c++) {
+ rows[idx++] = GenericRow.of(a, b, c);
+ }
+ }
+ }
+ write(table, rows);
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 ----
+ Predicate predicate1 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("1,0", "2,0", "3,5");
+
+ // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 ----
+ Predicate predicate2 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1, 100))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate2))
+ .containsExactlyInAnyOrder(
+ "3,9", "1,0", "2,0", "3,0", "3,1", "3,2", "3,3", "3,4", "3,5", "3,6", "3,7",
+ "3,8");
+
+ // ---- build predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 ----
+ Predicate predicate3 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("2,0", "3,5");
+ }
+
+ private Set plan(Table table, Predicate predicate) {
+ return table.newReadBuilder().withFilter(predicate).newScan().plan().splits().stream()
+ .map(
+ split -> {
+ DataSplit dataSplit = (DataSplit) split;
+ int partitionA = dataSplit.partition().getInt(0);
+ int bucket = dataSplit.bucket();
+ return partitionA + "," + bucket;
+ })
+ .collect(Collectors.toSet());
+ }
+}