blob: 997312fd0f9f91b9772adb850cbfb21a72e97d39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.runner;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.interfaces.UdfResolver;
import org.apache.samza.sql.testutil.JsonUtil;
import org.apache.samza.sql.testutil.ReflectionUtils;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
import org.apache.samza.sql.testutil.SqlFileParser;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class representing the Samza SQL application config
*/
public class SamzaSqlApplicationConfig {
private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplicationConfig.class);
public static final String CFG_SQL_STMT = "samza.sql.stmt";
public static final String CFG_SQL_STMTS_JSON = "samza.sql.stmts.json";
public static final String CFG_SQL_FILE = "samza.sql.sqlFile";
public static final String CFG_UDF_CONFIG_DOMAIN = "samza.sql.udf";
public static final String CFG_FACTORY = "factory";
public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = "samza.sql.relSchemaProvider.%s.";
public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = "samza.sql.relConverter.%s.";
public static final String CFG_IO_RESOLVER = "samza.sql.ioResolver";
public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = "samza.sql.ioResolver.%s.";
public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins.
private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
private SqlIOResolver ioResolver;
private UdfResolver udfResolver;
private final Collection<UdfMetadata> udfMetadata;
private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
private final List<String> sql;
private final List<QueryInfo> queryInfo;
private final long windowDurationMs;
public SamzaSqlApplicationConfig(Config staticConfig) {
sql = fetchSqlFromConfig(staticConfig);
queryInfo = fetchQueryInfo(sql);
ioResolver = createIOResolver(staticConfig);
udfResolver = createUdfResolver(staticConfig);
udfMetadata = udfResolver.getUdfs();
inputSystemStreamConfigBySource = queryInfo.stream()
.map(QueryInfo::getSources)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo));
Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
outputSystemStreamConfigsBySource = queryInfo.stream()
.map(QueryInfo::getSink)
.distinct()
.collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo));
systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
relSchemaProvidersBySource = systemStreamConfigs.stream()
.collect(Collectors.toMap(SqlIOConfig::getSource,
x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
(o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
samzaRelConvertersBySource = systemStreamConfigs.stream()
.collect(Collectors.toMap(SqlIOConfig::getSource,
x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
relSchemaProvidersBySource.get(x.getSource()), c))));
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
}
private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) {
String pluginDomain = String.format(pluginDomainFormat, plugin);
Config pluginConfig = staticConfig.subset(pluginDomain);
String factoryName = pluginConfig.getOrDefault(CFG_FACTORY, "");
Validate.notEmpty(factoryName, String.format("Factory is not set for %s", plugin));
Object factory = ReflectionUtils.createInstance(factoryName);
Validate.notNull(factory, String.format("Factory creation failed for %s", plugin));
LOG.info("Instantiating {} using factory {} with props {}", pluginName, factoryName, pluginConfig);
return factoryInvoker.apply(factory, pluginConfig);
}
public static List<QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}
public static List<String> fetchSqlFromConfig(Map<String, String> config) {
List<String> sql;
if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
String sqlValue = config.get(CFG_SQL_STMT);
sql = Collections.singletonList(sqlValue);
} else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
} else if (config.containsKey(CFG_SQL_FILE)) {
String sqlFile = config.get(CFG_SQL_FILE);
sql = SqlFileParser.parseSqlFile(sqlFile);
} else {
String msg = "Config doesn't contain the SQL that needs to be executed.";
LOG.error(msg);
throw new SamzaException(msg);
}
return sql;
}
private static List<String> deserializeSqlStmts(String value) {
Validate.notEmpty(value, "json Value is not set or empty");
return JsonUtil.fromJson(value, new TypeReference<List<String>>() {
});
}
public static String serializeSqlStmts(List<String> sqlStmts) {
Validate.notEmpty(sqlStmts, "json Value is not set or empty");
return JsonUtil.toJson(sqlStmts);
}
public static SqlIOResolver createIOResolver(Config config) {
String sourceResolveValue = config.get(CFG_IO_RESOLVER);
Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty");
return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
(o, c) -> ((SqlIOResolverFactory) o).create(c, config));
}
private UdfResolver createUdfResolver(Map<String, String> config) {
String udfResolveValue = config.get(CFG_UDF_RESOLVER);
Validate.notEmpty(udfResolveValue, "udfResolver config is not set or empty");
HashMap<String, String> domainConfig =
getDomainProperties(config, String.format(CFG_FMT_UDF_RESOLVER_DOMAIN, udfResolveValue), false);
Properties props = new Properties();
props.putAll(domainConfig);
HashMap<String, String> udfConfig = getDomainProperties(config, CFG_UDF_CONFIG_DOMAIN, false);
return new ConfigBasedUdfResolver(props, new MapConfig(udfConfig));
}
private static HashMap<String, String> getDomainProperties(Map<String, String> props, String prefix,
boolean preserveFullKey) {
String fullPrefix;
if (StringUtils.isBlank(prefix)) {
fullPrefix = ""; // this will effectively retrieve all properties
} else {
fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
}
HashMap<String, String> ret = new HashMap<>();
props.keySet().forEach(keyStr -> {
if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
if (preserveFullKey) {
ret.put(keyStr, props.get(keyStr));
} else {
ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
}
}
});
return ret;
}
public List<String> getSql() {
return sql;
}
public List<QueryInfo> getQueryInfo() {
return queryInfo;
}
public Collection<UdfMetadata> getUdfMetadata() {
return udfMetadata;
}
public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() {
return inputSystemStreamConfigBySource;
}
public Map<String, SqlIOConfig> getOutputSystemStreamConfigsBySource() {
return outputSystemStreamConfigsBySource;
}
public Map<String, SamzaRelConverter> getSamzaRelConverters() {
return samzaRelConvertersBySource;
}
public Map<String, RelSchemaProvider> getRelSchemaProviders() {
return relSchemaProvidersBySource;
}
public SqlIOResolver getIoResolver() {
return ioResolver;
}
public long getWindowDurationMs() {
return windowDurationMs;
}
}