blob: 9f54d8ae1f5812be254b6abd1be3cb5752bd493c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.input;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public abstract class Input extends ConfigBlock implements Runnable {
private static final Logger LOG = Logger.getLogger(Input.class);
private static final boolean DEFAULT_TAIL = true;
private static final boolean DEFAULT_USE_EVENT_MD5 = false;
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
private static final boolean DEFAULT_CACHE_ENABLED = false;
private static final boolean DEFAULT_CACHE_DEDUP_LAST = false;
private static final int DEFAULT_CACHE_SIZE = 100;
private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
private static final String CACHE_ENABLED = "cache_enabled";
private static final String CACHE_KEY_FIELD = "cache_key_field";
private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
private static final String CACHE_SIZE = "cache_size";
private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
protected InputManager inputManager;
protected OutputManager outputManager;
private List<Output> outputList = new ArrayList<Output>();
private Thread thread;
private String type;
protected String filePath;
private Filter firstFilter;
private boolean isClosed;
protected boolean tail;
private boolean useEventMD5;
private boolean genEventMD5;
private LRUCache cache;
private String cacheKeyField;
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
protected String getReadBytesMetricName() {
return null;
}
@Override
public void loadConfig(Map<String, Object> map) {
super.loadConfig(map);
String typeValue = getStringValue("type");
if (typeValue != null) {
// Explicitly add type and value to field list
contextFields.put("type", typeValue);
@SuppressWarnings("unchecked")
Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
if (addFields == null) {
addFields = new HashMap<String, Object>();
map.put("add_fields", addFields);
}
addFields.put("type", typeValue);
}
}
public void setType(String type) {
this.type = type;
}
public void setInputManager(InputManager inputManager) {
this.inputManager = inputManager;
}
public void setOutputManager(OutputManager outputManager) {
this.outputManager = outputManager;
}
public void addFilter(Filter filter) {
if (firstFilter == null) {
firstFilter = filter;
} else {
Filter f = firstFilter;
while (f.getNextFilter() != null) {
f = f.getNextFilter();
}
f.setNextFilter(filter);
}
}
public void addOutput(Output output) {
outputList.add(output);
}
@Override
public void init() throws Exception {
super.init();
initCache();
tail = getBooleanValue("tail", DEFAULT_TAIL);
useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
if (firstFilter != null) {
firstFilter.init();
}
}
boolean monitor() {
if (isReady()) {
LOG.info("Starting thread. " + getShortDescription());
thread = new Thread(this, getNameForThread());
thread.start();
return true;
} else {
return false;
}
}
public abstract boolean isReady();
@Override
public void run() {
try {
LOG.info("Started to monitor. " + getShortDescription());
start();
} catch (Exception e) {
LOG.error("Error writing to output.", e);
}
LOG.info("Exiting thread. " + getShortDescription());
}
/**
* This method will be called from the thread spawned for the output. This
* method should only exit after all data are read from the source or the
* process is exiting
*/
abstract void start() throws Exception;
protected void outputLine(String line, InputMarker marker) {
statMetric.value++;
readBytesMetric.value += (line.length());
if (firstFilter != null) {
try {
firstFilter.apply(line, marker);
} catch (LogfeederException e) {
LOG.error(e.getLocalizedMessage(), e);
}
} else {
// TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter
// outputManager.write(line, this);
}
}
protected void flush() {
if (firstFilter != null) {
firstFilter.flush();
}
}
@Override
public void setDrain(boolean drain) {
LOG.info("Request to drain. " + getShortDescription());
super.setDrain(drain);
try {
thread.interrupt();
} catch (Throwable t) {
// ignore
}
}
public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
if (firstFilter != null) {
firstFilter.addMetricsContainers(metricsList);
}
metricsList.add(readBytesMetric);
}
@Override
public void logStat() {
super.logStat();
logStatForMetric(readBytesMetric, "Stat: Bytes Read");
if (firstFilter != null) {
firstFilter.logStat();
}
}
public abstract void checkIn(InputMarker inputMarker);
public abstract void lastCheckIn();
public void close() {
LOG.info("Close called. " + getShortDescription());
try {
if (firstFilter != null) {
firstFilter.close();
} else {
outputManager.close();
}
} catch (Throwable t) {
// Ignore
}
isClosed = true;
}
private void initCache() {
boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
: LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
if (cacheEnabled) {
String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
? getStringValue(CACHE_KEY_FIELD)
: LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
setCacheKeyField(getStringValue(cacheKeyField));
boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
: LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
int cacheSize = getConfigValue(CACHE_SIZE) != null
? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
: LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
: Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
}
}
public boolean isTail() {
return tail;
}
public boolean isUseEventMD5() {
return useEventMD5;
}
public boolean isGenEventMD5() {
return genEventMD5;
}
public Filter getFirstFilter() {
return firstFilter;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
public void setClosed(boolean isClosed) {
this.isClosed = isClosed;
}
public boolean isClosed() {
return isClosed;
}
public List<Output> getOutputList() {
return outputList;
}
public Thread getThread(){
return thread;
}
public LRUCache getCache() {
return cache;
}
public void setCache(LRUCache cache) {
this.cache = cache;
}
public String getCacheKeyField() {
return cacheKeyField;
}
public void setCacheKeyField(String cacheKeyField) {
this.cacheKeyField = cacheKeyField;
}
@Override
public String getNameForThread() {
if (filePath != null) {
try {
return (type + "=" + (new File(filePath)).getName());
} catch (Throwable ex) {
LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
}
}
return super.getNameForThread() + ":" + type;
}
@Override
public String toString() {
return getShortDescription();
}
}