blob: 800c0a2dd3c0c715ef931d9d87e021731077a5a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
import org.apache.hadoop.io.file.tfile.Compression;
import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream;
import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Indexed Log Aggregation File Format implementation.
*
*/
@Private
@Unstable
public class LogAggregationIndexedFileController
extends LogAggregationFileController {
private static final Logger LOG = LoggerFactory.getLogger(
LogAggregationIndexedFileController.class);
private static final String FS_OUTPUT_BUF_SIZE_ATTR =
"indexedFile.fs.output.buffer.size";
private static final String FS_INPUT_BUF_SIZE_ATTR =
"indexedFile.fs.input.buffer.size";
private static final String FS_NUM_RETRIES_ATTR =
"indexedFile.fs.op.num-retries";
private static final String FS_RETRY_INTERVAL_MS_ATTR =
"indexedFile.fs.retry-interval-ms";
private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
"indexedFile.log.roll-over.max-file-size-gb";
@VisibleForTesting
public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
private int fsNumRetries = 3;
private long fsRetryInterval = 1000L;
private static final int VERSION = 1;
private IndexedLogsMeta indexedLogsMeta = null;
private IndexedPerAggregationLogMeta logsMetaInThisCycle;
private long logAggregationTimeInThisCycle;
private FSDataOutputStream fsDataOStream;
private Algorithm compressAlgo;
private CachedIndexedLogsMeta cachedIndexedLogsMeta = null;
private boolean logAggregationSuccessfullyInThisCyCle = false;
private long currentOffSet = 0;
private Path remoteLogCheckSumFile;
private FileContext fc;
private UserGroupInformation ugi;
private byte[] uuid = null;
private final int UUID_LENGTH = 32;
private long logRollOverMaxFileSize;
private Clock sysClock;
public LogAggregationIndexedFileController() {}
@Override
public void initInternal(Configuration conf) {
// Currently, we need the underlying File System to support append
// operation. Will remove this check after we finish
// LogAggregationIndexedFileController for non-append mode.
boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true);
if (!append) {
throw new YarnRuntimeException("The configuration:"
+ LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only"
+ " use LogAggregationIndexedFileController when the FileSystem "
+ "support append operations.");
}
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
this.fileControllerName);
String remoteDir = conf.get(remoteDirStr);
if (remoteDir == null || remoteDir.isEmpty()) {
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
}
this.remoteRootLogDir = new Path(remoteDir);
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
this.fileControllerName);
this.remoteRootLogDirSuffix = conf.get(suffix);
if (this.remoteRootLogDirSuffix == null
|| this.remoteRootLogDirSuffix.isEmpty()) {
this.remoteRootLogDirSuffix = conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ "-ifile";
}
String compressName = conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
this.compressAlgo = Compression.getCompressionAlgorithmByName(
compressName);
this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3);
this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L);
this.logRollOverMaxFileSize = getRollOverLogMaxSize(conf);
this.sysClock = getSystemClock();
}
@Override
public void initializeWriter(
final LogAggregationFileControllerContext context)
throws IOException {
final UserGroupInformation userUgi = context.getUserUgi();
final Map<ApplicationAccessType, String> appAcls = context.getAppAcls();
final String nodeId = context.getNodeId().toString();
final ApplicationId appId = context.getAppId();
final Path remoteLogFile = context.getRemoteNodeLogFileForApp();
this.ugi = userUgi;
logAggregationSuccessfullyInThisCyCle = false;
logsMetaInThisCycle = new IndexedPerAggregationLogMeta();
logAggregationTimeInThisCycle = this.sysClock.getTime();
logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle);
logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName());
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
fc = FileContext.getFileContext(
remoteRootLogDir.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
if (indexedLogsMeta == null) {
indexedLogsMeta = new IndexedLogsMeta();
indexedLogsMeta.setVersion(VERSION);
indexedLogsMeta.setUser(userUgi.getShortUserName());
indexedLogsMeta.setAcls(appAcls);
indexedLogsMeta.setNodeId(nodeId);
String compressName = conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
indexedLogsMeta.setCompressName(compressName);
}
Path aggregatedLogFile = null;
if (context.isLogAggregationInRolling()) {
aggregatedLogFile = initializeWriterInRolling(
remoteLogFile, appId, nodeId);
} else {
aggregatedLogFile = remoteLogFile;
fsDataOStream = fc.create(remoteLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
if (uuid == null) {
uuid = createUUID(appId);
}
fsDataOStream.write(uuid);
fsDataOStream.flush();
}
long aggregatedLogFileLength = fc.getFileStatus(
aggregatedLogFile).getLen();
// append a simple character("\n") to move the writer cursor, so
// we could get the correct position when we call
// fsOutputStream.getStartPos()
final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
fsDataOStream.write(dummyBytes);
fsDataOStream.flush();
if (fsDataOStream.getPos() >= (aggregatedLogFileLength
+ dummyBytes.length)) {
currentOffSet = 0;
} else {
currentOffSet = aggregatedLogFileLength;
}
return null;
}
});
} catch (Exception e) {
throw new IOException(e);
}
}
private Path initializeWriterInRolling(final Path remoteLogFile,
final ApplicationId appId, final String nodeId) throws Exception {
Path aggregatedLogFile = null;
// check uuid
// if we can not find uuid, we would load the uuid
// from previous aggregated log files, and at the same
// time, we would delete any aggregated log files which
// has invalid uuid.
if (uuid == null) {
uuid = loadUUIDFromLogFile(fc, remoteLogFile.getParent(),
appId, nodeId);
}
Path currentRemoteLogFile = getCurrentRemoteLogFile(
fc, remoteLogFile.getParent(), nodeId);
// check checksum file
boolean overwriteCheckSum = true;
remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
(remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
if(fc.util().exists(remoteLogCheckSumFile)) {
// if the checksum file exists, we should reset cached
// indexedLogsMeta.
indexedLogsMeta.getLogMetas().clear();
if (currentRemoteLogFile != null) {
FSDataInputStream checksumFileInputStream = null;
try {
checksumFileInputStream = fc.open(remoteLogCheckSumFile);
int nameLength = checksumFileInputStream.readInt();
byte[] b = new byte[nameLength];
int actualLength = checksumFileInputStream.read(b);
if (actualLength == nameLength) {
String recoveredLogFile = new String(
b, Charset.forName("UTF-8"));
if (recoveredLogFile.equals(
currentRemoteLogFile.getName())) {
overwriteCheckSum = false;
long endIndex = checksumFileInputStream.readLong();
IndexedLogsMeta recoveredLogsMeta = null;
try {
truncateFileWithRetries(fc, currentRemoteLogFile,
endIndex);
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile);
} catch (Exception ex) {
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile, endIndex);
}
if (recoveredLogsMeta != null) {
indexedLogsMeta = recoveredLogsMeta;
}
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
}
}
}
// check whether we need roll over old logs
if (currentRemoteLogFile == null || isRollover(
fc, currentRemoteLogFile)) {
indexedLogsMeta.getLogMetas().clear();
overwriteCheckSum = true;
aggregatedLogFile = new Path(remoteLogFile.getParent(),
remoteLogFile.getName() + "_" + sysClock.getTime());
fsDataOStream = fc.create(aggregatedLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
// writes the uuid
fsDataOStream.write(uuid);
fsDataOStream.flush();
} else {
aggregatedLogFile = currentRemoteLogFile;
fsDataOStream = fc.create(currentRemoteLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
new Options.CreateOpts[] {});
}
// recreate checksum file if needed before aggregate the logs
if (overwriteCheckSum) {
final long currentAggregatedLogFileLength = fc
.getFileStatus(aggregatedLogFile).getLen();
FSDataOutputStream checksumFileOutputStream = null;
try {
checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
String fileName = aggregatedLogFile.getName();
checksumFileOutputStream.writeInt(fileName.length());
checksumFileOutputStream.write(fileName.getBytes(
Charset.forName("UTF-8")));
checksumFileOutputStream.writeLong(
currentAggregatedLogFileLength);
checksumFileOutputStream.flush();
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
}
}
return aggregatedLogFile;
}
@Override
public void closeWriter() {
IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
}
@Override
public void write(LogKey logKey, LogValue logValue) throws IOException {
String containerId = logKey.toString();
Set<File> pendingUploadFiles = logValue
.getPendingLogFilesToUploadForThisContainer();
List<IndexedFileLogMeta> metas = new ArrayList<>();
for (File logFile : pendingUploadFiles) {
FileInputStream in = null;
try {
in = SecureIOUtils.openForRead(logFile, logValue.getUser(), null);
} catch (IOException e) {
logErrorMessage(logFile, e);
IOUtils.cleanupWithLogger(LOG, in);
continue;
}
final long fileLength = logFile.length();
IndexedFileOutputStreamState outputStreamState = null;
try {
outputStreamState = new IndexedFileOutputStreamState(
this.compressAlgo, this.fsDataOStream, conf, this.currentOffSet);
byte[] buf = new byte[65535];
int len = 0;
long bytesLeft = fileLength;
while ((len = in.read(buf)) != -1) {
//If buffer contents within fileLength, write
if (len < bytesLeft) {
outputStreamState.getOutputStream().write(buf, 0, len);
bytesLeft-=len;
} else {
//else only write contents within fileLength, then exit early
outputStreamState.getOutputStream().write(buf, 0,
(int)bytesLeft);
break;
}
}
long newLength = logFile.length();
if(fileLength < newLength) {
LOG.warn("Aggregated logs truncated by approximately "+
(newLength-fileLength) +" bytes.");
}
logAggregationSuccessfullyInThisCyCle = true;
} catch (IOException e) {
String message = logErrorMessage(logFile, e);
if (outputStreamState != null &&
outputStreamState.getOutputStream() != null) {
outputStreamState.getOutputStream().write(
message.getBytes(Charset.forName("UTF-8")));
}
} finally {
IOUtils.cleanupWithLogger(LOG, in);
}
IndexedFileLogMeta meta = new IndexedFileLogMeta();
meta.setContainerId(containerId.toString());
meta.setFileName(logFile.getName());
if (outputStreamState != null) {
outputStreamState.finish();
meta.setFileCompressedSize(outputStreamState.getCompressedSize());
meta.setStartIndex(outputStreamState.getStartPos());
meta.setFileSize(fileLength);
}
meta.setLastModificatedTime(logFile.lastModified());
metas.add(meta);
}
logsMetaInThisCycle.addContainerLogMeta(containerId, metas);
}
@Override
public void postWrite(LogAggregationFileControllerContext record)
throws Exception {
// always aggregate the previous logsMeta, and append them together
// at the end of the file
indexedLogsMeta.addLogMeta(logsMetaInThisCycle);
byte[] b = SerializationUtils.serialize(indexedLogsMeta);
this.fsDataOStream.write(b);
int length = b.length;
this.fsDataOStream.writeInt(length);
this.fsDataOStream.write(uuid);
if (logAggregationSuccessfullyInThisCyCle &&
record.isLogAggregationInRolling()) {
deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile);
}
}
private void deleteFileWithRetries(final FileContext fileContext,
final UserGroupInformation userUgi,
final Path deletePath) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
deleteFileWithPrivilege(fileContext, userUgi, deletePath);
return null;
}
}.runWithRetries();
}
private void deleteFileWithRetries(final FileContext fileContext,
final Path deletePath) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
if (fileContext.util().exists(deletePath)) {
fileContext.delete(deletePath, false);
}
return null;
}
}.runWithRetries();
}
private void truncateFileWithRetries(final FileContext fileContext,
final Path truncatePath, final long newLength) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
fileContext.truncate(truncatePath, newLength);
return null;
}
}.runWithRetries();
}
private Object deleteFileWithPrivilege(final FileContext fileContext,
final UserGroupInformation userUgi, final Path fileToDelete)
throws Exception {
return userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
if (fileContext.util().exists(fileToDelete)) {
fileContext.delete(fileToDelete, false);
}
return null;
}
});
}
@Override
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException {
boolean findLogs = false;
boolean createPrintStream = (os == null);
ApplicationId appId = logRequest.getAppId();
String nodeId = logRequest.getNodeId();
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId);
List<String> logTypes = new ArrayList<>();
if (logRequest.getLogTypes() != null && !logRequest
.getLogTypes().isEmpty()) {
logTypes.addAll(logRequest.getLogTypes());
}
String containerIdStr = logRequest.getContainerId();
boolean getAllContainers = (containerIdStr == null
|| containerIdStr.isEmpty());
long size = logRequest.getBytes();
List<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
byte[] buf = new byte[65535];
for (FileStatus thisNodeFile : fileToRead) {
String nodeName = thisNodeFile.getPath().getName();
Long checkSumIndex = checkSumFiles.get(nodeName);
long endIndex = -1;
if (checkSumIndex != null) {
endIndex = checkSumIndex.longValue();
}
IndexedLogsMeta indexedLogsMeta = null;
try {
indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(),
endIndex);
} catch (Exception ex) {
// DO NOTHING
LOG.warn("Can not load log meta from the log file:"
+ thisNodeFile.getPath());
continue;
}
if (indexedLogsMeta == null) {
continue;
}
String compressAlgo = indexedLogsMeta.getCompressName();
List<IndexedFileLogMeta> candidates = new ArrayList<>();
for (IndexedPerAggregationLogMeta logMeta
: indexedLogsMeta.getLogMetas()) {
for (Entry<String, List<IndexedFileLogMeta>> meta
: logMeta.getLogMetas().entrySet()) {
for (IndexedFileLogMeta log : meta.getValue()) {
if (!getAllContainers && !log.getContainerId()
.equals(containerIdStr)) {
continue;
}
if (logTypes != null && !logTypes.isEmpty() &&
!logTypes.contains(log.getFileName())) {
continue;
}
candidates.add(log);
}
}
}
if (candidates.isEmpty()) {
continue;
}
Algorithm compressName = Compression.getCompressionAlgorithmByName(
compressAlgo);
Decompressor decompressor = compressName.getDecompressor();
FileContext fileContext = FileContext.getFileContext(
thisNodeFile.getPath().toUri(), conf);
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
String currentContainer = "";
for (IndexedFileLogMeta candidate : candidates) {
if (!candidate.getContainerId().equals(currentContainer)) {
if (createPrintStream) {
closePrintStream(os);
os = LogToolUtils.createPrintStream(
logRequest.getOutputLocalDir(),
thisNodeFile.getPath().getName(),
candidate.getContainerId());
currentContainer = candidate.getContainerId();
}
}
InputStream in = null;
try {
in = compressName.createDecompressionStream(
new BoundedRangeFileInputStream(fsin,
candidate.getStartIndex(),
candidate.getFileCompressedSize()),
decompressor, getFSInputBufferSize(conf));
LogToolUtils.outputContainerLog(candidate.getContainerId(),
nodeName, candidate.getFileName(), candidate.getFileSize(), size,
Times.format(candidate.getLastModificatedTime()),
in, os, buf, ContainerLogAggregationType.AGGREGATED);
byte[] b = aggregatedLogSuffix(candidate.getFileName())
.getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} catch (IOException e) {
System.err.println(e.getMessage());
compressName.returnDecompressor(decompressor);
continue;
} finally {
os.flush();
IOUtils.cleanupWithLogger(LOG, in);
}
}
}
return findLogs;
}
// TODO: fix me if the remote file system does not support append operation.
@Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
List<IndexedLogsMeta> listOfLogsMeta = new ArrayList<>();
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
String containerIdStr = logRequest.getContainerId();
String nodeId = logRequest.getNodeId();
ApplicationId appId = logRequest.getAppId();
String appOwner = logRequest.getAppOwner();
boolean getAllContainers = (containerIdStr == null ||
containerIdStr.isEmpty());
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId);
List<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
for(FileStatus thisNodeFile : fileToRead) {
try {
Long checkSumIndex = checkSumFiles.get(
thisNodeFile.getPath().getName());
long endIndex = -1;
if (checkSumIndex != null) {
endIndex = checkSumIndex.longValue();
}
IndexedLogsMeta current = loadIndexedLogsMeta(
thisNodeFile.getPath(), endIndex);
if (current != null) {
listOfLogsMeta.add(current);
}
} catch (IOException ex) {
// DO NOTHING
LOG.warn("Can not get log meta from the log file:"
+ thisNodeFile.getPath());
}
}
for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) {
String curNodeId = indexedLogMeta.getNodeId();
for (IndexedPerAggregationLogMeta logMeta :
indexedLogMeta.getLogMetas()) {
if (getAllContainers) {
for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
.getLogMetas().entrySet()) {
ContainerLogMeta meta = new ContainerLogMeta(
log.getKey().toString(), curNodeId);
for (IndexedFileLogMeta aMeta : log.getValue()) {
meta.addLogMeta(aMeta.getFileName(), Long.toString(
aMeta.getFileSize()),
Times.format(aMeta.getLastModificatedTime()));
}
containersLogMeta.add(meta);
}
} else if (logMeta.getContainerLogMeta(containerIdStr) != null) {
ContainerLogMeta meta = new ContainerLogMeta(containerIdStr,
curNodeId);
for (IndexedFileLogMeta log :
logMeta.getContainerLogMeta(containerIdStr)) {
meta.addLogMeta(log.getFileName(), Long.toString(
log.getFileSize()),
Times.format(log.getLastModificatedTime()));
}
containersLogMeta.add(meta);
}
}
}
Collections.sort(containersLogMeta, new Comparator<ContainerLogMeta>() {
@Override
public int compare(ContainerLogMeta o1, ContainerLogMeta o2) {
return o1.getContainerId().compareTo(o2.getContainerId());
}
});
return containersLogMeta;
}
@Private
public Map<String, Long> parseCheckSumFiles(
List<FileStatus> fileList) throws IOException {
Map<String, Long> checkSumFiles = new HashMap<>();
Set<FileStatus> status = new HashSet<FileStatus>(fileList);
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName().endsWith(
CHECK_SUM_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
FileContext fc = null;
for (FileStatus file : status) {
FSDataInputStream checksumFileInputStream = null;
try {
if (fc == null) {
fc = FileContext.getFileContext(file.getPath().toUri(), conf);
}
String nodeName = null;
long index = 0L;
checksumFileInputStream = fc.open(file.getPath());
int nameLength = checksumFileInputStream.readInt();
byte[] b = new byte[nameLength];
int actualLength = checksumFileInputStream.read(b);
if (actualLength == nameLength) {
nodeName = new String(b, Charset.forName("UTF-8"));
index = checksumFileInputStream.readLong();
} else {
continue;
}
if (nodeName != null && !nodeName.isEmpty()) {
checkSumFiles.put(nodeName, Long.valueOf(index));
}
} catch (IOException ex) {
continue;
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
}
}
return checkSumFiles;
}
@Private
public List<FileStatus> getNodeLogFileToRead(
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
throws IOException {
List<FileStatus> listOfFiles = new ArrayList<>();
List<FileStatus> files = new ArrayList<>(nodeFiles);
for (FileStatus file : files) {
String nodeName = file.getPath().getName();
if ((nodeId == null || nodeId.isEmpty()
|| nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX) &&
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
continue;
}
listOfFiles.add(file);
}
}
return listOfFiles;
}
@Private
public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
String fileName) {
for (Entry<String, FileStatus> file : fileMap.entrySet()) {
if (file.getKey().startsWith(fileName) && file.getKey()
.endsWith(CHECK_SUM_FILE_SUFFIX)) {
return file.getValue();
}
}
return null;
}
@Override
public void renderAggregatedLogsBlock(Block html, ViewContext context) {
IndexedFileAggregatedLogsBlock block = new IndexedFileAggregatedLogsBlock(
context, this.conf, this);
block.render(html);
}
@Override
public String getApplicationOwner(Path aggregatedLogPath)
throws IOException {
if (this.cachedIndexedLogsMeta == null
|| !this.cachedIndexedLogsMeta.getRemoteLogPath()
.equals(aggregatedLogPath)) {
this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
}
return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser();
}
@Override
public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException {
if (this.cachedIndexedLogsMeta == null
|| !this.cachedIndexedLogsMeta.getRemoteLogPath()
.equals(aggregatedLogPath)) {
this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta(
loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath);
}
return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls();
}
@Override
public Path getRemoteAppLogDir(ApplicationId appId, String user)
throws IOException {
return LogAggregationUtils.getRemoteAppLogDir(conf, appId, user,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
@Private
public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteLogPath.toUri(), conf);
FSDataInputStream fsDataIStream = null;
try {
fsDataIStream = fileContext.open(remoteLogPath);
if (end == 0) {
return null;
}
long fileLength = end < 0 ? fileContext.getFileStatus(
remoteLogPath).getLen() : end;
fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
int offset = fsDataIStream.readInt();
byte[] array = new byte[offset];
fsDataIStream.seek(
fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH);
int actual = fsDataIStream.read(array);
if (actual != offset) {
throw new IOException("Error on loading log meta from "
+ remoteLogPath);
}
return (IndexedLogsMeta)SerializationUtils
.deserialize(array);
} finally {
IOUtils.cleanupWithLogger(LOG, fsDataIStream);
}
}
private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath)
throws IOException {
return loadIndexedLogsMeta(remoteLogPath, -1);
}
/**
* This IndexedLogsMeta includes all the meta information
* for the aggregated log file.
*/
@Private
@VisibleForTesting
public static class IndexedLogsMeta implements Serializable {
private static final long serialVersionUID = 5439875373L;
private int version;
private String user;
private String compressName;
private Map<ApplicationAccessType, String> acls;
private String nodeId;
private List<IndexedPerAggregationLogMeta> logMetas = new ArrayList<>();
public int getVersion() {
return this.version;
}
public void setVersion(int version) {
this.version = version;
}
public String getUser() {
return this.user;
}
public void setUser(String user) {
this.user = user;
}
public Map<ApplicationAccessType, String> getAcls() {
return this.acls;
}
public void setAcls(Map<ApplicationAccessType, String> acls) {
this.acls = acls;
}
public String getCompressName() {
return compressName;
}
public void setCompressName(String compressName) {
this.compressName = compressName;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public void addLogMeta(IndexedPerAggregationLogMeta logMeta) {
logMetas.add(logMeta);
}
public List<IndexedPerAggregationLogMeta> getLogMetas() {
return logMetas;
}
}
/**
* This IndexedPerAggregationLogMeta includes the meta information
* for all files which would be aggregated in one
* Log aggregation cycle.
*/
public static class IndexedPerAggregationLogMeta implements Serializable {
private static final long serialVersionUID = 3929298383L;
private String remoteNodeLogFileName;
private Map<String, List<IndexedFileLogMeta>> logMetas = new HashMap<>();
private long uploadTimeStamp;
public String getRemoteNodeFile() {
return remoteNodeLogFileName;
}
public void setRemoteNodeFile(String remoteNodeLogFileName) {
this.remoteNodeLogFileName = remoteNodeLogFileName;
}
public void addContainerLogMeta(String containerId,
List<IndexedFileLogMeta> logMeta) {
logMetas.put(containerId, logMeta);
}
public List<IndexedFileLogMeta> getContainerLogMeta(String containerId) {
return logMetas.get(containerId);
}
public Map<String, List<IndexedFileLogMeta>> getLogMetas() {
return logMetas;
}
public long getUploadTimeStamp() {
return uploadTimeStamp;
}
public void setUploadTimeStamp(long uploadTimeStamp) {
this.uploadTimeStamp = uploadTimeStamp;
}
}
/**
* This IndexedFileLogMeta includes the meta information
* for a single file which would be aggregated in one
* Log aggregation cycle.
*
*/
@Private
@VisibleForTesting
public static class IndexedFileLogMeta implements Serializable {
private static final long serialVersionUID = 1L;
private String containerId;
private String fileName;
private long fileSize;
private long fileCompressedSize;
private long lastModificatedTime;
private long startIndex;
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public long getFileSize() {
return fileSize;
}
public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}
public long getFileCompressedSize() {
return fileCompressedSize;
}
public void setFileCompressedSize(long fileCompressedSize) {
this.fileCompressedSize = fileCompressedSize;
}
public long getLastModificatedTime() {
return lastModificatedTime;
}
public void setLastModificatedTime(long lastModificatedTime) {
this.lastModificatedTime = lastModificatedTime;
}
public long getStartIndex() {
return startIndex;
}
public void setStartIndex(long startIndex) {
this.startIndex = startIndex;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
}
private static String logErrorMessage(File logFile, Exception e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + ". " + e.getMessage();
LOG.error(message, e);
return message;
}
private static class IndexedFileOutputStreamState {
private final Algorithm compressAlgo;
private Compressor compressor;
private final FSDataOutputStream fsOut;
private long posStart;
private final SimpleBufferedOutputStream fsBufferedOutput;
private OutputStream out;
private long offset;
IndexedFileOutputStreamState(Algorithm compressionName,
FSDataOutputStream fsOut, Configuration conf, long offset)
throws IOException {
this.compressAlgo = compressionName;
this.fsOut = fsOut;
this.offset = offset;
this.posStart = fsOut.getPos();
BytesWritable fsOutputBuffer = new BytesWritable();
fsOutputBuffer.setCapacity(LogAggregationIndexedFileController
.getFSOutputBufferSize(conf));
this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut,
fsOutputBuffer.getBytes());
this.compressor = compressAlgo.getCompressor();
try {
this.out = compressAlgo.createCompressionStream(
fsBufferedOutput, compressor, 0);
} catch (IOException e) {
compressAlgo.returnCompressor(compressor);
throw e;
}
}
OutputStream getOutputStream() {
return out;
}
long getCurrentPos() throws IOException {
return fsOut.getPos() + fsBufferedOutput.size();
}
long getStartPos() {
return posStart + offset;
}
long getCompressedSize() throws IOException {
long ret = getCurrentPos() - posStart;
return ret;
}
void finish() throws IOException {
try {
if (out != null) {
out.flush();
out = null;
}
} finally {
compressAlgo.returnCompressor(compressor);
compressor = null;
}
}
}
private static class CachedIndexedLogsMeta {
private final Path remoteLogPath;
private final IndexedLogsMeta indexedLogsMeta;
CachedIndexedLogsMeta(IndexedLogsMeta indexedLogsMeta,
Path remoteLogPath) {
this.indexedLogsMeta = indexedLogsMeta;
this.remoteLogPath = remoteLogPath;
}
public Path getRemoteLogPath() {
return this.remoteLogPath;
}
public IndexedLogsMeta getCachedIndexedLogsMeta() {
return this.indexedLogsMeta;
}
}
@Private
public static int getFSOutputBufferSize(Configuration conf) {
return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
}
@Private
public static int getFSInputBufferSize(Configuration conf) {
return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
}
@Private
@VisibleForTesting
public long getRollOverLogMaxSize(Configuration conf) {
return 1024L * 1024 * 1024 * conf.getInt(
LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
}
private abstract class FSAction<T> {
abstract T run() throws Exception;
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return run();
} catch (IOException e) {
LOG.info("Exception while executing an FS operation.", e);
if (++retry > fsNumRetries) {
LOG.info("Maxed out FS retries. Giving up!");
throw e;
}
LOG.info("Retrying operation on FS. Retry no. " + retry);
Thread.sleep(fsRetryInterval);
}
}
}
}
private Path getCurrentRemoteLogFile(final FileContext fc,
final Path parent, final String nodeId) throws IOException {
RemoteIterator<FileStatus> files = fc.listStatus(parent);
long maxTime = 0L;
Path returnPath = null;
while(files.hasNext()) {
FileStatus candidate = files.next();
String fileName = candidate.getPath().getName();
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX) &&
!fileName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
if (candidate.getModificationTime() > maxTime) {
maxTime = candidate.getModificationTime();
returnPath = candidate.getPath();
}
}
}
return returnPath;
}
private byte[] loadUUIDFromLogFile(final FileContext fc,
final Path parent, final ApplicationId appId, final String nodeId)
throws Exception {
byte[] id = null;
RemoteIterator<FileStatus> files = fc.listStatus(parent);
FSDataInputStream fsDataInputStream = null;
byte[] uuid = createUUID(appId);
while(files.hasNext()) {
try {
Path checkPath = files.next().getPath();
if (checkPath.getName().contains(LogAggregationUtils
.getNodeString(nodeId)) && !checkPath.getName()
.endsWith(CHECK_SUM_FILE_SUFFIX)) {
fsDataInputStream = fc.open(checkPath);
byte[] b = new byte[uuid.length];
int actual = fsDataInputStream.read(b);
if (actual != uuid.length || Arrays.equals(b, uuid)) {
deleteFileWithRetries(fc, checkPath);
} else if (id == null){
id = uuid;
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
}
}
return id == null ? uuid : id;
}
@Private
@VisibleForTesting
public boolean isRollover(final FileContext fc,
final Path candidate) throws IOException {
FileStatus fs = fc.getFileStatus(candidate);
return fs.getLen() >= this.logRollOverMaxFileSize;
}
@Private
@VisibleForTesting
public Clock getSystemClock() {
return SystemClock.getInstance();
}
private byte[] createUUID(ApplicationId appId) throws IOException {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(appId.toString().getBytes(
Charset.forName("UTF-8")));
} catch (NoSuchAlgorithmException ex) {
throw new IOException(ex);
}
}
}