| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| |
| import org.apache.avro.AvroRuntimeException; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.NodeHealthCheckerService; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.SecurityInfo; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.NodeHealthStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass; |
| import org.apache.hadoop.yarn.server.YarnServerConfig; |
| import org.apache.hadoop.yarn.server.api.ResourceTracker; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; |
| import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.records.NodeStatus; |
| import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| import org.apache.hadoop.yarn.util.Records; |
| |
| public class NodeStatusUpdaterImpl extends AbstractService implements |
| NodeStatusUpdater { |
| |
| private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); |
| |
| private final Object heartbeatMonitor = new Object(); |
| |
| private final Context context; |
| private final Dispatcher dispatcher; |
| |
| private long heartBeatInterval; |
| private ResourceTracker resourceTracker; |
| private String rmAddress; |
| private Resource totalResource; |
| private String containerManagerBindAddress; |
| private String nodeHttpAddress; |
| private String hostName; |
| private int containerManagerPort; |
| private int httpPort; |
| private NodeId nodeId; |
| private byte[] secretKeyBytes = new byte[0]; |
| private boolean isStopped; |
| private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| |
| private final NodeHealthCheckerService healthChecker; |
| private final NodeManagerMetrics metrics; |
| |
| public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, |
| NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { |
| super(NodeStatusUpdaterImpl.class.getName()); |
| this.healthChecker = healthChecker; |
| this.context = context; |
| this.dispatcher = dispatcher; |
| this.metrics = metrics; |
| } |
| |
| @Override |
| public synchronized void init(Configuration conf) { |
| this.rmAddress = |
| conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS, |
| YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS); |
| this.heartBeatInterval = |
| conf.getLong(NMConfig.HEARTBEAT_INTERVAL, |
| NMConfig.DEFAULT_HEARTBEAT_INTERVAL); |
| int memory = conf.getInt(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB); |
| this.totalResource = recordFactory.newRecordInstance(Resource.class); |
| this.totalResource.setMemory(memory * 1024); |
| metrics.addResource(totalResource); |
| super.init(conf); |
| } |
| |
| @Override |
| public void start() { |
| String cmBindAddressStr = |
| getConfig().get(NMConfig.NM_BIND_ADDRESS, |
| NMConfig.DEFAULT_NM_BIND_ADDRESS); |
| InetSocketAddress cmBindAddress = |
| NetUtils.createSocketAddr(cmBindAddressStr); |
| String httpBindAddressStr = |
| getConfig().get(NMConfig.NM_HTTP_BIND_ADDRESS, |
| NMConfig.DEFAULT_NM_HTTP_BIND_ADDRESS); |
| InetSocketAddress httpBindAddress = |
| NetUtils.createSocketAddr(httpBindAddressStr); |
| try { |
| this.hostName = InetAddress.getLocalHost().getHostAddress(); |
| this.containerManagerPort = cmBindAddress.getPort(); |
| this.httpPort = httpBindAddress.getPort(); |
| this.containerManagerBindAddress = |
| this.hostName + ":" + this.containerManagerPort; |
| this.nodeHttpAddress = this.hostName + ":" + this.httpPort; |
| LOG.info("Configured ContainerManager Address is " |
| + this.containerManagerBindAddress); |
| // Registration has to be in start so that ContainerManager can get the |
| // perNM tokens needed to authenticate ContainerTokens. |
| registerWithRM(); |
| super.start(); |
| startStatusUpdater(); |
| } catch (Exception e) { |
| throw new AvroRuntimeException(e); |
| } |
| } |
| |
| @Override |
| public synchronized void stop() { |
| // Interrupt the updater. |
| this.isStopped = true; |
| super.stop(); |
| } |
| |
| protected ResourceTracker getRMClient() { |
| YarnRPC rpc = YarnRPC.create(getConfig()); |
| InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress); |
| Configuration rmClientConf = new Configuration(getConfig()); |
| rmClientConf.setClass( |
| YarnConfiguration.YARN_SECURITY_INFO, |
| RMNMSecurityInfoClass.class, SecurityInfo.class); |
| return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress, |
| rmClientConf); |
| } |
| |
| private void registerWithRM() throws YarnRemoteException { |
| this.resourceTracker = getRMClient(); |
| LOG.info("Connected to ResourceManager at " + this.rmAddress); |
| |
| RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); |
| this.nodeId = Records.newRecord(NodeId.class); |
| this.nodeId.setHost(this.hostName); |
| this.nodeId.setPort(this.containerManagerPort); |
| request.setHttpPort(this.httpPort); |
| request.setResource(this.totalResource); |
| request.setNodeId(this.nodeId); |
| RegistrationResponse regResponse = |
| this.resourceTracker.registerNodeManager(request).getRegistrationResponse(); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| this.secretKeyBytes = regResponse.getSecretKey().array(); |
| } |
| |
| LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress |
| + " with total resource of " + this.totalResource); |
| } |
| |
| @Override |
| public String getContainerManagerBindAddress() { |
| return this.containerManagerBindAddress; |
| } |
| |
| @Override |
| public byte[] getRMNMSharedSecret() { |
| return this.secretKeyBytes.clone(); |
| } |
| |
| private NodeStatus getNodeStatus() { |
| |
| NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); |
| nodeStatus.setNodeId(this.nodeId); |
| |
| int numActiveContainers = 0; |
| for (Iterator<Entry<ContainerId, Container>> i = |
| this.context.getContainers().entrySet().iterator(); i.hasNext();) { |
| Entry<ContainerId, Container> e = i.next(); |
| ContainerId containerId = e.getKey(); |
| Container container = e.getValue(); |
| |
| List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus |
| .getContainers(container.getContainerID().getAppId()); |
| if (applicationContainers == null) { |
| applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>(); |
| nodeStatus.setContainers(container.getContainerID().getAppId(), |
| applicationContainers); |
| } |
| |
| // Clone the container to send it to the RM |
| org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer(); |
| c.setNodeId(this.nodeId); |
| c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime. |
| applicationContainers.add(c); |
| ++numActiveContainers; |
| LOG.info("Sending out status for container: " + c); |
| |
| if (c.getState() == ContainerState.COMPLETE) { |
| // Remove |
| i.remove(); |
| |
| LOG.info("Removed completed container " + containerId); |
| } |
| } |
| |
| LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers |
| + " containers"); |
| |
| NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); |
| if (this.healthChecker != null) { |
| this.healthChecker.setHealthStatus(nodeHealthStatus); |
| } |
| LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() |
| + ", " + nodeHealthStatus.getHealthReport()); |
| nodeStatus.setNodeHealthStatus(nodeHealthStatus); |
| |
| return nodeStatus; |
| } |
| |
| @Override |
| public void sendOutofBandHeartBeat() { |
| synchronized (this.heartbeatMonitor) { |
| this.heartbeatMonitor.notify(); |
| } |
| } |
| |
| protected void startStatusUpdater() { |
| |
| new Thread() { |
| @Override |
| public void run() { |
| int lastHeartBeatID = 0; |
| while (!isStopped) { |
| // Send heartbeat |
| try { |
| synchronized (heartbeatMonitor) { |
| heartbeatMonitor.wait(heartBeatInterval); |
| } |
| NodeStatus nodeStatus = getNodeStatus(); |
| nodeStatus.setResponseId(lastHeartBeatID); |
| |
| NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class); |
| request.setNodeStatus(nodeStatus); |
| HeartbeatResponse response = |
| resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); |
| lastHeartBeatID = response.getResponseId(); |
| List<ContainerId> containersToCleanup = response |
| .getContainersToCleanupList(); |
| if (containersToCleanup.size() != 0) { |
| dispatcher.getEventHandler().handle( |
| new CMgrCompletedContainersEvent(containersToCleanup)); |
| } |
| List<ApplicationId> appsToCleanup = |
| response.getApplicationsToCleanupList(); |
| if (appsToCleanup.size() != 0) { |
| dispatcher.getEventHandler().handle( |
| new CMgrCompletedAppsEvent(appsToCleanup)); |
| } |
| } catch (Throwable e) { |
| LOG.error("Caught exception in status-updater", e); |
| break; |
| } |
| } |
| } |
| }.start(); |
| } |
| } |