blob: 4938939ae2b816c835aeeba1ba69562727e52956 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
LogHandler {
private static final Logger LOG =
LoggerFactory.getLogger(LogAggregationService.class);
private static final long MIN_LOG_ROLLING_INTERVAL = 3600;
// This configuration is for debug and test purpose. By setting
// this configuration as true. We can break the lower bound of
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private long rollingMonitorInterval;
private final Context context;
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
private NodeId nodeId;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
@VisibleForTesting
ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
}
protected void serviceInit(Configuration conf) throws Exception {
int threadPoolSize = getAggregatorThreadPoolSize(conf);
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
rollingMonitorInterval = conf.getLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
boolean logAggregationDebugMode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
if (rollingMonitorInterval > 0
&& rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) {
if (logAggregationDebugMode) {
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
+ rollingMonitorInterval);
} else {
LOG.warn("rollingMonitorIntervall should be more than or equal to "
+ MIN_LOG_ROLLING_INTERVAL + " seconds. Using "
+ MIN_LOG_ROLLING_INTERVAL + " seconds instead.");
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
}
} else if (rollingMonitorInterval <= 0) {
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
+ ". The log rolling monitoring interval is disabled. "
+ "The logs will be aggregated after this application is finished.");
} else {
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
+ ". The logs will be aggregated every " + rollingMonitorInterval
+ " seconds");
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
this.nodeId = this.context.getNodeId();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
stopAggregators();
super.serviceStop();
}
private void stopAggregators() {
threadPool.shutdown();
boolean supervised = getConfig().getBoolean(
YarnConfiguration.NM_RECOVERY_SUPERVISED,
YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
// if recovery on restart is supported then leave outstanding aggregations
// to the next restart
boolean shouldAbort = context.getNMStateStore().canRecover()
&& !context.getDecommissioned() && supervised;
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
if (shouldAbort) {
aggregator.abortLogAggregation();
} else {
aggregator.finishLogAggregation();
}
}
while (!threadPool.isTerminated()) { // wait for all threads to finish
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.info("Waiting for aggregation to complete for " + appId);
}
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // send interrupt to hurry them along
}
} catch (InterruptedException e) {
LOG.warn("Aggregation stop interrupted!");
break;
}
}
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.warn("Some logs may not have been aggregated for " + appId);
}
}
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext,
long recoveredLogInitedTime) {
ApplicationEvent eventResponse;
try {
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext, recoveredLogInitedTime);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext,
long recoveredLogInitedTime) {
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
if (credentials != null) {
userUgi.addCredentials(credentials);
}
LogAggregationFileController logAggregationFileController
= getLogAggregationFileController(getConfig());
logAggregationFileController.verifyAndCreateRemoteLogDir();
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
logAggregationFileController.getRemoteNodeLogFileForApp(appId,
user, nodeId), appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()), this.rollingMonitorInterval,
recoveredLogInitedTime, logAggregationFileController);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
// wait until check for existing aggregator to create dirs
YarnRuntimeException appDirException = null;
try {
// Create the app dir
logAggregationFileController.createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregator.disableLogAggregation();
if (!(e instanceof YarnRuntimeException)) {
appDirException = new YarnRuntimeException(e);
} else {
appDirException = (YarnRuntimeException)e;
}
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
throw appDirException;
}
// TODO Get the user configuration for the list of containers that need log
// aggregation.
// Schedule the aggregator.
Runnable aggregatorWrapper = new Runnable() {
public void run() {
try {
appLogAggregator.run();
} finally {
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
}
}
};
this.threadPool.execute(aggregatorWrapper);
}
protected void closeFileSystems(final UserGroupInformation userUgi) {
try {
FileSystem.closeAllForUGI(userUgi);
} catch (IOException e) {
LOG.warn("Failed to close filesystems: ", e);
}
}
// for testing only
@Private
int getNumAggregators() {
return this.appLogAggregators.size();
}
private void stopContainer(ContainerId containerId,
ContainerType containerType, int exitCode) {
// A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed.
AppLogAggregator aggregator = this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId());
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + containerId
+ ", did it fail to start?");
return;
}
aggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, containerType, exitCode));
}
@SuppressWarnings("unchecked")
private void stopApp(ApplicationId appId) {
// App is complete. Finish up any containers' pending log aggregation and
// close the application specific logFile.
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + appId
+ ", did it fail to start?");
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
return;
}
aggregator.finishLogAggregation();
}
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartEvent =
(LogHandlerAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getApplicationAcls(),
appStartEvent.getLogAggregationContext(),
appStartEvent.getRecoveredAppLogInitedTime());
break;
case CONTAINER_FINISHED:
LogHandlerContainerFinishedEvent containerFinishEvent =
(LogHandlerContainerFinishedEvent) event;
stopContainer(containerFinishEvent.getContainerId(),
containerFinishEvent.getContainerType(),
containerFinishEvent.getExitCode());
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
default:
; // Ignore
}
}
@VisibleForTesting
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
return this.appLogAggregators;
}
@VisibleForTesting
public NodeId getNodeId() {
return this.nodeId;
}
private int getAggregatorThreadPoolSize(Configuration conf) {
int threadPoolSize;
try {
threadPoolSize = conf.getInt(YarnConfiguration
.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
} catch (NumberFormatException ex) {
LOG.warn("Invalid thread pool size. Setting it to the default value " +
"in YarnConfiguration");
threadPoolSize = YarnConfiguration.
DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
}
if(threadPoolSize <= 0) {
LOG.warn("Invalid thread pool size. Setting it to the default value " +
"in YarnConfiguration");
threadPoolSize = YarnConfiguration.
DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
}
return threadPoolSize;
}
@VisibleForTesting
public LogAggregationFileController getLogAggregationFileController(
Configuration conf) {
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(conf);
LogAggregationFileController logAggregationFileController = factory
.getFileControllerForWrite();
return logAggregationFileController;
}
}