From 930957acb45eabe06e8caa872c80161016502234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 29 Apr 2026 19:52:03 +0800 Subject: [PATCH 1/3] [Flink] Fix PostponeFixedBucketChannelComputer routing all records to same channel Previously, the channel() method used the total number of buckets (knownNumBuckets) as the bucket parameter for ChannelComputer.select(). Since all records in the same partition share the same total bucket count, they were all routed to the same downstream channel, causing only one subtask to process data in batch mode. This fix computes a per-record bucket by hashing the trimmedPrimaryKey and taking modulo numBuckets, so records with different primary keys are distributed across different channels/subtasks. --- .../flink/sink/PostponeFixedBucketChannelComputer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java index 5fd55db13e47..a1f99ccfae82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java @@ -55,7 +55,12 @@ public void setup(int numChannels) { @Override public int channel(InternalRow record) { BinaryRow partition = partitionKeyExtractor.partition(record); - int bucket = knownNumBuckets.computeIfAbsent(partition, p -> numChannels); + int numBuckets = knownNumBuckets.computeIfAbsent(partition, p -> numChannels); + int hash = partitionKeyExtractor.trimmedPrimaryKey(record).hashCode(); + if (hash == Integer.MIN_VALUE) { + hash = Integer.MAX_VALUE; + } + int bucket = Math.abs(hash) % numBuckets; return ChannelComputer.select(partition, bucket, numChannels); } From 5698511bb23f02d97efd5a9d71c82cad57342db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 29 Apr 2026 22:13:44 +0800 Subject: [PATCH 2/3] [Flink] Add tests for PostponeFixedBucketChannelComputer fix --- ...ostponeFixedBucketChannelComputerTest.java | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java new file mode 100644 index 000000000000..991ab5d2ddbd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PostponeFixedBucketChannelComputer}. */ +public class PostponeFixedBucketChannelComputerTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testRecordsDistributedAcrossChannels() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, + new String[] {"pt", "k", "v"}); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + TableSchema schema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + Collections.singletonList("pt"), + Arrays.asList("pt", "k"), + new HashMap() { + { + put("bucket", "-1"); + put("postpone.bucket-mode", "true"); + } + }, + "")); + + int numChannels = 8; + Map knownNumBuckets = new HashMap<>(); + PostponeFixedBucketChannelComputer computer = + new PostponeFixedBucketChannelComputer(schema, knownNumBuckets); + computer.setup(numChannels); + + Set channels = new HashSet<>(); + for (long i = 0; i < 100; i++) { + InternalRow row = GenericRow.of(1, i, (double) i); + int channel = computer.channel(row); + assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels); + channels.add(channel); + } + + // With 100 distinct keys and 8 channels, we should hit more than 1 channel + assertThat(channels.size()).isGreaterThan(1); + } + + @Test + public void testNoPartitionDistribution() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.DOUBLE()}, + new String[] {"k", "v"}); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + TableSchema schema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + new HashMap() { + { + put("bucket", "-1"); + put("postpone.bucket-mode", "true"); + } + }, + "")); + + int numChannels = 8; + Map knownNumBuckets = new HashMap<>(); + PostponeFixedBucketChannelComputer computer = + new PostponeFixedBucketChannelComputer(schema, knownNumBuckets); + computer.setup(numChannels); + + Set channels = new HashSet<>(); + for (long i = 0; i < 100; i++) { + InternalRow row = GenericRow.of(i, (double) i); + int channel = computer.channel(row); + assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels); + channels.add(channel); + } + + // Without the fix, all records would go to the same channel + // With the fix, 100 distinct keys across 8 channels should use multiple channels + assertThat(channels.size()).isGreaterThan(1); + } + + @Test + public void testSameKeyGoesToSameChannel() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.DOUBLE()}, + new String[] {"k", "v"}); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + TableSchema schema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + new HashMap() { + { + put("bucket", "-1"); + put("postpone.bucket-mode", "true"); + } + }, + "")); + + int numChannels = 8; + Map knownNumBuckets = new HashMap<>(); + PostponeFixedBucketChannelComputer computer = + new PostponeFixedBucketChannelComputer(schema, knownNumBuckets); + computer.setup(numChannels); + + // Same key should always route to the same channel + for (long key = 0; key < 50; key++) { + InternalRow row1 = GenericRow.of(key, 1.0); + InternalRow row2 = GenericRow.of(key, 2.0); + assertThat(computer.channel(row1)).isEqualTo(computer.channel(row2)); + } + } +} From aa071b7c6b21335fb623cfbb8406ba2844fa3b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Sun, 3 May 2026 15:04:11 +0800 Subject: [PATCH 3/3] [Flink] Reuse fixed bucket extractor for postpone channel --- .../table/sink/FixedBucketRowKeyExtractor.java | 7 +++++-- .../sink/PostponeFixedBucketChannelComputer.java | 15 ++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java index b0cfc2d2057f..146a45b43713 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java @@ -69,10 +69,13 @@ private BinaryRow bucketKey() { @Override public int bucket() { - BinaryRow bucketKey = bucketKey(); if (reuseBucket == null) { - reuseBucket = bucketFunction.bucket(bucketKey, numBuckets); + reuseBucket = bucket(numBuckets); } return reuseBucket; } + + public int bucket(int numBuckets) { + return bucketFunction.bucket(bucketKey(), numBuckets); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java index a1f99ccfae82..e1bf08d8434c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.ChannelComputer; -import org.apache.paimon.table.sink.RowPartitionKeyExtractor; +import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor; import java.util.Map; @@ -38,7 +38,7 @@ public class PostponeFixedBucketChannelComputer implements ChannelComputer knownNumBuckets; private transient int numChannels; - private transient RowPartitionKeyExtractor partitionKeyExtractor; + private transient FixedBucketRowKeyExtractor keyExtractor; public PostponeFixedBucketChannelComputer( TableSchema schema, Map knownNumBuckets) { @@ -49,18 +49,15 @@ public PostponeFixedBucketChannelComputer( @Override public void setup(int numChannels) { this.numChannels = numChannels; - this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema); + this.keyExtractor = new FixedBucketRowKeyExtractor(schema); } @Override public int channel(InternalRow record) { - BinaryRow partition = partitionKeyExtractor.partition(record); + keyExtractor.setRecord(record); + BinaryRow partition = keyExtractor.partition(); int numBuckets = knownNumBuckets.computeIfAbsent(partition, p -> numChannels); - int hash = partitionKeyExtractor.trimmedPrimaryKey(record).hashCode(); - if (hash == Integer.MIN_VALUE) { - hash = Integer.MAX_VALUE; - } - int bucket = Math.abs(hash) % numBuckets; + int bucket = keyExtractor.bucket(numBuckets); return ChannelComputer.select(partition, bucket, numChannels); }