blob: 4ac150ac065ab303c35084000b6f0e4bb098cfad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Logger LOG =
LoggerFactory.getLogger(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
private final Path remoteNodeLogFileForApp;
private final Path remoteNodeTmpLogFileForApp;
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
private final FileContext lfs;
private final LogAggregationContext logAggregationContext;
private final Context context;
private final NodeId nodeId;
private final LogAggregationFileControllerContext logControllerContext;
// These variables are only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
private int logAggregationTimes = 0;
private int cleanupOldLogTimes = 0;
private boolean renameTemporaryLogFileFailed = false;
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>();
private final ContainerLogAggregationPolicy logAggPolicy;
private final LogAggregationFileController logAggregationFileController;
/**
* The value recovered from state store to determine the age of application
* log files if log retention is enabled. Files older than retention policy
* will not be uploaded but scheduled for cleaning up. -1 if not recovered.
*/
private final long recoveredLogInitedTime;
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval,
recoveredLogInitedTime, null);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime,
LogAggregationFileController logAggregationFileController) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
this.appId = appId;
this.applicationId = appId.toString();
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
this.lfs = lfs;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
this.logAggPolicy = getLogAggPolicy(conf);
this.recoveredLogInitedTime = recoveredLogInitedTime;
if (logAggregationFileController == null) {
// by default, use T-File Controller
this.logAggregationFileController = new LogAggregationTFileController();
this.logAggregationFileController.initialize(conf, "TFile");
this.logAggregationFileController.verifyAndCreateRemoteLogDir();
this.logAggregationFileController.createAppDir(
this.userUgi.getShortUserName(), appId, userUgi);
this.remoteNodeLogFileForApp = this.logAggregationFileController
.getRemoteNodeLogFileForApp(appId,
this.userUgi.getShortUserName(), nodeId);
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
} else {
this.logAggregationFileController = logAggregationFileController;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
}
boolean logAggregationInRolling =
rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
logControllerContext = new LogAggregationFileControllerContext(
this.remoteNodeLogFileForApp,
this.remoteNodeTmpLogFileForApp,
logAggregationInRolling,
rollingMonitorInterval,
this.appId, this.appAcls, this.nodeId, this.userUgi);
}
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
String params = getLogAggPolicyParameters(conf);
if (params != null) {
policy.parseParameters(params);
}
return policy;
}
// Use the policy class specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy class.
private ContainerLogAggregationPolicy getLogAggPolicyInstance(
Configuration conf) {
Class<? extends ContainerLogAggregationPolicy> policyClass = null;
if (this.logAggregationContext != null) {
String className =
this.logAggregationContext.getLogAggregationPolicyClassName();
if (className != null) {
try {
Class<?> policyFromContext = conf.getClassByName(className);
if (ContainerLogAggregationPolicy.class.isAssignableFrom(
policyFromContext)) {
policyClass = policyFromContext.asSubclass(
ContainerLogAggregationPolicy.class);
} else {
LOG.warn(this.appId + " specified invalid log aggregation policy " +
className);
}
} catch (ClassNotFoundException cnfe) {
// We don't fail the app if the policy class isn't valid.
LOG.warn(this.appId + " specified invalid log aggregation policy " +
className);
}
}
}
if (policyClass == null) {
policyClass = conf.getClass(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
AllContainerLogAggregationPolicy.class,
ContainerLogAggregationPolicy.class);
} else {
LOG.info(this.appId + " specifies ContainerLogAggregationPolicy of "
+ policyClass);
}
return ReflectionUtils.newInstance(policyClass, conf);
}
// Use the policy parameters specified in LogAggregationContext if available.
// Otherwise use the cluster-wide default policy parameters.
private String getLogAggPolicyParameters(Configuration conf) {
String params = null;
if (this.logAggregationContext != null) {
params = this.logAggregationContext.getLogAggregationPolicyParameters();
}
if (params == null) {
params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
}
return params;
}
private void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) {
return;
}
addCredentials();
// Create a set of Containers whose logs will be uploaded in this cycle.
// It includes:
// a) all containers in pendingContainers: those containers are finished
// and satisfy the ContainerLogAggregationPolicy.
// b) some set of running containers: For all the Running containers,
// we use exitCode of 0 to find those which satisfy the
// ContainerLogAggregationPolicy.
Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
this.pendingContainers.drainTo(pendingContainerInThisCycle);
Set<ContainerId> finishedContainers =
new HashSet<ContainerId>(pendingContainerInThisCycle);
if (this.context.getApplications().get(this.appId) != null) {
for (Container container : this.context.getApplications()
.get(this.appId).getContainers().values()) {
ContainerType containerType =
container.getContainerTokenIdentifier().getContainerType();
if (shouldUploadLogs(new ContainerLogContext(
container.getContainerId(), containerType, 0))) {
pendingContainerInThisCycle.add(container.getContainerId());
}
}
}
if (pendingContainerInThisCycle.isEmpty()) {
sendLogAggregationReport(true, "", appFinished);
return;
}
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
try {
try {
logAggregationFileController.initializeWriter(logControllerContext);
} catch (IOException e1) {
logAggregationSucceedInThisCycle = false;
LOG.error("Cannot create writer for app " + this.applicationId
+ ". Skip log upload this time. ", e1);
return;
}
boolean uploadedLogsInThisCycle = false;
for (ContainerId container : pendingContainerInThisCycle) {
ContainerLogAggregator aggregator = null;
if (containerLogAggregators.containsKey(container)) {
aggregator = containerLogAggregators.get(container);
} else {
aggregator = new ContainerLogAggregator(container);
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(logAggregationFileController,
appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
DeletionTask deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycleList);
delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
// remove it from containerLogAggregators.
if (finishedContainers.contains(container)) {
containerLogAggregators.remove(container);
}
}
logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
logControllerContext.increLogAggregationTimes();
try {
this.logAggregationFileController.postWrite(logControllerContext);
diagnosticMessage = "Log uploaded successfully for Application: "
+ appId + " in NodeManager: "
+ LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(logControllerContext.getLogUploadTimeStamp())
+ "\n";
} catch (Exception e) {
diagnosticMessage = e.getMessage();
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
}
} finally {
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
logAggregationFileController.closeWriter();
}
}
private void addCredentials() {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
context.getSystemCredentialsForApps().get(appId);
if (systemCredentials != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new framework-token for " + appId
+ " for log-aggregation: " + systemCredentials.getAllTokens()
+ "; userUgi=" + userUgi);
}
// this will replace old token
userUgi.addCredentials(systemCredentials);
}
}
}
private void sendLogAggregationReport(
boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
boolean appFinished) {
LogAggregationStatus logAggregationStatus =
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE;
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
// that the log aggregation in this NM is completed.
LogAggregationStatus finalLogAggregationStatus =
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
}
}
private void sendLogAggregationReportInternal(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setDiagnosticMessage(diagnosticMessage);
report.setLogAggregationStatus(logAggregationStatus);
this.context.getLogAggregationStatusForApps().add(report);
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
doAppLogAggregation();
} catch (Exception e) {
// do post clean up of log directories on any exception
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
doAppLogAggregationPostCleanUp();
} finally {
if (!this.appAggregationFinished.get() && !this.aborted.get()) {
LOG.warn("Log aggregation did not complete for application " + appId);
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
}
this.appAggregationFinished.set(true);
}
}
@SuppressWarnings("unchecked")
private void doAppLogAggregation() {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
waiting.set(true);
if (logControllerContext.isLogAggregationInRolling()) {
wait(logControllerContext.getRollingMonitorInterval() * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
uploadLogsForContainers(false);
} else {
wait(THREAD_SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
}
}
}
if (this.aborted.get()) {
return;
}
// App is finished, upload the container logs.
uploadLogsForContainers(true);
doAppLogAggregationPostCleanUp();
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
}
private void doAppLogAggregationPostCleanUp() {
// Remove the local app-log-dirs
List<Path> localAppLogDirs = new ArrayList<Path>();
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
Path logPath = new Path(rootLogDir, applicationId);
try {
// check if log dir exists
lfs.getFileStatus(logPath);
localAppLogDirs.add(logPath);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
continue;
} catch (IOException fe) {
continue;
}
}
if (localAppLogDirs.size() > 0) {
List<Path> localAppLogDirsList = new ArrayList<>();
localAppLogDirsList.addAll(localAppLogDirs);
DeletionTask deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null, localAppLogDirsList);
this.delService.delete(deletionTask);
}
}
private Path getRemoteNodeTmpLogFileForApp() {
return new Path(remoteNodeLogFileForApp.getParent(),
(remoteNodeLogFileForApp.getName() + LogAggregationUtils.TMP_FILE_SUFFIX));
}
private boolean shouldUploadLogs(ContainerLogContext logContext) {
return logAggPolicy.shouldDoLogAggregation(logContext);
}
@Override
public void startContainerLogAggregation(ContainerLogContext logContext) {
if (shouldUploadLogs(logContext)) {
LOG.info("Considering container " + logContext.getContainerId()
+ " for log-aggregation");
this.pendingContainers.add(logContext.getContainerId());
}
}
@Override
public synchronized void finishLogAggregation() {
LOG.info("Application just finished : " + this.applicationId);
this.appFinishing.set(true);
this.notifyAll();
}
@Override
public synchronized void abortLogAggregation() {
LOG.info("Aborting log aggregation for " + this.applicationId);
this.aborted.set(true);
this.notifyAll();
}
@Override
public void disableLogAggregation() {
this.logAggregationDisabled = true;
}
@Private
@VisibleForTesting
// This is only used for testing.
// This will wake the log aggregation thread that is waiting for
// rollingMonitorInterval.
// To use this method, make sure the log aggregation thread is running
// and waiting for rollingMonitorInterval.
public synchronized void doLogAggregationOutOfBand() {
while(!waiting.get()) {
try {
wait(200);
} catch (InterruptedException e) {
// Do Nothing
}
}
LOG.info("Do OutOfBand log aggregation");
this.notifyAll();
}
class ContainerLogAggregator {
private final AggregatedLogFormat.LogRetentionContext retentionContext;
private final ContainerId containerId;
private Set<String> uploadedFileMeta = new HashSet<String>();
public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
this.retentionContext = getRetentionContext();
}
private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
final long logRetentionSecs =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
return new AggregatedLogFormat.LogRetentionContext(
recoveredLogInitedTime, logRetentionSecs * 1000);
}
public Set<Path> doContainerLogAggregation(
LogAggregationFileController logAggregationFileController,
boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
final LogKey logKey = new LogKey(containerId);
final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, retentionContext, appFinished,
containerFinished);
try {
logAggregationFileController.write(logKey, logValue);
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.", e);
return new HashSet<Path>();
}
this.uploadedFileMeta.addAll(logValue
.getCurrentUpLoadedFileMeta());
// if any of the previous uploaded logs have been deleted,
// we need to remove them from alreadyUploadedLogs
Iterable<String> mask =
Iterables.filter(uploadedFileMeta, new Predicate<String>() {
@Override
public boolean apply(String next) {
return logValue.getAllExistingFilesMeta().contains(next);
}
});
this.uploadedFileMeta = Sets.newHashSet(mask);
// need to return files uploaded or older-than-retention clean up.
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
logValue.getObseleteRetentionLogFiles());
}
}
// only for test
@VisibleForTesting
public UserGroupInformation getUgi() {
return this.userUgi;
}
@Private
@VisibleForTesting
public int getLogAggregationTimes() {
return this.logAggregationTimes;
}
@VisibleForTesting
int getCleanupOldLogTimes() {
return this.cleanupOldLogTimes;
}
@VisibleForTesting
public LogAggregationFileController getLogAggregationFileController() {
return this.logAggregationFileController;
}
@VisibleForTesting
public LogAggregationFileControllerContext
getLogAggregationFileControllerContext() {
return this.logControllerContext;
}
}