Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class BeamCalciteTable extends AbstractQueryableTable
private final Map<String, String> pipelineOptionsMap;
private @Nullable PipelineOptions pipelineOptions;

BeamCalciteTable(
public BeamCalciteTable(
BeamSqlTable beamTable,
Map<String, String> pipelineOptionsMap,
@Nullable PipelineOptions pipelineOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder;
Comment thread
damccorm marked this conversation as resolved.
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -58,6 +61,7 @@
*/
@Internal
public class BeamSqlEnv {

JdbcConnection connection;
QueryPlanner planner;

Expand Down Expand Up @@ -116,6 +120,42 @@ public BeamRelNode parseQuery(String query, QueryParameters queryParameters)
return planner.convertToBeamRel(query, queryParameters);
}

public QueryPlanner getPlanner() {
return planner;
}

public RelBuilder getRelBuilder() {
return planner.getRelBuilder();
}

public BeamRelNode convertToBeamRel(RelNode relNode) throws SqlConversionException {
return convertToBeamRel(relNode, QueryParameters.ofNone());
}

public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters)
throws SqlConversionException {
return planner.convertToBeamRel(relNode, queryParameters);
}

public RelNode parseLogicalPlan(String query) throws ParseException, SqlConversionException {
return parseLogicalPlan(query, QueryParameters.ofNone());
}

public RelNode parseLogicalPlan(String query, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
return planner.parseToRel(query, queryParameters);
}

public void registerSchemaFunction(String name, Function function) {
java.util.Objects.requireNonNull(name, "name cannot be null");
java.util.Objects.requireNonNull(function, "function cannot be null");
connection.getCurrentSchemaPlus().add(name, function);
}
Comment thread
damccorm marked this conversation as resolved.

public SqlOperatorTable getOperatorTable() {
return planner.getOperatorTable();
}

public boolean isDdl(String sqlStatement) throws ParseException {
return planner.parse(sqlStatement).getKind().belongsTo(SqlKind.DDL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
Expand All @@ -41,6 +41,7 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelRoot;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.BuiltInMetadata;
Expand All @@ -64,16 +65,21 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParser;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformance;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.FrameworkConfig;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Frameworks;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Planner;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Program;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelConversionException;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.ValidationException;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.BuiltInMethod;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -90,11 +96,27 @@ public class CalciteQueryPlanner implements QueryPlanner {

private final Planner planner;
private final JdbcConnection connection;
private final FrameworkConfig config;

/** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */
public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> ruleSets) {
this.connection = connection;
this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
this.config = defaultConfig(connection, ruleSets);
this.planner = Frameworks.getPlanner(config);
}

/**
* Returns a RelBuilder instance configured with the same Calcite components used by this
* QueryPlanner.
*/
@Override
public RelBuilder getRelBuilder() {
return RelBuilder.create(config);
}

@Override
public SqlOperatorTable getOperatorTable() {
return config.getOperatorTable();
}

public static final Factory FACTORY =
Expand All @@ -120,12 +142,20 @@ private void loadBuiltinFunctions(JdbcConnection jdbcConnection) {

public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) {
final CalciteConnectionConfig config = connection.config();
// Resolve the parser conformance. Calcite's Avatica JDBC connect path silently drops the
// {@code conformance} connection property (it is not in the driver's registered property set),
// so {@code config.conformance()} is always DEFAULT here even when callers set it via
// {@code BeamSqlPipelineOptions.calciteConnectionProperties}. We therefore read that map
// directly from the connection's pipeline options and let it override. This keeps the behavior
// opt-in: with no {@code conformance} property the connection's own (DEFAULT) value is used, so
// existing Beam SQL behavior is unchanged.
final SqlConformance conformance = resolveConformance(connection, config);
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
.setQuotedCasing(config.quotedCasing())
.setUnquotedCasing(config.unquotedCasing())
.setQuoting(config.quoting())
.setConformance(config.conformance())
.setConformance(conformance)
.setCaseSensitive(config.caseSensitive());
final SqlParserImplFactory parserFactory =
config.parserFactory(SqlParserImplFactory.class, null);
Expand Down Expand Up @@ -163,6 +193,43 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleS
.build();
}

/**
* Resolves the {@link SqlConformance} for the parser. Prefers an explicit {@code conformance}
* entry in {@link BeamSqlPipelineOptions#getCalciteConnectionProperties()} (looked up
* case-insensitively, value matched against {@link SqlConformanceEnum}); otherwise falls back to
* the connection's own conformance. This is the bridge for {@code conformance=BABEL}, which the
* Avatica JDBC connect path drops, enabling Spark-SQL spellings (e.g. the {@code !=} operator)
* the default conformance rejects. Returns the connection default on any unrecognized value.
*/
private static SqlConformance resolveConformance(
JdbcConnection connection, CalciteConnectionConfig config) {
PipelineOptions options = connection.getPipelineOptions();
if (options == null) {
return config.conformance();
}
BeamSqlPipelineOptions sqlOptions = options.as(BeamSqlPipelineOptions.class);
Map<String, String> props = sqlOptions.getCalciteConnectionProperties();
if (props == null) {
return config.conformance();
}
String value = null;
for (Map.Entry<String, String> e : props.entrySet()) {
if ("conformance".equalsIgnoreCase(e.getKey())) {
value = e.getValue();
break;
}
}
if (value == null) {
return config.conformance();
}
try {
return SqlConformanceEnum.valueOf(value.trim().toUpperCase());
} catch (IllegalArgumentException ex) {
LOG.warn("Unrecognized calcite conformance '{}', using {}", value, config.conformance());
return config.conformance();
}
}

/** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
@Override
public SqlNode parse(String sqlStatement) throws ParseException {
Expand All @@ -179,20 +246,18 @@ public SqlNode parse(String sqlStatement) throws ParseException {

/**
* It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that
* query parameters are not yet supported.
* query parameters are now supported for positional parameters.
*/
@Override
public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
Preconditions.checkArgument(
queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL,
"Beam SQL Calcite dialect only supports positional query parameters.");
BeamRelNode beamRelNode;
"Beam SQL Calcite dialect only supports positional query parameters or no parameters.");
try {
SqlNode parsed = planner.parse(sqlStatement);
TableResolutionUtils.setupCustomTableResolution(connection, parsed);
SqlNode validated = planner.validate(parsed);
LOG.info("SQL:\n{}", validated);

// root of original logical plan
RelRoot root = planner.rel(validated);
Expand All @@ -203,13 +268,92 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters));
}
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
RelTraitSet desiredTraits =
relNode
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE)
.replace(root.collation)
.simplify();
return convertToBeamRel(relNode, root.collation);
} catch (RelConversionException | CannotPlanException e) {
planner.close();
throw new SqlConversionException(
String.format("Unable to convert query %s", sqlStatement), e);
} catch (SqlParseException | ValidationException e) {
planner.close();
throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
}
Comment thread
damccorm marked this conversation as resolved.
}

private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean inputsChanged = false;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
inputsChanged = true;
}
}
return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}
Comment thread
damccorm marked this conversation as resolved.
Comment on lines +282 to +295

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In bindParameters, calling newRel.copy(...) when only the row expressions of newRel changed (but none of its child inputs changed) is redundant because newRel already contains the updated expressions and the original inputs. We should only perform a copy if the child inputs actually changed (inputsChanged is true). This avoids unnecessary node copies during parameter binding.

Suggested change
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean changed = newRel != rel;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
changed = true;
}
}
return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean inputsChanged = false;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
inputsChanged = true;
}
}
return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}


@Override
public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
Preconditions.checkNotNull(sqlStatement, "sqlStatement cannot be null");
Preconditions.checkArgument(
queryParameters.getKind() == Kind.NONE,
"Query parameters are not supported during logical plan parsing; please provide them when converting to a Beam physical plan instead.");
boolean success = false;
Comment thread
damccorm marked this conversation as resolved.
try {
SqlNode parsed = planner.parse(sqlStatement);
TableResolutionUtils.setupCustomTableResolution(connection, parsed);
SqlNode validated = planner.validate(parsed);
// root of original logical plan
RelRoot root = planner.rel(validated);
success = true;
return root.rel;
} catch (RelConversionException e) {
throw new SqlConversionException(
String.format("Unable to convert query %s", sqlStatement), e);
} catch (SqlParseException | ValidationException e) {
throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
} finally {
if (!success) {
planner.close();
}
}
}

@Override
public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters)
throws SqlConversionException {
Preconditions.checkNotNull(relNode, "relNode cannot be null");
Preconditions.checkNotNull(queryParameters, "queryParameters cannot be null");
boolean success = false;
Comment thread
damccorm marked this conversation as resolved.
try {
if (queryParameters.getKind() == Kind.POSITIONAL) {
relNode =
bindParameters(
relNode,
new ParameterBinder(relNode.getCluster().getRexBuilder(), queryParameters));
}
BeamRelNode result = convertToBeamRel(relNode, (RelCollation) null);
success = true;
return result;
} finally {
if (!success) {
planner.close();
}
}
}
Comment thread
damccorm marked this conversation as resolved.

private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation)
throws SqlConversionException {
BeamRelNode beamRelNode;
try {
RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
if (collation != null) {
desiredTraits = desiredTraits.replace(collation);
}
desiredTraits = desiredTraits.simplify();
// beam physical plan
relNode
.getCluster()
Expand All @@ -224,28 +368,36 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
relNode.getCluster().invalidateMetadataQuery();
beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
} catch (RelConversionException | CannotPlanException e) {

if (config.getPrograms().isEmpty()) {
throw new SqlConversionException("No planning programs configured in FrameworkConfig.");
}
Program program = config.getPrograms().get(0);
RelNode optimizedNode =
program.run(
relNode.getCluster().getPlanner(),
relNode,
desiredTraits,
ImmutableList.of(),
ImmutableList.of());

if (!(optimizedNode instanceof BeamRelNode)) {
throw new SqlConversionException(
String.format(
"The optimizer was unable to produce a Beam physical plan. "
+ "Expected BeamRelNode, but got: %s",
optimizedNode.getClass().getName()));
}
beamRelNode = (BeamRelNode) optimizedNode;
} catch (CannotPlanException e) {
throw new SqlConversionException(
String.format("Unable to convert query %s", sqlStatement), e);
} catch (SqlParseException | ValidationException e) {
throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
String.format("Unable to convert relNode to Beam: %s", relNode), e);
} finally {
planner.close();
}
Comment on lines 395 to 397

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The RelMetadataQuery.THREAD_PROVIDERS is a ThreadLocal variable that is set at the beginning of convertToBeamRel but never cleared. In long-running environments or thread pools, failing to clear ThreadLocal variables can lead to classloader memory leaks because the metadata provider holds references to the RelOptCluster and other heavy objects. We should remove the thread-local value in the finally block.

Suggested change
} finally {
planner.close();
}
} finally {
RelMetadataQuery.THREAD_PROVIDERS.remove();
planner.close();
}

return beamRelNode;
}

private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
for (RelNode input : newRel.getInputs()) {
newInputs.add(bindParameters(input, binder));
}
return newRel.copy(newRel.getTraitSet(), newInputs);
}

// It needs to be public so that the generated code in Calcite can access it.
public static class NonCumulativeCostImpl
implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> {
Expand Down
Loading
Loading