blob: acc5b4232186bd1116a1528909a1643e84cae12c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.dsl;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelRoot;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.DslConverter;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.sql.util.SqlFileParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SamzaSqlDslConverter implements DslConverter {
private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlDslConverter.class);
private final Config config;
SamzaSqlDslConverter(Config config) {
this.config = config;
}
@Override
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql statements
List<String> sqlStmts = fetchSqlFromConfig(config);
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
QueryPlanner planner = getQueryPlanner(getSqlConfig(Collections.singletonList(sql), config));
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict comformance of the destination schema with that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
relRoots.add(relRoot);
}
return relRoots;
}
/**
* Get {@link SamzaSqlApplicationConfig} given sql statements and samza config.
* @param sqlStmts List of sql statements
* @param config Samza config
* @return {@link SamzaSqlApplicationConfig}
*/
public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts, Config config) {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
return new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
.collect(Collectors.toList()));
}
/**
* Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig}
* @param sqlConfig {@link SamzaSqlApplicationConfig}
* @return {@link QueryPlanner}
*/
public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
}
/**
* Get list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql statements.
* @param sqlStmts list of sql statements
* @return list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo}
*/
public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}
/**
* Get list of sql statements based on the property set in the config.
* @param config config
* @return list of Sql statements
*/
public static List<String> fetchSqlFromConfig(Map<String, String> config) {
List<String> sql;
if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
StringUtils.isNotBlank(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMT))) {
String sqlValue = config.get(SamzaSqlApplicationConfig.CFG_SQL_STMT);
sql = Collections.singletonList(sqlValue);
} else if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON) &&
StringUtils.isNotBlank(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON))) {
sql = SamzaSqlApplicationConfig.deserializeSqlStmts(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON));
} else if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_FILE)) {
String sqlFile = config.get(SamzaSqlApplicationConfig.CFG_SQL_FILE);
sql = SqlFileParser.parseSqlFile(sqlFile);
} else {
String msg = "Config doesn't contain the SQL that needs to be executed.";
LOG.error(msg);
throw new SamzaException(msg);
}
return sql;
}
}