-
Notifications
You must be signed in to change notification settings - Fork 4.6k
SQL API Extensions: Expose planning APIs and make classes public #38951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d8a5bfb
25c9c24
132a7bb
fb42b31
5939a99
097e41f
d57e276
d9b8938
3b002e8
6769387
15ceb1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -29,8 +29,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.options.PipelineOptions; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.config.CalciteConnectionConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -41,6 +41,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.prepare.CalciteCatalogReader; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelRoot; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.BuiltInMetadata; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -64,16 +65,21 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParser; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserImplFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.util.SqlOperatorTables; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformance; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformanceEnum; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.SqlToRelConverter; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.FrameworkConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Frameworks; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Planner; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Program; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelConversionException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.ValidationException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.BuiltInMethod; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -90,11 +96,27 @@ public class CalciteQueryPlanner implements QueryPlanner { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final Planner planner; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final JdbcConnection connection; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final FrameworkConfig config; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> ruleSets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.connection = connection; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.config = defaultConfig(connection, ruleSets); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.planner = Frameworks.getPlanner(config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Returns a RelBuilder instance configured with the same Calcite components used by this | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * QueryPlanner. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public RelBuilder getRelBuilder() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return RelBuilder.create(config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public SqlOperatorTable getOperatorTable() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return config.getOperatorTable(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static final Factory FACTORY = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -120,12 +142,20 @@ private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final CalciteConnectionConfig config = connection.config(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Resolve the parser conformance. Calcite's Avatica JDBC connect path silently drops the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // {@code conformance} connection property (it is not in the driver's registered property set), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // so {@code config.conformance()} is always DEFAULT here even when callers set it via | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // {@code BeamSqlPipelineOptions.calciteConnectionProperties}. We therefore read that map | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // directly from the connection's pipeline options and let it override. This keeps the behavior | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // opt-in: with no {@code conformance} property the connection's own (DEFAULT) value is used, so | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // existing Beam SQL behavior is unchanged. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final SqlConformance conformance = resolveConformance(connection, config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final SqlParser.ConfigBuilder parserConfig = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SqlParser.configBuilder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setQuotedCasing(config.quotedCasing()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setUnquotedCasing(config.unquotedCasing()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setQuoting(config.quoting()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setConformance(config.conformance()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setConformance(conformance) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .setCaseSensitive(config.caseSensitive()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final SqlParserImplFactory parserFactory = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| config.parserFactory(SqlParserImplFactory.class, null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -163,6 +193,43 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleS | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Resolves the {@link SqlConformance} for the parser. Prefers an explicit {@code conformance} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * entry in {@link BeamSqlPipelineOptions#getCalciteConnectionProperties()} (looked up | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * case-insensitively, value matched against {@link SqlConformanceEnum}); otherwise falls back to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the connection's own conformance. This is the bridge for {@code conformance=BABEL}, which the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Avatica JDBC connect path drops, enabling Spark-SQL spellings (e.g. the {@code !=} operator) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the default conformance rejects. Returns the connection default on any unrecognized value. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static SqlConformance resolveConformance( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| JdbcConnection connection, CalciteConnectionConfig config) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PipelineOptions options = connection.getPipelineOptions(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (options == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return config.conformance(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BeamSqlPipelineOptions sqlOptions = options.as(BeamSqlPipelineOptions.class); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Map<String, String> props = sqlOptions.getCalciteConnectionProperties(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (props == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return config.conformance(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String value = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (Map.Entry<String, String> e : props.entrySet()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ("conformance".equalsIgnoreCase(e.getKey())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| value = e.getValue(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (value == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return config.conformance(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return SqlConformanceEnum.valueOf(value.trim().toUpperCase()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (IllegalArgumentException ex) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOG.warn("Unrecognized calcite conformance '{}', using {}", value, config.conformance()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return config.conformance(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public SqlNode parse(String sqlStatement) throws ParseException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -179,20 +246,18 @@ public SqlNode parse(String sqlStatement) throws ParseException { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * query parameters are not yet supported. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * query parameters are now supported for positional parameters. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws ParseException, SqlConversionException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkArgument( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Beam SQL Calcite dialect only supports positional query parameters."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BeamRelNode beamRelNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Beam SQL Calcite dialect only supports positional query parameters or no parameters."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SqlNode parsed = planner.parse(sqlStatement); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TableResolutionUtils.setupCustomTableResolution(connection, parsed); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SqlNode validated = planner.validate(parsed); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOG.info("SQL:\n{}", validated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // root of original logical plan | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelRoot root = planner.rel(validated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -203,13 +268,92 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelTraitSet desiredTraits = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .getTraitSet() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .replace(BeamLogicalConvention.INSTANCE) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .replace(root.collation) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .simplify(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return convertToBeamRel(relNode, root.collation); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (RelConversionException | CannotPlanException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planner.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new SqlConversionException( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.format("Unable to convert query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (SqlParseException | ValidationException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planner.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
damccorm marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static RelNode bindParameters(RelNode rel, RexShuttle binder) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelNode newRel = rel.accept(binder); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| java.util.List<RelNode> inputs = newRel.getInputs(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean inputsChanged = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (RelNode input : inputs) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelNode newInput = bindParameters(input, binder); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| newInputs.add(newInput); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (newInput != input) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| inputsChanged = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
damccorm marked this conversation as resolved.
Comment on lines
+282
to
+295
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws ParseException, SqlConversionException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkNotNull(sqlStatement, "sqlStatement cannot be null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkArgument( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| queryParameters.getKind() == Kind.NONE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Query parameters are not supported during logical plan parsing; please provide them when converting to a Beam physical plan instead."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean success = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
damccorm marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SqlNode parsed = planner.parse(sqlStatement); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TableResolutionUtils.setupCustomTableResolution(connection, parsed); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SqlNode validated = planner.validate(parsed); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // root of original logical plan | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelRoot root = planner.rel(validated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| success = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return root.rel; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (RelConversionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new SqlConversionException( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.format("Unable to convert query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (SqlParseException | ValidationException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!success) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planner.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws SqlConversionException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkNotNull(relNode, "relNode cannot be null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkNotNull(queryParameters, "queryParameters cannot be null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean success = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
damccorm marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (queryParameters.getKind() == Kind.POSITIONAL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bindParameters( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new ParameterBinder(relNode.getCluster().getRexBuilder(), queryParameters)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BeamRelNode result = convertToBeamRel(relNode, (RelCollation) null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| success = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!success) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planner.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
damccorm marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws SqlConversionException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| BeamRelNode beamRelNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (collation != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| desiredTraits = desiredTraits.replace(collation); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| desiredTraits = desiredTraits.simplify(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // beam physical plan | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .getCluster() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -224,28 +368,36 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelMetadataQuery.THREAD_PROVIDERS.set( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode.getCluster().invalidateMetadataQuery(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (RelConversionException | CannotPlanException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (config.getPrograms().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new SqlConversionException("No planning programs configured in FrameworkConfig."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Program program = config.getPrograms().get(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelNode optimizedNode = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| program.run( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode.getCluster().getPlanner(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| relNode, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| desiredTraits, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ImmutableList.of(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ImmutableList.of()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!(optimizedNode instanceof BeamRelNode)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new SqlConversionException( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.format( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "The optimizer was unable to produce a Beam physical plan. " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + "Expected BeamRelNode, but got: %s", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| optimizedNode.getClass().getName())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| beamRelNode = (BeamRelNode) optimizedNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (CannotPlanException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new SqlConversionException( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.format("Unable to convert query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (SqlParseException | ValidationException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.format("Unable to convert relNode to Beam: %s", relNode), e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| planner.close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
395
to
397
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return beamRelNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static RelNode bindParameters(RelNode rel, RexShuttle binder) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RelNode newRel = rel.accept(binder); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| java.util.List<RelNode> newInputs = new java.util.ArrayList<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (RelNode input : newRel.getInputs()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| newInputs.add(bindParameters(input, binder)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return newRel.copy(newRel.getTraitSet(), newInputs); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // It needs to be public so that the generated code in Calcite can access it. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static class NonCumulativeCostImpl | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.