blob: 3730857994b21b93ec7657e5b51e7d3252534776 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql;
import com.google.zetasql.Value;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Contexts;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitDef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelRoot;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperatorTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParser;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RelConversionException;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/** ZetaSQLQueryPlanner. */
public class ZetaSQLQueryPlanner implements QueryPlanner {
private final ZetaSQLPlannerImpl plannerImpl;
public ZetaSQLQueryPlanner(FrameworkConfig config) {
plannerImpl = new ZetaSQLPlannerImpl(config);
}
public ZetaSQLQueryPlanner(JdbcConnection jdbcConnection, RuleSet[] ruleSets) {
plannerImpl = new ZetaSQLPlannerImpl(defaultConfig(jdbcConnection, ruleSets));
}
@Override
public BeamRelNode convertToBeamRel(String sqlStatement)
throws ParseException, SqlConversionException {
try {
return parseQuery(sqlStatement);
} catch (RelConversionException e) {
throw new SqlConversionException(e.getCause());
}
}
@Override
public SqlNode parse(String sqlStatement) throws ParseException {
return null;
}
public BeamRelNode convertToBeamRel(String sqlStatement, Map<String, Value> queryParams)
throws ParseException, SqlConversionException {
try {
return parseQuery(sqlStatement, queryParams);
} catch (RelConversionException e) {
throw new SqlConversionException(e.getCause());
}
}
public BeamRelNode parseQuery(String sql) throws RelConversionException {
return parseQuery(sql, Collections.emptyMap());
}
public BeamRelNode parseQuery(String sql, Map<String, Value> queryParams)
throws RelConversionException {
RelRoot root = plannerImpl.rel(sql, queryParams);
RelTraitSet desiredTraits =
root.rel
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE)
.replace(root.collation)
.simplify();
BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
return beamRelNode;
}
private FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
final CalciteConnectionConfig config = connection.config();
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
.setQuotedCasing(config.quotedCasing())
.setUnquotedCasing(config.unquotedCasing())
.setQuoting(config.quoting())
.setConformance(config.conformance())
.setCaseSensitive(config.caseSensitive());
final SqlParserImplFactory parserFactory =
config.parserFactory(SqlParserImplFactory.class, null);
if (parserFactory != null) {
parserConfig.setParserFactory(parserFactory);
}
final SchemaPlus schema = connection.getRootSchema();
final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus();
final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE);
final CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
CalciteSchema.from(schema),
ImmutableList.of(defaultSchema.getName()),
connection.getTypeFactory(),
connection.config());
final SqlOperatorTable opTab0 =
connection.config().fun(SqlOperatorTable.class, SqlStdOperatorTable.instance());
return Frameworks.newConfigBuilder()
.parserConfig(parserConfig.build())
.defaultSchema(defaultSchema)
.traitDefs(traitDefs)
.context(Contexts.of(connection.config()))
.ruleSets(ruleSets)
.costFactory(null)
.typeSystem(connection.getTypeFactory().getTypeSystem())
.operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
.build();
}
}