diff --git a/.github/workflows/utitcase-flink-1.x-common.yml b/.github/workflows/utitcase-flink-1.x-common.yml
index 6825d8e7a369..f22b56014fe2 100644
--- a/.github/workflows/utitcase-flink-1.x-common.yml
+++ b/.github/workflows/utitcase-flink-1.x-common.yml
@@ -38,7 +38,7 @@ concurrency:
jobs:
build_test:
runs-on: ubuntu-latest
- timeout-minutes: 60
+ timeout-minutes: 100
steps:
- name: Checkout code
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 1003aff77cfb..9cde0eefe38a 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@
Duration |
Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition. |
+
+ lookup.dynamic-partition.refresh.async |
+ false |
+ Boolean |
+ Whether to refresh dynamic partition lookup table asynchronously. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded. |
+
lookup.refresh.async |
false |
@@ -357,4 +363,4 @@
Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
-
+
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 8febafc5e3c5..bfb4b7b8d1e2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -276,6 +276,17 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to refresh lookup table in an async thread.");
+ public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC =
+ ConfigOptions.key("lookup.dynamic-partition.refresh.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to refresh dynamic partition lookup table asynchronously. "
+ + "When enabled, partition changes will be loaded in a background thread "
+ + "while the old partition data continues serving queries. "
+ + "When disabled (default), partition refresh is synchronous and blocks queries "
+ + "until the new partition data is fully loaded.");
+
public static final ConfigOption LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
.intType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 5b71664ca6a9..4cb1474e8009 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -68,6 +68,7 @@
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
@@ -95,6 +96,10 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private transient File path;
private transient LookupTable lookupTable;
+ // partition refresh
+ @Nullable private transient PartitionRefresher partitionRefresher;
+ @Nullable private transient List scanPartitions;
+
// interval of refreshing lookup table
private transient Duration refreshInterval;
// timestamp when refreshing lookup table
@@ -235,6 +240,7 @@ private void open() throws Exception {
if (!partitions.isEmpty()) {
lookupTable.specifyPartitions(
partitions, partitionLoader.createSpecificPartFilter());
+ this.scanPartitions = partitions;
}
}
@@ -242,6 +248,14 @@ private void open() throws Exception {
lookupTable.specifyCacheRowFilter(cacheRowFilter);
}
lookupTable.open();
+
+ // Initialize partition refresher for FullCacheLookupTable
+ if (lookupTable instanceof FullCacheLookupTable) {
+ this.partitionRefresher =
+ new PartitionRefresher(
+ options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC), table.name());
+ this.partitionRefresher.init();
+ }
}
@Nullable
@@ -271,13 +285,14 @@ public Collection lookup(RowData keyRow) {
if (partitionLoader == null) {
return lookupInternal(key);
}
-
- if (partitionLoader.partitions().isEmpty()) {
+ List partitions =
+ scanPartitions == null ? partitionLoader.partitions() : scanPartitions;
+ if (partitions.isEmpty()) {
return Collections.emptyList();
}
List rows = new ArrayList<>();
- for (BinaryRow partition : partitionLoader.partitions()) {
+ for (BinaryRow partition : partitions) {
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
@@ -324,7 +339,18 @@ void tryRefresh() throws Exception {
return;
}
- // 2. refresh dynamic partition
+ // 2. check if async partition refresh has completed, and switch if so
+ if (partitionRefresher != null) {
+ LookupTable switchedTable = partitionRefresher.checkPartitionRefreshCompletion();
+ if (switchedTable != null) {
+ lookupTable.close();
+ lookupTable = switchedTable;
+ path = ((FullCacheLookupTable) switchedTable).context.tempPath;
+ scanPartitions = partitionLoader.partitions();
+ }
+ }
+
+ // 3. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
List partitions = partitionLoader.partitions();
@@ -334,18 +360,20 @@ void tryRefresh() throws Exception {
}
if (partitionChanged) {
- // reopen with latest partition
- lookupTable.specifyPartitions(
- partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
- lookupTable.close();
- lookupTable.open();
+ if (partitionRefresher != null) {
+ partitionRefresher.startPartitionRefresh(
+ partitions,
+ partitionLoader.createSpecificPartFilter(),
+ lookupTable,
+ ((FullCacheLookupTable) lookupTable).context,
+ cacheRowFilter);
+ }
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
- // no need to refresh the lookup table because it is reopened
return;
}
}
- // 3. refresh lookup table
+ // 4. refresh lookup table
if (shouldRefreshLookupTable()) {
// Check if we should do full load (close and reopen table) instead of incremental
// refresh
@@ -415,6 +443,10 @@ long nextBlacklistCheckTime() {
@Override
public void close() throws IOException {
+ if (partitionRefresher != null) {
+ partitionRefresher.close();
+ }
+
if (lookupTable != null) {
lookupTable.close();
lookupTable = null;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 81af38ea8ff2..6b02084d8ada 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -414,5 +414,16 @@ public Context copy(int[] newProjection) {
joinKey,
requiredCachedBucketIds);
}
+
+ public Context copy(File newTempPath) {
+ return new Context(
+ table.wrapped(),
+ projection,
+ tablePredicate,
+ projectedPredicate,
+ newTempPath,
+ joinKey,
+ requiredCachedBucketIds);
+ }
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
new file mode 100644
index 000000000000..bc0f43a57174
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.ExecutorUtils;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.Filter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
+
+/** Manages partition refresh logic for {@link FullCacheLookupTable}. */
+public class PartitionRefresher implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionRefresher.class);
+
+ private final boolean partitionRefreshAsync;
+ private final String tableName;
+
+ @Nullable private ExecutorService partitionRefreshExecutor;
+ private AtomicReference pendingLookupTable;
+ private AtomicReference partitionRefreshException;
+
+ public PartitionRefresher(boolean partitionRefreshAsync, String tableName) {
+ this.partitionRefreshAsync = partitionRefreshAsync;
+ this.tableName = tableName;
+ }
+
+ /** Initialize partition refresh resources. Should be called during table initialization. */
+ public void init() {
+ if (!partitionRefreshAsync) {
+ return;
+ }
+ this.pendingLookupTable = new AtomicReference<>(null);
+ this.partitionRefreshException = new AtomicReference<>(null);
+ this.partitionRefreshExecutor =
+ Executors.newSingleThreadExecutor(
+ new ExecutorThreadFactory(
+ String.format(
+ "%s-lookup-refresh-partition",
+ Thread.currentThread().getName())));
+ }
+
+ /**
+ * Start partition refresh. Chooses sync or async mode based on configuration.
+ *
+ * @param newPartitions the new partitions to refresh to
+ * @param partitionFilter the partition filter for the new partitions
+ * @param lookupTable the current lookup table to refresh
+ * @param context the context of the current lookup table
+ * @param cacheRowFilter the cache row filter, may be null
+ */
+ public void startPartitionRefresh(
+ List newPartitions,
+ @Nullable Predicate partitionFilter,
+ LookupTable lookupTable,
+ FullCacheLookupTable.Context context,
+ @Nullable Filter cacheRowFilter)
+ throws Exception {
+ if (partitionRefreshAsync) {
+ asyncPartitionRefresh(
+ newPartitions, partitionFilter, lookupTable, context, cacheRowFilter);
+ } else {
+ syncPartitionRefresh(newPartitions, partitionFilter, lookupTable);
+ }
+ }
+
+ private void syncPartitionRefresh(
+ List newPartitions,
+ @Nullable Predicate partitionFilter,
+ LookupTable lookupTable)
+ throws Exception {
+ LOG.info(
+ "Synchronously refreshing partition for table {}, new partitions detected.",
+ tableName);
+ lookupTable.close();
+ lookupTable.specifyPartitions(newPartitions, partitionFilter);
+ lookupTable.open();
+ LOG.info("Synchronous partition refresh completed for table {}.", tableName);
+ }
+
+ private void asyncPartitionRefresh(
+ List newPartitions,
+ @Nullable Predicate partitionFilter,
+ LookupTable lookupTable,
+ FullCacheLookupTable.Context context,
+ @Nullable Filter cacheRowFilter) {
+
+ LOG.info(
+ "Starting async partition refresh for table {}, new partitions detected.",
+ tableName);
+
+ partitionRefreshExecutor.submit(
+ () -> {
+ File newPath = null;
+ try {
+ newPath =
+ new File(
+ context.tempPath.getParent(),
+ "lookup-" + UUID.randomUUID());
+ if (!newPath.mkdirs()) {
+ throw new RuntimeException("Failed to create dir: " + newPath);
+ }
+ LookupTable newTable = copyWithNewPath(newPath, context, cacheRowFilter);
+ newTable.specifyPartitions(newPartitions, partitionFilter);
+ newTable.open();
+
+ pendingLookupTable.set(newTable);
+ LOG.info("Async partition refresh completed for table {}.", tableName);
+ } catch (Exception e) {
+ LOG.error("Async partition refresh failed for table {}.", tableName, e);
+ partitionRefreshException.set(e);
+ if (newPath != null) {
+ FileIOUtils.deleteDirectoryQuietly(newPath);
+ }
+ }
+ });
+ }
+
+ /**
+ * Create a new LookupTable instance with the same configuration but a different temp path.
+ *
+ * @param newPath the new temp path
+ * @param context the context of the current lookup table
+ * @param cacheRowFilter the cache row filter, may be null
+ * @return a new LookupTable instance (not yet opened)
+ */
+ public LookupTable copyWithNewPath(
+ File newPath,
+ FullCacheLookupTable.Context context,
+ @Nullable Filter cacheRowFilter) {
+ FullCacheLookupTable.Context newContext = context.copy(newPath);
+ Options options = Options.fromMap(context.table.options());
+ FullCacheLookupTable newTable =
+ FullCacheLookupTable.create(newContext, options.get(LOOKUP_CACHE_ROWS));
+ if (cacheRowFilter != null) {
+ newTable.specifyCacheRowFilter(cacheRowFilter);
+ }
+ return newTable;
+ }
+
+ /**
+ * Check if an async partition refresh has completed.
+ *
+ * @return the new lookup table if ready, or null if no switch is needed
+ */
+ @Nullable
+ public LookupTable checkPartitionRefreshCompletion() throws Exception {
+ if (!partitionRefreshAsync) {
+ return null;
+ }
+
+ Exception asyncException = partitionRefreshException.getAndSet(null);
+ if (asyncException != null) {
+ LOG.error(
+ "Async partition refresh failed for table {}, will stop running.",
+ tableName,
+ asyncException);
+ throw asyncException;
+ }
+
+ LookupTable newTable = pendingLookupTable.getAndSet(null);
+ if (newTable == null) {
+ return null;
+ }
+
+ LOG.info("Switched to new lookup table for table {} with new partitions.", tableName);
+ return newTable;
+ }
+
+ /** Close partition refresh resources. */
+ @Override
+ public void close() throws IOException {
+ if (partitionRefreshExecutor != null) {
+ ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, partitionRefreshExecutor);
+ }
+ if (pendingLookupTable != null) {
+ LookupTable pending = pendingLookupTable.getAndSet(null);
+ if (pending != null) {
+ pending.close();
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 408d74ea0cd9..bda4bcc567d0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1250,4 +1250,408 @@ public void testFallbackCacheMode() throws Exception {
iterator.close();
}
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL"})
+ public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception {
+ // This test verifies synchronous partition refresh (default mode):
+ // when max_pt() changes, the lookup table is refreshed synchronously,
+ // so queries immediately return new partition data after refresh.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ // insert data into partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ // verify initial lookup returns partition '1' data
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert data into a new partition '2', which will trigger sync partition refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception {
+ // This test verifies asynchronous partition refresh:
+ // when max_pt() changes, the lookup table is refreshed in a background thread,
+ // old partition data continues serving queries until the new partition is fully loaded.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ // insert data into partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ // verify initial lookup returns partition '1' data
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert data into a new partition '2', which will trigger async partition refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ // trigger a lookup to check async completion and switch to new partition
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ // insert another new partition '3' and verify switch again
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode)
+ throws Exception {
+ // Verify that during async refresh, queries still return old partition data
+ // until the new partition is fully loaded and switched.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+
+ // immediately query before async refresh completes — should still return old partition data
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // old partition data (100, 200) should still be served
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // now wait for async refresh to complete and trigger switch
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // after switch, new partition data should be returned
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMode mode)
+ throws Exception {
+ // Verify that incremental data updates in the old partition are visible
+ // before async partition switch happens.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // update data in the current partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)");
+ Thread.sleep(500); // wait for incremental refresh
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250));
+
+ // now insert new partition '2' to trigger async refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception {
+ // Verify async partition refresh works correctly with multi-level partition keys.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+ + "'bucket' = '1', "
+ + "'scan.partitions' = 'pt1=max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2024', 1, 1, 100), ('2024', 1, 2, 200), "
+ + "('2024', 2, 1, 300), ('2024', 2, 2, 400)");
+
+ String query =
+ "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2024", 1, 1, 100),
+ Row.of("2024", 1, 2, 200),
+ Row.of("2024", 2, 1, 300),
+ Row.of("2024", 2, 2, 400));
+
+ // insert new max partition '2025' with sub-partitions
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), "
+ + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2025", 1, 1, 1000),
+ Row.of("2025", 1, 2, 2000),
+ Row.of("2025", 2, 1, 3000),
+ Row.of("2025", 2, 2, 4000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithOverwrite() throws Exception {
+ // Verify async partition refresh works correctly when a new max partition
+ // is created via INSERT OVERWRITE.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ // overwrite current max partition with new data
+ sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250));
+
+ // overwrite to create a new max partition
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), Row.of(3, 3000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception {
+ // Verify async partition refresh works correctly with max_two_pt() strategy.
+ sql(
+ "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_two_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ // insert data into partitions '1' and '2'
+ sql(
+ "INSERT INTO TWO_PT_DIM VALUES "
+ + "('1', 1, 100), ('1', 2, 200), "
+ + "('2', 1, 300), ('2', 2, 400)");
+
+ String query =
+ "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("1", 1, 100),
+ Row.of("1", 2, 200),
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400));
+
+ // insert new partition '3', now max_two_pt should be '2' and '3'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ // should now see data from partitions '2' and '3'
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400),
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000));
+
+ // insert another partition '4', max_two_pt should be '3' and '4'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000),
+ Row.of("4", 1, 10000),
+ Row.of("4", 2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws Exception {
+ // Verify async partition refresh works correctly with non-primary-key append tables.
+ sql(
+ "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List result = iterator.collect(3);
+ // non-pk table may return multiple matches
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), Row.of(2, 200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), Row.of(2, 2000));
+
+ iterator.close();
+ }
}