blob: cc2e5e58ab0c35cc657637982b1ddeee61553bb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.plugin.output;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Output is responsible about to ship transformed inputs to a destination which should be implemented by extending this class.
* @param <PROP_TYPE> Log Feeder configuration holder object
* @param <INPUT_MARKER> Type of the input marker - can be anything which can store unique data about an input
*/
public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements OutputConfigMonitor {
private static final Logger LOG = LogManager.getLogger(Output.class);
private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
private LogSearchConfigLogFeeder logSearchConfig;
private String destination = null;
private boolean isClosed;
protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
/**
* Obtain the output type
* @return Text which represents the output type in shipper configuration. (e.g.: "solr")
*/
public abstract String getOutputType();
/**
* Copy input file, can be used instead or with processing an input
* @param inputFile File to copy
* @param inputMarker Marker that stores input details
* @throws Exception Error during input copy
*/
public abstract void copyFile(File inputFile, InputMarker inputMarker) throws Exception;
/**
* Call write operation - should ship inputs to a destination
* @param jsonStr Input string to process (JSON string)
* @param inputMarker Marker that stores input details
* @throws Exception Error during output writing
*/
public abstract void write(String jsonStr, INPUT_MARKER inputMarker) throws Exception;
/**
* Get pending output count
* @return Pending outputs (used during closing the outputs - mainly at shutdown phase)
*/
public abstract Long getPendingCount();
/**
* Obtain writes metric name - if there are any metric sinks in the application it can identify the specific metric for the output
* @return metric name
*/
public abstract String getWriteBytesMetricName();
/**
* Converts key/value map to JSON string and call write() on that input
* @param jsonObj Key/value map which contains the fields to process
* @param inputMarker Marker that stores input details
* @throws Exception Error during output writing
*/
public void write(Map<String, Object> jsonObj, INPUT_MARKER inputMarker) throws Exception {
write(gson.toJson(jsonObj), inputMarker);
}
protected String getNameForThread() {
return this.getClass().getSimpleName();
}
public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
this.logSearchConfig = logSearchConfig;
}
public LogSearchConfigLogFeeder getLogSearchConfig() {
return logSearchConfig;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
/**
* Get the list of fields that will be used for ID generation of log entries.
* @return list of string
*/
public List<String> getIdFields() {
return new ArrayList<>();
}
public boolean isClosed() {
return isClosed;
}
/**
* Flag an output to be closed
*/
protected void shouldCloseOutput() {
isClosed = true;
}
@Override
public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
metricsList.add(writeBytesMetric);
}
@Override
public synchronized void logStat() {
super.logStat();
logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
}
@Override
public boolean logConfigs() {
// TODO: log something about the configs
return true;
}
protected void trimStrValue(Map<String, Object> jsonObj) {
if (jsonObj != null) {
for (Map.Entry<String, Object> entry : jsonObj.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
String valueStr = value.toString().trim();
jsonObj.put(key, valueStr);
}
}
}
}
public void close() {
LOG.info("Calling base close() = " + getShortDescription());
isClosed = true;
}
}