| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.NodeHealthCheckerService; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.api.ResourceTracker; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; |
| import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.records.NodeStatus; |
| import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; |
| import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; |
| import org.apache.hadoop.yarn.service.Service.STATE; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestNodeStatusUpdater { |
| |
| static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); |
| static final Path basedir = |
| new Path("target", TestNodeStatusUpdater.class.getName()); |
| private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| |
| int heartBeatID = 0; |
| volatile Error nmStartError = null; |
| |
| private class MyResourceTracker implements ResourceTracker { |
| |
| private Context context; |
| |
| public MyResourceTracker(Context context) { |
| this.context = context; |
| } |
| |
| @Override |
| public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException { |
| NodeId nodeId = request.getNodeId(); |
| Resource resource = request.getResource(); |
| LOG.info("Registering " + nodeId.toString()); |
| try { |
| Assert.assertEquals(InetAddress.getLocalHost().getHostAddress() |
| + ":12345", nodeId.toString()); |
| } catch (UnknownHostException e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertEquals(5 * 1024, resource.getMemory()); |
| RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class); |
| |
| RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); |
| response.setRegistrationResponse(regResponse); |
| return response; |
| } |
| |
| ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class); |
| ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class); |
| ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class); |
| |
| @Override |
| public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { |
| NodeStatus nodeStatus = request.getNodeStatus(); |
| LOG.info("Got heartbeat number " + heartBeatID); |
| nodeStatus.setResponseId(heartBeatID++); |
| if (heartBeatID == 1) { |
| Assert.assertEquals(0, nodeStatus.getAllContainers().size()); |
| |
| // Give a container to the NM. |
| applicationID.setId(heartBeatID); |
| firstContainerID.setAppId(applicationID); |
| firstContainerID.setId(heartBeatID); |
| ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| launchContext.setContainerId(firstContainerID); |
| launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); |
| launchContext.getResource().setMemory(2); |
| Container container = new ContainerImpl(null, launchContext, null, null); |
| this.context.getContainers().put(firstContainerID, container); |
| } else if (heartBeatID == 2) { |
| // Checks on the RM end |
| Assert.assertEquals("Number of applications should only be one!", 1, |
| nodeStatus.getAllContainers().size()); |
| Assert.assertEquals("Number of container for the app should be one!", |
| 1, nodeStatus.getContainers(applicationID).size()); |
| Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0) |
| .getResource().getMemory()); |
| |
| // Checks on the NM end |
| ConcurrentMap<ContainerId, Container> activeContainers = |
| this.context.getContainers(); |
| Assert.assertEquals(1, activeContainers.size()); |
| |
| // Give another container to the NM. |
| applicationID.setId(heartBeatID); |
| secondContainerID.setAppId(applicationID); |
| secondContainerID.setId(heartBeatID); |
| ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| launchContext.setContainerId(secondContainerID); |
| launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); |
| launchContext.getResource().setMemory(3); |
| Container container = new ContainerImpl(null, launchContext, null, null); |
| this.context.getContainers().put(secondContainerID, container); |
| } else if (heartBeatID == 3) { |
| // Checks on the RM end |
| Assert.assertEquals("Number of applications should only be one!", 1, |
| nodeStatus.getAllContainers().size()); |
| Assert.assertEquals("Number of container for the app should be two!", |
| 2, nodeStatus.getContainers(applicationID).size()); |
| Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0) |
| .getResource().getMemory()); |
| Assert.assertEquals(3, nodeStatus.getContainers(applicationID).get(1) |
| .getResource().getMemory()); |
| |
| // Checks on the NM end |
| ConcurrentMap<ContainerId, Container> activeContainers = |
| this.context.getContainers(); |
| Assert.assertEquals(2, activeContainers.size()); |
| } |
| HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class); |
| response.setResponseId(heartBeatID); |
| |
| NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class); |
| nhResponse.setHeartbeatResponse(response); |
| return nhResponse; |
| } |
| } |
| |
| private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { |
| private Context context; |
| |
| public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, |
| NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { |
| super(context, dispatcher, healthChecker, metrics); |
| this.context = context; |
| } |
| |
| @Override |
| protected ResourceTracker getRMClient() { |
| return new MyResourceTracker(this.context); |
| } |
| } |
| |
| @Before |
| public void clearError() { |
| nmStartError = null; |
| } |
| |
| @After |
| public void deleteBaseDir() throws IOException { |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| lfs.delete(basedir, true); |
| } |
| |
| @Test |
| public void testNMRegistration() throws InterruptedException { |
| final NodeManager nm = new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| return new MyNodeStatusUpdater(context, dispatcher, healthChecker, |
| metrics); |
| } |
| }; |
| |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setInt(NMConfig.NM_VMEM_GB, 5); // 5GB |
| conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345"); |
| conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346"); |
| conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath()); |
| conf.set(NMConfig.REMOTE_USER_LOG_DIR, new Path(basedir, "remotelogs") |
| .toUri().getPath()); |
| conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath()); |
| nm.init(conf); |
| new Thread() { |
| public void run() { |
| try { |
| nm.start(); |
| } catch (Error e) { |
| TestNodeStatusUpdater.this.nmStartError = e; |
| } |
| } |
| }.start(); |
| |
| System.out.println(" ----- thread already started.." |
| + nm.getServiceState()); |
| |
| int waitCount = 0; |
| while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to start.."); |
| Thread.sleep(1000); |
| } |
| if (nmStartError != null) { |
| throw nmStartError; |
| } |
| if (nm.getServiceState() != STATE.STARTED) { |
| // NM could have failed. |
| Assert.fail("NodeManager failed to start"); |
| } |
| |
| while (heartBeatID <= 3) { |
| Thread.sleep(500); |
| } |
| |
| nm.stop(); |
| } |
| } |