blob: 5b67aa52545cb18dbafb0c862c80eb6e60263bf3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.configuration;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.common.utils.JSONUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
public class IndexingConfigurations extends Configurations {
public static final String BATCH_SIZE_CONF = "batchSize";
public static final String BATCH_TIMEOUT_CONF = "batchTimeout";
public static final String ENABLED_CONF = "enabled";
public static final String INDEX_CONF = "index";
public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter";
public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
if(ret == null) {
return emptyMapOnNonExistent?new HashMap<>():null;
}
else {
return ret;
}
}
public Map<String, Object> getSensorIndexingConfig(String sensorType) {
return getSensorIndexingConfig(sensorType, true);
}
public List<String> getTypes() {
List<String> ret = new ArrayList<>();
for(String keyedSensor : getConfigurations().keySet()) {
if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.INDEXING.getTypeName())) {
ret.add(keyedSensor.substring(ConfigurationType.INDEXING.getTypeName().length() + 1));
}
}
return ret;
}
public void delete(String sensorType) {
getConfigurations().remove(getKey(sensorType));
}
public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
String key = getKey(sensorType);
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(key);
if(ret == null) {
return new HashMap();
}
else {
Map<String, Object> writerConfig = (Map<String, Object>)ret.get(writerName);
return writerConfig != null?writerConfig:new HashMap<>();
}
}
public void updateSensorIndexingConfig(String sensorType, byte[] data) throws IOException {
updateSensorIndexingConfig(sensorType, new ByteArrayInputStream(data));
}
public void updateSensorIndexingConfig(String sensorType, InputStream io) throws IOException {
Map<String, Object> sensorIndexingConfig = JSONUtils.INSTANCE.load(io, JSONUtils.MAP_SUPPLIER);
updateSensorIndexingConfig(sensorType, sensorIndexingConfig);
}
public void updateSensorIndexingConfig(String sensorType, Map<String, Object> sensorIndexingConfig) {
getConfigurations().put(getKey(sensorType), sensorIndexingConfig);
}
public static String getKey(String sensorType) {
return ConfigurationType.INDEXING.getTypeName() + "." + sensorType;
}
public boolean isDefault(String sensorName, String writerName) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorName));
if(ret == null) {
return true;
}
else {
Map<String, Object> writerConfig = (Map<String, Object>)ret.get(writerName);
return writerConfig != null?false:true;
}
}
public int getBatchSize(String sensorName, String writerName ) {
return getBatchSize(getSensorIndexingConfig(sensorName, writerName));
}
public int getBatchTimeout(String sensorName, String writerName ) {
return getBatchTimeout(getSensorIndexingConfig(sensorName, writerName));
}
/**
* Returns all configured values of batchTimeout, for all configured sensors,
* but only for the specific writer identified by {@param writerName}. So, if it is
* an hdfs writer, it will return the batchTimeouts for hdfs writers for all the sensors.
* The goal is to return to a {@link org.apache.metron.common.bolt.ConfiguredBolt}
* the set of all and only batchTimeouts relevant to that ConfiguredBolt.
*
* @param writerName
* @return list of integer batchTimeouts, one per configured sensor
*/
public List<Integer> getAllConfiguredTimeouts(String writerName) {
// The configuration infrastructure was not designed to enumerate sensors, so we synthesize.
// Since getKey is in this same class, we know we can pass it a null string to get the key prefix
// for all sensor types within this capability. We then enumerate all keys in configurations.keySet
// and select those that match the key prefix, as being sensor keys. The suffix substring of
// each such key is used as a sensor name to query the batchTimeout settings, if any.
String keyPrefixString = getKey("");
int prefixStringLength = keyPrefixString.length();
List<Integer> configuredBatchTimeouts = new ArrayList<>();
for (String sensorKeyString : getConfigurations().keySet()) {
if (sensorKeyString.startsWith(keyPrefixString)) {
String configuredSensorName = sensorKeyString.substring(prefixStringLength);
configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, writerName));
}
}
return configuredBatchTimeouts;
}
public String getIndex(String sensorName, String writerName) {
return getIndex(getSensorIndexingConfig(sensorName, writerName), sensorName);
}
public boolean isEnabled(String sensorName, String writerName) {
return isEnabled(getSensorIndexingConfig(sensorName, writerName));
}
public String getOutputPathFunction(String sensorName, String writerName) {
return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName);
}
public String getFieldNameConverter(String sensorName, String writerName) {
return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName);
}
public static boolean isEnabled(Map<String, Object> conf) {
return getAs( ENABLED_CONF
,conf
, true
, Boolean.class
);
}
public static int getBatchSize(Map<String, Object> conf) {
return getAs( BATCH_SIZE_CONF
,conf
, 1
, Integer.class
);
}
public static int getBatchTimeout(Map<String, Object> conf) {
return getAs( BATCH_TIMEOUT_CONF
,conf
, 0
, Integer.class
);
}
public static String getIndex(Map<String, Object> conf, String sensorName) {
return getAs( INDEX_CONF
,conf
, sensorName
, String.class
);
}
public static String getOutputPathFunction(Map<String, Object> conf, String sensorName) {
return getAs(OUTPUT_PATH_FUNCTION_CONF
,conf
, ""
, String.class
);
}
public static String getFieldNameConverter(Map<String, Object> conf, String sensorName) {
return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class);
}
public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(ENABLED_CONF, enabled);
return ret;
}
public static Map<String, Object> setBatchSize(Map<String, Object> conf, int batchSize) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(BATCH_SIZE_CONF, batchSize);
return ret;
}
public static Map<String, Object> setBatchTimeout(Map<String, Object> conf, int batchTimeout) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(BATCH_TIMEOUT_CONF, batchTimeout);
return ret;
}
public static Map<String, Object> setIndex(Map<String, Object> conf, String index) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(INDEX_CONF, index);
return ret;
}
public static Map<String, Object> setFieldNameConverter(Map<String, Object> conf, String index) {
Map<String, Object> ret = conf == null ? new HashMap<>(): conf;
ret.put(FIELD_NAME_CONVERTER_CONF, index);
return ret;
}
public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
}
}