Skip to content
9 changes: 9 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ default void setRuntimeContext(Map<String, String> options) {}
*/
PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException;

/**
* Whether {@link #newOutputStream(Path, boolean)} with {@code overwrite=false} can create this
* path without first listing or probing the parent directory, and can fail atomically if the
* object already exists.
*/
default boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException {
return false;
}

/**
* Opens a TwoPhaseOutputStream at the indicated Path for transactional writing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
return wrap(() -> fileIO(path).newOutputStream(path, overwrite));
}

@Override
public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException {
return wrap(() -> fileIO(path).supportsAtomicCreateWithoutOverwrite(path));
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return wrap(() -> fileIO(path).getFileStatus(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
return wrap(() -> fileIO(path).newOutputStream(path, overwrite));
}

@Override
public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException {
return wrap(() -> fileIO(path).supportsAtomicCreateWithoutOverwrite(path));
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return wrap(() -> fileIO(path).getFileStatus(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
return delegate.newOutputStream(path, overwrite);
}

@Override
public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException {
return delegate.supportsAtomicCreateWithoutOverwrite(path);
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return delegate.getFileStatus(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
return fileIO().newOutputStream(path, overwrite);
}

@Override
public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException {
return fileIO().supportsAtomicCreateWithoutOverwrite(path);
}

@Override
public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.List;
import java.util.concurrent.Callable;

Expand All @@ -37,6 +38,9 @@
*/
public class RenamingSnapshotCommit implements SnapshotCommit {

private static final String HADOOP_FILE_ALREADY_EXISTS_EXCEPTION =
"org.apache.hadoop.fs.FileAlreadyExistsException";

private final SnapshotManager snapshotManager;
private final FileIO fileIO;
private final Lock lock;
Expand All @@ -50,30 +54,24 @@ public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) {
@Override
public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
throws Exception {
Path newSnapshotPath =
SnapshotManager targetSnapshotManager =
snapshotManager.branch().equals(branch)
? snapshotManager.snapshotPath(snapshot.id())
: snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id());
? snapshotManager
: snapshotManager.copyWithBranch(branch);
Path newSnapshotPath = targetSnapshotManager.snapshotPath(snapshot.id());
boolean supportsNoListCommit =
fileIO.isObjectStore()
&& fileIO.supportsAtomicCreateWithoutOverwrite(newSnapshotPath);

Callable<Boolean> callable =
() -> {
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
if (!committed) {
if (!fileIO.exists(newSnapshotPath)) {
throw new IOException(
"Commit snapshot "
+ snapshot.id()
+ " failed and "
+ newSnapshotPath
+ " not found");
}
committed =
snapshot.equals(
Snapshot.fromJson(fileIO.readFileUtf8(newSnapshotPath)));
}
boolean committed =
supportsNoListCommit
? writeNoOverwriteOrRecover(snapshot, newSnapshotPath)
: writeAtomicOrRecover(snapshot, newSnapshotPath);

if (committed) {
snapshotManager.commitLatestHint(snapshot.id());
targetSnapshotManager.commitLatestHint(snapshot.id());
}
return committed;
};
Expand All @@ -84,11 +82,102 @@ public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
(supportsNoListCommit || !fileIO.exists(newSnapshotPath))
&& callable.call());
}

@Override
public void close() throws Exception {
this.lock.close();
}

private boolean writeNoOverwriteOrRecover(Snapshot snapshot, Path newSnapshotPath)
throws IOException {
try {
fileIO.writeFile(newSnapshotPath, snapshot.toJson(), false);
return true;
} catch (IOException e) {
if (!isFileAlreadyExists(e)) {
throw e;
}
return recoverExistingSnapshot(snapshot, newSnapshotPath);
}
}

private boolean writeAtomicOrRecover(Snapshot snapshot, Path newSnapshotPath)
throws IOException {
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
if (!committed) {
if (!fileIO.exists(newSnapshotPath)) {
throw new IOException(
"Commit snapshot "
+ snapshot.id()
+ " failed and "
+ newSnapshotPath
+ " not found");
}
committed = snapshot.equals(Snapshot.fromJson(fileIO.readFileUtf8(newSnapshotPath)));
}
return committed;
}

private boolean recoverExistingSnapshot(Snapshot snapshot, Path newSnapshotPath)
throws IOException {
String existingSnapshotJson;
try {
existingSnapshotJson = fileIO.readFileUtf8(newSnapshotPath);
} catch (IOException | RuntimeException e) {
throw recoveryRequired(
snapshot, newSnapshotPath, "already exists but cannot be read", e);
}

Snapshot existingSnapshot;
try {
existingSnapshot = Snapshot.fromJson(existingSnapshotJson);
} catch (RuntimeException e) {
throw recoveryRequired(snapshot, newSnapshotPath, "already exists but is malformed", e);
}

if (snapshot.equals(existingSnapshot)) {
return true;
}

throw recoveryRequired(
snapshot, newSnapshotPath, "already exists with different content", null);
}

private static boolean isFileAlreadyExists(Throwable throwable) {
while (throwable != null) {
if (throwable instanceof FileAlreadyExistsException
|| HADOOP_FILE_ALREADY_EXISTS_EXCEPTION.equals(
throwable.getClass().getName())) {
return true;
}
throwable = throwable.getCause();
}
return false;
}

private static SnapshotCommitConflictRequiresListRecoveryException recoveryRequired(
Snapshot snapshot, Path newSnapshotPath, String reason, Throwable cause) {
return new SnapshotCommitConflictRequiresListRecoveryException(
"Cannot safely commit snapshot "
+ snapshot.id()
+ " because "
+ newSnapshotPath
+ " "
+ reason
+ "; recovery requires listing to discover the latest committed snapshot.",
cause);
}

/** Signals that snapshot commit conflict recovery requires listing snapshot metadata. */
public static class SnapshotCommitConflictRequiresListRecoveryException
extends RuntimeException {

public SnapshotCommitConflictRequiresListRecoveryException(
String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.HintFileUtils.LatestLookupMode;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.ListUtils;
Expand Down Expand Up @@ -718,9 +720,14 @@ private int tryCommit(
@Nullable String statsFileName) {
int retryCount = 0;
RetryCommitResult retryResult = null;
Snapshot recoveredLatestSnapshot = null;
long startMillis = System.currentTimeMillis();
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Snapshot latestSnapshot =
recoveredLatestSnapshot == null
? snapshotManager.latestSnapshot()
: recoveredLatestSnapshot;
recoveredLatestSnapshot = null;
CommitChanges changes = changesProvider.provide(latestSnapshot);
CommitResult result =
tryCommitOnce(
Expand All @@ -742,22 +749,54 @@ private int tryCommit(
}

retryResult = (RetryCommitResult) result;

if (System.currentTimeMillis() - startMillis > options.commitTimeout()
|| retryCount >= options.commitMaxRetries()) {
boolean canRetry =
System.currentTimeMillis() - startMillis <= options.commitTimeout()
&& retryCount < options.commitMaxRetries();
if (!canRetry) {
String message =
String.format(
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
options.commitTimeout(), retryCount);
throw new RuntimeException(message, retryResult.exception);
}

if (retryResult instanceof CommitFailRetryResult
&& ((CommitFailRetryResult) retryResult).requiresLatestSnapshotRecovery) {
recoveredLatestSnapshot =
recoverLatestSnapshotByListing(
((CommitFailRetryResult) retryResult).exception);
}

retryWaiter.retryWait(retryCount);
retryCount++;
}
return retryCount + 1;
}

private Snapshot recoverLatestSnapshotByListing(Exception cause) {
try {
Snapshot latestSnapshot =
snapshotManager.latestSnapshotFromFileSystem(
LatestLookupMode.RECOVERY_REQUIRING_LIST);
if (latestSnapshot == null) {
throw new RuntimeException(
"Cannot recover latest snapshot after snapshot commit conflict because listing returned no snapshots.");
}
return latestSnapshot;
} catch (RuntimeException e) {
RuntimeException recoveryException =
new RuntimeException(
"Cannot recover latest snapshot after missing or stale LATEST. "
+ "Recovery requires listing the snapshot directory; grant S3 ListBucket "
+ "for this table prefix or restore a valid LATEST hint.",
e);
if (cause != null) {
recoveryException.addSuppressed(cause);
}
throw recoveryException;
}
}

private void checkSameBucketFromSnapshot(
List<ManifestEntry> deltaFiles, @Nullable Snapshot latestSnapshot) {
if (latestSnapshot == null) {
Expand Down Expand Up @@ -1130,6 +1169,9 @@ CommitResult tryCommitOnce(
callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot));
try {
success = commitSnapshotImpl(newSnapshot, deltaStatistics);
} catch (SnapshotCommitConflictRequiresListRecoveryException e) {
LOG.warn("Recover latest snapshot by listing for snapshot commit conflict.", e);
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e, null, true);
} catch (Exception e) {
// commit exception, not sure about the situation and should not clean up the files
LOG.warn("Retry commit for exception.", e);
Expand Down Expand Up @@ -1263,14 +1305,39 @@ public boolean replaceManifestList(
latest.properties(),
nextRowId);

return commitSnapshotImpl(newSnapshot, emptyList());
try {
return commitSnapshotImpl(newSnapshot, emptyList());
} catch (SnapshotCommitConflictRequiresListRecoveryException e) {
LOG.warn("Manifest list replacement conflicted with a newer snapshot.", e);
return false;
}
}

public void compactManifest() {
int retryCount = 0;
Snapshot recoveredLatestSnapshot = null;
long startMillis = System.currentTimeMillis();
while (true) {
boolean success = compactManifestOnce();
boolean success;
try {
success = compactManifestOnce(recoveredLatestSnapshot);
recoveredLatestSnapshot = null;
} catch (SnapshotCommitConflictRequiresListRecoveryException e) {
LOG.warn("Recover latest snapshot by listing for manifest compaction conflict.", e);
if (System.currentTimeMillis() - startMillis > options.commitTimeout()
|| retryCount >= options.commitMaxRetries()) {
throw new RuntimeException(
String.format(
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
options.commitTimeout(), retryCount),
e);
}

recoveredLatestSnapshot = recoverLatestSnapshotByListing(e);
retryWaiter.retryWait(retryCount);
retryCount++;
continue;
}
if (success) {
break;
}
Expand All @@ -1288,8 +1355,11 @@ public void compactManifest() {
}
}

private boolean compactManifestOnce() {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
private boolean compactManifestOnce(@Nullable Snapshot recoveredLatestSnapshot) {
Snapshot latestSnapshot =
recoveredLatestSnapshot == null
? snapshotManager.latestSnapshot()
: recoveredLatestSnapshot;

if (latestSnapshot == null) {
return true;
Expand Down Expand Up @@ -1352,6 +1422,8 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de
statistics.add(entry.toPartitionStatistics(partitionComputer));
}
return snapshotCommit.commit(newSnapshot, options.branch(), statistics);
} catch (SnapshotCommitConflictRequiresListRecoveryException e) {
throw e;
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
Expand Down
Loading
Loading