Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,16 @@ public InlineElement getDescription() {
"When a batch job queries from a table, if a partition does not exist in the current branch, "
+ "the reader will try to get this partition from this fallback branch.");

public static final ConfigOption<Boolean> SCAN_FALLBACK_BRANCH_READ_FAIL_FAST =
key("scan.fallback-branch.read-fail-fast")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to fail the read immediately when reading from a fallback branch throws. "
+ "By default the failure is logged with the full stack trace and the reader "
+ "falls through to the current branch, which can mask data issues. "
+ "Set this to true to surface fallback branch errors to the caller instead.");

public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
key("scan.primary-branch")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,17 @@ private class Read implements InnerTableRead {

private final InnerTableRead mainRead;
private final InnerTableRead fallbackRead;
private final boolean fallbackReadFailFast;

private Read() {
FileStoreTable first = wrappedFirst ? wrapped : other;
FileStoreTable second = wrappedFirst ? other : wrapped;
this.mainRead = first.newRead();
this.fallbackRead = second.newRead();
this.fallbackReadFailFast =
wrapped.coreOptions()
.toConfiguration()
.get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST);
}

@Override
Expand Down Expand Up @@ -623,10 +628,23 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (fallbackSplit.isFallback()) {
try {
return fallbackRead.createReader(fallbackSplit.wrapped());
} catch (Exception ignored) {
} catch (Exception e) {
if (fallbackReadFailFast) {
if (e instanceof IOException) {
throw (IOException) e;
}
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new IOException(
"Failed to read fallback branch split: "
+ fallbackSplit.wrapped(),
e);
}
LOG.error(
"Reading from supplemental branch has problems: {}",
fallbackSplit.wrapped());
fallbackSplit.wrapped(),
e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
Expand All @@ -34,25 +35,31 @@
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link FallbackReadFileStoreTable}. */
public class FallbackReadFileStoreTableTest {
Expand Down Expand Up @@ -242,6 +249,89 @@ public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception {
assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3);
}

@Test
public void testFallbackReadFailFastDefaultSwallowsException() throws Exception {
FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(false);
Split split = onlyFallbackSplit(table);

// Default behavior: the failing fallback read is swallowed and the reader
// falls through to the main branch, which has no data for partition 3 and
// either returns an empty reader or throws something other than the
// injected fallback exception.
try {
table.newRead().createReader(split);
} catch (Exception e) {
assertThat(e.getMessage())
.as("fallback exception must not propagate when fail-fast is disabled")
.doesNotContain("injected fallback failure");
}
}

@Test
public void testFallbackReadFailFastPropagatesException() throws Exception {
FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(true);
Split split = onlyFallbackSplit(table);

assertThatThrownBy(() -> table.newRead().createReader(split))
.hasMessageContaining("injected fallback failure");
}

private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean failFast)
throws Exception {
String branchName = "bc";
FileStoreTable mainTable = createTable();
writeDataIntoTable(mainTable, 0, rowData(1, 10));
mainTable.createBranch(branchName);
FileStoreTable branchTable = createTableFromBranch(mainTable, branchName);
writeDataIntoTable(branchTable, 0, rowData(3, 60));

Options overrides = new Options();
overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST, failFast);
FileStoreTable mainWithOption = mainTable.copy(overrides.toMap());

FileStoreTable spyBranch = Mockito.spy(branchTable);
InnerTableRead throwing = throwingInnerTableRead();
Mockito.doReturn(throwing).when(spyBranch).newRead();

return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true);
}

private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) {
DataTableScan scan = table.newScan();
scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3));
List<Split> splits = scan.plan().splits();
assertThat(splits).hasSize(1);
FallbackReadFileStoreTable.FallbackSplit fs =
(FallbackReadFileStoreTable.FallbackSplit) splits.get(0);
assertThat(fs.isFallback()).isTrue();
return splits.get(0);
}

private static InnerTableRead throwingInnerTableRead() {
return new InnerTableRead() {
@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableRead withReadType(RowType readType) {
return this;
}

@Override
public TableRead withIOManager(org.apache.paimon.disk.IOManager ioManager) {
return this;
}

@Override
public org.apache.paimon.reader.RecordReader<InternalRow> createReader(Split split)
throws IOException {
throw new IOException("injected fallback failure");
}
};
}

private void writeDataIntoTable(
FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception {
StreamTableWrite write = table.newWrite(commitUser);
Expand Down
Loading