blob: 340868c131b8fe35ffd6c4d25bcec2f23b354552 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.sql.parser.plan;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlCreateView;
import org.apache.flink.sql.parser.ddl.SqlNodeInfo;
import org.apache.flink.sql.parser.errorcode.ParserErrors;
import org.apache.flink.sql.parser.node.SqlToTreeConverter;
import org.apache.flink.sql.parser.plan.builder.BlinkRelBuilder;
import org.apache.flink.sql.parser.util.SqlInfo;
import org.apache.flink.sql.parser.util.SqlLists;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Blink Sql planner.
*/
public class FlinkPlannerImpl {
private RelOptPlanner planner;
private RelDataTypeFactory typeFactory;
private SqlOperatorTable operatorTable;
private List<RelTraitDef> traitDefs;
private SqlRexConvertletTable convertletTable;
private SchemaPlus defaultSchema;
private SqlParser.Config parserConfig;
private FlinkSqlValidator validator;
private BlinkRelBuilder relBuilder;
public FlinkPlannerImpl(FrameworkConfig frameworkConfig) {
this.relBuilder = BlinkRelBuilder.create(frameworkConfig);
this.planner = relBuilder.getPlanner();
this.typeFactory = relBuilder.getTypeFactory();
this.traitDefs = frameworkConfig.getTraitDefs();
this.parserConfig = frameworkConfig.getParserConfig();
this.convertletTable = frameworkConfig.getConvertletTable();
this.defaultSchema = frameworkConfig.getDefaultSchema();
SqlOperatorTable builtinOperatorTable = frameworkConfig.getOperatorTable();
CalciteCatalogReader catalogReader = this.createCatalogReader();
this.operatorTable = ChainedSqlOperatorTable.of(builtinOperatorTable, catalogReader);
this.validator = new FlinkSqlValidator(this.operatorTable, catalogReader, this.typeFactory);
this.validator.setIdentifierExpansion(true);
}
public RelDataTypeFactory getTypeFactory() {
return this.typeFactory;
}
public List<SqlNodeInfo> parseContext(String sqlContext) throws SqlParseException {
this.ready();
List<SqlInfo> sqlList = SqlLists.getSQLList(sqlContext);
for (SqlInfo sqlInfo : sqlList) {
int startLine = sqlInfo.getLine();
StringBuilder sqlBuilder = new StringBuilder();
for (int i = 0; i < startLine - 1; i++) {
sqlBuilder.append('\n');
}
String sql = sqlBuilder.append(sqlInfo.getSqlContent()).toString();
sqlInfo.setSqlContent(sql);
}
List<SqlNodeInfo> sqlNodeInfoList = new ArrayList<>();
Map<String, SqlParserPos> sinkTables = new HashMap<>();
Set<String> tableNames = new HashSet<>();
for (SqlInfo sqlInfo : sqlList) {
if (StringUtils.isBlank(sqlInfo.getSqlContent())) {
continue;
}
SqlNodeInfo sqlNodeInfo = new SqlNodeInfo();
SqlParser sqlParser = SqlParser.create(sqlInfo.getSqlContent(), parserConfig);
try {
SqlNode sqlNode = sqlParser.parseStmt();
if (sqlNode instanceof SqlCreateView) {
String subQuerySql = getViewSubQuerySql(sqlInfo, (SqlCreateView) sqlNode);
((SqlCreateView) sqlNode).setSubQuerySql(subQuerySql);
} else if (sqlNode instanceof SqlInsert) {
SqlIdentifier targetTable = (SqlIdentifier) ((SqlInsert) sqlNode).getTargetTable();
sinkTables.put(targetTable.toString(), targetTable.getParserPosition());
} else if (sqlNode instanceof SqlCreateTable) {
String tableName = ((SqlCreateTable) sqlNode).getTableName().toString();
tableNames.add(tableName);
}
sqlNodeInfo.setOriginSql(sqlInfo.getSqlContent());
sqlNodeInfo.setSqlNode(sqlNode);
sqlNodeInfoList.add(sqlNodeInfo);
} catch (org.apache.calcite.sql.parser.SqlParseException e) {
//String message = StringUtils.substringBeforeLast(e.getMessage().split("\n")[0], "at line");
//todo log
//need to cut message?
//String message = e.getMessage().split("\n")[0];
throw new SqlParseException(
e.getPos() == null ? new SqlParserPos(-1, -1) : e.getPos(),
ParserErrors.INST.parParseContextError(e.getMessage()),
e);
}
}
//validate sink table names
for (Map.Entry<String, SqlParserPos> sinkTable : sinkTables.entrySet()) {
if (!tableNames.contains(sinkTable.getKey())) {
throw new SqlParseException(sinkTable.getValue(),
ParserErrors.INST.parParseContextError(
"Undefined target table [" + sinkTable.getKey() + "]"));
}
}
for (SqlNodeInfo sqlNodeInfo : sqlNodeInfoList) {
if (sqlNodeInfo.getSqlNode() instanceof SqlCreateTable) {
SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNodeInfo.getSqlNode();
if (sqlCreateTable.getTableType() != null) {
continue;
}
String tableName = sqlCreateTable.getTableName().toString();
if (sinkTables.containsKey(tableName)) {
sqlCreateTable.setTableType("SINK");
} else {
sqlCreateTable.setTableType("SOURCE");
}
}
}
return sqlNodeInfoList;
}
private String getViewSubQuerySql(SqlInfo sqlInfo, SqlCreateView sqlNode) {
int lineNum = sqlNode.getQuery().getParserPosition().getLineNum();
int columnNum = sqlNode.getQuery().getParserPosition().getColumnNum();
String sql = sqlInfo.getSqlContent();
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(sql.getBytes(Charset.forName("utf8"))),
Charset.forName("utf8")));
String line;
int i = 1;
StringBuilder stringBuilder = new StringBuilder();
try {
while ((line = br.readLine()) != null) {
//System.out.println(line);
if (i < lineNum) {
stringBuilder.append("\n");
} else if (i == lineNum) {
for (int j = 1; j < columnNum; j++) {
stringBuilder.append(" ");
}
stringBuilder.append(line.substring(columnNum - 1)).append('\n');
} else {
stringBuilder.append(line).append('\n');
}
i++;
}
} catch (IOException e) {
e.printStackTrace();
}
return stringBuilder.toString();
}
public RelDataType getRelDataType(SqlNode sqlnode) {
return this.validator.getValidatedNodeType(sqlnode);
}
public boolean validate(List<SqlNodeInfo> sqlNodeInfoList) throws SqlParseException {
for (SqlNodeInfo nodeInfo : sqlNodeInfoList) {
SqlNode sqlNode = nodeInfo.getSqlNode();
if (sqlNode instanceof SqlCreateTable) {
((SqlCreateTable) sqlNode).validate();
} else if (sqlNode instanceof SqlCreateFunction) {
((SqlCreateFunction) sqlNode).validate();
} else if (sqlNode instanceof SqlCreateView) {
((SqlCreateView) sqlNode).validate();
}
}
return true;
}
public String getJSONPlan(String sql) throws SqlParseException {
List<SqlNodeInfo> nodeInfos = parseContext(sql);
SqlToTreeConverter sql2Tree = new SqlToTreeConverter(this.validator);
// convert to tree
for (SqlNodeInfo info : nodeInfos) {
SqlNode node = info.getSqlNode();
sql2Tree.convertSql(node);
}
return sql2Tree.getJSON();
}
public RelRoot sqlToRel(SqlNode sqlNode) {
assert sqlNode != null;
RexBuilder rexBuilder = this.createRexBuilder();
RelOptCluster cluster = RelOptCluster.create(this.planner, rexBuilder);
SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl(), this.validator, this.createCatalogReader(),
cluster, this.convertletTable, SqlToRelConverter.Config.DEFAULT);
RelRoot tempRoot = sqlToRelConverter.convertQuery(sqlNode, false, true);
tempRoot = tempRoot.withRel(sqlToRelConverter.flattenTypes(tempRoot.project(), true));
tempRoot = tempRoot.withRel(RelDecorrelator.decorrelateQuery(tempRoot.project()));
return tempRoot;
}
public RelOptPlanner getPlanner() {
return this.relBuilder.getPlanner();
}
class ViewExpanderImpl implements RelOptTable.ViewExpander {
public RelRoot expandView(
RelDataType rowType,
String queryString,
List<String> schemaPath,
List<String> list1) {
SqlParser sqlParser = SqlParser.create(queryString, parserConfig);
try {
SqlNode sqlNode = sqlParser.parseQuery();
CalciteCatalogReader catalogReader = createCatalogReader().withSchemaPath(schemaPath);
SqlNode validatedSqlNode = validator.validate(sqlNode);
RexBuilder rexBuilder = createRexBuilder();
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl(), validator, catalogReader, cluster,
convertletTable, SqlToRelConverter.Config.DEFAULT);
// unknown ? TODO
// root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false);
// root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
// root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel));
//
// return root;
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private RexBuilder createRexBuilder() {
return new RexBuilder(this.typeFactory);
}
private CalciteCatalogReader createCatalogReader() {
SchemaPlus rootSchema = FlinkPlannerImpl.rootSchema(this.defaultSchema);
Properties prop = new Properties();
prop.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
String.valueOf(parserConfig.caseSensitive()));
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(prop);
return new CalciteCatalogReader(
CalciteSchema.from(rootSchema),
CalciteSchema.from(this.defaultSchema).path(null),
this.typeFactory,
connectionConfig);
}
private static SchemaPlus rootSchema(SchemaPlus schema) {
if (schema.getParentSchema() == null) {
return schema;
} else {
return rootSchema(schema.getParentSchema());
}
}
private void ready() {
if (this.traitDefs != null) {
this.planner.clearRelTraitDefs();
for (RelTraitDef traitDef : this.traitDefs) {
this.planner.addRelTraitDef(traitDef);
}
}
}
public FlinkSqlValidator getValidator() {
return validator;
}
}