Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,24 @@
<td>Integer</td>
<td>Level threshold of lookup to generate remote lookup files. Level files below this threshold will not generate remote lookup files.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.<br />Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand Down Expand Up @@ -939,24 +957,6 @@
<td>Integer</td>
<td>To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.<br />Note: when 'manifest-sort.enabled' is true, this minimum-count gate is only applied to the trailing sub-segment of a section that exceeds 'manifest-sort.max-rewrite-size'. Small under-budget sections are sorted and rewritten directly, so two small manifest files may be merged into one even when their count is below this threshold and full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.<br />Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
<td style="word-wrap: break-word;">8 mb</td>
Expand Down Expand Up @@ -1662,6 +1662,12 @@
<td>Boolean</td>
<td>Whether to process distributed vector search.</td>
</tr>
<tr>
<td><h5>vector-search.lateral-join.batch-size</h5></td>
<td style="word-wrap: break-word;">256</td>
<td>Integer</td>
<td>The batch size for lateral vector search. Each batch executes vector topK search and table lookup for multiple query vectors.</td>
</tr>
<tr>
<td><h5>vector.file.format</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -1728,12 +1734,6 @@
<td>Boolean</td>
<td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
</tr>
<tr>
<td><h5>write.sequence-number-init-mode</h5></td>
<td style="word-wrap: break-word;">scan</td>
<td><p>Enum</p></td>
<td>Specify how to initialize the next sequence number for primary key table writers.<br /><br />Possible values:<ul><li>"scan": initialize by scanning existing file metadata.</li><li>"snapshot": initialize from the maximum sequence number recorded in snapshot properties, which can avoid scanning existing file metadata in write-only mode.</li></ul></td>
</tr>
<tr>
<td><h5>write.batch-memory</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
Expand All @@ -1746,6 +1746,12 @@
<td>Integer</td>
<td>Write batch size for any file format if it supports.</td>
</tr>
<tr>
<td><h5>write.sequence-number-init-mode</h5></td>
<td style="word-wrap: break-word;">scan</td>
<td><p>Enum</p></td>
<td>Specify how to initialize the next sequence number for primary key table writers.<br /><br />Possible values:<ul><li>"scan": initialize by scanning existing file metadata.</li><li>"snapshot": initialize from the maximum sequence number recorded in snapshot properties, which can avoid scanning existing file metadata in write-only mode.</li></ul></td>
</tr>
<tr>
<td><h5>zorder.var-length-contribution</h5></td>
<td style="word-wrap: break-word;">8</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2627,6 +2627,14 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Whether to process distributed vector search.");

public static final ConfigOption<Integer> VECTOR_SEARCH_LATERAL_JOIN_BATCH_SIZE =
key("vector-search.lateral-join.batch-size")
.intType()
.defaultValue(256)
.withDescription(
"The batch size for lateral vector search. Each batch executes vector "
+ "topK search and table lookup for multiple query vectors.");

@Immutable
public static final ConfigOption<Boolean> PK_CLUSTERING_OVERRIDE =
key("pk-clustering-override")
Expand Down Expand Up @@ -4120,6 +4128,10 @@ public boolean vectorSearchDistributeEnabled() {
return options.get(VECTOR_SEARCH_DISTRIBUTE_ENABLED);
}

public int vectorSearchLateralJoinBatchSize() {
return options.get(VECTOR_SEARCH_LATERAL_JOIN_BATCH_SIZE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.catalyst.plans.logical.{PaimonTableValuedFunctions, PaimonTableValueFunction}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.LateralSubquery
import org.apache.spark.sql.catalyst.plans.logical.LateralJoin
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -32,6 +34,12 @@ case class PaimonIncompatibleResolutionRules(session: SparkSession) extends Rule
case func: PaimonTableValueFunction if func.args.forall(_.resolved) =>
PaimonTableValuedFunctions.resolvePaimonTableValuedFunction(session, func)

case LateralJoin(left, lateralSubquery: LateralSubquery, joinType, condition)
if left.resolved && lateralSubquery.plan.resolved =>
PaimonTableValuedFunctions
.resolveLateralVectorSearch(left, lateralSubquery.plan, joinType, condition)
.getOrElse(plan)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.optimizer

import org.apache.paimon.spark.catalyst.plans.logical.LateralVectorSearch

import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

/** Pushes filters on the query side below lateral vector search. */
object PushDownLateralVectorSearchFilter extends Rule[LogicalPlan] with PredicateHelper {

override def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case filter @ Filter(condition, lvs: LateralVectorSearch) =>
val predicates = splitConjunctivePredicates(condition)
val (pushDownToLeft, otherPredicates) = predicates.partition {
predicate => predicate.deterministic && predicate.references.subsetOf(lvs.child.outputSet)
}
val (pushDownToSearch, stayUp) = otherPredicates.partition {
predicate =>
predicate.deterministic &&
predicate.references.nonEmpty &&
predicate.references.subsetOf(lvs.searchFilterOutputSet)
}

if (pushDownToLeft.isEmpty && pushDownToSearch.isEmpty) {
filter
} else {
val lvsWithPushedLeft = if (pushDownToLeft.isEmpty) {
lvs
} else {
lvs.copy(left = Filter(buildBalancedPredicate(pushDownToLeft, And), lvs.child))
}
val newLateralVectorSearch = if (pushDownToSearch.isEmpty) {
lvsWithPushedLeft
} else {
lvsWithPushedLeft.copy(
searchFilters = lvsWithPushedLeft.searchFilters ++ pushDownToSearch)
}
if (stayUp.isEmpty) {
newLateralVectorSearch
} else {
Filter(buildBalancedPredicate(stayUp, And), newLateralVectorSearch)
}
}
}
}
Loading
Loading