blob: 841e16997da62e91ddd5021b59e8789f46f532bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.service.AbstractService;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object();
private final Context context;
private final Dispatcher dispatcher;
private NodeId nodeId;
private long heartBeatInterval;
private ResourceTracker resourceTracker;
private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
private boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
}
@Override
public synchronized void init(Configuration conf) {
this.rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
this.heartBeatInterval =
conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int cpuCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
float vCoresToPCores =
conf.getFloat(
YarnConfiguration.NM_VCORES_PCORES_RATIO,
YarnConfiguration.DEFAULT_NM_VCORES_PCORES_RATIO);
int virtualCores = (int)Math.ceil(cpuCores * vCoresToPCores);
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memoryMb);
this.totalResource.setVirtualCores(virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
super.init(conf);
}
@Override
public void start() {
// NodeManager is the last service to start, so NodeId is available.
this.nodeId = this.context.getNodeId();
InetSocketAddress httpBindAddress = getConfig().getSocketAddr(
YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
try {
// this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.httpPort = httpBindAddress.getPort();
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
registerWithRM();
super.start();
startStatusUpdater();
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
}
@Override
public synchronized void stop() {
// Interrupt the updater.
this.isStopped = true;
super.stop();
}
private boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
@Private
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& isSecurityEnabled();
}
protected ResourceTracker getRMClient() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
conf);
}
private void registerWithRM() throws YarnRemoteException {
this.resourceTracker = getRMClient();
LOG.info("Connecting to ResourceManager at " + this.rmAddress);
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
RegistrationResponse regResponse =
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
throw new YarnException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
if (UserGroupInformation.isSecurityEnabled()) {
MasterKey masterKey = regResponse.getMasterKey();
// do this now so that its set before we start heartbeating to RM
LOG.info("Security enabled - updating secret keys now");
// It is expected that status updater is started by this point and
// RM gives the shared secret in registration during
// StatusUpdater#start().
if (masterKey != null) {
this.context.getContainerTokenSecretManager().setMasterKey(masterKey);
}
}
LOG.info("Registered with ResourceManager as " + this.nodeId
+ " with total resource of " + this.totalResource);
}
private List<ApplicationId> createKeepAliveApplicationList() {
if (!tokenKeepAliveEnabled) {
return Collections.emptyList();
}
List<ApplicationId> appList = new ArrayList<ApplicationId>();
for (Iterator<Entry<ApplicationId, Long>> i =
this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
Entry<ApplicationId, Long> e = i.next();
ApplicationId appId = e.getKey();
Long nextKeepAlive = e.getValue();
if (!this.context.getApplications().containsKey(appId)) {
// Remove if the application has finished.
i.remove();
} else if (System.currentTimeMillis() > nextKeepAlive) {
// KeepAlive list for the next hearbeat.
appList.add(appId);
trackAppForKeepAlive(appId);
}
}
return appList;
}
private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
LOG.info("Sending out status for container: " + containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
LOG.info("Removed completed container " + containerId);
}
}
nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.nodeId + " sending out status for "
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
trackAppForKeepAlive(appId);
}
}
}
private void trackAppForKeepAlive(ApplicationId appId) {
// Next keepAlive request for app between 0.7 & 0.9 of when the token will
// likely expire.
long nextTime = System.currentTimeMillis()
+ (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
* keepAliveDelayRandom.nextInt(100))/100);
appTokenKeepAliveMap.put(appId, nextTime);
}
@Override
public void sendOutofBandHeartBeat() {
synchronized (this.heartbeatMonitor) {
this.heartbeatMonitor.notify();
}
}
protected void startStatusUpdater() {
new Thread("Node Status Updater") {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
// Send heartbeat
try {
synchronized (heartbeatMonitor) {
heartbeatMonitor.wait(heartBeatInterval);
}
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
if (isSecurityEnabled()) {
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey());
}
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
// See if the master-key has rolled over
if (isSecurityEnabled()) {
MasterKey updatedMasterKey = response.getMasterKey();
if (updatedMasterKey != null) {
// Will be non-null only on roll-over on RM side
context.getContainerTokenSecretManager().setMasterKey(
updatedMasterKey);
}
}
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down.");
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.REBOOT));
break;
}
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
}
}
}
}.start();
}
}