blob: 93a264366cb620687b638105dbb6c2d6dd9219b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* An {@link Output} that records logs to HDFS.
*
* The events are spooled on the local file system and uploaded in batches asynchronously.
*/
public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
private static final Logger logger = LogManager.getLogger(OutputHDFSFile.class);
private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
private final Object readyMonitor = new Object();
private Thread hdfsCopyThread = null;
private String filenamePrefix = "service-logs-";
private long rolloverThresholdTimeMillis;
private String hdfsOutDir = null;
private String hdfsHost = null;
private String hdfsPort = null;
private FileSystem fileSystem = null;
private LogSpooler logSpooler;
private LogFeederProps logFeederProps;
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
this.logFeederProps = logFeederProps;
hdfsOutDir = getStringValue("hdfs_out_dir");
hdfsHost = getStringValue("hdfs_host");
hdfsPort = getStringValue("hdfs_port");
long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
if (StringUtils.isEmpty(hdfsOutDir)) {
logger.error("HDFS config property <hdfs_out_dir> is not set in config file.");
return;
}
if (StringUtils.isEmpty(hdfsHost)) {
logger.error("HDFS config property <hdfs_host> is not set in config file.");
return;
}
if (StringUtils.isEmpty(hdfsPort)) {
logger.error("HDFS config property <hdfs_port> is not set in config file.");
return;
}
HashMap<String, String> contextParam = buildContextParam();
hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
logger.info("hdfs Output dir=" + hdfsOutDir);
String localFileDir = logFeederProps.getTmpDir() + "hdfs/service/";
logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
this.startHDFSCopyThread();
}
@Override
public void close() {
logger.info("Closing file." + getShortDescription());
logSpooler.rollover();
this.stopHDFSCopyThread();
shouldCloseOutput();
}
@Override
public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
if (block != null) {
logSpooler.add(block);
statMetric.value++;
}
}
@Override
public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
String block = LogFeederUtil.getGson().toJson(jsonObj);
write(block, inputMarker);
}
@Override
public String getShortDescription() {
return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
}
private void startHDFSCopyThread() {
hdfsCopyThread = new Thread("hdfsCopyThread") {
@Override
public void run() {
try {
while (true) {
Iterator<File> localFileIterator = localReadyFiles.iterator();
while (localFileIterator.hasNext()) {
File localFile = localFileIterator.next();
fileSystem = LogFeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
if (fileSystem != null && localFile.exists()) {
String destFilePath = hdfsOutDir + "/" + localFile.getName();
String localPath = localFile.getAbsolutePath();
boolean overWrite = true;
boolean delSrc = true;
boolean isCopied = LogFeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
overWrite, delSrc);
if (isCopied) {
logger.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
} else {
// TODO Need to write retry logic, in next release we can handle it
logger.error("Hdfs file copy failed for hdfspath :" + destFilePath + " and localpath :" + localPath);
}
}
localFileIterator.remove();
}
try {
// wait till new file comes in reayList
synchronized (readyMonitor) {
if (localReadyFiles.isEmpty()) {
readyMonitor.wait();
}
}
} catch (InterruptedException e) {
logger.error(e.getLocalizedMessage(),e);
}
}
} catch (Exception e) {
logger.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e);
}
}
};
hdfsCopyThread.setDaemon(true);
hdfsCopyThread.start();
}
private void stopHDFSCopyThread() {
if (hdfsCopyThread != null) {
logger.info("waiting till copy all local files to hdfs.......");
while (!localReadyFiles.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error(e.getLocalizedMessage(), e);
}
logger.debug("still waiting to copy all local files to hdfs.......");
}
logger.info("calling interrupt method for hdfsCopyThread to stop it.");
try {
hdfsCopyThread.interrupt();
} catch (SecurityException exception) {
logger.error(" Current thread : '" + Thread.currentThread().getName() +
"' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
}
LogFeederHDFSUtil.closeFileSystem(fileSystem);
}
}
private HashMap<String, String> buildContextParam() {
HashMap<String, String> contextParam = new HashMap<String, String>();
contextParam.put("host", LogFeederUtil.hostName);
return contextParam;
}
private void addFileInReadyList(File localFile) {
localReadyFiles.add(localFile);
try {
synchronized (readyMonitor) {
readyMonitor.notifyAll();
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage(),e);
}
}
@Override
public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
}
/**
* Add the rollover file to a daemon thread for uploading to HDFS
* @param rolloverFile the file to be uploaded to HDFS
*/
@Override
public void handleRollover(File rolloverFile) {
addFileInReadyList(rolloverFile);
}
/**
* Determines whether it is time to handleRollover the current spool file.
*
* The file will handleRollover if the time since creation of the file is more than
* the timeout specified in rollover_sec configuration.
* @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
* @return true if time since creation is greater than value specified in rollover_sec,
* false otherwise.
*/
@Override
public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
if (shouldRollover) {
logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
" has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
}
return shouldRollover;
}
@Override
public String getOutputType() {
throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
}
@Override
public Long getPendingCount() {
return 0L;
}
@Override
public String getWriteBytesMetricName() {
return "output.hdfs.write_bytes";
}
@Override
public String getStatMetricName() {
return "output.hdfs.write_logs";
}
}