diff --git a/.github/workflows/utitcase-flink-1.x-common.yml b/.github/workflows/utitcase-flink-1.x-common.yml index 6825d8e7a369..f22b56014fe2 100644 --- a/.github/workflows/utitcase-flink-1.x-common.yml +++ b/.github/workflows/utitcase-flink-1.x-common.yml @@ -38,7 +38,7 @@ concurrency: jobs: build_test: runs-on: ubuntu-latest - timeout-minutes: 60 + timeout-minutes: 100 steps: - name: Checkout code diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 1003aff77cfb..9cde0eefe38a 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -80,6 +80,12 @@ Duration Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition. + +
lookup.dynamic-partition.refresh.async
+ false + Boolean + Whether to refresh dynamic partition lookup table asynchronously. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded. +
lookup.refresh.async
false @@ -357,4 +363,4 @@ Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. - + \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 8febafc5e3c5..bfb4b7b8d1e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -276,6 +276,17 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to refresh lookup table in an async thread."); + public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC = + ConfigOptions.key("lookup.dynamic-partition.refresh.async") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to refresh dynamic partition lookup table asynchronously. " + + "When enabled, partition changes will be loaded in a background thread " + + "while the old partition data continues serving queries. " + + "When disabled (default), partition refresh is synchronous and blocks queries " + + "until the new partition data is fully loaded."); + public static final ConfigOption LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT = ConfigOptions.key("lookup.refresh.async.pending-snapshot-count") .intType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 5b71664ca6a9..4cb1474e8009 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -68,6 +68,7 @@ import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; @@ -95,6 +96,10 @@ public class FileStoreLookupFunction implements Serializable, Closeable { private transient File path; private transient LookupTable lookupTable; + // partition refresh + @Nullable private transient PartitionRefresher partitionRefresher; + @Nullable private transient List scanPartitions; + // interval of refreshing lookup table private transient Duration refreshInterval; // timestamp when refreshing lookup table @@ -235,6 +240,7 @@ private void open() throws Exception { if (!partitions.isEmpty()) { lookupTable.specifyPartitions( partitions, partitionLoader.createSpecificPartFilter()); + this.scanPartitions = partitions; } } @@ -242,6 +248,14 @@ private void open() throws Exception { lookupTable.specifyCacheRowFilter(cacheRowFilter); } lookupTable.open(); + + // Initialize partition refresher for FullCacheLookupTable + if (lookupTable instanceof FullCacheLookupTable) { + this.partitionRefresher = + new PartitionRefresher( + options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC), table.name()); + this.partitionRefresher.init(); + } } @Nullable @@ -271,13 +285,14 @@ public Collection lookup(RowData keyRow) { if (partitionLoader == null) { return lookupInternal(key); } - - if (partitionLoader.partitions().isEmpty()) { + List partitions = + scanPartitions == null ? partitionLoader.partitions() : scanPartitions; + if (partitions.isEmpty()) { return Collections.emptyList(); } List rows = new ArrayList<>(); - for (BinaryRow partition : partitionLoader.partitions()) { + for (BinaryRow partition : partitions) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; @@ -324,7 +339,18 @@ void tryRefresh() throws Exception { return; } - // 2. refresh dynamic partition + // 2. check if async partition refresh has completed, and switch if so + if (partitionRefresher != null) { + LookupTable switchedTable = partitionRefresher.checkPartitionRefreshCompletion(); + if (switchedTable != null) { + lookupTable.close(); + lookupTable = switchedTable; + path = ((FullCacheLookupTable) switchedTable).context.tempPath; + scanPartitions = partitionLoader.partitions(); + } + } + + // 3. refresh dynamic partition if (partitionLoader != null) { boolean partitionChanged = partitionLoader.checkRefresh(); List partitions = partitionLoader.partitions(); @@ -334,18 +360,20 @@ void tryRefresh() throws Exception { } if (partitionChanged) { - // reopen with latest partition - lookupTable.specifyPartitions( - partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); - lookupTable.close(); - lookupTable.open(); + if (partitionRefresher != null) { + partitionRefresher.startPartitionRefresh( + partitions, + partitionLoader.createSpecificPartFilter(), + lookupTable, + ((FullCacheLookupTable) lookupTable).context, + cacheRowFilter); + } nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); - // no need to refresh the lookup table because it is reopened return; } } - // 3. refresh lookup table + // 4. refresh lookup table if (shouldRefreshLookupTable()) { // Check if we should do full load (close and reopen table) instead of incremental // refresh @@ -415,6 +443,10 @@ long nextBlacklistCheckTime() { @Override public void close() throws IOException { + if (partitionRefresher != null) { + partitionRefresher.close(); + } + if (lookupTable != null) { lookupTable.close(); lookupTable = null; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 81af38ea8ff2..6b02084d8ada 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -414,5 +414,16 @@ public Context copy(int[] newProjection) { joinKey, requiredCachedBucketIds); } + + public Context copy(File newTempPath) { + return new Context( + table.wrapped(), + projection, + tablePredicate, + projectedPredicate, + newTempPath, + joinKey, + requiredCachedBucketIds); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java new file mode 100644 index 000000000000..bc0f43a57174 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.ExecutorUtils; +import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.Filter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS; + +/** Manages partition refresh logic for {@link FullCacheLookupTable}. */ +public class PartitionRefresher implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionRefresher.class); + + private final boolean partitionRefreshAsync; + private final String tableName; + + @Nullable private ExecutorService partitionRefreshExecutor; + private AtomicReference pendingLookupTable; + private AtomicReference partitionRefreshException; + + public PartitionRefresher(boolean partitionRefreshAsync, String tableName) { + this.partitionRefreshAsync = partitionRefreshAsync; + this.tableName = tableName; + } + + /** Initialize partition refresh resources. Should be called during table initialization. */ + public void init() { + if (!partitionRefreshAsync) { + return; + } + this.pendingLookupTable = new AtomicReference<>(null); + this.partitionRefreshException = new AtomicReference<>(null); + this.partitionRefreshExecutor = + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh-partition", + Thread.currentThread().getName()))); + } + + /** + * Start partition refresh. Chooses sync or async mode based on configuration. + * + * @param newPartitions the new partitions to refresh to + * @param partitionFilter the partition filter for the new partitions + * @param lookupTable the current lookup table to refresh + * @param context the context of the current lookup table + * @param cacheRowFilter the cache row filter, may be null + */ + public void startPartitionRefresh( + List newPartitions, + @Nullable Predicate partitionFilter, + LookupTable lookupTable, + FullCacheLookupTable.Context context, + @Nullable Filter cacheRowFilter) + throws Exception { + if (partitionRefreshAsync) { + asyncPartitionRefresh( + newPartitions, partitionFilter, lookupTable, context, cacheRowFilter); + } else { + syncPartitionRefresh(newPartitions, partitionFilter, lookupTable); + } + } + + private void syncPartitionRefresh( + List newPartitions, + @Nullable Predicate partitionFilter, + LookupTable lookupTable) + throws Exception { + LOG.info( + "Synchronously refreshing partition for table {}, new partitions detected.", + tableName); + lookupTable.close(); + lookupTable.specifyPartitions(newPartitions, partitionFilter); + lookupTable.open(); + LOG.info("Synchronous partition refresh completed for table {}.", tableName); + } + + private void asyncPartitionRefresh( + List newPartitions, + @Nullable Predicate partitionFilter, + LookupTable lookupTable, + FullCacheLookupTable.Context context, + @Nullable Filter cacheRowFilter) { + + LOG.info( + "Starting async partition refresh for table {}, new partitions detected.", + tableName); + + partitionRefreshExecutor.submit( + () -> { + File newPath = null; + try { + newPath = + new File( + context.tempPath.getParent(), + "lookup-" + UUID.randomUUID()); + if (!newPath.mkdirs()) { + throw new RuntimeException("Failed to create dir: " + newPath); + } + LookupTable newTable = copyWithNewPath(newPath, context, cacheRowFilter); + newTable.specifyPartitions(newPartitions, partitionFilter); + newTable.open(); + + pendingLookupTable.set(newTable); + LOG.info("Async partition refresh completed for table {}.", tableName); + } catch (Exception e) { + LOG.error("Async partition refresh failed for table {}.", tableName, e); + partitionRefreshException.set(e); + if (newPath != null) { + FileIOUtils.deleteDirectoryQuietly(newPath); + } + } + }); + } + + /** + * Create a new LookupTable instance with the same configuration but a different temp path. + * + * @param newPath the new temp path + * @param context the context of the current lookup table + * @param cacheRowFilter the cache row filter, may be null + * @return a new LookupTable instance (not yet opened) + */ + public LookupTable copyWithNewPath( + File newPath, + FullCacheLookupTable.Context context, + @Nullable Filter cacheRowFilter) { + FullCacheLookupTable.Context newContext = context.copy(newPath); + Options options = Options.fromMap(context.table.options()); + FullCacheLookupTable newTable = + FullCacheLookupTable.create(newContext, options.get(LOOKUP_CACHE_ROWS)); + if (cacheRowFilter != null) { + newTable.specifyCacheRowFilter(cacheRowFilter); + } + return newTable; + } + + /** + * Check if an async partition refresh has completed. + * + * @return the new lookup table if ready, or null if no switch is needed + */ + @Nullable + public LookupTable checkPartitionRefreshCompletion() throws Exception { + if (!partitionRefreshAsync) { + return null; + } + + Exception asyncException = partitionRefreshException.getAndSet(null); + if (asyncException != null) { + LOG.error( + "Async partition refresh failed for table {}, will stop running.", + tableName, + asyncException); + throw asyncException; + } + + LookupTable newTable = pendingLookupTable.getAndSet(null); + if (newTable == null) { + return null; + } + + LOG.info("Switched to new lookup table for table {} with new partitions.", tableName); + return newTable; + } + + /** Close partition refresh resources. */ + @Override + public void close() throws IOException { + if (partitionRefreshExecutor != null) { + ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, partitionRefreshExecutor); + } + if (pendingLookupTable != null) { + LookupTable pending = pendingLookupTable.getAndSet(null); + if (pending != null) { + pending.close(); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 408d74ea0cd9..bda4bcc567d0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1250,4 +1250,408 @@ public void testFallbackCacheMode() throws Exception { iterator.close(); } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL"}) + public void testSyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies synchronous partition refresh (default mode): + // when max_pt() changes, the lookup table is refreshed synchronously, + // so queries immediately return new partition data after refresh. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert data into a new partition '2', which will trigger sync partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefresh(LookupCacheMode mode) throws Exception { + // This test verifies asynchronous partition refresh: + // when max_pt() changes, the lookup table is refreshed in a background thread, + // old partition data continues serving queries until the new partition is fully loaded. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + // insert data into partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // verify initial lookup returns partition '1' data + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert data into a new partition '2', which will trigger async partition refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + Thread.sleep(500); // wait for async refresh to complete + // trigger a lookup to check async completion and switch to new partition + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + // insert another new partition '3' and verify switch again + sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 20000)"); + Thread.sleep(500); // wait for async refresh to complete + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), Row.of(2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode) + throws Exception { + // Verify that during async refresh, queries still return old partition data + // until the new partition is fully loaded and switched. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + + // immediately query before async refresh completes — should still return old partition data + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // old partition data (100, 200) should still be served + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // now wait for async refresh to complete and trigger switch + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + // after switch, new partition data should be returned + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithDataUpdateInOldPartition(LookupCacheMode mode) + throws Exception { + // Verify that incremental data updates in the old partition are visible + // before async partition switch happens. + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // update data in the current partition '1' + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 150), ('1', 2, 250)"); + Thread.sleep(500); // wait for incremental refresh + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // now insert new partition '2' to trigger async refresh + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(2); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws Exception { + // Verify async partition refresh works correctly with multi-level partition keys. + sql( + "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt1`, `pt2`) WITH (" + + "'bucket' = '1', " + + "'scan.partitions' = 'pt1=max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2024', 1, 1, 100), ('2024', 1, 2, 200), " + + "('2024', 2, 1, 300), ('2024', 2, 2, 400)"); + + String query = + "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2024", 1, 1, 100), + Row.of("2024", 1, 2, 200), + Row.of("2024", 2, 1, 300), + Row.of("2024", 2, 2, 400)); + + // insert new max partition '2025' with sub-partitions + sql( + "INSERT INTO PARTITIONED_DIM VALUES " + + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), " + + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2025", 1, 1, 1000), + Row.of("2025", 1, 2, 2000), + Row.of("2025", 2, 1, 3000), + Row.of("2025", 2, 2, 4000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithOverwrite() throws Exception { + // Verify async partition refresh works correctly when a new max partition + // is created via INSERT OVERWRITE. + sql( + "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200)); + + // overwrite current max partition with new data + sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 150), (2, 250)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 250)); + + // overwrite to create a new max partition + sql( + "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES (1, 1000), (2, 2000), (3, 3000)"); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2), (3)"); + iterator.collect(3); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2), (3)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), Row.of(3, 3000)); + + iterator.close(); + } + + @Test + public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception { + // Verify async partition refresh works correctly with max_two_pt() strategy. + sql( + "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '1', " + + "'lookup.dynamic-partition' = 'max_two_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + LookupCacheMode.FULL); + + // insert data into partitions '1' and '2' + sql( + "INSERT INTO TWO_PT_DIM VALUES " + + "('1', 1, 100), ('1', 2, 200), " + + "('2', 1, 300), ('2', 2, 400)"); + + String query = + "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("1", 1, 100), + Row.of("1", 2, 200), + Row.of("2", 1, 300), + Row.of("2", 2, 400)); + + // insert new partition '3', now max_two_pt should be '2' and '3' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + // should now see data from partitions '2' and '3' + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("2", 1, 300), + Row.of("2", 2, 400), + Row.of("3", 1, 1000), + Row.of("3", 2, 2000)); + + // insert another partition '4', max_two_pt should be '3' and '4' + sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 20000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(4); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("3", 1, 1000), + Row.of("3", 2, 2000), + Row.of("4", 1, 10000), + Row.of("4", 2, 20000)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource( + value = LookupCacheMode.class, + names = {"FULL", "MEMORY"}) + public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) throws Exception { + // Verify async partition refresh works correctly with non-primary-key append tables. + sql( + "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)" + + "PARTITIONED BY (`pt`) WITH (" + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.dynamic-partition.refresh.async' = 'true', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 2, 200)"); + + String query = + "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM " + + "for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2)"); + List result = iterator.collect(3); + // non-pk table may return multiple matches + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), Row.of(2, 200)); + + // insert new partition '2' to trigger async refresh + sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), ('2', 2, 2000)"); + sql("INSERT INTO T VALUES (1), (2)"); + iterator.collect(3); + Thread.sleep(500); + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), Row.of(2, 2000)); + + iterator.close(); + } }