blob: c5eadfd57283b2b5e89a8f3b3bbb3b0997953f86 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class LogAggregationService extends AbstractService implements
EventHandler<LogAggregatorEvent> {
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
private final DeletionService deletionService;
private String[] localRootLogDirs;
Path remoteRootLogDir;
private String nodeFile;
static final String LOG_COMPRESSION_TYPE = NMConfig.NM_PREFIX
+ "logaggregation.log_compression_type";
static final String DEFAULT_COMPRESSION_TYPE = "none";
private static final String LOG_RENTENTION_POLICY_CONFIG_KEY =
NMConfig.NM_PREFIX + "logaggregation.retain-policy";
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ExecutorService threadPool;
public LogAggregationService(DeletionService deletionService) {
super(LogAggregationService.class.getName());
this.deletionService = deletionService;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool();
}
public synchronized void init(Configuration conf) {
this.localRootLogDirs =
conf.getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
this.remoteRootLogDir =
new Path(conf.get(NMConfig.REMOTE_USER_LOG_DIR,
NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
super.init(conf);
}
@Override
public synchronized void start() {
String address =
getConfig().get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS);
InetSocketAddress cmBindAddress = NetUtils.createSocketAddr(address);
try {
this.nodeFile =
InetAddress.getLocalHost().getHostAddress() + "_"
+ cmBindAddress.getPort();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
super.start();
}
Path getRemoteNodeLogFileForApp(ApplicationId appId) {
return getRemoteNodeLogFileForApp(this.remoteRootLogDir, appId,
this.nodeFile);
}
static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
ApplicationId appId, String nodeFile) {
return new Path(getRemoteAppLogDir(remoteRootLogDir, appId),
nodeFile);
}
static Path getRemoteAppLogDir(Path remoteRootLogDir,
ApplicationId appId) {
return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
}
@Override
public synchronized void stop() {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
appLogAggregator.join();
}
super.stop();
}
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy) {
// Get user's FileSystem credentials
UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
if (credentials != null) {
for (Token<? extends TokenIdentifier> token : credentials
.getAllTokens()) {
userUgi.addToken(token);
}
}
// New application
AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
userUgi, this.localRootLogDirs,
getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
// Create the app dir
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// TODO: Reuse FS for user?
FileSystem remoteFS = FileSystem.get(getConfig());
remoteFS.mkdirs(getRemoteAppLogDir(
LogAggregationService.this.remoteRootLogDir, appId)
.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
return null;
}
});
} catch (Exception e) {
throw new YarnException(e);
}
// Get the user configuration for the list of containers that need log
// aggregation.
// Schedule the aggregator.
this.threadPool.execute(appLogAggregator);
}
private void stopContainer(ContainerId containerId, String exitCode) {
// A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed.
if (!this.appLogAggregators.containsKey(containerId.getAppId())) {
throw new YarnException("Application is not initialized yet for "
+ containerId);
}
this.appLogAggregators.get(containerId.getAppId())
.startContainerLogAggregation(containerId, exitCode.equals("0"));
}
private void stopApp(ApplicationId appId) {
// App is complete. Finish up any containers' pending log aggregation and
// close the application specific logFile.
if (!this.appLogAggregators.containsKey(appId)) {
throw new YarnException("Application is not initialized yet for "
+ appId);
}
this.appLogAggregators.get(appId).finishLogAggregation();
}
@Override
public void handle(LogAggregatorEvent event) {
// switch (event.getType()) {
// case APPLICATION_STARTED:
// LogAggregatorAppStartedEvent appStartEvent =
// (LogAggregatorAppStartedEvent) event;
// initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
// appStartEvent.getCredentials(),
// appStartEvent.getLogRetentionPolicy());
// break;
// case CONTAINER_FINISHED:
// LogAggregatorContainerFinishedEvent containerFinishEvent =
// (LogAggregatorContainerFinishedEvent) event;
// stopContainer(containerFinishEvent.getContainerId(),
// containerFinishEvent.getExitCode());
// break;
// case APPLICATION_FINISHED:
// LogAggregatorAppFinishedEvent appFinishedEvent =
// (LogAggregatorAppFinishedEvent) event;
// stopApp(appFinishedEvent.getApplicationId());
// break;
// default:
// ; // Ignore
// }
}
}