diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index 9f3ff6478ad6..c447b9b9624e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -53,7 +53,7 @@ public class BeamCalciteTable extends AbstractQueryableTable private final Map pipelineOptionsMap; private @Nullable PipelineOptions pipelineOptions; - BeamCalciteTable( + public BeamCalciteTable( BeamSqlTable beamTable, Map pipelineOptionsMap, @Nullable PipelineOptions pipelineOptions) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index d84783118bbd..cc37d84ef194 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -47,8 +47,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.checkerframework.checker.nullness.qual.Nullable; @@ -58,6 +61,7 @@ */ @Internal public class BeamSqlEnv { + JdbcConnection connection; QueryPlanner planner; @@ -116,6 +120,42 @@ public BeamRelNode parseQuery(String query, QueryParameters queryParameters) return planner.convertToBeamRel(query, queryParameters); } + public QueryPlanner getPlanner() { + return planner; + } + + public RelBuilder getRelBuilder() { + return planner.getRelBuilder(); + } + + public BeamRelNode convertToBeamRel(RelNode relNode) throws SqlConversionException { + return convertToBeamRel(relNode, QueryParameters.ofNone()); + } + + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) + throws SqlConversionException { + return planner.convertToBeamRel(relNode, queryParameters); + } + + public RelNode parseLogicalPlan(String query) throws ParseException, SqlConversionException { + return parseLogicalPlan(query, QueryParameters.ofNone()); + } + + public RelNode parseLogicalPlan(String query, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + return planner.parseToRel(query, queryParameters); + } + + public void registerSchemaFunction(String name, Function function) { + java.util.Objects.requireNonNull(name, "name cannot be null"); + java.util.Objects.requireNonNull(function, "function cannot be null"); + connection.getCurrentSchemaPlus().add(name, function); + } + + public SqlOperatorTable getOperatorTable() { + return planner.getOperatorTable(); + } + public boolean isDdl(String sqlStatement) throws ParseException { return planner.parse(sqlStatement).getKind().belongsTo(SqlKind.DDL); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index aa6f4d121871..01aaad06251f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -29,8 +29,8 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.config.CalciteConnectionConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; @@ -41,6 +41,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelRoot; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.BuiltInMetadata; @@ -64,16 +65,21 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParser; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformance; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.FrameworkConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Frameworks; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Planner; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Program; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelConversionException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.ValidationException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.BuiltInMethod; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,11 +96,27 @@ public class CalciteQueryPlanner implements QueryPlanner { private final Planner planner; private final JdbcConnection connection; + private final FrameworkConfig config; /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSets) { this.connection = connection; - this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); + this.config = defaultConfig(connection, ruleSets); + this.planner = Frameworks.getPlanner(config); + } + + /** + * Returns a RelBuilder instance configured with the same Calcite components used by this + * QueryPlanner. + */ + @Override + public RelBuilder getRelBuilder() { + return RelBuilder.create(config); + } + + @Override + public SqlOperatorTable getOperatorTable() { + return config.getOperatorTable(); } public static final Factory FACTORY = @@ -120,12 +142,20 @@ private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { public FrameworkConfig defaultConfig(JdbcConnection connection, Collection ruleSets) { final CalciteConnectionConfig config = connection.config(); + // Resolve the parser conformance. Calcite's Avatica JDBC connect path silently drops the + // {@code conformance} connection property (it is not in the driver's registered property set), + // so {@code config.conformance()} is always DEFAULT here even when callers set it via + // {@code BeamSqlPipelineOptions.calciteConnectionProperties}. We therefore read that map + // directly from the connection's pipeline options and let it override. This keeps the behavior + // opt-in: with no {@code conformance} property the connection's own (DEFAULT) value is used, so + // existing Beam SQL behavior is unchanged. + final SqlConformance conformance = resolveConformance(connection, config); final SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder() .setQuotedCasing(config.quotedCasing()) .setUnquotedCasing(config.unquotedCasing()) .setQuoting(config.quoting()) - .setConformance(config.conformance()) + .setConformance(conformance) .setCaseSensitive(config.caseSensitive()); final SqlParserImplFactory parserFactory = config.parserFactory(SqlParserImplFactory.class, null); @@ -163,6 +193,43 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection props = sqlOptions.getCalciteConnectionProperties(); + if (props == null) { + return config.conformance(); + } + String value = null; + for (Map.Entry e : props.entrySet()) { + if ("conformance".equalsIgnoreCase(e.getKey())) { + value = e.getValue(); + break; + } + } + if (value == null) { + return config.conformance(); + } + try { + return SqlConformanceEnum.valueOf(value.trim().toUpperCase()); + } catch (IllegalArgumentException ex) { + LOG.warn("Unrecognized calcite conformance '{}', using {}", value, config.conformance()); + return config.conformance(); + } + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ @Override public SqlNode parse(String sqlStatement) throws ParseException { @@ -179,20 +246,18 @@ public SqlNode parse(String sqlStatement) throws ParseException { /** * It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that - * query parameters are not yet supported. + * query parameters are now supported for positional parameters. */ @Override public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException { Preconditions.checkArgument( queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL, - "Beam SQL Calcite dialect only supports positional query parameters."); - BeamRelNode beamRelNode; + "Beam SQL Calcite dialect only supports positional query parameters or no parameters."); try { SqlNode parsed = planner.parse(sqlStatement); TableResolutionUtils.setupCustomTableResolution(connection, parsed); SqlNode validated = planner.validate(parsed); - LOG.info("SQL:\n{}", validated); // root of original logical plan RelRoot root = planner.rel(validated); @@ -203,13 +268,92 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa relNode, new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters)); } - LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); - RelTraitSet desiredTraits = - relNode - .getTraitSet() - .replace(BeamLogicalConvention.INSTANCE) - .replace(root.collation) - .simplify(); + return convertToBeamRel(relNode, root.collation); + } catch (RelConversionException | CannotPlanException e) { + planner.close(); + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + planner.close(); + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } + } + + private static RelNode bindParameters(RelNode rel, RexShuttle binder) { + RelNode newRel = rel.accept(binder); + java.util.List inputs = newRel.getInputs(); + java.util.List newInputs = new java.util.ArrayList<>(inputs.size()); + boolean inputsChanged = false; + for (RelNode input : inputs) { + RelNode newInput = bindParameters(input, binder); + newInputs.add(newInput); + if (newInput != input) { + inputsChanged = true; + } + } + return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; + } + + @Override + public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + Preconditions.checkNotNull(sqlStatement, "sqlStatement cannot be null"); + Preconditions.checkArgument( + queryParameters.getKind() == Kind.NONE, + "Query parameters are not supported during logical plan parsing; please provide them when converting to a Beam physical plan instead."); + boolean success = false; + try { + SqlNode parsed = planner.parse(sqlStatement); + TableResolutionUtils.setupCustomTableResolution(connection, parsed); + SqlNode validated = planner.validate(parsed); + // root of original logical plan + RelRoot root = planner.rel(validated); + success = true; + return root.rel; + } catch (RelConversionException e) { + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + if (!success) { + planner.close(); + } + } + } + + @Override + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) + throws SqlConversionException { + Preconditions.checkNotNull(relNode, "relNode cannot be null"); + Preconditions.checkNotNull(queryParameters, "queryParameters cannot be null"); + boolean success = false; + try { + if (queryParameters.getKind() == Kind.POSITIONAL) { + relNode = + bindParameters( + relNode, + new ParameterBinder(relNode.getCluster().getRexBuilder(), queryParameters)); + } + BeamRelNode result = convertToBeamRel(relNode, (RelCollation) null); + success = true; + return result; + } finally { + if (!success) { + planner.close(); + } + } + } + + private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) + throws SqlConversionException { + BeamRelNode beamRelNode; + try { + RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + if (collation != null) { + desiredTraits = desiredTraits.replace(collation); + } + desiredTraits = desiredTraits.simplify(); // beam physical plan relNode .getCluster() @@ -224,28 +368,36 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); relNode.getCluster().invalidateMetadataQuery(); - beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode); - LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); - } catch (RelConversionException | CannotPlanException e) { + + if (config.getPrograms().isEmpty()) { + throw new SqlConversionException("No planning programs configured in FrameworkConfig."); + } + Program program = config.getPrograms().get(0); + RelNode optimizedNode = + program.run( + relNode.getCluster().getPlanner(), + relNode, + desiredTraits, + ImmutableList.of(), + ImmutableList.of()); + + if (!(optimizedNode instanceof BeamRelNode)) { + throw new SqlConversionException( + String.format( + "The optimizer was unable to produce a Beam physical plan. " + + "Expected BeamRelNode, but got: %s", + optimizedNode.getClass().getName())); + } + beamRelNode = (BeamRelNode) optimizedNode; + } catch (CannotPlanException e) { throw new SqlConversionException( - String.format("Unable to convert query %s", sqlStatement), e); - } catch (SqlParseException | ValidationException e) { - throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + String.format("Unable to convert relNode to Beam: %s", relNode), e); } finally { planner.close(); } return beamRelNode; } - private static RelNode bindParameters(RelNode rel, RexShuttle binder) { - RelNode newRel = rel.accept(binder); - java.util.List newInputs = new java.util.ArrayList<>(); - for (RelNode input : newRel.getInputs()) { - newInputs.add(bindParameters(input, binder)); - } - return newRel.copy(newRel.getTraitSet(), newInputs); - } - // It needs to be public so that the generated code in Calcite can access it. public static class NonCumulativeCostImpl implements MetadataHandler { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java index 0f0f8970a3ab..843827db39f7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -39,9 +41,29 @@ public interface QueryPlanner { BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException; + /** Converts a logical {@link RelNode} tree into a physical {@link BeamRelNode} tree. */ + BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) + throws SqlConversionException; + + /** + * Parses and validates {@code sqlStatement} and returns the logical {@link RelNode} (standard + * Calcite convention), WITHOUT converting to Beam physical rels. This lets callers rewrite the + * logical plan (e.g. inject custom rels) before {@link #convertToBeamRel(RelNode, + * QueryParameters)}. + */ + default RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + throw new UnsupportedOperationException( + "parseToRel is not implemented by " + getClass().getName()); + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ SqlNode parse(String sqlStatement) throws ParseException; + RelBuilder getRelBuilder(); + + org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable getOperatorTable(); + @AutoOneOf(QueryParameters.Kind.class) abstract class QueryParameters { public enum Kind { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 3ddd78ab232b..ed12540c1c57 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -237,8 +237,7 @@ public PCollection expand(PCollection input) { } /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */ - @VisibleForTesting - static class RowToCsv extends PTransform, PCollection> + public static class RowToCsv extends PTransform, PCollection> implements Serializable { private CSVFormat csvFormat; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java index fdf18298b618..79990b15090e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.extensions.sql.impl.planner; +import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters; import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -70,4 +74,34 @@ public void testCumulativeCostMetaDataHandler() { root.getCluster().getMetadataQuery().getCumulativeCost(root) instanceof BeamCostModel); Assert.assertFalse(root.getCluster().getMetadataQuery().getCumulativeCost(root).isInfinite()); } + + @Test + public void testParseAndConvertHelpers() throws Exception { + String sql = "select * from medium_table"; + RelNode logicalPlan = env.parseLogicalPlan(sql); + Assert.assertNotNull(logicalPlan); + + BeamRelNode physicalPlan = env.convertToBeamRel(logicalPlan); + Assert.assertNotNull(physicalPlan); + Assert.assertTrue( + physicalPlan + .getCluster() + .getPlanner() + .getCost(physicalPlan, physicalPlan.getCluster().getMetadataQuery()) + instanceof BeamCostModel); + } + + @Test + public void testParseAndConvertHelpersWithParameters() throws Exception { + String sql = "select * from medium_table where id = ?"; + RelNode logicalPlan = env.parseLogicalPlan(sql); + Assert.assertNotNull(logicalPlan); + + QueryParameters params = QueryParameters.ofPositional(ImmutableList.of(3)); + BeamRelNode physicalPlan = env.convertToBeamRel(logicalPlan, params); + Assert.assertNotNull(physicalPlan); + + String explained = BeamSqlRelUtils.explainLazily(physicalPlan).toString(); + Assert.assertTrue(explained.contains("3")); + } }