blob: 88a01eb3854629ad3c3804dc6b3e85792540bc61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
LogHandler {
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
private static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
private final Context context;
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
}
public synchronized void init(Configuration conf) {
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
super.init(conf);
}
@Override
public synchronized void start() {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
this.nodeId = this.context.getNodeId();
verifyAndCreateRemoteLogDir(getConfig());
super.start();
}
@Override
public synchronized void stop() {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
stopAggregators();
super.stop();
}
private void stopAggregators() {
threadPool.shutdown();
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
aggregator.finishLogAggregation();
}
while (!threadPool.isTerminated()) { // wait for all threads to finish
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.info("Waiting for aggregation to complete for " + appId);
}
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // send interrupt to hurry them along
}
} catch (InterruptedException e) {
LOG.warn("Aggregation stop interrupted!");
break;
}
}
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.warn("Some logs may not have been aggregated for " + appId);
}
}
private void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD
FileSystem remoteFS = null;
try {
remoteFS = FileSystem.get(conf);
} catch (IOException e) {
throw new YarnException("Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = false;
try {
remoteExists = remoteFS.exists(this.remoteRootLogDir);
} catch (IOException e) {
throw new YarnException("Failed to check for existence of remoteLogDir ["
+ this.remoteRootLogDir + "]");
}
if (remoteExists) {
try {
FsPermission perms =
remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS)) {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
}
} catch (IOException e) {
throw new YarnException(
"Failed while attempting to check permissions for dir ["
+ this.remoteRootLogDir + "]");
}
} else {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch (IOException e) {
throw new YarnException("Failed to create remoteLogDir ["
+ this.remoteRootLogDir + "]", e);
}
}
}
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
this.remoteRootLogDir, appId, user, this.nodeId,
this.remoteRootLogDirSuffix);
}
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
fs.mkdirs(path, new FsPermission(fsPerm));
fs.setPermission(path, new FsPermission(fsPerm));
}
protected void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// TODO: Reuse FS for user?
FileSystem remoteFS = null;
Path userDir = null;
Path suffixDir = null;
Path appDir = null;
try {
remoteFS = FileSystem.get(getConfig());
} catch (IOException e) {
LOG.error("Failed to get remote FileSystem while processing app "
+ appId, e);
throw e;
}
try {
userDir =
LogAggregationUtils.getRemoteLogUserDir(
LogAggregationService.this.remoteRootLogDir, user);
userDir =
userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create user dir [" + userDir
+ "] while processing app " + appId);
throw e;
}
try {
suffixDir =
LogAggregationUtils.getRemoteLogSuffixedDir(
LogAggregationService.this.remoteRootLogDir, user,
LogAggregationService.this.remoteRootLogDirSuffix);
suffixDir =
suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create suffixed user dir [" + suffixDir
+ "] while processing app " + appId);
throw e;
}
try {
appDir =
LogAggregationUtils.getRemoteAppLogDir(
LogAggregationService.this.remoteRootLogDir, appId, user,
LogAggregationService.this.remoteRootLogDirSuffix);
appDir =
appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create application log dir [" + appDir
+ "] while processing app " + appId);
throw e;
}
return null;
}
});
} catch (Exception e) {
throw new YarnException(e);
}
}
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
ApplicationEvent eventResponse;
try {
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnException e) {
LOG.warn("Application failed to init aggregation: " + e.getMessage());
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
}
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
if (credentials != null) {
userUgi.addCredentials(credentials);
}
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
// wait until check for existing aggregator to create dirs
try {
// Create the app dir
createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
if (!(e instanceof YarnException)) {
e = new YarnException(e);
}
throw (YarnException)e;
}
// TODO Get the user configuration for the list of containers that need log
// aggregation.
// Schedule the aggregator.
Runnable aggregatorWrapper = new Runnable() {
public void run() {
try {
appLogAggregator.run();
} finally {
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
}
}
};
this.threadPool.execute(aggregatorWrapper);
}
protected void closeFileSystems(final UserGroupInformation userUgi) {
try {
FileSystem.closeAllForUGI(userUgi);
} catch (IOException e) {
LOG.warn("Failed to close filesystems: ", e);
}
}
// for testing only
@Private
int getNumAggregators() {
return this.appLogAggregators.size();
}
private void stopContainer(ContainerId containerId, int exitCode) {
// A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed.
AppLogAggregator aggregator = this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId());
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + containerId
+ ", did it fail to start?");
return;
}
aggregator.startContainerLogAggregation(containerId, exitCode == 0);
}
private void stopApp(ApplicationId appId) {
// App is complete. Finish up any containers' pending log aggregation and
// close the application specific logFile.
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + appId
+ ", did it fail to start?");
return;
}
aggregator.finishLogAggregation();
}
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartEvent =
(LogHandlerAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls());
break;
case CONTAINER_FINISHED:
LogHandlerContainerFinishedEvent containerFinishEvent =
(LogHandlerContainerFinishedEvent) event;
stopContainer(containerFinishEvent.getContainerId(),
containerFinishEvent.getExitCode());
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
default:
; // Ignore
}
}
}