| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager; |
| |
| import static org.mockito.Mockito.mock; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.server.api.ResourceTracker; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; |
| import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.records.NodeAction; |
| import org.apache.hadoop.yarn.server.api.records.NodeStatus; |
| import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; |
| import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.service.Service; |
| import org.apache.hadoop.yarn.service.Service.STATE; |
| import org.apache.hadoop.yarn.util.BuilderUtils; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestNodeStatusUpdater { |
| |
| // temp fix until metrics system can auto-detect itself running in unit test: |
| static { |
| DefaultMetricsSystem.setMiniClusterMode(true); |
| } |
| |
| static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); |
| static final Path basedir = |
| new Path("target", TestNodeStatusUpdater.class.getName()); |
| private static final RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| int heartBeatID = 0; |
| volatile Throwable nmStartError = null; |
| private final List<NodeId> registeredNodes = new ArrayList<NodeId>(); |
| private final Configuration conf = createNMConfig(); |
| private NodeManager nm; |
| protected NodeManager rebootedNodeManager; |
| |
| @After |
| public void tearDown() { |
| this.registeredNodes.clear(); |
| heartBeatID = 0; |
| if (nm != null && nm.getServiceState() == STATE.STARTED) { |
| nm.stop(); |
| } |
| DefaultMetricsSystem.shutdown(); |
| } |
| |
| private class MyResourceTracker implements ResourceTracker { |
| |
| private final Context context; |
| |
| public MyResourceTracker(Context context) { |
| this.context = context; |
| } |
| |
| @Override |
| public RegisterNodeManagerResponse registerNodeManager( |
| RegisterNodeManagerRequest request) throws YarnRemoteException { |
| NodeId nodeId = request.getNodeId(); |
| Resource resource = request.getResource(); |
| LOG.info("Registering " + nodeId.toString()); |
| // NOTE: this really should be checking against the config value |
| InetSocketAddress expected = NetUtils.getConnectAddress( |
| conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); |
| Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString()); |
| Assert.assertEquals(5 * 1024, resource.getMemory()); |
| registeredNodes.add(nodeId); |
| RegistrationResponse regResponse = recordFactory |
| .newRecordInstance(RegistrationResponse.class); |
| |
| RegisterNodeManagerResponse response = recordFactory |
| .newRecordInstance(RegisterNodeManagerResponse.class); |
| response.setRegistrationResponse(regResponse); |
| return response; |
| } |
| |
| ApplicationId applicationID = recordFactory |
| .newRecordInstance(ApplicationId.class); |
| ApplicationAttemptId appAttemptID = recordFactory |
| .newRecordInstance(ApplicationAttemptId.class); |
| ContainerId firstContainerID = recordFactory |
| .newRecordInstance(ContainerId.class); |
| ContainerId secondContainerID = recordFactory |
| .newRecordInstance(ContainerId.class); |
| |
| private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap( |
| List<ContainerStatus> containers) { |
| Map<ApplicationId, List<ContainerStatus>> map = |
| new HashMap<ApplicationId, List<ContainerStatus>>(); |
| for (ContainerStatus cs : containers) { |
| ApplicationId applicationId = |
| cs.getContainerId().getApplicationAttemptId().getApplicationId(); |
| List<ContainerStatus> appContainers = map.get(applicationId); |
| if (appContainers == null) { |
| appContainers = new ArrayList<ContainerStatus>(); |
| map.put(applicationId, appContainers); |
| } |
| appContainers.add(cs); |
| } |
| return map; |
| } |
| |
| @Override |
| public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) |
| throws YarnRemoteException { |
| NodeStatus nodeStatus = request.getNodeStatus(); |
| LOG.info("Got heartbeat number " + heartBeatID); |
| nodeStatus.setResponseId(heartBeatID++); |
| Map<ApplicationId, List<ContainerStatus>> appToContainers = |
| getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); |
| if (heartBeatID == 1) { |
| Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); |
| |
| // Give a container to the NM. |
| applicationID.setId(heartBeatID); |
| appAttemptID.setApplicationId(applicationID); |
| firstContainerID.setApplicationAttemptId(appAttemptID); |
| firstContainerID.setId(heartBeatID); |
| ContainerLaunchContext launchContext = recordFactory |
| .newRecordInstance(ContainerLaunchContext.class); |
| launchContext.setContainerId(firstContainerID); |
| launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); |
| launchContext.getResource().setMemory(2); |
| Container container = new ContainerImpl(conf , null, launchContext, null, null); |
| this.context.getContainers().put(firstContainerID, container); |
| } else if (heartBeatID == 2) { |
| // Checks on the RM end |
| Assert.assertEquals("Number of applications should only be one!", 1, |
| nodeStatus.getContainersStatuses().size()); |
| Assert.assertEquals("Number of container for the app should be one!", |
| 1, appToContainers.get(applicationID).size()); |
| |
| // Checks on the NM end |
| ConcurrentMap<ContainerId, Container> activeContainers = |
| this.context.getContainers(); |
| Assert.assertEquals(1, activeContainers.size()); |
| |
| // Give another container to the NM. |
| applicationID.setId(heartBeatID); |
| appAttemptID.setApplicationId(applicationID); |
| secondContainerID.setApplicationAttemptId(appAttemptID); |
| secondContainerID.setId(heartBeatID); |
| ContainerLaunchContext launchContext = recordFactory |
| .newRecordInstance(ContainerLaunchContext.class); |
| launchContext.setContainerId(secondContainerID); |
| launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); |
| launchContext.getResource().setMemory(3); |
| Container container = new ContainerImpl(conf, null, launchContext, null, null); |
| this.context.getContainers().put(secondContainerID, container); |
| } else if (heartBeatID == 3) { |
| // Checks on the RM end |
| Assert.assertEquals("Number of applications should only be one!", 1, |
| appToContainers.size()); |
| Assert.assertEquals("Number of container for the app should be two!", |
| 2, appToContainers.get(applicationID).size()); |
| |
| // Checks on the NM end |
| ConcurrentMap<ContainerId, Container> activeContainers = |
| this.context.getContainers(); |
| Assert.assertEquals(2, activeContainers.size()); |
| } |
| HeartbeatResponse response = recordFactory |
| .newRecordInstance(HeartbeatResponse.class); |
| response.setResponseId(heartBeatID); |
| |
| NodeHeartbeatResponse nhResponse = recordFactory |
| .newRecordInstance(NodeHeartbeatResponse.class); |
| nhResponse.setHeartbeatResponse(response); |
| return nhResponse; |
| } |
| } |
| |
| private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { |
| public ResourceTracker resourceTracker = new MyResourceTracker(this.context); |
| private Context context; |
| |
| public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, |
| NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { |
| super(context, dispatcher, healthChecker, metrics); |
| this.context = context; |
| } |
| |
| @Override |
| protected ResourceTracker getRMClient() { |
| return resourceTracker; |
| } |
| } |
| |
| private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { |
| public ResourceTracker resourceTracker; |
| private Context context; |
| |
| public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, |
| NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { |
| super(context, dispatcher, healthChecker, metrics); |
| this.context = context; |
| this.resourceTracker = new MyResourceTracker3(this.context); |
| } |
| |
| @Override |
| protected ResourceTracker getRMClient() { |
| return resourceTracker; |
| } |
| |
| @Override |
| protected boolean isTokenKeepAliveEnabled(Configuration conf) { |
| return true; |
| } |
| } |
| |
| private class MyNodeManager extends NodeManager { |
| |
| private MyNodeStatusUpdater3 nodeStatusUpdater; |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| this.nodeStatusUpdater = |
| new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); |
| return this.nodeStatusUpdater; |
| } |
| |
| protected MyNodeStatusUpdater3 getNodeStatusUpdater() { |
| return this.nodeStatusUpdater; |
| } |
| } |
| |
| // |
| private class MyResourceTracker2 implements ResourceTracker { |
| public NodeAction heartBeatNodeAction = NodeAction.NORMAL; |
| public NodeAction registerNodeAction = NodeAction.NORMAL; |
| |
| @Override |
| public RegisterNodeManagerResponse registerNodeManager( |
| RegisterNodeManagerRequest request) throws YarnRemoteException { |
| |
| RegisterNodeManagerResponse response = recordFactory |
| .newRecordInstance(RegisterNodeManagerResponse.class); |
| RegistrationResponse regResponse = recordFactory |
| .newRecordInstance(RegistrationResponse.class); |
| regResponse.setNodeAction(registerNodeAction ); |
| response.setRegistrationResponse(regResponse); |
| return response; |
| } |
| @Override |
| public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) |
| throws YarnRemoteException { |
| NodeStatus nodeStatus = request.getNodeStatus(); |
| nodeStatus.setResponseId(heartBeatID++); |
| HeartbeatResponse response = recordFactory |
| .newRecordInstance(HeartbeatResponse.class); |
| response.setResponseId(heartBeatID); |
| response.setNodeAction(heartBeatNodeAction); |
| |
| NodeHeartbeatResponse nhResponse = recordFactory |
| .newRecordInstance(NodeHeartbeatResponse.class); |
| nhResponse.setHeartbeatResponse(response); |
| return nhResponse; |
| } |
| } |
| |
| private class MyResourceTracker3 implements ResourceTracker { |
| public NodeAction heartBeatNodeAction = NodeAction.NORMAL; |
| public NodeAction registerNodeAction = NodeAction.NORMAL; |
| private Map<ApplicationId, List<Long>> keepAliveRequests = |
| new HashMap<ApplicationId, List<Long>>(); |
| private ApplicationId appId = BuilderUtils.newApplicationId(1, 1); |
| private final Context context; |
| |
| MyResourceTracker3(Context context) { |
| this.context = context; |
| } |
| |
| @Override |
| public RegisterNodeManagerResponse registerNodeManager( |
| RegisterNodeManagerRequest request) throws YarnRemoteException { |
| |
| RegisterNodeManagerResponse response = |
| recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); |
| RegistrationResponse regResponse = |
| recordFactory.newRecordInstance(RegistrationResponse.class); |
| regResponse.setNodeAction(registerNodeAction); |
| response.setRegistrationResponse(regResponse); |
| return response; |
| } |
| |
| @Override |
| public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) |
| throws YarnRemoteException { |
| LOG.info("Got heartBeatId: [" + heartBeatID +"]"); |
| NodeStatus nodeStatus = request.getNodeStatus(); |
| nodeStatus.setResponseId(heartBeatID++); |
| HeartbeatResponse response = |
| recordFactory.newRecordInstance(HeartbeatResponse.class); |
| response.setResponseId(heartBeatID); |
| response.setNodeAction(heartBeatNodeAction); |
| |
| if (nodeStatus.getKeepAliveApplications() != null |
| && nodeStatus.getKeepAliveApplications().size() > 0) { |
| for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) { |
| List<Long> list = keepAliveRequests.get(appId); |
| if (list == null) { |
| list = new LinkedList<Long>(); |
| keepAliveRequests.put(appId, list); |
| } |
| list.add(System.currentTimeMillis()); |
| } |
| } |
| if (heartBeatID == 2) { |
| LOG.info("Sending FINISH_APP for application: [" + appId + "]"); |
| this.context.getApplications().put(appId, mock(Application.class)); |
| response.addAllApplicationsToCleanup(Collections.singletonList(appId)); |
| } |
| NodeHeartbeatResponse nhResponse = |
| recordFactory.newRecordInstance(NodeHeartbeatResponse.class); |
| nhResponse.setHeartbeatResponse(response); |
| return nhResponse; |
| } |
| } |
| |
| @Before |
| public void clearError() { |
| nmStartError = null; |
| } |
| |
| @After |
| public void deleteBaseDir() throws IOException { |
| FileContext lfs = FileContext.getLocalFSFileContext(); |
| lfs.delete(basedir, true); |
| } |
| |
| @Test |
| public void testNMRegistration() throws InterruptedException { |
| nm = new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| return new MyNodeStatusUpdater(context, dispatcher, healthChecker, |
| metrics); |
| } |
| }; |
| |
| YarnConfiguration conf = createNMConfig(); |
| nm.init(conf); |
| |
| // verify that the last service is the nodeStatusUpdater (ie registration |
| // with RM) |
| Object[] services = nm.getServices().toArray(); |
| Object lastService = services[services.length-1]; |
| Assert.assertTrue("last service is NOT the node status updater", |
| lastService instanceof NodeStatusUpdater); |
| |
| new Thread() { |
| public void run() { |
| try { |
| nm.start(); |
| } catch (Throwable e) { |
| TestNodeStatusUpdater.this.nmStartError = e; |
| throw new YarnException(e); |
| } |
| } |
| }.start(); |
| |
| System.out.println(" ----- thread already started.." |
| + nm.getServiceState()); |
| |
| int waitCount = 0; |
| while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to start.."); |
| if (nmStartError != null) { |
| LOG.error("Error during startup. ", nmStartError); |
| Assert.fail(nmStartError.getCause().getMessage()); |
| } |
| Thread.sleep(1000); |
| } |
| if (nm.getServiceState() != STATE.STARTED) { |
| // NM could have failed. |
| Assert.fail("NodeManager failed to start"); |
| } |
| |
| waitCount = 0; |
| while (heartBeatID <= 3 && waitCount++ != 20) { |
| Thread.sleep(500); |
| } |
| Assert.assertFalse(heartBeatID <= 3); |
| Assert.assertEquals("Number of registered NMs is wrong!!", 1, |
| this.registeredNodes.size()); |
| |
| nm.stop(); |
| } |
| |
| @Test |
| public void testStopReentrant() throws Exception { |
| final AtomicInteger numCleanups = new AtomicInteger(0); |
| nm = new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( |
| context, dispatcher, healthChecker, metrics); |
| MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); |
| myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; |
| myNodeStatusUpdater.resourceTracker = myResourceTracker2; |
| return myNodeStatusUpdater; |
| } |
| |
| @Override |
| protected void cleanupContainers() { |
| super.cleanupContainers(); |
| numCleanups.incrementAndGet(); |
| } |
| }; |
| |
| YarnConfiguration conf = createNMConfig(); |
| nm.init(conf); |
| nm.start(); |
| |
| int waitCount = 0; |
| while (heartBeatID < 1 && waitCount++ != 20) { |
| Thread.sleep(500); |
| } |
| Assert.assertFalse(heartBeatID < 1); |
| |
| // Meanwhile call stop directly as the shutdown hook would |
| nm.stop(); |
| |
| // NM takes a while to reach the STOPPED state. |
| waitCount = 0; |
| while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to stop.."); |
| Thread.sleep(1000); |
| } |
| |
| Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); |
| Assert.assertEquals(numCleanups.get(), 1); |
| } |
| |
| @Test |
| public void testNodeDecommision() throws Exception { |
| nm = getNodeManager(NodeAction.SHUTDOWN); |
| YarnConfiguration conf = createNMConfig(); |
| nm.init(conf); |
| Assert.assertEquals(STATE.INITED, nm.getServiceState()); |
| nm.start(); |
| |
| int waitCount = 0; |
| while (heartBeatID < 1 && waitCount++ != 20) { |
| Thread.sleep(500); |
| } |
| Assert.assertFalse(heartBeatID < 1); |
| |
| // NM takes a while to reach the STOPPED state. |
| waitCount = 0; |
| while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to stop.."); |
| Thread.sleep(1000); |
| } |
| |
| Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); |
| } |
| |
| @Test |
| public void testNodeReboot() throws Exception { |
| nm = getNodeManager(NodeAction.REBOOT); |
| YarnConfiguration conf = createNMConfig(); |
| nm.init(conf); |
| Assert.assertEquals(STATE.INITED, nm.getServiceState()); |
| nm.start(); |
| |
| int waitCount = 0; |
| while (heartBeatID < 1 && waitCount++ != 20) { |
| Thread.sleep(500); |
| } |
| Assert.assertFalse(heartBeatID < 1); |
| |
| // NM takes a while to reach the STOPPED state. |
| waitCount = 0; |
| while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to stop.."); |
| Thread.sleep(1000); |
| } |
| Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); |
| |
| waitCount = 0; |
| while (null == rebootedNodeManager && waitCount++ != 20) { |
| LOG.info("Waiting for NM to reinitialize.."); |
| Thread.sleep(1000); |
| } |
| |
| waitCount = 0; |
| while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to start.."); |
| Thread.sleep(1000); |
| } |
| Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState()); |
| |
| rebootedNodeManager.stop(); |
| waitCount = 0; |
| while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) { |
| LOG.info("Waiting for NM to stop.."); |
| Thread.sleep(1000); |
| } |
| Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState()); |
| } |
| |
| @Test |
| public void testNMShutdownForRegistrationFailure() { |
| |
| nm = new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( |
| context, dispatcher, healthChecker, metrics); |
| MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); |
| myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; |
| nodeStatusUpdater.resourceTracker = myResourceTracker2; |
| return nodeStatusUpdater; |
| } |
| }; |
| verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: " |
| + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); |
| } |
| |
| /** |
| * Verifies that if for some reason NM fails to start ContainerManager RPC |
| * server, RM is oblivious to NM's presence. The behaviour is like this |
| * because otherwise, NM will report to RM even if all its servers are not |
| * started properly, RM will think that the NM is alive and will retire the NM |
| * only after NM_EXPIRY interval. See MAPREDUCE-2749. |
| */ |
| @Test |
| public void testNoRegistrationWhenNMServicesFail() { |
| |
| nm = new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| return new MyNodeStatusUpdater(context, dispatcher, healthChecker, |
| metrics); |
| } |
| |
| @Override |
| protected ContainerManagerImpl createContainerManager(Context context, |
| ContainerExecutor exec, DeletionService del, |
| NodeStatusUpdater nodeStatusUpdater, |
| ApplicationACLsManager aclsManager, |
| LocalDirsHandlerService diskhandler) { |
| return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, |
| metrics, aclsManager, diskhandler) { |
| @Override |
| public void start() { |
| // Simulating failure of starting RPC server |
| throw new YarnException("Starting of RPC Server failed"); |
| } |
| }; |
| } |
| }; |
| |
| verifyNodeStartFailure("Starting of RPC Server failed"); |
| } |
| |
| @Test |
| public void testApplicationKeepAlive() throws Exception { |
| MyNodeManager nm = new MyNodeManager(); |
| try { |
| YarnConfiguration conf = createNMConfig(); |
| conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); |
| conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, |
| 4000l); |
| nm.init(conf); |
| nm.start(); |
| // HB 2 -> app cancelled by RM. |
| while (heartBeatID < 12) { |
| Thread.sleep(1000l); |
| } |
| MyResourceTracker3 rt = |
| (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); |
| rt.context.getApplications().remove(rt.appId); |
| Assert.assertEquals(1, rt.keepAliveRequests.size()); |
| int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); |
| LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]"); |
| Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3); |
| while (heartBeatID < 20) { |
| Thread.sleep(1000l); |
| } |
| int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size(); |
| Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2); |
| } finally { |
| if (nm.getServiceState() == STATE.STARTED) |
| nm.stop(); |
| } |
| } |
| |
| private void verifyNodeStartFailure(String errMessage) { |
| YarnConfiguration conf = createNMConfig(); |
| nm.init(conf); |
| try { |
| nm.start(); |
| Assert.fail("NM should have failed to start. Didn't get exception!!"); |
| } catch (Exception e) { |
| Assert.assertEquals(errMessage, e.getCause() |
| .getMessage()); |
| } |
| |
| // the state change to stopped occurs only if the startup is success, else |
| // state change doesn't occur |
| Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm |
| .getServiceState()); |
| |
| Assert.assertEquals("Number of registered nodes is wrong!", 0, |
| this.registeredNodes.size()); |
| } |
| |
| private YarnConfiguration createNMConfig() { |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB |
| conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); |
| conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); |
| conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri() |
| .getPath()); |
| conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, |
| "remotelogs").toUri().getPath()); |
| conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0") |
| .toUri().getPath()); |
| return conf; |
| } |
| |
| private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { |
| return new NodeManager() { |
| @Override |
| protected NodeStatusUpdater createNodeStatusUpdater(Context context, |
| Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { |
| MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( |
| context, dispatcher, healthChecker, metrics); |
| MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); |
| myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; |
| myNodeStatusUpdater.resourceTracker = myResourceTracker2; |
| return myNodeStatusUpdater; |
| } |
| |
| @Override |
| NodeManager createNewNodeManager() { |
| rebootedNodeManager = getNodeManager(NodeAction.NORMAL); |
| return rebootedNodeManager; |
| } |
| }; |
| } |
| } |