From d8a5bfb1e4d63e2254e42958756fdc257b226f45 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 19:30:29 +0000 Subject: [PATCH 01/11] SQL API Extensions: Expose planning APIs and make classes public --- .../extensions/sql/impl/BeamCalciteTable.java | 2 +- .../sdk/extensions/sql/impl/BeamSqlEnv.java | 33 ++++ .../sql/impl/CalciteQueryPlanner.java | 183 +++++++++++++++--- .../sdk/extensions/sql/impl/QueryPlanner.java | 22 +++ .../meta/provider/text/TextTableProvider.java | 2 +- .../impl/planner/CalciteQueryPlannerTest.java | 17 ++ 6 files changed, 235 insertions(+), 24 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index 9f3ff6478ad6..c447b9b9624e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -53,7 +53,7 @@ public class BeamCalciteTable extends AbstractQueryableTable private final Map pipelineOptionsMap; private @Nullable PipelineOptions pipelineOptions; - BeamCalciteTable( + public BeamCalciteTable( BeamSqlTable beamTable, Map pipelineOptionsMap, @Nullable PipelineOptions pipelineOptions) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index d84783118bbd..7499c7a2189a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -47,10 +47,14 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptUtil; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Contains the metadata of tables/UDF functions, and exposes APIs to @@ -58,6 +62,8 @@ */ @Internal public class BeamSqlEnv { + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnv.class); + JdbcConnection connection; QueryPlanner planner; @@ -116,6 +122,31 @@ public BeamRelNode parseQuery(String query, QueryParameters queryParameters) return planner.convertToBeamRel(query, queryParameters); } + public QueryPlanner getPlanner() { + return planner; + } + + public RelBuilder getRelBuilder() { + return planner.getRelBuilder(); + } + + public BeamRelNode convertToBeamRel(RelNode relNode) { + return planner.convertToBeamRel(relNode, QueryParameters.ofNone()); + } + + public RelNode parseLogicalPlan(String query) throws ParseException { + return planner.parseToRel(query, QueryParameters.ofNone()); + } + + public void registerSchemaFunction(String name, Function function) { + connection.getCurrentSchemaPlus().add(name, function); + } + + public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable + getOperatorTable() { + return planner.getOperatorTable(); + } + public boolean isDdl(String sqlStatement) throws ParseException { return planner.parse(sqlStatement).getKind().belongsTo(SqlKind.DDL); } @@ -196,6 +227,7 @@ public BeamSqlEnvBuilder setCurrentSchema(String name) { /** Set the ruleSet used for query optimizer. */ public BeamSqlEnvBuilder setRuleSets(Collection ruleSets) { + LOG.info("Setting BeamSqlEnv rulesets to: {}", ruleSets); this.ruleSets = ruleSets; return this; } @@ -262,6 +294,7 @@ public BeamSqlEnv build() { configureSchemas(jdbcConnection); + LOG.info("Instantiating planner with ruleSets: {}", ruleSets); QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets); // The planner may choose to add its own builtin functions to the schema, so load user-defined diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index aa6f4d121871..19a00f624494 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -31,12 +31,15 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.config.CalciteConnectionConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.Contexts; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.ConventionTraitDef; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCluster; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCost; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner.CannotPlanException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet; @@ -52,6 +55,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode; @@ -64,10 +68,14 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParser; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformance; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.FrameworkConfig; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Frameworks; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Planner; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.Program; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelConversionException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.ValidationException; @@ -90,11 +98,52 @@ public class CalciteQueryPlanner implements QueryPlanner { private final Planner planner; private final JdbcConnection connection; + private final FrameworkConfig config; + + // Cannot be final because of wacky initialization logic + private RelOptCluster relOptCluster; + private CalciteCatalogReader catalogReader; + private RelDataTypeFactory typeFactory; + private RelOptPlanner calcitePlanner; /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSets) { this.connection = connection; - this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); + this.config = defaultConfig(connection, ruleSets); + this.planner = Frameworks.getPlanner(config); + + Frameworks.withPlanner( + (cluster, relOptSchema, rootSchema) -> { + // CAPTURE THE COMPONENTS HERE + this.relOptCluster = cluster; + this.catalogReader = (CalciteCatalogReader) relOptSchema; + this.typeFactory = cluster.getTypeFactory(); + this.calcitePlanner = cluster.getPlanner(); + + // ... any other setup from the original lambda ... + // e.g., planner.setExecutor(executor); + + return null; + }, + config); + + if (this.relOptCluster == null || this.catalogReader == null) { + throw new IllegalStateException("Failed to initialize Calcite components"); + } + } + + /** + * Returns a RelBuilder instance configured with the same Calcite components used by this + * QueryPlanner. + */ + @Override + public RelBuilder getRelBuilder() { + return RelBuilder.create(config); + } + + @Override + public SqlOperatorTable getOperatorTable() { + return config.getOperatorTable(); } public static final Factory FACTORY = @@ -103,6 +152,7 @@ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSe public QueryPlanner createPlanner( JdbcConnection jdbcConnection, Collection ruleSets) { loadBuiltinFunctions(jdbcConnection); + LOG.info("Factory creating planner with ruleSets: {}", ruleSets); return new CalciteQueryPlanner(jdbcConnection, ruleSets); } @@ -120,12 +170,20 @@ private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { public FrameworkConfig defaultConfig(JdbcConnection connection, Collection ruleSets) { final CalciteConnectionConfig config = connection.config(); + // Resolve the parser conformance. Calcite's Avatica JDBC connect path silently drops the + // {@code conformance} connection property (it is not in the driver's registered property set), + // so {@code config.conformance()} is always DEFAULT here even when callers set it via + // {@code BeamSqlPipelineOptions.calciteConnectionProperties}. We therefore read that map + // directly from the connection's pipeline options and let it override. This keeps the behavior + // opt-in: with no {@code conformance} property the connection's own (DEFAULT) value is used, so + // existing Beam SQL behavior is unchanged. + final SqlConformance conformance = resolveConformance(connection, config); final SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder() .setQuotedCasing(config.quotedCasing()) .setUnquotedCasing(config.unquotedCasing()) .setQuoting(config.quoting()) - .setConformance(config.conformance()) + .setConformance(conformance) .setCaseSensitive(config.caseSensitive()); final SqlParserImplFactory parserFactory = config.parserFactory(SqlParserImplFactory.class, null); @@ -150,6 +208,7 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection props = sqlOptions.getCalciteConnectionProperties(); + if (props == null) { + return config.conformance(); + } + String value = null; + for (Map.Entry e : props.entrySet()) { + if ("conformance".equalsIgnoreCase(e.getKey())) { + value = e.getValue(); + break; + } + } + if (value == null) { + return config.conformance(); + } + try { + return SqlConformanceEnum.valueOf(value.trim().toUpperCase()); + } catch (IllegalArgumentException ex) { + LOG.warn("Unrecognized calcite conformance '{}', using {}", value, config.conformance()); + return config.conformance(); + } + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ @Override public SqlNode parse(String sqlStatement) throws ParseException { @@ -179,15 +275,14 @@ public SqlNode parse(String sqlStatement) throws ParseException { /** * It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that - * query parameters are not yet supported. + * query parameters are now supported for positional parameters. */ @Override public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException { Preconditions.checkArgument( queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == Kind.POSITIONAL, - "Beam SQL Calcite dialect only supports positional query parameters."); - BeamRelNode beamRelNode; + "Beam SQL Calcite dialect only supports positional query parameters or no parameters."); try { SqlNode parsed = planner.parse(sqlStatement); TableResolutionUtils.setupCustomTableResolution(connection, parsed); @@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa relNode, new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters)); } - LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); + return convertToBeamRel(relNode, queryParameters); + } catch (RelConversionException | CannotPlanException e) { + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + planner.close(); + } + } + + private static RelNode bindParameters(RelNode rel, RexShuttle binder) { + RelNode newRel = rel.accept(binder); + java.util.List newInputs = new java.util.ArrayList<>(); + for (RelNode input : newRel.getInputs()) { + newInputs.add(bindParameters(input, binder)); + } + return newRel.copy(newRel.getTraitSet(), newInputs); + } + + @Override + public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + Preconditions.checkArgument( + queryParameters.getKind() == Kind.NONE, + "Beam SQL Calcite dialect does not yet support query parameters."); + try { + SqlNode parsed = planner.parse(sqlStatement); + TableResolutionUtils.setupCustomTableResolution(connection, parsed); + SqlNode validated = planner.validate(parsed); + // root of original logical plan + RelRoot root = planner.rel(validated); + return root.rel; + } catch (RelConversionException e) { + throw new SqlConversionException( + String.format("Unable to convert query %s", sqlStatement), e); + } catch (SqlParseException | ValidationException e) { + throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + planner.close(); + } + } + + @Override + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) { + RelNode beamRelNode; + try { + LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode)); RelTraitSet desiredTraits = relNode .getTraitSet() .replace(BeamLogicalConvention.INSTANCE) - .replace(root.collation) + // .replace(root.collation) .simplify(); // beam physical plan relNode @@ -224,26 +366,23 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); relNode.getCluster().invalidateMetadataQuery(); - beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode); + Program program = config.getPrograms().get(0); + LOG.info("Desired traits: {}", desiredTraits); + beamRelNode = + program.run( + relNode.getCluster().getPlanner(), + relNode, + desiredTraits, + ImmutableList.of(), + ImmutableList.of()); LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); - } catch (RelConversionException | CannotPlanException e) { + } catch (CannotPlanException e) { throw new SqlConversionException( - String.format("Unable to convert query %s", sqlStatement), e); - } catch (SqlParseException | ValidationException e) { - throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + String.format("Unable to convert relNode to Beam: %s", relNode), e); } finally { planner.close(); } - return beamRelNode; - } - - private static RelNode bindParameters(RelNode rel, RexShuttle binder) { - RelNode newRel = rel.accept(binder); - java.util.List newInputs = new java.util.ArrayList<>(); - for (RelNode input : newRel.getInputs()) { - newInputs.add(bindParameters(input, binder)); - } - return newRel.copy(newRel.getTraitSet(), newInputs); + return (BeamRelNode) beamRelNode; } // It needs to be public so that the generated code in Calcite can access it. diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java index 0f0f8970a3ab..fc3b0b92ba35 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -39,9 +41,29 @@ public interface QueryPlanner { BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException; + /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */ + BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters queryParameters) + throws SqlConversionException; + + /** + * Parses and validates {@code sqlStatement} and returns the logical {@link RelNode} (standard + * Calcite convention), WITHOUT converting to Beam physical rels. This lets callers rewrite the + * logical plan (e.g. inject custom rels) before {@link #convertToBeamRel(RelNode, + * QueryParameters)}. + */ + default RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + throw new UnsupportedOperationException( + "parseToRel is not implemented by " + getClass().getName()); + } + /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ SqlNode parse(String sqlStatement) throws ParseException; + RelBuilder getRelBuilder(); + + org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable getOperatorTable(); + @AutoOneOf(QueryParameters.Kind.class) abstract class QueryParameters { public enum Kind { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 3ddd78ab232b..9373ec931e6a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -238,7 +238,7 @@ public PCollection expand(PCollection input) { /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */ @VisibleForTesting - static class RowToCsv extends PTransform, PCollection> + public static class RowToCsv extends PTransform, PCollection> implements Serializable { private CSVFormat csvFormat; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java index fdf18298b618..3f42174b0ed4 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -70,4 +71,20 @@ public void testCumulativeCostMetaDataHandler() { root.getCluster().getMetadataQuery().getCumulativeCost(root) instanceof BeamCostModel); Assert.assertFalse(root.getCluster().getMetadataQuery().getCumulativeCost(root).isInfinite()); } + + @Test + public void testParseAndConvertHelpers() throws Exception { + String sql = "select * from medium_table"; + RelNode logicalPlan = env.parseLogicalPlan(sql); + Assert.assertNotNull(logicalPlan); + + BeamRelNode physicalPlan = env.convertToBeamRel(logicalPlan); + Assert.assertNotNull(physicalPlan); + Assert.assertTrue( + physicalPlan + .getCluster() + .getPlanner() + .getCost(physicalPlan, physicalPlan.getCluster().getMetadataQuery()) + instanceof BeamCostModel); + } } From 25c9c2485e56370769d2dca7c17a13e60a4cb95e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 20:00:38 +0000 Subject: [PATCH 02/11] Address code review feedback: clean up unused fields, fix premature close, preserve collation, and rename parameter --- .../sql/impl/CalciteQueryPlanner.java | 51 +++++-------------- .../sdk/extensions/sql/impl/QueryPlanner.java | 4 +- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 19a00f624494..c139e9cf80a6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -37,13 +37,12 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.Contexts; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.ConventionTraitDef; -import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCluster; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptCost; -import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelOptPlanner.CannotPlanException; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitDef; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.plan.RelTraitSet; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelRoot; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.BuiltInMetadata; @@ -55,7 +54,6 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType; -import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode; @@ -82,6 +80,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.BuiltInMethod; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,36 +99,11 @@ public class CalciteQueryPlanner implements QueryPlanner { private final JdbcConnection connection; private final FrameworkConfig config; - // Cannot be final because of wacky initialization logic - private RelOptCluster relOptCluster; - private CalciteCatalogReader catalogReader; - private RelDataTypeFactory typeFactory; - private RelOptPlanner calcitePlanner; - /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ public CalciteQueryPlanner(JdbcConnection connection, Collection ruleSets) { this.connection = connection; this.config = defaultConfig(connection, ruleSets); this.planner = Frameworks.getPlanner(config); - - Frameworks.withPlanner( - (cluster, relOptSchema, rootSchema) -> { - // CAPTURE THE COMPONENTS HERE - this.relOptCluster = cluster; - this.catalogReader = (CalciteCatalogReader) relOptSchema; - this.typeFactory = cluster.getTypeFactory(); - this.calcitePlanner = cluster.getPlanner(); - - // ... any other setup from the original lambda ... - // e.g., planner.setExecutor(executor); - - return null; - }, - config); - - if (this.relOptCluster == null || this.catalogReader == null) { - throw new IllegalStateException("Failed to initialize Calcite components"); - } } /** @@ -298,14 +272,12 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa relNode, new ParameterBinder(root.rel.getCluster().getRexBuilder(), queryParameters)); } - return convertToBeamRel(relNode, queryParameters); + return convertToBeamRel(relNode, root.collation); } catch (RelConversionException | CannotPlanException e) { throw new SqlConversionException( String.format("Unable to convert query %s", sqlStatement), e); } catch (SqlParseException | ValidationException e) { throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); - } finally { - planner.close(); } } @@ -336,22 +308,23 @@ public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) String.format("Unable to convert query %s", sqlStatement), e); } catch (SqlParseException | ValidationException e) { throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); - } finally { - planner.close(); } } @Override public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) { + return convertToBeamRel(relNode, (RelCollation) null); + } + + private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) { RelNode beamRelNode; try { LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode)); - RelTraitSet desiredTraits = - relNode - .getTraitSet() - .replace(BeamLogicalConvention.INSTANCE) - // .replace(root.collation) - .simplify(); + RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + if (collation != null) { + desiredTraits = desiredTraits.replace(collation); + } + desiredTraits = desiredTraits.simplify(); // beam physical plan relNode .getCluster() diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java index fc3b0b92ba35..843827db39f7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java @@ -41,8 +41,8 @@ public interface QueryPlanner { BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException; - /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */ - BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters queryParameters) + /** Converts a logical {@link RelNode} tree into a physical {@link BeamRelNode} tree. */ + BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) throws SqlConversionException; /** From 132a7bbb3a67f28ad93dc43ba1b047ba73d4a374 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 20:22:44 +0000 Subject: [PATCH 03/11] Address new code review feedback: add defensive checks in CalciteQueryPlanner --- .../sql/impl/CalciteQueryPlanner.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index c139e9cf80a6..d514ca79d964 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -317,7 +317,7 @@ public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParame } private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) { - RelNode beamRelNode; + BeamRelNode beamRelNode; try { LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode)); RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE); @@ -339,23 +339,36 @@ private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation col RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); relNode.getCluster().invalidateMetadataQuery(); + + if (config.getPrograms().isEmpty()) { + throw new SqlConversionException("No planning programs configured in FrameworkConfig."); + } Program program = config.getPrograms().get(0); LOG.info("Desired traits: {}", desiredTraits); - beamRelNode = + RelNode optimizedNode = program.run( relNode.getCluster().getPlanner(), relNode, desiredTraits, ImmutableList.of(), ImmutableList.of()); - LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); + LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(optimizedNode)); + + if (!(optimizedNode instanceof BeamRelNode)) { + throw new SqlConversionException( + String.format( + "The optimizer was unable to produce a Beam physical plan. " + + "Expected BeamRelNode, but got: %s", + optimizedNode.getClass().getName())); + } + beamRelNode = (BeamRelNode) optimizedNode; } catch (CannotPlanException e) { throw new SqlConversionException( String.format("Unable to convert relNode to Beam: %s", relNode), e); } finally { planner.close(); } - return (BeamRelNode) beamRelNode; + return beamRelNode; } // It needs to be public so that the generated code in Calcite can access it. From fb42b3149aaabb325aea4cd8c253429cfafed1ca Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 20:46:15 +0000 Subject: [PATCH 04/11] Address more code review feedback: bind parameters in convertToBeamRel(RelNode, QueryParameters) and expose it in BeamSqlEnv --- .../sdk/extensions/sql/impl/BeamSqlEnv.java | 18 ++++++++++++++---- .../sql/impl/CalciteQueryPlanner.java | 11 +++++++++-- .../impl/planner/CalciteQueryPlannerTest.java | 17 +++++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index 7499c7a2189a..3b273755946e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -130,12 +130,22 @@ public RelBuilder getRelBuilder() { return planner.getRelBuilder(); } - public BeamRelNode convertToBeamRel(RelNode relNode) { - return planner.convertToBeamRel(relNode, QueryParameters.ofNone()); + public BeamRelNode convertToBeamRel(RelNode relNode) throws SqlConversionException { + return convertToBeamRel(relNode, QueryParameters.ofNone()); } - public RelNode parseLogicalPlan(String query) throws ParseException { - return planner.parseToRel(query, QueryParameters.ofNone()); + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) + throws SqlConversionException { + return planner.convertToBeamRel(relNode, queryParameters); + } + + public RelNode parseLogicalPlan(String query) throws ParseException, SqlConversionException { + return parseLogicalPlan(query, QueryParameters.ofNone()); + } + + public RelNode parseLogicalPlan(String query, QueryParameters queryParameters) + throws ParseException, SqlConversionException { + return planner.parseToRel(query, queryParameters); } public void registerSchemaFunction(String name, Function function) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index d514ca79d964..951a5f2813c3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -312,11 +312,18 @@ public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) } @Override - public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) { + public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) + throws SqlConversionException { + if (queryParameters.getKind() == Kind.POSITIONAL) { + relNode = + bindParameters( + relNode, new ParameterBinder(relNode.getCluster().getRexBuilder(), queryParameters)); + } return convertToBeamRel(relNode, (RelCollation) null); } - private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) { + private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation collation) + throws SqlConversionException { BeamRelNode beamRelNode; try { LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode)); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java index 3f42174b0ed4..79990b15090e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/CalciteQueryPlannerTest.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.extensions.sql.impl.planner; +import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters; import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -87,4 +90,18 @@ public void testParseAndConvertHelpers() throws Exception { .getCost(physicalPlan, physicalPlan.getCluster().getMetadataQuery()) instanceof BeamCostModel); } + + @Test + public void testParseAndConvertHelpersWithParameters() throws Exception { + String sql = "select * from medium_table where id = ?"; + RelNode logicalPlan = env.parseLogicalPlan(sql); + Assert.assertNotNull(logicalPlan); + + QueryParameters params = QueryParameters.ofPositional(ImmutableList.of(3)); + BeamRelNode physicalPlan = env.convertToBeamRel(logicalPlan, params); + Assert.assertNotNull(physicalPlan); + + String explained = BeamSqlRelUtils.explainLazily(physicalPlan).toString(); + Assert.assertTrue(explained.contains("3")); + } } From 5939a991b4a0486ae1b69c1bae7157b63fb972db Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 17 Jun 2026 19:57:55 +0000 Subject: [PATCH 05/11] Address review comments: close planner when exception occurs in parseToRel --- .../beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 951a5f2813c3..bf30d97cd674 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -296,18 +296,24 @@ public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) Preconditions.checkArgument( queryParameters.getKind() == Kind.NONE, "Beam SQL Calcite dialect does not yet support query parameters."); + boolean success = false; try { SqlNode parsed = planner.parse(sqlStatement); TableResolutionUtils.setupCustomTableResolution(connection, parsed); SqlNode validated = planner.validate(parsed); // root of original logical plan RelRoot root = planner.rel(validated); + success = true; return root.rel; } catch (RelConversionException e) { throw new SqlConversionException( String.format("Unable to convert query %s", sqlStatement), e); } catch (SqlParseException | ValidationException e) { throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); + } finally { + if (!success) { + planner.close(); + } } } From 097e41f2fe57ca2177053f99fb5155527f35809e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 17 Jun 2026 20:10:47 +0000 Subject: [PATCH 06/11] Address review comments: close planner when exception occurs in convertToBeamRel(RelNode, QueryParameters) --- .../sdk/extensions/sql/impl/BeamSqlEnv.java | 5 ---- .../sql/impl/CalciteQueryPlanner.java | 27 ++++++++++--------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index 3b273755946e..fa663da7c330 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -53,8 +53,6 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Contains the metadata of tables/UDF functions, and exposes APIs to @@ -62,7 +60,6 @@ */ @Internal public class BeamSqlEnv { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnv.class); JdbcConnection connection; QueryPlanner planner; @@ -237,7 +234,6 @@ public BeamSqlEnvBuilder setCurrentSchema(String name) { /** Set the ruleSet used for query optimizer. */ public BeamSqlEnvBuilder setRuleSets(Collection ruleSets) { - LOG.info("Setting BeamSqlEnv rulesets to: {}", ruleSets); this.ruleSets = ruleSets; return this; } @@ -304,7 +300,6 @@ public BeamSqlEnv build() { configureSchemas(jdbcConnection); - LOG.info("Instantiating planner with ruleSets: {}", ruleSets); QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets); // The planner may choose to add its own builtin functions to the schema, so load user-defined diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index bf30d97cd674..3c34e74eb726 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.calcite.v1_40_0.com.google.common.collect.Table; @@ -126,7 +125,6 @@ public SqlOperatorTable getOperatorTable() { public QueryPlanner createPlanner( JdbcConnection jdbcConnection, Collection ruleSets) { loadBuiltinFunctions(jdbcConnection); - LOG.info("Factory creating planner with ruleSets: {}", ruleSets); return new CalciteQueryPlanner(jdbcConnection, ruleSets); } @@ -182,7 +180,6 @@ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection\n{}", BeamSqlRelUtils.explainLazily(relNode)); RelTraitSet desiredTraits = relNode.getTraitSet().replace(BeamLogicalConvention.INSTANCE); if (collation != null) { desiredTraits = desiredTraits.replace(collation); @@ -357,7 +362,6 @@ private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation col throw new SqlConversionException("No planning programs configured in FrameworkConfig."); } Program program = config.getPrograms().get(0); - LOG.info("Desired traits: {}", desiredTraits); RelNode optimizedNode = program.run( relNode.getCluster().getPlanner(), @@ -365,7 +369,6 @@ private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation col desiredTraits, ImmutableList.of(), ImmutableList.of()); - LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(optimizedNode)); if (!(optimizedNode instanceof BeamRelNode)) { throw new SqlConversionException( From d57e2763843aff32380f1cee4b7a5bfa0732f1d0 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 17 Jun 2026 21:04:44 +0000 Subject: [PATCH 07/11] Address PR 38951 review comments: add planner.close() to convertToBeamRel catch blocks, optimize bindParameters, add null checks in parseToRel and convertToBeamRel, add null checks in registerSchemaFunction and use simple name for SqlOperatorTable, remove @VisibleForTesting from RowToCsv --- .../sdk/extensions/sql/impl/BeamSqlEnv.java | 6 ++++-- .../sql/impl/CalciteQueryPlanner.java | 21 ++++++++++++++----- .../meta/provider/text/TextTableProvider.java | 1 - 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index fa663da7c330..cc37d84ef194 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -50,6 +50,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RelBuilder; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; import org.checkerframework.checker.nullness.qual.Nullable; @@ -146,11 +147,12 @@ public RelNode parseLogicalPlan(String query, QueryParameters queryParameters) } public void registerSchemaFunction(String name, Function function) { + java.util.Objects.requireNonNull(name, "name cannot be null"); + java.util.Objects.requireNonNull(function, "function cannot be null"); connection.getCurrentSchemaPlus().add(name, function); } - public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable - getOperatorTable() { + public SqlOperatorTable getOperatorTable() { return planner.getOperatorTable(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 3c34e74eb726..9aa9aba14f63 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -270,28 +270,37 @@ public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryPa } return convertToBeamRel(relNode, root.collation); } catch (RelConversionException | CannotPlanException e) { + planner.close(); throw new SqlConversionException( String.format("Unable to convert query %s", sqlStatement), e); } catch (SqlParseException | ValidationException e) { + planner.close(); throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); } } private static RelNode bindParameters(RelNode rel, RexShuttle binder) { RelNode newRel = rel.accept(binder); - java.util.List newInputs = new java.util.ArrayList<>(); - for (RelNode input : newRel.getInputs()) { - newInputs.add(bindParameters(input, binder)); + java.util.List inputs = newRel.getInputs(); + java.util.List newInputs = new java.util.ArrayList<>(inputs.size()); + boolean changed = newRel != rel; + for (RelNode input : inputs) { + RelNode newInput = bindParameters(input, binder); + newInputs.add(newInput); + if (newInput != input) { + changed = true; + } } - return newRel.copy(newRel.getTraitSet(), newInputs); + return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; } @Override public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) throws ParseException, SqlConversionException { + Preconditions.checkNotNull(sqlStatement, "sqlStatement cannot be null"); Preconditions.checkArgument( queryParameters.getKind() == Kind.NONE, - "Beam SQL Calcite dialect does not yet support query parameters."); + "Query parameters are not supported during logical plan parsing; please provide them when converting to a Beam physical plan instead."); boolean success = false; try { SqlNode parsed = planner.parse(sqlStatement); @@ -316,6 +325,8 @@ public RelNode parseToRel(String sqlStatement, QueryParameters queryParameters) @Override public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters queryParameters) throws SqlConversionException { + Preconditions.checkNotNull(relNode, "relNode cannot be null"); + Preconditions.checkNotNull(queryParameters, "queryParameters cannot be null"); boolean success = false; try { if (queryParameters.getKind() == Kind.POSITIONAL) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java index 9373ec931e6a..ed12540c1c57 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -237,7 +237,6 @@ public PCollection expand(PCollection input) { } /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */ - @VisibleForTesting public static class RowToCsv extends PTransform, PCollection> implements Serializable { From d9b893833d448a4dfda6e5a13296ca4c40e22e63 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 17 Jun 2026 22:26:50 +0000 Subject: [PATCH 08/11] Fix NullPointerException in SQL tests by keeping thread providers set during translation Remove early cleanup of THREAD_PROVIDERS in convertToBeamRel(RelNode, @Nullable RelCollation) to prevent NullPointerException: metadataHandlerProvider when nodes query metadata during pipeline translation/assembly. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599 --- .../beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 9aa9aba14f63..8d6e29a26d87 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -283,15 +283,15 @@ private static RelNode bindParameters(RelNode rel, RexShuttle binder) { RelNode newRel = rel.accept(binder); java.util.List inputs = newRel.getInputs(); java.util.List newInputs = new java.util.ArrayList<>(inputs.size()); - boolean changed = newRel != rel; + boolean inputsChanged = false; for (RelNode input : inputs) { RelNode newInput = bindParameters(input, binder); newInputs.add(newInput); if (newInput != input) { - changed = true; + inputsChanged = true; } } - return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; + return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; } @Override @@ -365,6 +365,7 @@ private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation col relNode.getCluster().getMetadataProvider()))); relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); + previousThreadProviders = RelMetadataQuery.THREAD_PROVIDERS.get(); RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); relNode.getCluster().invalidateMetadataQuery(); From 3b002e813d105a0111ebef47dc0c1e321648da21 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 18 Jun 2026 14:53:24 +0000 Subject: [PATCH 09/11] Fix compileJava issues for Java 26 environments and remove undeclared variable assignment Fix compileJava issues for Java 26 environments by adding a fallback to JDK 21 in the gradlew script. Remove assignment of non-existent variable previousThreadProviders in CalciteQueryPlanner.java to fix branch compilation. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599 --- gradlew | 8 ++++++++ .../beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java | 1 - 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/gradlew b/gradlew index 1aa94a426907..4b2f3c85302e 100755 --- a/gradlew +++ b/gradlew @@ -115,6 +115,14 @@ esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar +# Fallback to JDK 21 if running on Java >= 24 (e.g., Java 26), JAVA_HOME is unset, and JDK 21 is available at standard path. +if [ -z "$JAVA_HOME" ]; then + java_version=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{print $1}') + if [ -n "$java_version" ] && [ "$java_version" -ge 24 ] && [ -d "/usr/lib/jvm/java-21-openjdk-amd64" ]; then + export JAVA_HOME="/usr/lib/jvm/java-21-openjdk-amd64" + fi +fi + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 8d6e29a26d87..01aaad06251f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -365,7 +365,6 @@ private BeamRelNode convertToBeamRel(RelNode relNode, @Nullable RelCollation col relNode.getCluster().getMetadataProvider()))); relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); - previousThreadProviders = RelMetadataQuery.THREAD_PROVIDERS.get(); RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider())); relNode.getCluster().invalidateMetadataQuery(); From 6769387169a33ba95b8e93866a00261e0ef1661b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 18 Jun 2026 14:55:04 +0000 Subject: [PATCH 10/11] Run tests sequentially on JDK 17+ to prevent report generation failures Set maxParallelForks = 1 for all Test tasks when running on Java 17+. On Java 17+, running tests in parallel can corrupt the binary test results output store (causing EOFException/Buffer underflow crashes during report generation). Running tests sequentially on newer JDKs fixes this. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599 --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index abeede24709a..e69c04577d0f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1218,6 +1218,11 @@ class BeamModulePlugin implements Plugin { useJUnit {} // default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests maxHeapSize = '2g' + + if (JavaVersion.current().compareTo(JavaVersion.VERSION_17) >= 0) { + // When running on Java 17+, tests running in parallel can corrupt test output files, so run sequentially. + maxParallelForks = 1 + } } List skipDefRegexes = [] From 15ceb1d3eb30f996455765be9e42b240417e0dcb Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 18 Jun 2026 15:43:07 +0000 Subject: [PATCH 11/11] Revert changes to gradlew and BeamModulePlugin.groovy Undo recent changes to gradlew and BeamModulePlugin.groovy, allowing builds to be configured through standard environment variables (e.g. JAVA_HOME) or gradle properties instead of modifying the repository's files directly. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599 --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 5 ----- gradlew | 8 -------- 2 files changed, 13 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index e69c04577d0f..abeede24709a 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1218,11 +1218,6 @@ class BeamModulePlugin implements Plugin { useJUnit {} // default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests maxHeapSize = '2g' - - if (JavaVersion.current().compareTo(JavaVersion.VERSION_17) >= 0) { - // When running on Java 17+, tests running in parallel can corrupt test output files, so run sequentially. - maxParallelForks = 1 - } } List skipDefRegexes = [] diff --git a/gradlew b/gradlew index 4b2f3c85302e..1aa94a426907 100755 --- a/gradlew +++ b/gradlew @@ -115,14 +115,6 @@ esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar -# Fallback to JDK 21 if running on Java >= 24 (e.g., Java 26), JAVA_HOME is unset, and JDK 21 is available at standard path. -if [ -z "$JAVA_HOME" ]; then - java_version=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{print $1}') - if [ -n "$java_version" ] && [ "$java_version" -ge 24 ] && [ -d "/usr/lib/jvm/java-21-openjdk-amd64" ]; then - export JAVA_HOME="/usr/lib/jvm/java-21-openjdk-amd64" - fi -fi - # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then