From 70734bebdbf0d07675e767ff85d6d4378887cac5 Mon Sep 17 00:00:00 2001 From: Arnav Balyan Date: Sun, 3 May 2026 09:52:53 +0530 Subject: [PATCH] update --- .../java/org/apache/paimon/CoreOptions.java | 10 +++ .../table/FallbackReadFileStoreTable.java | 22 ++++- .../table/FallbackReadFileStoreTableTest.java | 90 +++++++++++++++++++ 3 files changed, 120 insertions(+), 2 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 2f24db43f4db..3364ca4d3928 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1972,6 +1972,16 @@ public InlineElement getDescription() { "When a batch job queries from a table, if a partition does not exist in the current branch, " + "the reader will try to get this partition from this fallback branch."); + public static final ConfigOption SCAN_FALLBACK_BRANCH_READ_FAIL_FAST = + key("scan.fallback-branch.read-fail-fast") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to fail the read immediately when reading from a fallback branch throws. " + + "By default the failure is logged with the full stack trace and the reader " + + "falls through to the current branch, which can mask data issues. " + + "Set this to true to surface fallback branch errors to the caller instead."); + public static final ConfigOption SCAN_PRIMARY_BRANCH = key("scan.primary-branch") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index addca008d197..5c32135f764a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -573,12 +573,17 @@ private class Read implements InnerTableRead { private final InnerTableRead mainRead; private final InnerTableRead fallbackRead; + private final boolean fallbackReadFailFast; private Read() { FileStoreTable first = wrappedFirst ? wrapped : other; FileStoreTable second = wrappedFirst ? other : wrapped; this.mainRead = first.newRead(); this.fallbackRead = second.newRead(); + this.fallbackReadFailFast = + wrapped.coreOptions() + .toConfiguration() + .get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST); } @Override @@ -623,10 +628,23 @@ public RecordReader createReader(Split split) throws IOException { if (fallbackSplit.isFallback()) { try { return fallbackRead.createReader(fallbackSplit.wrapped()); - } catch (Exception ignored) { + } catch (Exception e) { + if (fallbackReadFailFast) { + if (e instanceof IOException) { + throw (IOException) e; + } + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new IOException( + "Failed to read fallback branch split: " + + fallbackSplit.wrapped(), + e); + } LOG.error( "Reading from supplemental branch has problems: {}", - fallbackSplit.wrapped()); + fallbackSplit.wrapped(), + e); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 6a1f782fd2d6..7f586875a193 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -34,7 +35,9 @@ import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -42,10 +45,13 @@ import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -53,6 +59,7 @@ import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link FallbackReadFileStoreTable}. */ public class FallbackReadFileStoreTableTest { @@ -242,6 +249,89 @@ public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception { assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3); } + @Test + public void testFallbackReadFailFastDefaultSwallowsException() throws Exception { + FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(false); + Split split = onlyFallbackSplit(table); + + // Default behavior: the failing fallback read is swallowed and the reader + // falls through to the main branch, which has no data for partition 3 and + // either returns an empty reader or throws something other than the + // injected fallback exception. + try { + table.newRead().createReader(split); + } catch (Exception e) { + assertThat(e.getMessage()) + .as("fallback exception must not propagate when fail-fast is disabled") + .doesNotContain("injected fallback failure"); + } + } + + @Test + public void testFallbackReadFailFastPropagatesException() throws Exception { + FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(true); + Split split = onlyFallbackSplit(table); + + assertThatThrownBy(() -> table.newRead().createReader(split)) + .hasMessageContaining("injected fallback failure"); + } + + private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean failFast) + throws Exception { + String branchName = "bc"; + FileStoreTable mainTable = createTable(); + writeDataIntoTable(mainTable, 0, rowData(1, 10)); + mainTable.createBranch(branchName); + FileStoreTable branchTable = createTableFromBranch(mainTable, branchName); + writeDataIntoTable(branchTable, 0, rowData(3, 60)); + + Options overrides = new Options(); + overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST, failFast); + FileStoreTable mainWithOption = mainTable.copy(overrides.toMap()); + + FileStoreTable spyBranch = Mockito.spy(branchTable); + InnerTableRead throwing = throwingInnerTableRead(); + Mockito.doReturn(throwing).when(spyBranch).newRead(); + + return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true); + } + + private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) { + DataTableScan scan = table.newScan(); + scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3)); + List splits = scan.plan().splits(); + assertThat(splits).hasSize(1); + FallbackReadFileStoreTable.FallbackSplit fs = + (FallbackReadFileStoreTable.FallbackSplit) splits.get(0); + assertThat(fs.isFallback()).isTrue(); + return splits.get(0); + } + + private static InnerTableRead throwingInnerTableRead() { + return new InnerTableRead() { + @Override + public InnerTableRead withFilter(Predicate predicate) { + return this; + } + + @Override + public InnerTableRead withReadType(RowType readType) { + return this; + } + + @Override + public TableRead withIOManager(org.apache.paimon.disk.IOManager ioManager) { + return this; + } + + @Override + public org.apache.paimon.reader.RecordReader createReader(Split split) + throws IOException { + throw new IOException("injected fallback failure"); + } + }; + } + private void writeDataIntoTable( FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception { StreamTableWrite write = table.newWrite(commitUser);