From 389f108e83a8d3fd2eae3a1cd7f706ba67df5ed7 Mon Sep 17 00:00:00 2001 From: David Wang Date: Tue, 16 Jun 2026 12:27:50 +1000 Subject: [PATCH 1/4] Log partition and bucket in compaction task. --- .../apache/paimon/compact/CompactTask.java | 38 ++++++++++++++++++- .../compact/MergeTreeCompactManager.java | 19 ++++++++++ .../MergeTreeCompactManagerFactory.java | 33 ++++++++-------- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index 69e68949c3c9..7d3e19ffe582 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -37,22 +37,36 @@ public abstract class CompactTask implements Callable { @Nullable private final CompactionMetrics.Reporter metricsReporter; + @Nullable private String partitionString; + private int bucket = -1; + public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) { this.metricsReporter = metricsReporter; } + /** Set partition and bucket info for logging purposes. */ + public void setPartitionBucketInfo(@Nullable String partitionString, int bucket) { + this.partitionString = partitionString; + this.bucket = bucket; + } + @Override public CompactResult call() throws Exception { MetricUtils.safeCall(this::startTimer, LOG); + LOG.info( + "Paimon compact task started: partition={}, bucket={}, taskType={}", + partitionString, + bucket, + getClass().getSimpleName()); try { long startMillis = System.currentTimeMillis(); CompactResult result = doCompact(); + long durationMs = System.currentTimeMillis() - startMillis; MetricUtils.safeCall( () -> { if (metricsReporter != null) { - metricsReporter.reportCompactionTime( - System.currentTimeMillis() - startMillis); + metricsReporter.reportCompactionTime(durationMs); metricsReporter.increaseCompactionsCompletedCount(); metricsReporter.reportCompactionInputSize( result.before().stream() @@ -68,10 +82,30 @@ public CompactResult call() throws Exception { }, LOG); + LOG.info( + "Paimon compact task finished: partition={}, bucket={}, taskType={}, " + + "inputFiles={}, inputBytes={}, outputFiles={}, outputBytes={}, durationMs={}", + partitionString, + bucket, + getClass().getSimpleName(), + result.before().size(), + result.before().stream().mapToLong(DataFileMeta::fileSize).sum(), + result.after().size(), + result.after().stream().mapToLong(DataFileMeta::fileSize).sum(), + durationMs); + if (LOG.isDebugEnabled()) { LOG.debug(logMetric(startMillis, result.before(), result.after())); } return result; + } catch (Exception e) { + LOG.warn( + "Paimon compact task failed: partition={}, bucket={}, taskType={}", + partitionString, + bucket, + getClass().getSimpleName(), + e); + throw e; } finally { MetricUtils.safeCall(this::stopTimer, LOG); MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index ee431f4e0340..f14b58c0cd21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -25,6 +25,7 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; @@ -33,6 +34,7 @@ import org.apache.paimon.mergetree.Levels; import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.operation.metrics.MetricUtils; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -72,6 +74,9 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable private final RecordLevelExpire recordLevelExpire; + @Nullable private String partitionString; + private int partitionBucket = -1; + public MergeTreeCompactManager( ExecutorService executor, Levels levels, @@ -105,6 +110,19 @@ public MergeTreeCompactManager( MetricUtils.safeCall(this::reportMetrics, LOG); } + /** + * Set partition and bucket info so compact tasks can log readable partition/bucket identifiers. + */ + public void setPartitionBucketInfo( + BinaryRow partition, int bucket, FileStorePathFactory pathFactory) { + try { + this.partitionString = pathFactory.getPartitionString(partition); + } catch (Exception e) { + this.partitionString = partition.toString(); + } + this.partitionBucket = bucket; + } + @Override public boolean shouldWaitForLatestCompaction() { return levels.numberOfSortedRuns() > numSortedRunStopTrigger; @@ -244,6 +262,7 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) { file.fileName(), file.level(), file.fileSize())) .collect(Collectors.joining(", "))); } + task.setPartitionBucketInfo(partitionString, partitionBucket); taskFuture = executor.submit(task); if (metricsReporter != null) { metricsReporter.increaseCompactionsQueuedCount(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index 89d19313844e..ca646c3a7c45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -171,21 +171,24 @@ public CompactManager create( if (metricsReporter != null) { rewriter.setMetricsReporter(metricsReporter); } - return new MergeTreeCompactManager( - compactExecutor, - levels, - compactStrategy, - keyComparator, - options.compactionFileSize(true), - options.numSortedRunStopTrigger(), - rewriter, - metricsReporter, - dvMaintainer, - options.prepareCommitWaitCompaction(), - options.needLookup(), - recordLevelExpire, - options.forceRewriteAllFiles(), - options.isChainTable()); + MergeTreeCompactManager compactManager = + new MergeTreeCompactManager( + compactExecutor, + levels, + compactStrategy, + keyComparator, + options.compactionFileSize(true), + options.numSortedRunStopTrigger(), + rewriter, + metricsReporter, + dvMaintainer, + options.prepareCommitWaitCompaction(), + options.needLookup(), + recordLevelExpire, + options.forceRewriteAllFiles(), + options.isChainTable()); + compactManager.setPartitionBucketInfo(partition, bucket, keyReaderFactory.pathFactory()); + return compactManager; } private CompactStrategy createCompactStrategy( From 56596b219b691cf001e3c965c9fe130166cd1fc4 Mon Sep 17 00:00:00 2001 From: David Wang Date: Thu, 18 Jun 2026 11:36:19 +1000 Subject: [PATCH 2/4] Get partition info from binaryrow --- .../paimon/io/KeyValueFileReaderFactory.java | 4 ++ .../MergeTreeCompactManagerFactory.java | 3 +- .../operation/AbstractFileStoreWrite.java | 42 ++++++++++++------- .../apache/paimon/operation/WriteRestore.java | 11 ++++- .../coordinator/TableWriteCoordinator.java | 21 +++++++++- 5 files changed, 62 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 5f7e3741927e..081abb6668b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -300,6 +300,10 @@ public RowType readValueType() { return readValueType; } + public FileStorePathFactory pathFactory() { + return pathFactory; + } + public KeyValueFileReaderFactory build( BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) { return build(partition, bucket, dvFactory, true, Collections.emptyList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index ca646c3a7c45..6f0fcf1374e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -187,7 +187,8 @@ public CompactManager create( recordLevelExpire, options.forceRewriteAllFiles(), options.isChainTable()); - compactManager.setPartitionBucketInfo(partition, bucket, keyReaderFactory.pathFactory()); + compactManager.setPartitionBucketInfo( + partition, bucket, readerFactoryBuilder.pathFactory()); return compactManager; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index a49fff06d0ce..400e8444daab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -496,27 +496,28 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { } private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) { - RestoreFiles restored = - restore.restoreFiles( - partition, - bucket, - dbMaintainerFactory != null, - dvMaintainerFactory != null); + String partInfo = partitionInfo(partition); + RestoreFiles restored; + try { + restored = + restore.restoreFiles( + partition, + bucket, + dbMaintainerFactory != null, + dvMaintainerFactory != null); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Failed to restore existing files for %s, bucket %d.", + partInfo, bucket), + e); + } Integer restoredTotalBuckets = restored.totalBuckets(); int totalBuckets = numBuckets; if (restoredTotalBuckets != null) { totalBuckets = restoredTotalBuckets; } if (!ignoreNumBucketCheck && totalBuckets != numBuckets) { - String partInfo = - partitionType.getFieldCount() > 0 - ? "partition " - + getPartitionComputer( - partitionType, - PARTITION_DEFAULT_NAME.defaultValue(), - legacyPartitionName) - .generatePartValues(partition) - : "table"; throw new RuntimeException( String.format( "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " @@ -526,6 +527,17 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) { return restored; } + private String partitionInfo(BinaryRow partition) { + return partitionType.getFieldCount() > 0 + ? "partition " + + getPartitionComputer( + partitionType, + PARTITION_DEFAULT_NAME.defaultValue(), + legacyPartitionName) + .generatePartValues(partition) + : "table"; + } + private ExecutorService compactExecutor() { if (lazyCompactExecutor == null) { lazyCompactExecutor = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java index 5d4e335571d1..0d8db17313fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java @@ -39,13 +39,20 @@ RestoreFiles restoreFiles( @Nullable static Integer extractDataFiles(List entries, List dataFiles) { + return extractDataFiles(entries, dataFiles, null); + } + + @Nullable + static Integer extractDataFiles( + List entries, List dataFiles, @Nullable String context) { Integer totalBuckets = null; for (ManifestEntry entry : entries) { if (totalBuckets != null && totalBuckets != entry.totalBuckets()) { + String contextInfo = context == null ? "" : " for " + context; throw new RuntimeException( String.format( - "Bucket data files has different total bucket number, %s vs %s, this should be a bug.", - totalBuckets, entry.totalBuckets())); + "Bucket data files%s has different total bucket number, %s vs %s, this should be a bug.", + contextInfo, totalBuckets, entry.totalBuckets())); } totalBuckets = entry.totalBuckets(); dataFiles.add(entry.file()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java index 1cff11eeffc1..338c556321c0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java @@ -28,6 +28,7 @@ import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -42,6 +43,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.utils.InstantiationUtil.deserializeObject; import static org.apache.paimon.utils.InstantiationUtil.serializeObject; @@ -169,7 +171,11 @@ public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest reques List restoreFiles = new ArrayList<>(); List entries = scan.withPartitionBucket(partition, bucket).plan().files(); - Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles); + Integer totalBuckets = + WriteRestore.extractDataFiles( + entries, + restoreFiles, + String.format("%s, bucket %d", partitionInfo(partition), bucket)); IndexFileMeta dynamicBucketIndex = null; if (request.scanDynamicBucketIndex()) { @@ -212,6 +218,19 @@ public void checkpoint() { latestCommittedIdentifiers.clear(); } + private String partitionInfo(BinaryRow partition) { + if (table.schema().logicalPartitionType().getFieldCount() == 0) { + return "table"; + } + return "partition " + + FileStorePathFactory + .getPartitionComputer( + table.schema().logicalPartitionType(), + table.coreOptions().toConfiguration().get(PARTITION_DEFAULT_NAME), + table.coreOptions().legacyPartitionName()) + .generatePartValues(partition); + } + private static class CoordinationKey { private final byte[] content; From a4428894d9a48a818bb31a4ef3e49c4f6bb93810 Mon Sep 17 00:00:00 2001 From: David Wang Date: Tue, 23 Jun 2026 17:30:17 +1000 Subject: [PATCH 3/4] Fix lint. --- .../paimon/flink/sink/coordinator/TableWriteCoordinator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java index 338c556321c0..4ca62281248a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java @@ -223,8 +223,7 @@ private String partitionInfo(BinaryRow partition) { return "table"; } return "partition " - + FileStorePathFactory - .getPartitionComputer( + + FileStorePathFactory.getPartitionComputer( table.schema().logicalPartitionType(), table.coreOptions().toConfiguration().get(PARTITION_DEFAULT_NAME), table.coreOptions().legacyPartitionName()) From e67361493570f92f9c8105eb060581437a534ca3 Mon Sep 17 00:00:00 2001 From: David Wang Date: Tue, 23 Jun 2026 19:18:24 +1000 Subject: [PATCH 4/4] Fix UT. --- .../org/apache/paimon/table/AppendOnlySimpleTableTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index fca17b5b9800..c0c3f5e738d7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -195,7 +195,8 @@ public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws Except try (BatchTableWrite write = writeBuilder.newWrite()) { if (ordered) { assertThatThrownBy(() -> write.write(rowData(1, 10, 100L))) - .hasMessageContaining("FileNotFoundException"); + .hasMessageContaining("Failed to restore existing files") + .hasRootCauseInstanceOf(java.io.FileNotFoundException.class); } else { // no exception write.write(rowData(1, 10, 100L));