blob: f0d68e089b5d6a5a84f8fdca500ae133511f469f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.seatunnel.config.sql;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.common.utils.ParserException;
import org.apache.seatunnel.config.sql.model.BaseConfig;
import org.apache.seatunnel.config.sql.model.Option;
import org.apache.seatunnel.config.sql.model.SeaTunnelConfig;
import org.apache.seatunnel.config.sql.model.SinkConfig;
import org.apache.seatunnel.config.sql.model.SourceConfig;
import org.apache.seatunnel.config.sql.model.TransformConfig;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_DELIMITER;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_DOUBLE_SINGLE_QUOTES;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_KV_DELIMITER;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_RESULT_TABLE_NAME_KEY;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_SINGLE_QUOTES;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_SOURCE_TABLE_NAME_KEY;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_TABLE_CONNECTOR_KEY;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_TABLE_TYPE_KEY;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_TABLE_TYPE_SINK;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_TABLE_TYPE_SOURCE;
import static org.apache.seatunnel.config.sql.utils.Constant.OPTION_TABLE_TYPE_TRANSFORM;
import static org.apache.seatunnel.config.sql.utils.Constant.SQL_ANNOTATION_PREFIX;
import static org.apache.seatunnel.config.sql.utils.Constant.SQL_ANNOTATION_PREFIX2;
import static org.apache.seatunnel.config.sql.utils.Constant.SQL_ANNOTATION_SUFFIX;
import static org.apache.seatunnel.config.sql.utils.Constant.SQL_CONFIG_ANNOTATION_PREFIX;
import static org.apache.seatunnel.config.sql.utils.Constant.SQL_DELIMITER;
import static org.apache.seatunnel.config.sql.utils.Constant.TEMP_TABLE_SUFFIX;
@Slf4j
public class SqlConfigBuilder {
public static Config of(@NonNull Path sqlFilePath) {
try {
List<String> lines = Files.readAllLines(sqlFilePath);
Map<String, BaseConfig> sqlTables = new LinkedHashMap<>();
SeaTunnelConfig seaTunnelConfig = new SeaTunnelConfig();
List<String> sqlLines = parseAnnoConfigAndSqlLine(lines, seaTunnelConfig);
// Split SQL
List<String> sqlList = split4SqlList(sqlLines);
for (Iterator<String> it = sqlList.iterator(); it.hasNext(); ) {
String sql = it.next();
Statement statement = CCJSqlParserUtil.parse(sql);
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
if (createTable.getTableOptionsStrings() == null) {
continue;
}
parseCreateTableSql(createTable, sqlTables, seaTunnelConfig);
it.remove();
}
}
AtomicInteger tempTableIndex = new AtomicInteger(1);
for (String sql : sqlList) {
Statement statement = CCJSqlParserUtil.parse(sql);
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
TransformConfig transformConfig = parseCreateAsSql(createTable, sqlTables);
seaTunnelConfig.getTransformConfigs().add(transformConfig);
} else if (statement instanceof Insert) {
parseInsertSql((Insert) statement, sqlTables, seaTunnelConfig, tempTableIndex);
} else {
throw new ParserException(
String.format("Unsupported SQL syntax: %s", statement));
}
}
// filter out the sink config without 'source_table_name' option
seaTunnelConfig.setSinkConfigs(
seaTunnelConfig.getSinkConfigs().stream()
.filter(
sinkConfig -> {
boolean containSourceTable = false;
for (Option option : sinkConfig.getOptions()) {
if (option.getKey()
.equals(OPTION_SOURCE_TABLE_NAME_KEY)) {
containSourceTable = true;
break;
}
}
return containSourceTable;
})
.collect(Collectors.toList()));
if (seaTunnelConfig.getSourceConfigs().isEmpty()) {
throw new ParserException("The SQL config must contain at least one source table");
}
if (seaTunnelConfig.getSinkConfigs().isEmpty()) {
throw new ParserException(
"The SQL config must contain `INSERT INTO ... SELECT ...` syntax");
}
// render to hocon config
String configContent = ConfigTemplate.generate(seaTunnelConfig);
log.debug("Generated config: \n{}", configContent);
return ConfigFactory.parseString(configContent);
} catch (ParserException e) {
throw e;
} catch (Exception e) {
throw new ParserException(e);
}
}
private static List<String> parseAnnoConfigAndSqlLine(
List<String> lines, SeaTunnelConfig seaTunnelConfig) {
List<String> sqlLines = new ArrayList<>();
List<String> annotationConfigs = new ArrayList<>();
boolean annoConfig = false;
boolean anno = false;
StringJoiner annotationConfig = new StringJoiner("\n");
for (String line : lines) {
if (line.trim().startsWith(SQL_ANNOTATION_PREFIX2)) {
continue;
}
if (line.trim().equals(SQL_CONFIG_ANNOTATION_PREFIX)) {
annoConfig = true;
continue;
}
if (line.trim().startsWith(SQL_ANNOTATION_PREFIX)) {
anno = true;
continue;
}
if (anno) {
if (line.trim().equals(SQL_ANNOTATION_SUFFIX)) {
anno = false;
}
} else if (annoConfig) {
if (line.trim().equals(SQL_ANNOTATION_SUFFIX)) {
annoConfig = false;
annotationConfigs.add(annotationConfig.toString());
annotationConfig = new StringJoiner("\n");
} else {
annotationConfig.add(line);
}
} else {
if (StringUtils.isNotEmpty(line.trim())) {
sqlLines.add(line);
}
}
}
seaTunnelConfig.getEnvConfigs().addAll(annotationConfigs);
return sqlLines;
}
private static List<String> split4SqlList(List<String> sqlLines) {
List<String> sqlList = new ArrayList<>();
StringJoiner sqlSj = new StringJoiner(" ");
for (String line : sqlLines) {
line = line.trim();
int commentIdx = line.indexOf(" " + SQL_ANNOTATION_PREFIX2);
if (commentIdx > -1) {
line = line.substring(0, commentIdx);
}
if (line.endsWith(SQL_DELIMITER)) {
line = line.substring(0, line.length() - 1);
sqlSj.add(line);
sqlList.add(sqlSj.toString());
sqlSj = new StringJoiner(" ");
} else {
sqlSj.add(line);
}
}
return sqlList;
}
private static void parseCreateTableSql(
CreateTable createTable,
Map<String, BaseConfig> sqlTables,
SeaTunnelConfig seaTunnelConfig) {
Map<String, String> optionsMap = parseOptions(createTable);
String tableName = createTable.getTable().getName();
if (sqlTables.containsKey(tableName)) {
throw new ParserException(String.format("Table name duplicate: %s", tableName));
}
String type = optionsMap.get(OPTION_TABLE_TYPE_KEY);
if (OPTION_TABLE_TYPE_SOURCE.equalsIgnoreCase(type)) {
SourceConfig sourceConfig = parseSourceSql(createTable, optionsMap);
sqlTables.put(tableName, sourceConfig);
seaTunnelConfig.getSourceConfigs().add(sourceConfig);
} else if (OPTION_TABLE_TYPE_SINK.equalsIgnoreCase(type)) {
SinkConfig sinkConfig = parseSinkSql(optionsMap);
sqlTables.put(tableName, sinkConfig);
seaTunnelConfig.getSinkConfigs().add(sinkConfig);
}
}
private static Map<String, String> parseOptions(CreateTable createTable) {
String options = createTable.getTableOptionsStrings().get(1);
options = options.substring(0, options.length() - 1).substring(1);
String[] optionItems = options.split(OPTION_DELIMITER);
Map<String, String> optionsMap = new LinkedHashMap<>();
for (String optionItem : optionItems) {
int idx = optionItem.indexOf(OPTION_KV_DELIMITER);
if (idx < 0) {
continue;
}
String key = clean(optionItem.substring(0, idx).trim());
String value = clean(optionItem.substring(idx + 1).trim());
optionsMap.put(key, value);
}
return optionsMap;
}
private static SourceConfig parseSourceSql(
CreateTable createTable, Map<String, String> options) {
String connector = options.get(OPTION_TABLE_CONNECTOR_KEY);
if (StringUtils.isEmpty(connector)) {
throw new ParserException("The connector of option is none");
}
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setConnector(connector);
String resultTableName = createTable.getTable().getName();
sourceConfig.setResultTableName(resultTableName);
convertOptions(options, sourceConfig.getOptions());
sourceConfig
.getOptions()
.add(Option.of(OPTION_RESULT_TABLE_NAME_KEY, "\"" + resultTableName + "\""));
return sourceConfig;
}
private static SinkConfig parseSinkSql(Map<String, String> options) {
String connector = options.get(OPTION_TABLE_CONNECTOR_KEY);
if (StringUtils.isEmpty(connector)) {
throw new ParserException("The connector of option is none");
}
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setConnector(connector);
// original sink table without source_table_name
options.remove(OPTION_SOURCE_TABLE_NAME_KEY);
convertOptions(options, sinkConfig.getOptions());
return sinkConfig;
}
private static void convertOptions(Map<String, String> options, Collection<Option> optionList) {
options.forEach(
(k, v) -> {
if (OPTION_TABLE_CONNECTOR_KEY.equalsIgnoreCase(k)
|| OPTION_TABLE_TYPE_KEY.equalsIgnoreCase(k)
|| OPTION_RESULT_TABLE_NAME_KEY.equalsIgnoreCase(k)) {
return;
}
String trimVal = v.trim();
// if not sub-config
if (!(trimVal.startsWith("{") && trimVal.endsWith("}"))
&& !(trimVal.startsWith("[") && trimVal.endsWith("]"))) {
v = "\"" + v + "\"";
}
Option option = Option.of(k, v);
optionList.add(option);
});
}
private static TransformConfig parseCreateAsSql(
CreateTable createTable, Map<String, BaseConfig> sqlTables) {
Select select = createTable.getSelect();
if (select != null) {
TransformConfig transformConfig = new TransformConfig();
PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
Table table = (Table) plainSelect.getFromItem();
String sourceTableName = table.getName();
if (!sqlTables.containsKey(sourceTableName)) {
throw new ParserException(
String.format("The source table[%s] is not found", sourceTableName));
}
String resultTableName = createTable.getTable().getName();
if (sqlTables.containsKey(resultTableName)) {
throw new ParserException(
String.format("Table name duplicate: %s", resultTableName));
}
sqlTables.put(resultTableName, transformConfig);
String query = select.toString();
transformConfig.setSourceTableName(sourceTableName);
transformConfig.setResultTableName(resultTableName);
transformConfig.setQuery(query);
return transformConfig;
} else {
throw new ParserException(String.format("Unsupported syntax: %s", createTable));
}
}
private static void parseInsertSql(
Insert insertSql,
Map<String, BaseConfig> sqlTables,
SeaTunnelConfig seaTunnelConfig,
AtomicInteger tempTableIndex) {
if (insertSql.getColumns() != null && !insertSql.getColumns().isEmpty()) {
throw new ParserException("Insert sql must not have columns");
}
TransformConfig transformConfig = new TransformConfig();
Select select = insertSql.getSelect();
if (select == null
|| select.getSelectBody() == null
|| !(select.getSelectBody() instanceof PlainSelect)) {
throw new ParserException("Insert sql must have select statement");
}
String targetTableName = insertSql.getTable().getName();
if (select.getSelectBody() instanceof PlainSelect) {
PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
String sourceTableName;
String resultTableName;
if (plainSelect.getFromItem() == null) {
List<SelectItem> selectItems = plainSelect.getSelectItems();
if (selectItems.size() != 1) {
throw new ParserException(
"Source table must be specified in SQL: " + insertSql);
}
SelectExpressionItem selectItem = (SelectExpressionItem) selectItems.get(0);
Column column = (Column) selectItem.getExpression();
sourceTableName = column.getColumnName();
resultTableName = sourceTableName;
} else {
if (!(plainSelect.getFromItem() instanceof Table)) {
throw new ParserException("Unsupported syntax: " + insertSql);
}
Table table = (Table) plainSelect.getFromItem();
sourceTableName = table.getName();
resultTableName =
sourceTableName + TEMP_TABLE_SUFFIX + tempTableIndex.getAndIncrement();
String query = select.toString();
transformConfig.setSourceTableName(sourceTableName);
transformConfig.setResultTableName(resultTableName);
transformConfig.setQuery(query);
seaTunnelConfig.getTransformConfigs().add(transformConfig);
}
if (!sqlTables.containsKey(sourceTableName)
|| (!OPTION_TABLE_TYPE_SOURCE.equalsIgnoreCase(
sqlTables.get(sourceTableName).getType())
&& !OPTION_TABLE_TYPE_TRANSFORM.equalsIgnoreCase(
sqlTables.get(sourceTableName).getType()))) {
throw new ParserException(
String.format("The source table[%s] is not found", sourceTableName));
}
if (!sqlTables.containsKey(targetTableName)
|| !OPTION_TABLE_TYPE_SINK.equalsIgnoreCase(
sqlTables.get(targetTableName).getType())) {
throw new ParserException(
String.format("The sink table[%s] is not found", sourceTableName));
}
SinkConfig sinkConfig = (SinkConfig) sqlTables.get(targetTableName);
SinkConfig sinkConfigNew = new SinkConfig();
sinkConfigNew.setConnector(sinkConfig.getConnector());
sinkConfigNew.setSourceTableName(resultTableName);
sinkConfigNew.getOptions().addAll(sinkConfig.getOptions());
sinkConfigNew
.getOptions()
.add(Option.of(OPTION_SOURCE_TABLE_NAME_KEY, "\"" + resultTableName + "\""));
seaTunnelConfig.getSinkConfigs().add(sinkConfigNew);
} else {
throw new ParserException("Unsupported syntax: " + insertSql);
}
}
private static String clean(String val) {
if (val.startsWith(OPTION_SINGLE_QUOTES)) {
val = val.substring(1);
}
if (val.endsWith(OPTION_SINGLE_QUOTES)) {
val = val.substring(0, val.length() - 1);
}
val = val.replace(OPTION_DOUBLE_SINGLE_QUOTES, OPTION_SINGLE_QUOTES);
return val;
}
}