| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.impl; |
| |
| import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.calcite.config.CalciteConnectionConfig; |
| import org.apache.calcite.jdbc.CalciteSchema; |
| import org.apache.calcite.plan.Contexts; |
| import org.apache.calcite.plan.ConventionTraitDef; |
| import org.apache.calcite.plan.RelOptPlanner.CannotPlanException; |
| import org.apache.calcite.plan.RelOptUtil; |
| import org.apache.calcite.plan.RelTraitDef; |
| import org.apache.calcite.plan.RelTraitSet; |
| import org.apache.calcite.prepare.CalciteCatalogReader; |
| import org.apache.calcite.rel.RelRoot; |
| import org.apache.calcite.schema.SchemaPlus; |
| import org.apache.calcite.sql.SqlNode; |
| import org.apache.calcite.sql.SqlOperatorTable; |
| import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| import org.apache.calcite.sql.parser.SqlParseException; |
| import org.apache.calcite.sql.parser.SqlParser; |
| import org.apache.calcite.sql.parser.SqlParserImplFactory; |
| import org.apache.calcite.sql.util.ChainedSqlOperatorTable; |
| import org.apache.calcite.tools.FrameworkConfig; |
| import org.apache.calcite.tools.Frameworks; |
| import org.apache.calcite.tools.Planner; |
| import org.apache.calcite.tools.RelConversionException; |
| import org.apache.calcite.tools.ValidationException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The core component to handle through a SQL statement, from explain execution plan, to generate a |
| * Beam pipeline. |
| */ |
| class CalciteQueryPlanner implements QueryPlanner { |
| private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class); |
| |
| private final Planner planner; |
| |
| CalciteQueryPlanner(JdbcConnection connection) { |
| planner = Frameworks.getPlanner(defaultConfig(connection)); |
| } |
| |
| public FrameworkConfig defaultConfig(JdbcConnection connection) { |
| final CalciteConnectionConfig config = connection.config(); |
| final SqlParser.ConfigBuilder parserConfig = |
| SqlParser.configBuilder() |
| .setQuotedCasing(config.quotedCasing()) |
| .setUnquotedCasing(config.unquotedCasing()) |
| .setQuoting(config.quoting()) |
| .setConformance(config.conformance()) |
| .setCaseSensitive(config.caseSensitive()); |
| final SqlParserImplFactory parserFactory = |
| config.parserFactory(SqlParserImplFactory.class, null); |
| if (parserFactory != null) { |
| parserConfig.setParserFactory(parserFactory); |
| } |
| |
| final SchemaPlus schema = connection.getRootSchema(); |
| final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus(); |
| |
| final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE); |
| |
| final CalciteCatalogReader catalogReader = |
| new CalciteCatalogReader( |
| CalciteSchema.from(schema), |
| ImmutableList.of(defaultSchema.getName()), |
| connection.getTypeFactory(), |
| connection.config()); |
| final SqlOperatorTable opTab0 = |
| connection.config().fun(SqlOperatorTable.class, SqlStdOperatorTable.instance()); |
| |
| return Frameworks.newConfigBuilder() |
| .parserConfig(parserConfig.build()) |
| .defaultSchema(defaultSchema) |
| .traitDefs(traitDefs) |
| .context(Contexts.of(connection.config())) |
| .ruleSets(BeamRuleSets.getRuleSets()) |
| .costFactory(null) |
| .typeSystem(connection.getTypeFactory().getTypeSystem()) |
| .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader)) |
| .build(); |
| } |
| |
| /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */ |
| @Override |
| public SqlNode parse(String sqlStatement) throws ParseException { |
| SqlNode parsed; |
| try { |
| parsed = planner.parse(sqlStatement); |
| } catch (SqlParseException e) { |
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); |
| } finally { |
| planner.close(); |
| } |
| return parsed; |
| } |
| |
| /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */ |
| @Override |
| public BeamRelNode convertToBeamRel(String sqlStatement) |
| throws ParseException, SqlConversionException { |
| BeamRelNode beamRelNode; |
| try { |
| SqlNode parsed = planner.parse(sqlStatement); |
| SqlNode validated = planner.validate(parsed); |
| LOG.info("SQL:\n" + validated); |
| |
| // root of original logical plan |
| RelRoot root = planner.rel(validated); |
| LOG.info("SQLPlan>\n" + RelOptUtil.toString(root.rel)); |
| |
| RelTraitSet desiredTraits = |
| root.rel |
| .getTraitSet() |
| .replace(BeamLogicalConvention.INSTANCE) |
| .replace(root.collation) |
| .simplify(); |
| |
| // beam physical plan |
| beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel); |
| LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode)); |
| } catch (RelConversionException | CannotPlanException e) { |
| throw new SqlConversionException( |
| String.format("Unable to convert query %s", sqlStatement), e); |
| } catch (SqlParseException | ValidationException e) { |
| throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e); |
| } finally { |
| planner.close(); |
| } |
| return beamRelNode; |
| } |
| } |