diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index 69e68949c3c9..7d3e19ffe582 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -37,22 +37,36 @@ public abstract class CompactTask implements Callable { @Nullable private final CompactionMetrics.Reporter metricsReporter; + @Nullable private String partitionString; + private int bucket = -1; + public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) { this.metricsReporter = metricsReporter; } + /** Set partition and bucket info for logging purposes. */ + public void setPartitionBucketInfo(@Nullable String partitionString, int bucket) { + this.partitionString = partitionString; + this.bucket = bucket; + } + @Override public CompactResult call() throws Exception { MetricUtils.safeCall(this::startTimer, LOG); + LOG.info( + "Paimon compact task started: partition={}, bucket={}, taskType={}", + partitionString, + bucket, + getClass().getSimpleName()); try { long startMillis = System.currentTimeMillis(); CompactResult result = doCompact(); + long durationMs = System.currentTimeMillis() - startMillis; MetricUtils.safeCall( () -> { if (metricsReporter != null) { - metricsReporter.reportCompactionTime( - System.currentTimeMillis() - startMillis); + metricsReporter.reportCompactionTime(durationMs); metricsReporter.increaseCompactionsCompletedCount(); metricsReporter.reportCompactionInputSize( result.before().stream() @@ -68,10 +82,30 @@ public CompactResult call() throws Exception { }, LOG); + LOG.info( + "Paimon compact task finished: partition={}, bucket={}, taskType={}, " + + "inputFiles={}, inputBytes={}, outputFiles={}, outputBytes={}, durationMs={}", + partitionString, + bucket, + getClass().getSimpleName(), + result.before().size(), + result.before().stream().mapToLong(DataFileMeta::fileSize).sum(), + result.after().size(), + result.after().stream().mapToLong(DataFileMeta::fileSize).sum(), + durationMs); + if (LOG.isDebugEnabled()) { LOG.debug(logMetric(startMillis, result.before(), result.after())); } return result; + } catch (Exception e) { + LOG.warn( + "Paimon compact task failed: partition={}, bucket={}, taskType={}", + partitionString, + bucket, + getClass().getSimpleName(), + e); + throw e; } finally { MetricUtils.safeCall(this::stopTimer, LOG); MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 5f7e3741927e..081abb6668b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -300,6 +300,10 @@ public RowType readValueType() { return readValueType; } + public FileStorePathFactory pathFactory() { + return pathFactory; + } + public KeyValueFileReaderFactory build( BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) { return build(partition, bucket, dvFactory, true, Collections.emptyList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index ee431f4e0340..f14b58c0cd21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -25,6 +25,7 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; @@ -33,6 +34,7 @@ import org.apache.paimon.mergetree.Levels; import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.operation.metrics.MetricUtils; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -72,6 +74,9 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable private final RecordLevelExpire recordLevelExpire; + @Nullable private String partitionString; + private int partitionBucket = -1; + public MergeTreeCompactManager( ExecutorService executor, Levels levels, @@ -105,6 +110,19 @@ public MergeTreeCompactManager( MetricUtils.safeCall(this::reportMetrics, LOG); } + /** + * Set partition and bucket info so compact tasks can log readable partition/bucket identifiers. + */ + public void setPartitionBucketInfo( + BinaryRow partition, int bucket, FileStorePathFactory pathFactory) { + try { + this.partitionString = pathFactory.getPartitionString(partition); + } catch (Exception e) { + this.partitionString = partition.toString(); + } + this.partitionBucket = bucket; + } + @Override public boolean shouldWaitForLatestCompaction() { return levels.numberOfSortedRuns() > numSortedRunStopTrigger; @@ -244,6 +262,7 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) { file.fileName(), file.level(), file.fileSize())) .collect(Collectors.joining(", "))); } + task.setPartitionBucketInfo(partitionString, partitionBucket); taskFuture = executor.submit(task); if (metricsReporter != null) { metricsReporter.increaseCompactionsQueuedCount(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index 8b858e6592ce..bd7bbb55a837 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -174,21 +174,25 @@ public CompactManager create( if (metricsReporter != null) { rewriter.setMetricsReporter(metricsReporter); } - return new MergeTreeCompactManager( - compactExecutor, - levels, - compactStrategy, - keyComparator, - options.compactionFileSize(true), - options.numSortedRunStopTrigger(), - rewriter, - metricsReporter, - dvMaintainer, - lookupEnabled && options.prepareCommitWaitCompaction(), - lookupEnabled, - recordLevelExpire, - options.forceRewriteAllFiles(), - options.isChainTable()); + MergeTreeCompactManager compactManager = + new MergeTreeCompactManager( + compactExecutor, + levels, + compactStrategy, + keyComparator, + options.compactionFileSize(true), + options.numSortedRunStopTrigger(), + rewriter, + metricsReporter, + dvMaintainer, + lookupEnabled && options.prepareCommitWaitCompaction(), + lookupEnabled, + recordLevelExpire, + options.forceRewriteAllFiles(), + options.isChainTable()); + compactManager.setPartitionBucketInfo( + partition, bucket, readerFactoryBuilder.pathFactory()); + return compactManager; } private CompactStrategy createCompactStrategy( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 953942d738a6..2961a3702910 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -540,27 +540,28 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { } private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) { - RestoreFiles restored = - restore.restoreFiles( - partition, - bucket, - dbMaintainerFactory != null, - dvMaintainerFactory != null); + String partInfo = partitionInfo(partition); + RestoreFiles restored; + try { + restored = + restore.restoreFiles( + partition, + bucket, + dbMaintainerFactory != null, + dvMaintainerFactory != null); + } catch (RuntimeException e) { + throw new RuntimeException( + String.format( + "Failed to restore existing files for %s, bucket %d.", + partInfo, bucket), + e); + } Integer restoredTotalBuckets = restored.totalBuckets(); int totalBuckets = numBuckets; if (restoredTotalBuckets != null) { totalBuckets = restoredTotalBuckets; } if (!ignoreNumBucketCheck && totalBuckets != numBuckets) { - String partInfo = - partitionType.getFieldCount() > 0 - ? "partition " - + getPartitionComputer( - partitionType, - PARTITION_DEFAULT_NAME.defaultValue(), - legacyPartitionName) - .generatePartValues(partition) - : "table"; throw new RuntimeException( String.format( "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " @@ -570,6 +571,17 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) { return restored; } + private String partitionInfo(BinaryRow partition) { + return partitionType.getFieldCount() > 0 + ? "partition " + + getPartitionComputer( + partitionType, + PARTITION_DEFAULT_NAME.defaultValue(), + legacyPartitionName) + .generatePartValues(partition) + : "table"; + } + private ExecutorService compactExecutor() { if (lazyCompactExecutor == null) { lazyCompactExecutor = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java index 5d4e335571d1..0d8db17313fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/WriteRestore.java @@ -39,13 +39,20 @@ RestoreFiles restoreFiles( @Nullable static Integer extractDataFiles(List entries, List dataFiles) { + return extractDataFiles(entries, dataFiles, null); + } + + @Nullable + static Integer extractDataFiles( + List entries, List dataFiles, @Nullable String context) { Integer totalBuckets = null; for (ManifestEntry entry : entries) { if (totalBuckets != null && totalBuckets != entry.totalBuckets()) { + String contextInfo = context == null ? "" : " for " + context; throw new RuntimeException( String.format( - "Bucket data files has different total bucket number, %s vs %s, this should be a bug.", - totalBuckets, entry.totalBuckets())); + "Bucket data files%s has different total bucket number, %s vs %s, this should be a bug.", + contextInfo, totalBuckets, entry.totalBuckets())); } totalBuckets = entry.totalBuckets(); dataFiles.add(entry.file()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index fca17b5b9800..c0c3f5e738d7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -195,7 +195,8 @@ public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws Except try (BatchTableWrite write = writeBuilder.newWrite()) { if (ordered) { assertThatThrownBy(() -> write.write(rowData(1, 10, 100L))) - .hasMessageContaining("FileNotFoundException"); + .hasMessageContaining("Failed to restore existing files") + .hasRootCauseInstanceOf(java.io.FileNotFoundException.class); } else { // no exception write.write(rowData(1, 10, 100L)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java index 1cff11eeffc1..4ca62281248a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java @@ -28,6 +28,7 @@ import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -42,6 +43,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.utils.InstantiationUtil.deserializeObject; import static org.apache.paimon.utils.InstantiationUtil.serializeObject; @@ -169,7 +171,11 @@ public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest reques List restoreFiles = new ArrayList<>(); List entries = scan.withPartitionBucket(partition, bucket).plan().files(); - Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles); + Integer totalBuckets = + WriteRestore.extractDataFiles( + entries, + restoreFiles, + String.format("%s, bucket %d", partitionInfo(partition), bucket)); IndexFileMeta dynamicBucketIndex = null; if (request.scanDynamicBucketIndex()) { @@ -212,6 +218,18 @@ public void checkpoint() { latestCommittedIdentifiers.clear(); } + private String partitionInfo(BinaryRow partition) { + if (table.schema().logicalPartitionType().getFieldCount() == 0) { + return "table"; + } + return "partition " + + FileStorePathFactory.getPartitionComputer( + table.schema().logicalPartitionType(), + table.coreOptions().toConfiguration().get(PARTITION_DEFAULT_NAME), + table.coreOptions().legacyPartitionName()) + .generatePartValues(partition); + } + private static class CoordinationKey { private final byte[] content;