blob: 452575be4539eaf00b80aa799303accdc05935fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.utils;
import org.apache.flink.sql.parser.ddl.SqlAnalyzeTable;
import org.apache.flink.sql.parser.ddl.SqlColumnType;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlNodeInfo;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.plan.FlinkPlannerImpl;
import org.apache.flink.sql.parser.plan.SqlParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.BatchTableEnvironment;
import org.apache.flink.table.api.Column;
import org.apache.flink.table.api.RichTableSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.functions.UserDefinedFunction;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.DecimalType;
import org.apache.flink.table.api.types.GenericType;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ReadableWritableCatalog;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.errorcode.TableErrors;
import org.apache.flink.table.plan.stats.AnalyzeStatistic;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.runtime.functions.python.PythonUDFUtil;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.util.StringUtils;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.config.Lex;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlProperty;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Util to transform a sql context (a sequence of sql statements) into a flink job.
*/
public class SqlJobUtil {
private static final Logger LOG = LoggerFactory.getLogger(SqlJobUtil.class);
/**
* Register an external table to table environment.
* @param tableEnv table environment
* @param sqlNodeInfo external table defined in ddl.
*/
public static void registerExternalTable(TableEnvironment tableEnv, SqlNodeInfo sqlNodeInfo) {
final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNodeInfo.getSqlNode();
final String tableName = sqlCreateTable.getTableName().toString();
// set with properties
SqlNodeList propertyList = sqlCreateTable.getPropertyList();
Map<String, String> properties = new HashMap<>();
for (SqlNode sqlNode : propertyList) {
SqlProperty sqlProperty = (SqlProperty) sqlNode;
properties.put(
sqlProperty.getKeyString(), sqlProperty.getValueString());
}
String tableType = properties.get("type");
if (StringUtils.isNullOrWhitespaceOnly(tableType)) {
throw new SqlExecutionException("The type of CREATE TABLE is null");
}
TableSchema tableSchema = SqlJobUtil.getTableSchema(tableEnv, sqlCreateTable);
List<RexNode> exprList = SqlJobUtil.getComputedColumns(tableEnv, sqlCreateTable);
Map<String, RexNode> computedColumns = null;
if (exprList != null) {
computedColumns = new HashMap<>();
for (int i = 0; i < tableSchema.getColumns().length; i++) {
computedColumns.put(tableSchema.getColumnName(i), exprList.get(i));
}
}
String rowtimeField = null;
long offset = -1;
if (sqlCreateTable.getWatermark() != null) {
try {
rowtimeField = sqlCreateTable.getWatermark().getColumnName().toString();
offset = sqlCreateTable.getWatermark().getWatermarkOffset();
} catch (SqlParseException e) {
throw new SqlExecutionException(e.getMessage(), e);
}
}
boolean isStreaming = true;
if (tableEnv instanceof BatchTableEnvironment) {
isStreaming = false;
}
long now = System.currentTimeMillis();
CatalogTable catalogTable = new CatalogTable(
tableType,
tableSchema,
properties,
SqlJobUtil.createBlinkTableSchema(sqlCreateTable),
null,
"SQL Client Table", // TODO We don't have a table comment from DDL
null,
false,
computedColumns,
rowtimeField,
offset,
now,
now,
isStreaming);
ReadableWritableCatalog catalog = tableEnv.getDefaultCatalog();
// TODO: need to consider if a default db doesn't exist
catalog.createTable(
new ObjectPath(tableEnv.getDefaultDatabaseName(), tableName),
catalogTable,
false);
}
/**
* Configuration for sql parser.
*/
private static final SqlParser.Config PARSER_CONFIG = SqlParser.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setQuoting(Quoting.BACK_TICK)
.setQuotedCasing(Casing.UNCHANGED)
.setUnquotedCasing(Casing.UNCHANGED)
.setConformance(SqlConformanceEnum.DEFAULT)
.setIdentifierMaxLength(256)
.setLex(Lex.JAVA)
.build();
private static final SchemaPlus ROOT_SCHEMA = Frameworks.createRootSchema(true);
private static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks
.newConfigBuilder()
.defaultSchema(ROOT_SCHEMA)
.parserConfig(PARSER_CONFIG)
.typeSystem(RelDataTypeSystem.DEFAULT)
.build();
/**
* Parses a sql context as a list of {@link SqlNodeInfo}.
*
* @throws SqlParseException if there is any syntactic error
*/
public static List<SqlNodeInfo> parseSqlContext(String sqlContext) throws SqlParseException {
FlinkPlannerImpl planner = new FlinkPlannerImpl(FRAMEWORK_CONFIG);
List<SqlNodeInfo> sqlNodeList = planner.parseContext(sqlContext);
planner.validate(sqlNodeList);
return sqlNodeList;
}
/**
* Registers functions to the tableEnvironment.
*
* @param tableEnv the {@link TableEnvironment} of the sql job
* @param sqlNodeInfoList the parsed result of a sql context
* @return true or false
*/
public static boolean registerFunctions(
TableEnvironment tableEnv,
List<SqlNodeInfo> sqlNodeInfoList,
String userPyLibs) {
Map<String, String> pyUdfNameClass = new HashMap<>();
for (SqlNodeInfo sqlNodeInfo : sqlNodeInfoList) {
if (sqlNodeInfo.getSqlNode() instanceof SqlCreateFunction) {
SqlCreateFunction sqlCreateFunction = (SqlCreateFunction) sqlNodeInfo.getSqlNode();
String functionName = sqlCreateFunction.getFunctionName().toString();
boolean isPyUdf = sqlCreateFunction.getClassName().startsWith(PythonUDFUtil.PYUDF_PREFIX);
if (isPyUdf) {
String className = sqlCreateFunction.getClassName()
.substring(PythonUDFUtil.PYUDF_PREFIX.length());
pyUdfNameClass.put(functionName, className);
continue;
}
// Register in catalog
// TODO: [BLINK-18570607] re-enable register external functions in SqlJobUtil
// tableEnv.registerExternalFunction(
// null, functionName, sqlCreateFunction.getClassName(), false);
throw new UnsupportedOperationException(
"catalogs haven't support registering functions yet");
}
}
if (pyUdfNameClass.size() > 0) {
ArrayList<String> pyFiles = new ArrayList<>(Arrays.asList(userPyLibs.split(",")));
Map<String, UserDefinedFunction> pyUDFs =
PythonUDFUtil.registerPyUdfsToTableEnvironment(tableEnv, pyFiles, pyUdfNameClass);
// TODO How to handle the python function?
}
return true;
}
public static RichTableSchema createBlinkTableSchema(SqlCreateTable sqlCreateTable) {
if (!"SOURCE".equals(sqlCreateTable.getTableType())) {
if (sqlCreateTable.getWatermark() != null) {
throw new IllegalArgumentException(
TableErrors.INST.sqlTableTypeNotSupportWaterMark(
sqlCreateTable.getTableName().toString(),
sqlCreateTable.getTableType()));
}
if (sqlCreateTable.containsComputedColumn()) {
throw new IllegalArgumentException(
TableErrors.INST.sqlTableTypeNotSupportComputedCol(
sqlCreateTable.getTableName().toString(),
sqlCreateTable.getTableType()));
}
}
//set columnList
SqlNodeList columnList = sqlCreateTable.getColumnList();
int columnCount = columnList.size();
String[] columnNames = new String[columnCount];
boolean[] nullables = new boolean[columnCount];
InternalType[] columnTypes = new InternalType[columnCount];
List<String> headerFields = new ArrayList<>();
RichTableSchema schema;
if (!sqlCreateTable.containsComputedColumn()) {
// all column is SqlTableColumn
for (int i = 0; i < columnCount; i++) {
SqlTableColumn columnNode = (SqlTableColumn) columnList.get(i);
String columnName = columnNode.getName().getSimple();
columnNames[i] = columnName;
try {
columnTypes[i] = getInternalType(columnNode.getType());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
TableErrors.INST.sqlUnSupportedColumnType(
sqlCreateTable.getTableName().toString(),
columnName,
columnNode.getType().getTypeName().getSimple()));
}
nullables[i] = columnNode.getType().getNullable() == null ? true : columnNode.getType().getNullable();
if (columnNode.isHeader()) {
headerFields.add(columnName);
}
}
schema = new RichTableSchema(columnNames, columnTypes, nullables);
} else {
// some columns are computed column
List<String> originNameList = new ArrayList<>();
List<InternalType> originTypeList = new ArrayList<>();
List<Boolean> originNullableList = new ArrayList<>();
for (int i = 0; i < columnCount; i++) {
SqlNode node = columnList.get(i);
if (node instanceof SqlTableColumn) {
SqlTableColumn columnNode = (SqlTableColumn) columnList.get(i);
String columnName = columnNode.getName().getSimple();
try {
InternalType columnType = getInternalType(columnNode.getType());
originTypeList.add(columnType);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
TableErrors.INST.sqlUnSupportedColumnType(
sqlCreateTable.getTableName().toString(),
columnName,
columnNode.getType().getTypeName().getSimple()));
}
originNameList.add(columnName);
originNullableList.add(columnNode.getType().getNullable() == null ? true : columnNode.getType().getNullable());
if (columnNode.isHeader()) {
headerFields.add(columnName);
}
}
}
String[] originColumnNames = originNameList.toArray(new String[originNameList.size()]);
InternalType[] originColumnTypes = originTypeList.toArray(new InternalType[originTypeList.size()]);
boolean[] originNullables = new boolean[originNullableList.size()];
for (int i = 0; i < originNullables.length; i++) {
originNullables[i] = originNullableList.get(i);
}
schema = new RichTableSchema(originColumnNames, originColumnTypes, originNullables);
}
schema.setHeaderFields(headerFields);
//set primary key
if (sqlCreateTable.getPrimaryKeyList() != null) {
String[] primaryKeys = new String[sqlCreateTable.getPrimaryKeyList().size()];
for (int i = 0; i < primaryKeys.length; i++) {
primaryKeys[i] = sqlCreateTable.getPrimaryKeyList().get(i).toString();
}
schema.setPrimaryKey(primaryKeys);
}
//set unique key
List<SqlNodeList> uniqueKeyList = sqlCreateTable.getUniqueKeysList();
if (uniqueKeyList != null) {
List<List<String>> ukList = new ArrayList<>();
for (SqlNodeList uniqueKeys: uniqueKeyList) {
List<String> uk = new ArrayList<>();
for (int i = 0; i < uniqueKeys.size(); i++) {
uk.add(uniqueKeys.get(i).toString());
}
ukList.add(uk);
}
schema.setUniqueKeys(ukList);
}
//set index
List<SqlCreateTable.IndexWrapper> indexKeyList = sqlCreateTable.getIndexKeysList();
if (indexKeyList != null) {
List<RichTableSchema.Index> indexes = new ArrayList<>();
for (SqlCreateTable.IndexWrapper idx : indexKeyList) {
List<String> keyList = new ArrayList<>();
for (int i = 0; i < idx.indexKeys.size(); i++) {
keyList.add(idx.indexKeys.get(i).toString());
}
indexes.add(new RichTableSchema.Index(idx.unique, keyList));
}
schema.setIndexes(indexes);
}
return schema;
}
/**
* Analyze table statistics.
*
* @param tableEnv the {@link TableEnvironment} of the sql job
* @param sqlNodeInfoList the parsed result of a sql context
*/
public static void analyzeTableStats(TableEnvironment tableEnv, List<SqlNodeInfo> sqlNodeInfoList) {
for (SqlNodeInfo sqlNodeInfo : sqlNodeInfoList) {
if (sqlNodeInfo.getSqlNode() instanceof SqlAnalyzeTable) {
SqlAnalyzeTable sqlAnalyzeTable = (SqlAnalyzeTable) sqlNodeInfo.getSqlNode();
String[] tablePath = sqlAnalyzeTable.getTableName().names.toArray(new String[] {});
String[] columnNames = getColumnsToAnalyze(sqlAnalyzeTable);
TableStats tableStats = AnalyzeStatistic.generateTableStats(tableEnv, tablePath, columnNames);
tableEnv.alterTableStats(tablePath, tableStats);
}
}
}
private static String[] getColumnsToAnalyze(SqlAnalyzeTable analyzeTable) {
if (!analyzeTable.isWithColumns()) {
return new String[] {};
}
SqlNodeList columnList = analyzeTable.getColumnList();
int columnCount = columnList.size();
// analyze all columns or specified columns.
if (columnCount == 0) {
return new String[] {"*"};
}
String[] columnNames = new String[columnCount];
for (int i = 0; i < columnCount; i++) {
SqlIdentifier column = (SqlIdentifier) columnList.get(i);
columnNames[i] = column.getSimple();
}
return columnNames;
}
/**
* Maps a sql column type to a flink {@link InternalType}.
*
* @param type the sql column type
* @return the corresponding flink type
*/
public static InternalType getInternalType(SqlDataTypeSpec type) {
switch (SqlColumnType.getType(type.getTypeName().getSimple())) {
case BOOLEAN:
return DataTypes.BOOLEAN;
case TINYINT:
return DataTypes.BYTE;
case SMALLINT:
return DataTypes.SHORT;
case INT:
return DataTypes.INT;
case BIGINT:
return DataTypes.LONG;
case FLOAT:
return DataTypes.FLOAT;
case DECIMAL:
if (type.getPrecision() >= 0) {
return DecimalType.of(type.getPrecision(), type.getScale());
}
return DecimalType.USER_DEFAULT;
case DOUBLE:
return DataTypes.DOUBLE;
case DATE:
return DataTypes.DATE;
case TIME:
return DataTypes.TIME;
case TIMESTAMP:
return DataTypes.TIMESTAMP;
case VARCHAR:
return DataTypes.STRING;
case VARBINARY:
return DataTypes.BYTE_ARRAY;
case ANY:
return new GenericType<>(Object.class);
default:
LOG.warn("Unsupported sql column type: {}", type);
throw new IllegalArgumentException("Unsupported sql column type " + type + " !");
}
}
public static List<RexNode> getComputedColumns(
TableEnvironment tEnv, SqlCreateTable sqlCreateTable) {
RichTableSchema richTableSchema = createBlinkTableSchema(sqlCreateTable);
if (!sqlCreateTable.containsComputedColumn()) {
return null;
}
TableSchema originalSchema = new TableSchema(
richTableSchema.getColumnNames(),
richTableSchema.getColumnTypes(),
richTableSchema.getNullables());
String name = tEnv.createUniqueTableName();
tEnv.registerTableSource(
name, new MockTableSource(name, originalSchema));
String viewSql = "select " + sqlCreateTable.getColumnSqlString() + " from " + name;
Table viewTable = tEnv.sqlQuery(viewSql);
LogicalProject project = (LogicalProject) viewTable.getRelNode();
List<RexNode> exprs = project.getProjects();
return exprs;
}
public static TableSchema getTableSchema(TableEnvironment tEnv, SqlCreateTable sqlCreateTable) {
RichTableSchema richTableSchema = createBlinkTableSchema(sqlCreateTable);
String rowtimeField = null;
if (sqlCreateTable.getWatermark() != null) {
rowtimeField = sqlCreateTable.getWatermark().getColumnName().toString();
}
TableSchema.Builder builder = TableSchema.builder();
if (sqlCreateTable.containsComputedColumn()) {
TableSchema originalSchema = new TableSchema(
richTableSchema.getColumnNames(),
richTableSchema.getColumnTypes(),
richTableSchema.getNullables());
String name = tEnv.createUniqueTableName();
tEnv.registerTableSource(
name, new MockTableSource(name, originalSchema));
String viewSql = "select " + sqlCreateTable.getColumnSqlString() + " from " + name;
Table viewTable = tEnv.sqlQuery(viewSql);
TableSchema schemaWithComputedColumn = viewTable.getSchema();
for (int i = 0; i < schemaWithComputedColumn.getColumns().length; i++) {
Column col = schemaWithComputedColumn.getColumn(i);
if (col.name().equals(rowtimeField)) {
builder.field(
rowtimeField,
DataTypes.ROWTIME_INDICATOR,
false);
} else {
builder.field(
col.name(),
col.internalType(),
col.isNullable());
}
}
} else {
for (int i = 0; i < richTableSchema.getColumnNames().length; i++) {
if (richTableSchema.getColumnNames()[i].equals(rowtimeField)) {
builder.field(rowtimeField, DataTypes.ROWTIME_INDICATOR, false);
} else {
builder.field(
richTableSchema.getColumnNames()[i],
richTableSchema.getColumnTypes()[i],
richTableSchema.getNullables()[i]);
}
}
}
builder.primaryKey(richTableSchema.getPrimaryKeys().stream().toArray(String[]::new));
for (List<String> uniqueKey: richTableSchema.getUniqueKeys()) {
builder.uniqueIndex(uniqueKey.stream().toArray(String[]::new));
}
return builder.build();
}
private static class MockTableSource
implements BatchTableSource<BaseRow>, StreamTableSource<BaseRow> {
private String name;
private TableSchema schema;
public MockTableSource(String name, TableSchema tableSchema) {
this.name = name;
this.schema = tableSchema;
}
@Override
public DataStream<BaseRow> getBoundedStream(StreamExecutionEnvironment streamEnv) {
return null;
}
@Override
public DataType getReturnType() {
return DataTypes.createRowType(schema.getTypes(), schema.getColumnNames());
}
@Override
public TableSchema getTableSchema() {
return schema;
}
@Override
public String explainSource() {
return name;
}
@Override
public TableStats getTableStats() {
return null;
}
@Override
public DataStream<BaseRow> getDataStream(StreamExecutionEnvironment execEnv) {
return null;
}
}
}