blob: a2f6b08e67609f6cd0674b7a25b6277e719e0894 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.output;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputS3FileDescriptorImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* Write log file into s3 bucket.
*
* This class supports two modes of upload:
* <ul>
* <li>A one time upload of files matching a pattern</li>
* <li>A batch mode, asynchronous, periodic upload of files</li>
* </ul>
*/
public class OutputS3File extends OutputFile implements RolloverCondition, RolloverHandler {
private static final Logger logger = LogManager.getLogger(OutputS3File.class);
public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
private LogSpooler logSpooler;
private S3OutputConfiguration s3OutputConfiguration;
private S3Uploader s3Uploader;
private LogFeederProps logFeederProps;
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
this.logFeederProps = logFeederProps;
s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
}
private static boolean uploadedGlobalConfig = false;
/**
* Copy local log files and corresponding config to S3 bucket one time.
* @param inputFile The file to be copied
* @param inputMarker Contains information about the configuration to be uploaded.
*/
@Override
public void copyFile(File inputFile, InputMarker inputMarker) {
String type = inputMarker.getInput().getInputDescriptor().getType();
S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.getInput().getInputDescriptor().getType());
uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
}
private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
String resolvedPath) {
ArrayList<FilterDescriptor> filters = new ArrayList<>();
addFilters(filters, inputMarker.getInput().getFirstFilter());
InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.getInput().getInputDescriptor();
InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
inputS3FileDescriptor.setPath(s3CompletePath);
ArrayList<InputDescriptorImpl> inputConfigList = new ArrayList<>();
inputConfigList.add(inputS3FileDescriptor);
// set source s3_file
// remove global config from input config
removeS3GlobalConfig(inputS3FileDescriptor);
// write config into s3 file
InputConfigImpl inputConfig = new InputConfigImpl();
inputConfig.setInput(inputConfigList);
writeConfigToS3(inputConfig, getComponentConfigFileName(type), s3OutputConfiguration);
// write global config
writeGlobalConfig(s3OutputConfiguration);
}
private void addFilters(ArrayList<FilterDescriptor> filters, Filter filter) {
if (filter != null) {
FilterDescriptor filterDescriptorOriginal = filter.getFilterDescriptor();
FilterDescriptor filterDescriptor = InputConfigGson.gson.fromJson(
InputConfigGson.gson.toJson(filterDescriptorOriginal), filterDescriptorOriginal.getClass());
filters.add(filterDescriptor);
if (filter.getNextFilter() != null) {
addFilters(filters, filter.getNextFilter());
}
}
}
private void writeConfigToS3(Object config, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
String configJson = InputConfigGson.gson.toJson(config);
String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
s3OutputConfiguration.getCluster());
S3Util.writeDataIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
s3OutputConfiguration.getS3Endpoint(), s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
}
private String getComponentConfigFileName(String componentName) {
return "input.config-" + componentName + ".json";
}
private void removeS3GlobalConfig(InputS3FileDescriptorImpl inputS3FileDescriptor) {
inputS3FileDescriptor.setSource(null);
inputS3FileDescriptor.setCopyFile(null);
inputS3FileDescriptor.setProcessFile(null);
inputS3FileDescriptor.setTail(null);
inputS3FileDescriptor.getAddFields().remove("ip");
inputS3FileDescriptor.getAddFields().remove("host");
inputS3FileDescriptor.getAddFields().remove("bundle_id");
}
/**
* write global config in s3 file Invoke only once
*/
@SuppressWarnings("unchecked")
private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
if (!uploadedGlobalConfig) {
Map<String, Object> globalConfig = new HashMap<>();
//updating global config before write to s3
globalConfig.put("source", "s3_file");
globalConfig.put("copy_file", false);
globalConfig.put("process_file", true);
globalConfig.put("tail", false);
Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields");
if (addFields == null) {
addFields = new HashMap<>();
}
addFields.put("ip", LogFeederUtil.ipAddress);
addFields.put("host", LogFeederUtil.hostName);
// add bundle id same as cluster if its not there
String bundle_id = (String) addFields.get("bundle_id");
if (bundle_id == null || bundle_id.isEmpty()) {
String cluster = (String) addFields.get("cluster");
if (cluster != null && !cluster.isEmpty()) {
addFields.put("bundle_id", bundle_id);
}
}
globalConfig.put("add_fields", addFields);
Map<String, Object> config = new HashMap<String, Object>();
config.put("global", globalConfig);
writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
uploadedGlobalConfig = true;
}
}
/**
* Write a log line to local file, to upload to S3 bucket asynchronously.
*
* This method uses a {@link LogSpooler} to spool the log lines to a local file.
* @param block The log event to upload
* @param inputMarker Contains information about the log file feeding the lines.
*/
@Override
public void write(String block, InputFileMarker inputMarker) {
createLogSpoolerIfRequired(inputMarker);
logSpooler.add(block);
}
@Override
public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
String block = LogFeederUtil.getGson().toJson(jsonObj);
write(block, inputMarker);
}
private void createLogSpoolerIfRequired(InputFileMarker inputMarker) {
if (logSpooler == null) {
if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
InputFile input = (InputFile) inputMarker.getInput();
logSpooler = createSpooler(input.getFilePath());
s3Uploader = createUploader(input.getInputDescriptor().getType());
} else {
logger.error("Cannot write from non local file...");
}
}
}
@VisibleForTesting
protected S3Uploader createUploader(String logType) {
S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType);
uploader.startUploaderThread();
return uploader;
}
@VisibleForTesting
protected LogSpooler createSpooler(String filePath) {
String spoolDirectory = logFeederProps.getTmpDir() + "/s3/service";
logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
s3OutputConfiguration.getRolloverTimeThresholdSecs());
}
/**
* Check whether the locally spooled file should be rolled over, based on file size.
*
* @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
* for rollover.
* @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
* false otherwise
*/
@Override
public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
File spoolFile = currentSpoolerContext.getActiveSpoolFile();
long currentSize = spoolFile.length();
boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
if (result) {
logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
s3OutputConfiguration.getRolloverSizeThresholdBytes()));
}
return result;
}
/**
* Stops dependent objects that consume resources.
*/
@Override
public void close() {
if (s3Uploader != null) {
s3Uploader.stopUploaderThread();
}
if (logSpooler != null) {
logSpooler.close();
}
}
/**
* Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
*
* @param rolloverFile The file that has been rolled over.
*/
@Override
public void handleRollover(File rolloverFile) {
s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
}
@Override
public String getShortDescription() {
return "output:destination=s3,bucket=" + s3OutputConfiguration.getS3BucketName();
}
}