blob: f9650fbdc2d12601a9f89786a42707e501b72658 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManager,
EventHandler<ContainerManagerEvent> {
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
final Context context;
private final ContainersMonitor containersMonitor;
private Server server;
private final ResourceLocalizationService rsrcLocalizationSrvc;
private final ContainersLauncher containersLauncher;
private final AuxServices auxiliaryServices;
private final NodeManagerMetrics metrics;
private final NodeStatusUpdater nodeStatusUpdater;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
private final ApplicationACLsManager aclsManager;
private final DeletionService deletionService;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
super(ContainerManagerImpl.class.getName());
this.context = context;
this.dirsHandler = dirsHandler;
// ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;
rsrcLocalizationSrvc =
createResourceLocalizationService(exec, deletionContext);
addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
this.nodeStatusUpdater = nodeStatusUpdater;
this.aclsManager = aclsManager;
// Start configurable services
auxiliaryServices = new AuxServices();
auxiliaryServices.register(this);
addService(auxiliaryServices);
this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
addService(dispatcher);
}
@Override
public void init(Configuration conf) {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
super.init(conf);
}
private void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService, dirsHandler);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService,
dirsHandler);
}
}
public ContainersMonitor getContainersMonitor() {
return this.containersMonitor;
}
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext, dirsHandler);
}
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler);
}
@Override
public void start() {
// Enqueue user dirs in deletion context
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress initialAddress = conf.getSocketAddr(
YarnConfiguration.NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_PORT);
server =
rpc.getServer(ContainerManager.class, this, initialAddress, conf,
this.context.getContainerTokenSecretManager(),
conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new NMPolicyProvider());
}
server.start();
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
this.context.getNodeId().setHost(connectAddress.getHostName());
this.context.getNodeId().setPort(connectAddress.getPort());
LOG.info("ContainerManager started at " + connectAddress);
super.start();
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public void stop() {
if (auxiliaryServices.getServiceState() == STARTED) {
auxiliaryServices.unregister(this);
}
if (server != null) {
server.stop();
}
super.stop();
}
// Get the remoteUGI corresponding to the api call.
private UserGroupInformation getRemoteUgi(String containerIDStr)
throws YarnRemoteException {
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg = "Cannot obtain the user-name for containerId: "
+ containerIDStr + ". Got exception: "
+ StringUtils.stringifyException(e);
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
return remoteUgi;
}
// Obtain the needed ContainerTokenIdentifier from the remote-UGI. RPC layer
// currently sets only the required id, but iterate through anyways just to
// be sure.
private ContainerTokenIdentifier selectContainerTokenIdentifier(
UserGroupInformation remoteUgi) {
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
ContainerTokenIdentifier resultId = null;
for (TokenIdentifier id : tokenIdentifiers) {
if (id instanceof ContainerTokenIdentifier) {
resultId = (ContainerTokenIdentifier) id;
break;
}
}
return resultId;
}
/**
* Authorize the request.
*
* @param containerIDStr
* of the container
* @param launchContext
* passed if verifying the startContainer, null otherwise.
* @param remoteUgi
* ugi corresponding to the remote end making the api-call
* @throws YarnRemoteException
*/
private void authorizeRequest(String containerIDStr,
ContainerLaunchContext launchContext, UserGroupInformation remoteUgi)
throws YarnRemoteException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
boolean unauthorized = false;
StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. ");
if (!remoteUgi.getUserName().equals(containerIDStr)) {
unauthorized = true;
messageBuilder.append("\nExpected containerId: "
+ remoteUgi.getUserName() + " Found: " + containerIDStr);
} else if (launchContext != null) {
// Verify other things also for startContainer() request.
if (LOG.isDebugEnabled()) {
LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
+ remoteUgi.getTokenIdentifiers().size());
}
// Get the tokenId from the remote user ugi
ContainerTokenIdentifier tokenId =
selectContainerTokenIdentifier(remoteUgi);
if (tokenId == null) {
unauthorized = true;
messageBuilder
.append("\nContainerTokenIdentifier cannot be null! Null found for "
+ containerIDStr);
} else {
// Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through
if (!this.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(tokenId)) {
unauthorized = true;
messageBuilder.append("\n Attempt to relaunch the same " +
"container with id " + containerIDStr + ".");
}
// Ensure the token is not expired.
// Token expiry is not checked for stopContainer/getContainerStatus
if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
unauthorized = true;
messageBuilder.append("\nThis token is expired. current time is "
+ System.currentTimeMillis() + " found "
+ tokenId.getExpiryTimeStamp());
}
Resource resource = tokenId.getResource();
if (!resource.equals(launchContext.getResource())) {
unauthorized = true;
messageBuilder.append("\nExpected resource " + resource
+ " but found " + launchContext.getResource());
}
}
}
if (unauthorized) {
String msg = messageBuilder.toString();
LOG.error(msg);
throw RPCUtil.getRemoteException(msg);
}
}
/**
* Start a container on this NodeManager.
*/
@SuppressWarnings("unchecked")
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
ContainerId containerID = launchContext.getContainerId();
String containerIDStr = containerID.toString();
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
authorizeRequest(containerIDStr, launchContext, remoteUgi);
LOG.info("Start request for " + containerIDStr + " by user "
+ launchContext.getUser());
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getContainerTokens();
Credentials credentials = new Credentials();
if (tokens != null) {
DataInputByteBuffer buf = new DataInputByteBuffer();
tokens.rewind();
buf.reset(tokens);
try {
credentials.readTokenStorageStream(buf);
if (LOG.isDebugEnabled()) {
for (Token<? extends TokenIdentifier> tk : credentials
.getAllTokens()) {
LOG.debug(tk.getService() + " = " + tk.toString());
}
}
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
// //////////// End of parsing credentials
Container container = new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics);
ApplicationId applicationID =
containerID.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerID, container) != null) {
NMAuditLogger.logFailure(launchContext.getUser(),
AuditConstants.START_CONTAINER, "ContainerManagerImpl",
"Container already running on this node!",
applicationID, containerID);
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " already is running on this node!!");
}
// Create the application
Application application =
new ApplicationImpl(dispatcher, this.aclsManager,
launchContext.getUser(), applicationID, credentials, context);
if (null ==
context.getApplications().putIfAbsent(applicationID, application)) {
LOG.info("Creating a new application reference for app "
+ applicationID);
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, container
.getLaunchContext().getApplicationACLs()));
}
// TODO: Validate the request
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
if (UserGroupInformation.isSecurityEnabled()) {
ContainerTokenIdentifier tokenId =
selectContainerTokenIdentifier(remoteUgi);
this.context.getContainerTokenSecretManager().startContainerSuccessful(
tokenId);
}
NMAuditLogger.logSuccess(launchContext.getUser(),
AuditConstants.START_CONTAINER, "ContainerManageImpl",
applicationID, containerID);
StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiliaryServices.getMeta());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(launchContext.getResource());
return response;
}
/**
* Stop the container running on this NodeManager.
*/
@Override
@SuppressWarnings("unchecked")
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
// TODO: Only the container's owner can kill containers today.
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
authorizeRequest(containerIDStr, null, remoteUgi);
StopContainerResponse response =
recordFactory.newRecordInstance(StopContainerResponse.class);
Container container = this.context.getContainers().get(containerID);
if (container == null) {
LOG.warn("Trying to stop unknown container " + containerID);
NMAuditLogger.logFailure("UnknownUser",
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
"Trying to stop unknown container!",
containerID.getApplicationAttemptId().getApplicationId(),
containerID);
return response; // Return immediately.
}
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed by the ApplicationMaster."));
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
containerID.getApplicationAttemptId().getApplicationId(),
containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
nodeStatusUpdater.sendOutofBandHeartBeat();
return response;
}
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
// TODO: Only the container's owner can get containers' status today.
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
authorizeRequest(containerIDStr, null, remoteUgi);
LOG.info("Getting container-status for " + containerIDStr);
Container container = this.context.getContainers().get(containerID);
if (container != null) {
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
GetContainerStatusResponse response = recordFactory
.newRecordInstance(GetContainerStatusResponse.class);
response.setStatus(containerStatus);
return response;
}
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " is not handled by this NodeManager");
}
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@Override
public void handle(ContainerEvent event) {
Map<ContainerId,Container> containers =
ContainerManagerImpl.this.context.getContainers();
Container c = containers.get(event.getContainerID());
if (c != null) {
c.handle(event);
} else {
LOG.warn("Event " + event + " sent to absent container " +
event.getContainerID());
}
}
}
class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
@Override
public void handle(ApplicationEvent event) {
Application app =
ContainerManagerImpl.this.context.getApplications().get(
event.getApplicationID());
if (app != null) {
app.handle(event);
} else {
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
}
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerManagerEvent event) {
switch (event.getType()) {
case FINISH_APPS:
CMgrCompletedAppsEvent appsFinishedEvent =
(CMgrCompletedAppsEvent) event;
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
"Application Killed by ResourceManager"));
}
break;
case FINISH_CONTAINERS:
CMgrCompletedContainersEvent containersFinishedEvent =
(CMgrCompletedContainersEvent) event;
for (ContainerId container : containersFinishedEvent
.getContainersToCleanup()) {
this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container,
"Container Killed by ResourceManager"));
}
break;
default:
LOG.warn("Invalid event " + event.getType() + ". Ignoring.");
}
}
@Override
public void stateChanged(Service service) {
// TODO Auto-generated method stub
}
}