| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.impl; |
| |
| import java.lang.reflect.Method; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.ServiceLoader; |
| import java.util.stream.Collectors; |
| import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind; |
| import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; |
| import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery; |
| import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; |
| import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; |
| import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.Table; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.config.CalciteConnectionConfig; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteSchema; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.Contexts; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.ConventionTraitDef; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCost; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner.CannotPlanException; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitDef; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.prepare.CalciteCatalogReader; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelRoot; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.BuiltInMetadata; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.MetadataDef; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.MetadataHandler; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.RelMetadataProvider; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.RelMetadataQuery; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.SchemaPlus; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperatorTable; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParseException; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParser; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserImplFactory; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.util.SqlOperatorTables; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.FrameworkConfig; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Frameworks; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Planner; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.RelConversionException; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.RuleSet; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.ValidationException; |
| import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.BuiltInMethod; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The core component to handle through a SQL statement, from explain execution plan, to generate a |
| * Beam pipeline. |
| */ |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) |
| "nullness" // TODO(https://github.com/apache/beam/issues/20497) |
| }) |
| public class CalciteQueryPlanner implements QueryPlanner { |
| private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class); |
| |
| private final Planner planner; |
| private final JdbcConnection connection; |
| |
| /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */ |
| public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> ruleSets) { |
| this.connection = connection; |
| this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); |
| } |
| |
| public static final Factory FACTORY = |
| new Factory() { |
| @Override |
| public QueryPlanner createPlanner( |
| JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) { |
| loadBuiltinFunctions(jdbcConnection); |
| return new CalciteQueryPlanner(jdbcConnection, ruleSets); |
| } |
| |
| private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { |
| for (BeamBuiltinFunctionProvider provider : |
| ServiceLoader.load(BeamBuiltinFunctionProvider.class)) { |
| for (Map.Entry<String, List<Method>> entry : provider.getBuiltinMethods().entrySet()) { |
| for (Method method : entry.getValue()) { |
| jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method)); |
| } |
| } |
| } |
| } |
| }; |
| |
| public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) { |
| final CalciteConnectionConfig config = connection.config(); |
| final SqlParser.ConfigBuilder parserConfig = |
| SqlParser.configBuilder() |
| .setQuotedCasing(config.quotedCasing()) |
| .setUnquotedCasing(config.unquotedCasing()) |
| .setQuoting(config.quoting()) |
| .setConformance(config.conformance()) |
| .setCaseSensitive(config.caseSensitive()); |
| final SqlParserImplFactory parserFactory = |
| config.parserFactory(SqlParserImplFactory.class, null); |
| if (parserFactory != null) { |
| parserConfig.setParserFactory(parserFactory); |
| } |
| |
| final SchemaPlus schema = connection.getRootSchema(); |
| final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus(); |
| |
| final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE); |
| |
| final CalciteCatalogReader catalogReader = |
| new CalciteCatalogReader( |
| CalciteSchema.from(schema), |
| ImmutableList.of(defaultSchema.getName()), |
| connection.getTypeFactory(), |
| connection.config()); |
| final SqlOperatorTable opTab0 = |
| connection.config().fun(SqlOperatorTable.class, SqlStdOperatorTable.instance()); |
| |
| return Frameworks.newConfigBuilder() |
| .parserConfig(parserConfig.build()) |
| .defaultSchema(defaultSchema) |
| .traitDefs(traitDefs) |
| .context(Contexts.of(connection.config())) |
| .ruleSets(ruleSets.toArray(new RuleSet[0])) |
| .costFactory(BeamCostModel.FACTORY) |
| .typeSystem(connection.getTypeFactory().getTypeSystem()) |
| .operatorTable(SqlOperatorTables.chain(opTab0, catalogReader)) |
| .build(); |
| } |
| |
| /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ |
| @Override |
| public SqlNode parse(String sqlStatement) throws ParseException { |
| SqlNode parsed; |
| try { |
| parsed = planner.parse(sqlStatement); |
| } catch (SqlParseException e) { |
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); |
| } finally { |
| planner.close(); |
| } |
| return parsed; |
| } |
| |
| /** |
| * It parses and validate the input query, then convert into a {@link BeamRelNode} tree. Note that |
| * query parameters are not yet supported. |
| */ |
| @Override |
| public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters) |
| throws ParseException, SqlConversionException { |
| Preconditions.checkArgument( |
| queryParameters.getKind() == Kind.NONE, |
| "Beam SQL Calcite dialect does not yet support query parameters."); |
| BeamRelNode beamRelNode; |
| try { |
| SqlNode parsed = planner.parse(sqlStatement); |
| TableResolutionUtils.setupCustomTableResolution(connection, parsed); |
| SqlNode validated = planner.validate(parsed); |
| LOG.info("SQL:\n{}", validated); |
| |
| // root of original logical plan |
| RelRoot root = planner.rel(validated); |
| LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel)); |
| RelTraitSet desiredTraits = |
| root.rel |
| .getTraitSet() |
| .replace(BeamLogicalConvention.INSTANCE) |
| .replace(root.collation) |
| .simplify(); |
| // beam physical plan |
| root.rel |
| .getCluster() |
| .setMetadataProvider( |
| ChainedRelMetadataProvider.of( |
| ImmutableList.of( |
| NonCumulativeCostImpl.SOURCE, |
| RelMdNodeStats.SOURCE, |
| root.rel.getCluster().getMetadataProvider()))); |
| |
| root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance); |
| RelMetadataQuery.THREAD_PROVIDERS.set( |
| JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider())); |
| root.rel.getCluster().invalidateMetadataQuery(); |
| beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel); |
| LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode)); |
| } catch (RelConversionException | CannotPlanException e) { |
| throw new SqlConversionException( |
| String.format("Unable to convert query %s", sqlStatement), e); |
| } catch (SqlParseException | ValidationException e) { |
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); |
| } finally { |
| planner.close(); |
| } |
| return beamRelNode; |
| } |
| |
| // It needs to be public so that the generated code in Calcite can access it. |
| public static class NonCumulativeCostImpl |
| implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> { |
| |
| public static final RelMetadataProvider SOURCE = |
| ReflectiveRelMetadataProvider.reflectiveSource( |
| BuiltInMethod.NON_CUMULATIVE_COST.method, new NonCumulativeCostImpl()); |
| |
| @Override |
| public MetadataDef<BuiltInMetadata.NonCumulativeCost> getDef() { |
| return BuiltInMetadata.NonCumulativeCost.DEF; |
| } |
| |
| @SuppressWarnings("UnusedDeclaration") |
| public RelOptCost getNonCumulativeCost(RelNode rel, RelMetadataQuery mq) { |
| assert mq instanceof BeamRelMetadataQuery; |
| BeamRelMetadataQuery bmq = (BeamRelMetadataQuery) mq; |
| |
| // This is called by a generated code in calcite MetadataQuery. |
| // If the rel is Calcite rel or we are in JDBC path and cost factory is not set yet we should |
| // use calcite cost estimation |
| if (!(rel instanceof BeamRelNode)) { |
| return rel.computeSelfCost(rel.getCluster().getPlanner(), bmq); |
| } |
| |
| // Currently we do nothing in this case, however, we can plug our own cost estimation method |
| // here and based on the design we also need to remove the cached values |
| |
| // We need to first remove the cached values. |
| List<Table.Cell<RelNode, List, Object>> costKeys = |
| bmq.map.cellSet().stream() |
| .filter(entry -> entry.getValue() instanceof BeamCostModel) |
| .filter(entry -> ((BeamCostModel) entry.getValue()).isInfinite()) |
| .collect(Collectors.toList()); |
| |
| costKeys.forEach(cell -> bmq.map.remove(cell.getRowKey(), cell.getColumnKey())); |
| |
| return ((BeamRelNode) rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq); |
| } |
| } |
| } |