blob: 42228882721188a1c6f243986eb931a3482ffae9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
public class ResourceTrackerService extends AbstractService implements
ResourceTracker {
private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final RMContext rmContext;
private final NodesListManager nodesListManager;
private final NMLivelinessMonitor nmLivelinessMonitor;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private long nextHeartBeatInterval;
private Server server;
private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion;
private static final NodeHeartbeatResponse resync = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private int minAllocMb;
private int minAllocVcores;
static {
resync.setNodeAction(NodeAction.RESYNC);
shutDown.setNodeAction(NodeAction.SHUTDOWN);
}
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager) {
super(ResourceTrackerService.class.getName());
this.rmContext = rmContext;
this.nodesListManager = nodesListManager;
this.nmLivelinessMonitor = nmLivelinessMonitor;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
nextHeartBeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
if (nextHeartBeatInterval <= 0) {
throw new YarnRuntimeException("Invalid Configuration. "
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
+ " should be larger than 0.");
}
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
// ResourceTrackerServer authenticates NodeManager via Kerberos if
// security is enabled, so no secretManager.
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
if (inputStream != null) {
conf.addResource(inputStream);
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
/**
* Helper method to handle received ContainerStatus. If this corresponds to
* the completion of a master-container of a managed AM,
* we call the handler for RMAppAttemptContainerFinishedEvent.
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
rmContext.getRMApps().get(appAttemptId.getApplicationId());
if (rmApp == null) {
LOG.error("Received finished container : "
+ containerStatus.getContainerId()
+ "for unknown application " + appAttemptId.getApplicationId()
+ " Skipping.");
return;
}
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring container completion status for unmanaged AM"
+ rmApp.getApplicationId());
}
return;
}
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
&& containerStatus.getContainerState() == ContainerState.COMPLETE) {
ContainerStatus status =
ContainerStatus.newInstance(containerStatus.getContainerId(),
containerStatus.getContainerState(), containerStatus.getDiagnostics(),
containerStatus.getContainerExitStatus());
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId, status,
nodeId);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
NodeId nodeId = request.getNodeId();
String host = nodeId.getHost();
int cmPort = nodeId.getPort();
int httpPort = request.getHttpPort();
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
if (!minimumNodeManagerVersion.equals("NONE")) {
if (minimumNodeManagerVersion.equals("EqualToRM")) {
minimumNodeManagerVersion = YarnVersionInfo.getVersion();
}
if ((nodeManagerVersion == null) ||
(VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
String message =
"Disallowed NodeManager Version " + nodeManagerVersion
+ ", is less than the minimum version "
+ minimumNodeManagerVersion + " sending SHUTDOWN signal to "
+ "NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
}
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
String message =
"NodeManager from " + host
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
+ " signal to the NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
response.setContainerTokenMasterKey(containerTokenSecretManager
.getCurrentKey());
response.setNMTokenMasterKey(nmTokenSecretManager
.getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion);
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeReconnectEvent(nodeId, rmNode,
request.getRunningApplications()));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
// Handle received container status, this should be processed after new
// RMNode inserted
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
if (!request.getNMContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) {
handleNMContainerStatus(status, nodeId);
}
}
}
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
+ httpPort + ") " + "registered with capability: " + capability
+ ", assigned nodeId " + nodeId;
LOG.info(message);
response.setNodeAction(NodeAction.NORMAL);
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
response.setRMVersion(YarnVersionInfo.getVersion());
return response;
}
@SuppressWarnings("unchecked")
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
NodeStatus remoteNodeStatus = request.getNodeStatus();
/**
* Here is the node heartbeat sequence...
* 1. Check if it's a registered node
* 2. Check if it's a valid (i.e. not excluded) node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
*/
NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. Check if it's a registered node
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
/* node does not exist */
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
return resync;
}
// Send ping
this.nmLivelinessMonitor.receivedPing(nodeId);
// 2. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress();
LOG.info(message);
shutDown.setDiagnosticsMessage(message);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
return shutDown;
}
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress());
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
.getResponseId()) {
String message =
"Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync;
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
return nodeHeartBeatResponse;
}
private void populateKeys(NodeHeartbeatRequest request,
NodeHeartbeatResponse nodeHeartBeatResponse) {
// Check if node's masterKey needs to be updated and if the currentKey has
// roller over, send it across
// ContainerTokenMasterKey
MasterKey nextMasterKeyForNode =
this.containerTokenSecretManager.getNextKey();
if (nextMasterKeyForNode != null
&& (request.getLastKnownContainerTokenMasterKey().getKeyId()
!= nextMasterKeyForNode.getKeyId())) {
nodeHeartBeatResponse.setContainerTokenMasterKey(nextMasterKeyForNode);
}
// NMTokenMasterKey
nextMasterKeyForNode = this.nmTokenSecretManager.getNextKey();
if (nextMasterKeyForNode != null
&& (request.getLastKnownNMTokenMasterKey().getKeyId()
!= nextMasterKeyForNode.getKeyId())) {
nodeHeartBeatResponse.setNMTokenMasterKey(nextMasterKeyForNode);
}
}
/**
* resolving the network topology.
* @param hostName the hostname of this node.
* @return the resolved {@link Node} for this nodemanager.
*/
public static Node resolve(String hostName) {
return RackResolver.resolve(hostName);
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
}