blob: a47c71f451238feddddd4412903beb767770f9d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder;
import java.io.BufferedInputStream;
import java.io.File;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputManager;
import org.apache.ambari.logfeeder.input.InputSimulate;
import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.AliasUtil;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.SSLUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.google.gson.reflect.TypeToken;
public class LogFeeder {
private static final Logger LOG = Logger.getLogger(LogFeeder.class);
private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
private OutputManager outputManager = new OutputManager();
private InputManager inputManager = new InputManager();
private MetricsManager metricsManager = new MetricsManager();
public static Map<String, Object> globalConfigs = new HashMap<>();
private List<Map<String, Object>> inputConfigList = new ArrayList<>();
private List<Map<String, Object>> filterConfigList = new ArrayList<>();
private List<Map<String, Object>> outputConfigList = new ArrayList<>();
private long lastCheckPointCleanedMS = 0;
private boolean isLogfeederCompleted = false;
private Thread statLoggerThread = null;
private LogFeeder() {}
public void run() {
try {
init();
monitor();
waitOnAllDaemonThreads();
} catch (Throwable t) {
LOG.fatal("Caught exception in main.", t);
System.exit(1);
}
}
private void init() throws Throwable {
Date startTime = new Date();
loadConfigFiles();
addSimulatedInputs();
mergeAllConfigs();
SSLUtil.ensureStorePasswords();
outputManager.init();
inputManager.init();
metricsManager.init();
LogConfigHandler.handleConfig();
LOG.debug("==============");
Date endTime = new Date();
LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
}
private void loadConfigFiles() throws Exception {
List<String> configFiles = getConfigFiles();
for (String configFileName : configFiles) {
LOG.info("Going to load config file:" + configFileName);
configFileName = configFileName.replace("\\ ", "%20");
File configFile = new File(configFileName);
if (configFile.exists() && configFile.isFile()) {
LOG.info("Config file exists in path." + configFile.getAbsolutePath());
loadConfigsUsingFile(configFile);
} else {
LOG.info("Trying to load config file from classloader: " + configFileName);
loadConfigsUsingClassLoader(configFileName);
LOG.info("Loaded config file from classloader: " + configFileName);
}
}
}
private List<String> getConfigFiles() {
List<String> configFiles = new ArrayList<>();
String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
if (logfeederConfigFilesProperty != null) {
configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
}
String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
if (StringUtils.isNotEmpty(inputConfigDir)) {
File configDirFile = new File(inputConfigDir);
List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
for (File inputConfigFile : inputConfigFiles) {
configFiles.add(inputConfigFile.getAbsolutePath());
}
}
if (CollectionUtils.isEmpty(configFiles)) {
String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
}
return configFiles;
}
private void loadConfigsUsingFile(File configFile) throws Exception {
try {
String configData = FileUtils.readFileToString(configFile);
loadConfigs(configData);
} catch (Exception t) {
LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
throw t;
}
}
private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
String configData = IOUtils.toString(fis);
loadConfigs(configData);
}
}
@SuppressWarnings("unchecked")
private void loadConfigs(String configData) throws Exception {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
// Get the globals
for (String key : configMap.keySet()) {
switch (key) {
case "global" :
globalConfigs.putAll((Map<String, Object>) configMap.get(key));
break;
case "input" :
List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
inputConfigList.addAll(inputConfig);
break;
case "filter" :
List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
filterConfigList.addAll(filterConfig);
break;
case "output" :
List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
outputConfigList.addAll(outputConfig);
break;
default :
LOG.warn("Unknown config key: " + key);
}
}
}
private void addSimulatedInputs() {
int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
if (simulatedInputNumber == 0)
return;
InputSimulate.loadTypeToFilePath(inputConfigList);
inputConfigList.clear();
for (int i = 0; i < simulatedInputNumber; i++) {
HashMap<String, Object> mapList = new HashMap<String, Object>();
mapList.put("source", "simulate");
mapList.put("rowtype", "service");
inputConfigList.add(mapList);
}
}
private void mergeAllConfigs() {
loadOutputs();
loadInputs();
loadFilters();
assignOutputsToInputs();
}
private void loadOutputs() {
for (Map<String, Object> map : outputConfigList) {
if (map == null) {
continue;
}
mergeBlocks(globalConfigs, map);
String value = (String) map.get("destination");
if (StringUtils.isEmpty(value)) {
LOG.error("Output block doesn't have destination element");
continue;
}
Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
if (output == null) {
LOG.error("Output object could not be found");
continue;
}
output.setDestination(value);
output.loadConfig(map);
// We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
if (output.getBooleanValue("is_enabled", true)) {
output.logConfigs(Level.INFO);
outputManager.add(output);
} else {
LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
}
}
}
private void loadInputs() {
for (Map<String, Object> map : inputConfigList) {
if (map == null) {
continue;
}
mergeBlocks(globalConfigs, map);
String value = (String) map.get("source");
if (StringUtils.isEmpty(value)) {
LOG.error("Input block doesn't have source element");
continue;
}
Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
if (input == null) {
LOG.error("Input object could not be found");
continue;
}
input.setType(value);
input.loadConfig(map);
if (input.isEnabled()) {
input.setOutputManager(outputManager);
input.setInputManager(inputManager);
inputManager.add(input);
input.logConfigs(Level.INFO);
} else {
LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
}
}
}
private void loadFilters() {
sortFilters();
List<Input> toRemoveInputList = new ArrayList<Input>();
for (Input input : inputManager.getInputList()) {
for (Map<String, Object> map : filterConfigList) {
if (map == null) {
continue;
}
mergeBlocks(globalConfigs, map);
String value = (String) map.get("filter");
if (StringUtils.isEmpty(value)) {
LOG.error("Filter block doesn't have filter element");
continue;
}
Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
if (filter == null) {
LOG.error("Filter object could not be found");
continue;
}
filter.loadConfig(map);
filter.setInput(input);
if (filter.isEnabled()) {
filter.setOutputManager(outputManager);
input.addFilter(filter);
filter.logConfigs(Level.INFO);
} else {
LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
}
}
if (input.getFirstFilter() == null) {
toRemoveInputList.add(input);
}
}
for (Input toRemoveInput : toRemoveInputList) {
LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
inputManager.removeInput(toRemoveInput);
}
}
private void sortFilters() {
Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
@Override
public int compare(Map<String, Object> o1, Map<String, Object> o2) {
Object o1Sort = o1.get("sort_order");
Object o2Sort = o2.get("sort_order");
if (o1Sort == null || o2Sort == null) {
return 0;
}
int o1Value = parseSort(o1, o1Sort);
int o2Value = parseSort(o2, o2Sort);
return o1Value - o2Value;
}
private int parseSort(Map<String, Object> map, Object o) {
if (!(o instanceof Number)) {
try {
return (new Double(Double.parseDouble(o.toString()))).intValue();
} catch (Throwable t) {
LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
+ ", map=" + map.toString());
return 0;
}
} else {
return ((Number) o).intValue();
}
}
});
}
private void assignOutputsToInputs() {
Set<Output> usedOutputSet = new HashSet<Output>();
for (Input input : inputManager.getInputList()) {
for (Output output : outputManager.getOutputs()) {
if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
usedOutputSet.add(output);
input.addOutput(output);
}
}
}
// In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
for (Output output : InputSimulate.getSimulateOutputs()) {
outputManager.add(output);
usedOutputSet.add(output);
}
outputManager.retainUsedOutputs(usedOutputSet);
}
@SuppressWarnings("unchecked")
private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
for (String key : fromMap.keySet()) {
Object objValue = fromMap.get(key);
if (objValue == null) {
continue;
}
if (objValue instanceof Map) {
Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
if (localFields == null) {
localFields = new HashMap<String, Object>();
toMap.put(key, localFields);
}
if (globalFields != null) {
for (String fieldKey : globalFields.keySet()) {
if (!localFields.containsKey(fieldKey)) {
localFields.put(fieldKey, globalFields.get(fieldKey));
}
}
}
}
}
// Let's add the rest of the top level fields if missing
for (String key : fromMap.keySet()) {
if (!toMap.containsKey(key)) {
toMap.put(key, fromMap.get(key));
}
}
}
private class JVMShutdownHook extends Thread {
public void run() {
try {
LOG.info("Processing is shutting down.");
inputManager.close();
outputManager.close();
inputManager.checkInAll();
logStats();
LOG.info("LogSearch is exiting.");
} catch (Throwable t) {
// Ignore
}
}
}
private void monitor() throws Exception {
inputManager.monitor();
JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
statLoggerThread = new Thread("statLogger") {
@Override
public void run() {
while (true) {
try {
Thread.sleep(30 * 1000);
} catch (Throwable t) {
// Ignore
}
try {
logStats();
} catch (Throwable t) {
LOG.error("LogStats: Caught exception while logging stats.", t);
}
if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
lastCheckPointCleanedMS = System.currentTimeMillis();
inputManager.cleanCheckPointFiles();
}
if (isLogfeederCompleted) {
break;
}
}
}
};
statLoggerThread.setDaemon(true);
statLoggerThread.start();
}
private void logStats() {
inputManager.logStats();
outputManager.logStats();
if (metricsManager.isMetricsEnabled()) {
List<MetricData> metricsList = new ArrayList<MetricData>();
inputManager.addMetricsContainers(metricsList);
outputManager.addMetricsContainers(metricsList);
metricsManager.useMetrics(metricsList);
}
}
private void waitOnAllDaemonThreads() {
if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) {
inputManager.waitOnAllInputs();
isLogfeederCompleted = true;
if (statLoggerThread != null) {
try {
statLoggerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
try {
LogFeederUtil.loadProperties("logfeeder.properties", args);
} catch (Throwable t) {
LOG.warn("Could not load logfeeder properites");
System.exit(1);
}
LogFeeder logFeeder = new LogFeeder();
logFeeder.run();
}
}