blob: 5cdb57458d8bf818b55206f9a557d49d39662468 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS =
YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers";
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object();
private final Context context;
private final Dispatcher dispatcher;
private NodeId nodeId;
private long nextHeartBeatInterval;
private ResourceTracker resourceTracker;
private Resource totalResource;
private int httpPort;
private String nodeManagerVersionId;
private String minimumResourceManagerVersion;
private volatile boolean isStopped;
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
// It will be used to track recently stopped containers on node manager, this
// is to avoid the misleading no-such-container exception messages on NM, when
// the AM finishes it informs the RM to stop the may-be-already-completed
// containers.
private final Map<ContainerId, Long> recentlyStoppedContainers;
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
// This is used to track the current completed containers when nodeheartBeat
// is called. These completed containers will be removed from NM context after
// nodeHeartBeat succeeds and the response from the nodeHeartBeat is
// processed.
private final Set<ContainerId> previousCompletedContainers;
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>();
this.previousCompletedContainers = new HashSet<ContainerId>();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
durationToTrackStoppedContainers =
conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
600000);
if (durationToTrackStoppedContainers < 0) {
String message = "Invalid configuration for "
+ YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+ "value is 10Min(600000).";
LOG.error(message);
throw new YarnException(message);
}
if (LOG.isDebugEnabled()) {
LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+ durationToTrackStoppedContainers);
}
super.serviceInit(conf);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
}
@Override
protected void serviceStart() throws Exception {
// NodeManager is the last service to start, so NodeId is available.
this.nodeId = this.context.getNodeId();
this.httpPort = this.context.getHttpPort();
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
this.resourceTracker = getRMClient();
registerWithRM();
super.serviceStart();
startStatusUpdater();
} catch (Exception e) {
String errorMessage = "Unexpected error starting NodeStatusUpdater";
LOG.error(errorMessage, e);
throw new YarnRuntimeException(e);
}
}
@Override
protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
stopRMProxy();
super.serviceStop();
}
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater.
this.isStopped = true;
try {
statusUpdater.join();
registerWithRM();
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
this.isStopped = false;
statusUpdater.start();
LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
} catch (Exception e) {
String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
LOG.error(errorMessage, e);
throw new YarnRuntimeException(e);
}
}
@VisibleForTesting
protected void stopRMProxy() {
if(this.resourceTracker != null) {
RPC.stopProxy(this.resourceTracker);
}
}
@Private
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& UserGroupInformation.isSecurityEnabled();
}
@VisibleForTesting
protected ResourceTracker getRMClient() throws IOException {
Configuration conf = getConfig();
return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
}
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
"Message from ResourceManager: "
+ regNMResponse.getDiagnosticsMessage();
throw new YarnRuntimeException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
+ message);
}
// if ResourceManager version is too old then shutdown
if (!minimumResourceManagerVersion.equals("NONE")){
if (minimumResourceManagerVersion.equals("EqualToNM")){
minimumResourceManagerVersion = nodeManagerVersionId;
}
String rmVersion = regNMResponse.getRMVersion();
if (rmVersion == null) {
String message = "The Resource Manager's did not return a version. "
+ "Valid version cannot be checked.";
throw new YarnRuntimeException("Shutting down the Node Manager. "
+ message);
}
if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) {
String message = "The Resource Manager's version ("
+ rmVersion +") is less than the minimum "
+ "allowed version " + minimumResourceManagerVersion;
throw new YarnRuntimeException("Shutting down the Node Manager on RM "
+ "version error, " + message);
}
}
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
// do this now so that its set before we start heartbeating to RM
// It is expected that status updater is started by this point and
// RM gives the shared secret in registration during
// StatusUpdater#start().
if (masterKey != null) {
this.context.getContainerTokenSecretManager().setMasterKey(masterKey);
}
masterKey = regNMResponse.getNMTokenMasterKey();
if (masterKey != null) {
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
}
LOG.info("Registered with ResourceManager as " + this.nodeId
+ " with total resource of " + this.totalResource);
LOG.info("Notifying ContainerManager to unblock new container-requests");
((ContainerManagerImpl) this.context.getContainerManager())
.setBlockNewContainerRequests(false);
}
private List<ApplicationId> createKeepAliveApplicationList() {
if (!tokenKeepAliveEnabled) {
return Collections.emptyList();
}
List<ApplicationId> appList = new ArrayList<ApplicationId>();
for (Iterator<Entry<ApplicationId, Long>> i =
this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
Entry<ApplicationId, Long> e = i.next();
ApplicationId appId = e.getKey();
Long nextKeepAlive = e.getValue();
if (!this.context.getApplications().containsKey(appId)) {
// Remove if the application has finished.
i.remove();
} else if (System.currentTimeMillis() > nextKeepAlive) {
// KeepAlive list for the next hearbeat.
appList.add(appId);
trackAppForKeepAlive(appId);
}
}
return appList;
}
private NodeStatus getNodeStatus(int responseId) {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(healthChecker
.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
List<ContainerStatus> containersStatuses = getContainerStatuses();
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus);
return nodeStatus;
}
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers and previousCompletedContainers collections.
@VisibleForTesting
protected List<ContainerStatus> getContainerStatuses() {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Container container : this.context.getContainers().values()) {
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containerStatuses.add(containerStatus);
if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
updateStoppedContainersInCache(container.getContainerId());
addCompletedContainer(container);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out " + containerStatuses.size()
+ " container statuses: " + containerStatuses);
}
return containerStatuses;
}
// These NMContainerStatus are sent on NM registration and used by YARN only.
private List<NMContainerStatus> getNMContainerStatuses() {
List<NMContainerStatus> containerStatuses =
new ArrayList<NMContainerStatus>();
for (Container container : this.context.getContainers().values()) {
NMContainerStatus status =
container.getNMContainerStatus();
containerStatuses.add(status);
if (status.getContainerState().equals(ContainerState.COMPLETE)) {
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
updateStoppedContainersInCache(container.getContainerId());
addCompletedContainer(container);
}
}
LOG.info("Sending out " + containerStatuses.size()
+ " NM container statuses: " + containerStatuses);
return containerStatuses;
}
private void addCompletedContainer(Container container) {
synchronized (previousCompletedContainers) {
previousCompletedContainers.add(container.getContainerId());
}
}
private void removeCompletedContainersFromContext() {
synchronized (previousCompletedContainers) {
if (!previousCompletedContainers.isEmpty()) {
for (ContainerId containerId : previousCompletedContainers) {
this.context.getContainers().remove(containerId);
}
LOG.info("Removed completed containers from NM context: "
+ previousCompletedContainers);
previousCompletedContainers.clear();
}
}
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
trackAppForKeepAlive(appId);
}
}
}
private void trackAppForKeepAlive(ApplicationId appId) {
// Next keepAlive request for app between 0.7 & 0.9 of when the token will
// likely expire.
long nextTime = System.currentTimeMillis()
+ (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
* keepAliveDelayRandom.nextInt(100))/100);
appTokenKeepAliveMap.put(appId, nextTime);
}
@Override
public void sendOutofBandHeartBeat() {
synchronized (this.heartbeatMonitor) {
this.heartbeatMonitor.notify();
}
}
public boolean isContainerRecentlyStopped(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
return recentlyStoppedContainers.containsKey(containerId);
}
}
@Private
@VisibleForTesting
public void updateStoppedContainersInCache(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache();
recentlyStoppedContainers.put(containerId,
System.currentTimeMillis() + durationToTrackStoppedContainers);
}
}
@Override
public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
recentlyStoppedContainers.clear();
}
}
@Private
@VisibleForTesting
public void removeVeryOldStoppedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
long currentTime = System.currentTimeMillis();
Iterator<ContainerId> i =
recentlyStoppedContainers.keySet().iterator();
while (i.hasNext()) {
if (recentlyStoppedContainers.get(i.next()) < currentTime) {
i.remove();
} else {
break;
}
}
}
}
@Override
public long getRMIdentifier() {
return this.rmIdentifier;
}
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getCurrentKey());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
+ " hence shutting down.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
context.setDecommissioned(true);
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.warn("Node is out of sync with ResourceManager,"
+ " hence resyncing.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
// Invalidate the RMIdentifier while resync
NodeStatusUpdaterImpl.this.rmIdentifier =
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.RESYNC));
break;
}
// Explicitly put this method after checking the resync response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
removeCompletedContainersFromContext();
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
throw new YarnRuntimeException(e);
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
}
}
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
if (updatedMasterKey != null) {
// Will be non-null only on roll-over on RM side
context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
}
updatedMasterKey = response.getNMTokenMasterKey();
if (updatedMasterKey != null) {
context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
}
}
};
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
}