blob: e6a92f1b0668535f402a12ecc425b622bde618f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.streams.config;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueType;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* ComponentConfigurator supplies serializable configuration beans derived from a specified typesafe path or object.
*
* <p></p>
* Typically a component will select a 'default' typesafe path to be used if no other path or object is provided.
*
* <p></p>
* For example, streams-persist-elasticsearch will use 'elasticsearch' by default, but an implementation
* such as github.com/apache/streams-examples/local/elasticsearch-reindex
* can resolve a reader from elasticsearch.source
* and a writer from elasticsearch.destination.
*
*/
public class ComponentConfigurator<T extends Serializable> {
private Class<T> configClass;
public ComponentConfigurator(Class<T> configClass) {
this.configClass = configClass;
}
private static final Logger LOGGER = LoggerFactory.getLogger(ComponentConfigurator.class);
private static final ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, false);
/**
* resolve a serializable configuration pojo.
*
* the entire object, or fragments of it, will be collected and merged from:
* - the simple class name of the configured class
* - the fully qualified class name of the configured class
* - any of the ancestor classes of the configured class
* - the configured class's package
* - any of the parent packages of the configured class's package
*
* @return result
*/
public T detectConfiguration() {
Config rootConfig = StreamsConfigurator.getConfig();
return detectConfiguration(rootConfig, "");
}
/**
* resolve a serializable configuration pojo compiled from a specified path on a provided configuration tree.
*
* @return result
*/
public T detectConfiguration(Config typesafe, String path) {
T pojoConfig = null;
Config rootConfig = typesafe;
Config cascadeConfig = null;
String[] canonicalNameParts = StringUtils.split(configClass.getCanonicalName(), '.');
/*
* Resolve from any of the parent packages of the configured class's package
*/
for( int partIndex = 1; partIndex < canonicalNameParts.length; partIndex++) {
String[] partialPathParts = ArrayUtils.subarray(canonicalNameParts, 0, partIndex);
String partialPath = StringUtils.join(partialPathParts, '.');
LOGGER.debug("partialPath: ", partialPath);
if( rootConfig.hasPath(partialPath) ) {
Config partialPathConfig = rootConfig.getConfig(partialPath);
partialPathConfig = valuesOnly(partialPathConfig);
if (!partialPathConfig.isEmpty() && !partialPathConfig.root().isEmpty()) {
if (cascadeConfig == null) {
cascadeConfig = partialPathConfig;
} else {
cascadeConfig = partialPathConfig.withFallback(cascadeConfig);
}
}
}
}
/*
* Resolve from any of the superclasses of the configured class
*/
List<Class> superclasses = getSuperClasses(configClass);
for( Class superclass : superclasses) {
String superclassCanonicalName = superclass.getCanonicalName();
if( rootConfig.hasPath(superclassCanonicalName)) {
Config superclassConfig = rootConfig.getConfig(superclassCanonicalName);
if (cascadeConfig == null) {
cascadeConfig = superclassConfig;
} else {
cascadeConfig = superclassConfig.withFallback(cascadeConfig);
}
}
}
/*
* Resolve from any of the configured class simple name
*/
if( rootConfig.hasPath(configClass.getSimpleName()) ) {
Config simpleNameConfig = rootConfig.getConfig(configClass.getSimpleName());
if( cascadeConfig == null ) {
cascadeConfig = simpleNameConfig;
} else {
cascadeConfig = simpleNameConfig.withFallback(cascadeConfig);
}
}
/*
* Resolve from any of the configured class canonical name
*/
if( rootConfig.hasPath(configClass.getCanonicalName()) ) {
Config canonicalNameConfig = rootConfig.getConfig(configClass.getCanonicalName());
if( cascadeConfig == null ) {
cascadeConfig = canonicalNameConfig;
} else {
cascadeConfig = canonicalNameConfig.withFallback(cascadeConfig);
}
}
if( StringUtils.isNotBlank(path) && !StringUtils.startsWith(path, "$") && rootConfig.hasPath(path) ) {
Config pathConfig = rootConfig.getConfig(path);
if( cascadeConfig == null ) {
cascadeConfig = pathConfig;
} else {
cascadeConfig = pathConfig.withFallback(cascadeConfig);
}
}
else if( path == null || StringUtils.isBlank(path) ) {
if( cascadeConfig == null ) {
cascadeConfig = rootConfig;
} else {
cascadeConfig = valuesOnly(rootConfig).withFallback(cascadeConfig);
}
}
if( cascadeConfig != null && cascadeConfig.root() != null) {
try {
pojoConfig = mapper.readValue(cascadeConfig.root().render(ConfigRenderOptions.concise()), configClass);
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.warn("Could not parse:", cascadeConfig);
}
}
return pojoConfig;
}
/**
* resolve a serializable configuration pojo from a portion of the JVM config object.
*
* @param path path
* @return result
*/
public T detectConfiguration(String path) {
Config streamsConfig = StreamsConfigurator.getConfig();
return detectConfiguration(streamsConfig, path);
}
/**
* resolve a serializable configuration pojo from a portion of the JVM config object.
*
* @param config config
* @return result
*/
public T detectConfiguration(Config config) {
return detectConfiguration(config, null);
}
/**
* inspect and return class hierarchy in order from furthest to nearest ancestor.
*
* @param clazz class to analyze
* @return result
*/
public static List<Class> getSuperClasses(Class clazz) {
List<Class> classList = new ArrayList<Class>();
Class superclass = clazz.getSuperclass();
while (superclass != null && !superclass.isInterface() && superclass != java.lang.Object.class) {
classList.add(0, superclass);
superclass = superclass.getSuperclass();
}
return classList;
}
/**
* returns a version of a config with only values - no nested objects.
*
* @param config config
* @return result
*/
private Config valuesOnly(Config config) {
for (String key : config.root().keySet()) {
LOGGER.debug("key: ", key);
try {
ConfigValue value = config.getValue(key);
LOGGER.debug("child type: ", value.valueType());
if (value.valueType() == ConfigValueType.OBJECT) {
config = config.withoutPath(key);
}
} catch(ConfigException configExceptions) {
LOGGER.debug("error processing");
config = config.withoutPath("\""+key+"\"");
}
}
return config;
}
}