| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.apache.hadoop.net.ServerSocketUtil; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; |
| import org.apache.hadoop.util.Sets; |
| import org.apache.hadoop.util.XMLUtils; |
| import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; |
| import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; |
| import org.apache.hadoop.yarn.server.api.ResourceTracker; |
| import org.apache.hadoop.yarn.server.api.ServerRMProxy; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; |
| |
| import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.HashSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Supplier; |
| import javax.xml.parsers.DocumentBuilderFactory; |
| import javax.xml.transform.Transformer; |
| import javax.xml.transform.TransformerFactory; |
| import javax.xml.transform.dom.DOMSource; |
| import javax.xml.transform.stream.StreamResult; |
| import javax.xml.transform.OutputKeys; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.yarn.LocalConfigurationProvider; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.NodeAttribute; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeType; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.DrainDispatcher; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventDispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.event.InlineDispatcher; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.nodelabels.AttributeValue; |
| import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; |
| import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; |
| import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; |
| import org.apache.hadoop.yarn.server.api.records.AppCollectorData; |
| import org.apache.hadoop.yarn.server.api.records.NodeAction; |
| import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; |
| import org.apache.hadoop.yarn.server.api.records.NodeStatus; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; |
| import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.apache.hadoop.yarn.util.YarnVersionInfo; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| |
| public class TestResourceTrackerService extends NodeLabelTestBase { |
| |
| private final static File TEMP_DIR = new File(System.getProperty( |
| "test.build.data", "/tmp"), "decommision"); |
| private final File hostFile = |
| new File(TEMP_DIR + File.separator + "hostFile.txt"); |
| private final File excludeHostFile = new File(TEMP_DIR + File.separator + |
| "excludeHostFile.txt"); |
| private final File excludeHostXmlFile = |
| new File(TEMP_DIR + File.separator + "excludeHostFile.xml"); |
| |
| private MockRM rm; |
| |
| /** |
| * Test RM read NM next heartBeat Interval correctly from Configuration file, |
| * and NM get next heartBeat Interval from RM correctly |
| */ |
| @Test (timeout = 50000) |
| public void testGetNextHeartBeatInterval() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "4000"); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(4000, nodeHeartbeat.getNextHeartBeatInterval()); |
| |
| NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| Assert.assertEquals(4000, nodeHeartbeat2.getNextHeartBeatInterval()); |
| |
| } |
| |
| /** |
| * Decommissioning using a pre-configured include hosts file |
| */ |
| @Test |
| public void testDecommissionWithIncludeHosts() throws Exception { |
| |
| writeToHostsFile("localhost", "host1", "host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| |
| ClusterMetrics metrics = ClusterMetrics.getMetrics(); |
| assert(metrics != null); |
| int metricCount = metrics.getNumDecommisionedNMs(); |
| |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| |
| // To test that IPs also work |
| String ip = NetUtils.normalizeHostName("localhost"); |
| writeToHostsFile("host1", ip); |
| |
| rm.getNodesListManager().refreshNodes(conf); |
| |
| checkShutdownNMCount(rm, ++metricCount); |
| |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| Assert |
| .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs()); |
| |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN |
| .equals(nodeHeartbeat.getNodeAction())); |
| |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() |
| .getNumShutdownNMs()); |
| rm.stop(); |
| } |
| |
| /** |
| * Decommissioning using a pre-configured exclude hosts file |
| */ |
| @Test |
| public void testDecommissionWithExcludeHosts() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| writeToHostsFile(""); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| |
| |
| int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| rm.drainEvents(); |
| |
| // To test that IPs also work |
| String ip = NetUtils.normalizeHostName("localhost"); |
| writeToHostsFile("host2", ip); |
| |
| rm.getNodesListManager().refreshNodes(conf); |
| |
| checkDecommissionedNMCount(rm, metricCount + 2); |
| |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue("The decommisioned metrics are not updated", |
| NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); |
| |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue("The decommisioned metrics are not updated", |
| NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); |
| rm.drainEvents(); |
| |
| writeToHostsFile(""); |
| rm.getNodesListManager().refreshNodes(conf); |
| |
| nm3 = rm.registerNode("localhost:4433", 1024); |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| // decommissined node is 1 since 1 node is rejoined after updating exclude |
| // file |
| checkDecommissionedNMCount(rm, metricCount + 1); |
| } |
| |
| /** |
| * Graceful decommission node with no running application. |
| */ |
| @Test |
| public void testGracefulDecommissionNoApp() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| writeToHostsFile(""); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("host3:4433", 5120); |
| |
| int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); |
| NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); |
| NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); |
| |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); |
| |
| rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); |
| rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); |
| |
| // Graceful decommission both host2 and host3. |
| writeToHostsFile("host2", "host3"); |
| rm.getNodesListManager().refreshNodes(conf, true); |
| |
| rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); |
| rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); |
| |
| nodeHeartbeat1 = nm1.nodeHeartbeat(true); |
| nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| nodeHeartbeat3 = nm3.nodeHeartbeat(true); |
| |
| checkDecommissionedNMCount(rm, metricCount + 2); |
| rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); |
| rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); |
| |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); |
| Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); |
| Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); |
| } |
| |
| @Test |
| public void testGracefulDecommissionDefaultTimeoutResolution() |
| throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile |
| .getAbsolutePath()); |
| |
| writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null)); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| int nodeMemory = 1024; |
| MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); |
| MockNM nm2 = rm.registerNode("host2:5678", nodeMemory); |
| MockNM nm3 = rm.registerNode("host3:9101", nodeMemory); |
| |
| NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); |
| NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); |
| |
| Assert.assertTrue( |
| NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); |
| Assert.assertTrue( |
| NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); |
| Assert.assertTrue( |
| NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); |
| |
| rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); |
| rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); |
| rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); |
| |
| // Graceful decommission both host1 and host2, with |
| // non default timeout for host1 |
| final Integer nm1DecommissionTimeout = 20; |
| writeToHostsXmlFile( |
| excludeHostXmlFile, |
| Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout), |
| Pair.of(nm2.getNodeId().getHost(), null)); |
| rm.getNodesListManager().refreshNodes(conf, true); |
| rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); |
| rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); |
| Assert.assertEquals( |
| nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId())); |
| Integer defaultDecTimeout = |
| conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, |
| YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); |
| Assert.assertEquals( |
| defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId())); |
| |
| // Graceful decommission host3 with a new default timeout |
| final Integer newDefaultDecTimeout = defaultDecTimeout + 10; |
| writeToHostsXmlFile( |
| excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null)); |
| conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, |
| newDefaultDecTimeout); |
| rm.getNodesListManager().refreshNodes(conf, true); |
| rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); |
| Assert.assertEquals( |
| newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId())); |
| } |
| |
| /** |
| * Graceful decommission node with running application. |
| */ |
| @Test |
| public void testGracefulDecommissionWithApp() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| writeToHostsFile(""); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 10240); |
| MockNM nm2 = rm.registerNode("host2:5678", 20480); |
| MockNM nm3 = rm.registerNode("host3:4433", 10240); |
| NodeId id1 = nm1.getNodeId(); |
| NodeId id3 = nm3.getNodeId(); |
| rm.waitForState(id1, NodeState.RUNNING); |
| rm.waitForState(id3, NodeState.RUNNING); |
| |
| // Create an app and launch two containers on host1. |
| RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm); |
| MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); |
| ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); |
| nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); |
| nm3.nodeHeartbeat(true); |
| |
| // Graceful decommission host1 and host3 |
| writeToHostsFile("host1", "host3"); |
| rm.getNodesListManager().refreshNodes(conf, true); |
| rm.waitForState(id1, NodeState.DECOMMISSIONING); |
| rm.waitForState(id3, NodeState.DECOMMISSIONING); |
| |
| // host1 should be DECOMMISSIONING due to running containers. |
| // host3 should become DECOMMISSIONED. |
| nm1.nodeHeartbeat(true); |
| nm3.nodeHeartbeat(true); |
| rm.waitForState(id1, NodeState.DECOMMISSIONING); |
| rm.waitForState(id3, NodeState.DECOMMISSIONED); |
| nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); |
| |
| // Complete containers on host1. |
| // Since the app is still RUNNING, expect NodeAction.NORMAL. |
| NodeHeartbeatResponse nodeHeartbeat1 = |
| nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); |
| Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); |
| |
| // Finish the app and verified DECOMMISSIONED. |
| MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); |
| rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); |
| nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); |
| Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); |
| rm.waitForState(id1, NodeState.DECOMMISSIONED); |
| } |
| |
| /** |
| * Test graceful decommission of node when an AM container is scheduled on a |
| * node just before it is gracefully decommissioned. |
| */ |
| @Test (timeout = 60000) |
| public void testGracefulDecommissionAfterAMContainerAlloc() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| writeToHostsFile(""); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 10240); |
| MockNM nm2 = rm.registerNode("host2:5678", 20480); |
| MockNM nm3 = rm.registerNode("host3:4433", 10240); |
| |
| NodeId id1 = nm1.getNodeId(); |
| NodeId id2 = nm2.getNodeId(); |
| NodeId id3 = nm3.getNodeId(); |
| |
| rm.waitForState(id1, NodeState.RUNNING); |
| rm.waitForState(id2, NodeState.RUNNING); |
| rm.waitForState(id3, NodeState.RUNNING); |
| |
| // Create an app and schedule AM on host1. |
| RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm); |
| MockAM am = MockRM.launchAM(app, rm, nm1); |
| |
| // Before sending heartbeat we gracefully decommission the node on which AM |
| // is scheduled to simulate race condition. |
| writeToHostsFile("host1", "host3"); |
| rm.getNodesListManager().refreshNodes(conf, true); |
| rm.waitForState(id1, NodeState.DECOMMISSIONING); |
| rm.waitForState(id3, NodeState.DECOMMISSIONING); |
| |
| // Heartbeat after the node is in DECOMMISSIONING state. This will be the |
| // first heartbeat containing information about the AM container since the |
| // application was submitted. |
| ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); |
| nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING); |
| nm3.nodeHeartbeat(true); |
| |
| // host1 should stay in DECOMMISSIONING as it has container running on it. |
| rm.waitForState(id1, NodeState.DECOMMISSIONING); |
| rm.waitForState(id3, NodeState.DECOMMISSIONED); |
| |
| // Go through the normal application flow and wait for it to finish. |
| am.registerAppAttempt(); |
| rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); |
| MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); |
| nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE); |
| rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); |
| rm.waitForState(id1, NodeState.DECOMMISSIONED); |
| } |
| |
| |
| /** |
| * Decommissioning using a post-configured include hosts file |
| */ |
| @Test |
| public void testAddNewIncludePathToConfiguration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| ClusterMetrics metrics = ClusterMetrics.getMetrics(); |
| assert(metrics != null); |
| int initialMetricCount = metrics.getNumShutdownNMs(); |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals( |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertEquals( |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| writeToHostsFile("host1"); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| rm.getNodesListManager().refreshNodes(conf); |
| checkShutdownNMCount(rm, ++initialMetricCount); |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals( |
| "Node should not have been shutdown.", |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| NodeState nodeState = |
| rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState(); |
| Assert.assertEquals("Node should have been shutdown but is in state" + |
| nodeState, NodeState.SHUTDOWN, nodeState); |
| } |
| |
| /** |
| * Decommissioning using a post-configured exclude hosts file |
| */ |
| @Test |
| public void testAddNewExcludePathToConfiguration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| ClusterMetrics metrics = ClusterMetrics.getMetrics(); |
| assert(metrics != null); |
| int initialMetricCount = metrics.getNumDecommisionedNMs(); |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals( |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertEquals( |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| writeToHostsFile("host2"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| rm.getNodesListManager().refreshNodes(conf); |
| checkDecommissionedNMCount(rm, ++initialMetricCount); |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals( |
| "Node should not have been decommissioned.", |
| NodeAction.NORMAL, |
| nodeHeartbeat.getNodeAction()); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertEquals( |
| "Node should have been decommissioned but is in state" |
| + nodeHeartbeat.getNodeAction(), |
| NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); |
| } |
| |
| @Test |
| public void testNodeRegistrationSuccess() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = Records.newRecord( |
| RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| // trying to register a invalid node. |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); |
| } |
| |
| @Test |
| public void testNodeRegistrationWithLabels() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| registerReq.setNodeLabels(toSet(NodeLabel.newInstance("A"))); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| Assert.assertEquals("Action should be normal on valid Node Labels", |
| NodeAction.NORMAL, response.getNodeAction()); |
| assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), |
| NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels())); |
| Assert.assertTrue("Valid Node Labels were not accepted by RM", |
| response.getAreNodeLabelsAcceptedByRM()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testNodeRegistrationWithInvalidLabels() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("X", "Y", "Z")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| registerReq.setNodeLabels(toNodeLabelSet("A", "B", "C")); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| Assert.assertEquals( |
| "On Invalid Node Labels action is expected to be normal", |
| NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId)); |
| Assert.assertNotNull(response.getDiagnosticsMessage()); |
| Assert.assertFalse("Node Labels should not accepted by RM If Invalid", |
| response.getAreNodeLabelsAcceptedByRM()); |
| |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("X", "Y", "Z")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| req.setNodeLabels(toNodeLabelSet("#Y")); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(req); |
| |
| Assert.assertEquals( |
| "On Invalid Node Labels action is expected to be normal", |
| NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId)); |
| Assert.assertNotNull(response.getDiagnosticsMessage()); |
| Assert.assertFalse("Node Labels should not accepted by RM If Invalid", |
| response.getAreNodeLabelsAcceptedByRM()); |
| |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeRegistrationWithCentralLabelConfig() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| req.setNodeLabels(toNodeLabelSet("A")); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(req); |
| // registered to RM with central label config |
| Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId)); |
| Assert |
| .assertFalse( |
| "Node Labels should not accepted by RM If its configured with " + |
| "Central configuration", |
| response.getAreNodeLabelsAcceptedByRM()); |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeRegistrationWithAttributes() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, |
| FileSystemNodeAttributeStore.class, NodeAttributeStore.class); |
| File tempDir = File.createTempFile("nattr", ".tmp"); |
| tempDir.delete(); |
| tempDir.mkdirs(); |
| tempDir.deleteOnExit(); |
| conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, |
| tempDir.getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| NodeAttribute nodeAttribute1 = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", |
| NodeAttributeType.STRING, "V1"); |
| NodeAttribute nodeAttribute2 = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", |
| NodeAttributeType.STRING, "V2"); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2)); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| Assert.assertEquals("Action should be normal on valid Node Attributes", |
| NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals( |
| rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).keySet(), |
| registerReq.getNodeAttributes())); |
| Assert.assertTrue("Valid Node Attributes were not accepted by RM", |
| response.getAreNodeAttributesAcceptedByRM()); |
| |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeRegistrationWithInvalidAttributes() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, |
| FileSystemNodeAttributeStore.class, NodeAttributeStore.class); |
| conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, |
| TEMP_DIR.getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| NodeAttribute validNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", |
| NodeAttributeType.STRING, "V1"); |
| NodeAttribute invalidPrefixNodeAttribute = NodeAttribute |
| .newInstance("_P", "Attr1", |
| NodeAttributeType.STRING, "V2"); |
| NodeAttribute invalidNameNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N", |
| NodeAttributeType.STRING, "V2"); |
| NodeAttribute invalidValueNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", |
| NodeAttributeType.STRING, "..."); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| |
| // check invalid prefix |
| req.setNodeAttributes( |
| toSet(validNodeAttribute, invalidPrefixNodeAttribute)); |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertRegisterResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .endsWith("attributes in HB must have prefix nm.yarn.io")); |
| |
| // check invalid name |
| req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute)); |
| response = resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertRegisterResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .startsWith("attribute name should only contains")); |
| |
| // check invalid value |
| req.setNodeAttributes(toSet(validNodeAttribute, invalidValueNodeAttribute)); |
| response = resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertRegisterResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .startsWith("attribute value should only contains")); |
| |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| private void assertRegisterResponseForInvalidAttributes( |
| RegisterNodeManagerResponse response) { |
| Assert.assertEquals( |
| "On Invalid Node Labels action is expected to be normal", |
| NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertNotNull(response.getDiagnosticsMessage()); |
| Assert.assertFalse("Node Labels should not accepted by RM If Invalid", |
| response.getAreNodeLabelsAcceptedByRM()); |
| } |
| |
| private NodeStatus getNodeStatusObject(NodeId nodeId) { |
| NodeStatus status = Records.newRecord(NodeStatus.class); |
| status.setNodeId(nodeId); |
| status.setResponseId(0); |
| status.setContainersStatuses(Collections.emptyList()); |
| status.setKeepAliveApplications(Collections.emptyList()); |
| return status; |
| } |
| |
| @Test |
| public void testNodeHeartBeatWithLabels() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| // adding valid labels |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| |
| // Registering of labels and other required info to RM |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| registerReq.setNodeLabels(toNodeLabelSet("A")); // Node register label |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| // modification of labels during heartbeat |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Node heartbeat label update |
| NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| NodeHeartbeatResponse nodeHeartbeatResponse = |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| Assert.assertEquals("InValid Node Labels were not accepted by RM", |
| NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); |
| assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), |
| NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels())); |
| Assert.assertTrue("Valid Node Labels were not accepted by RM", |
| nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); |
| |
| // After modification of labels next heartbeat sends null informing no update |
| Set<String> oldLabels = nodeLabelsMgr.getNodeLabels().get(nodeId); |
| int responseId = nodeStatusObject.getResponseId(); |
| heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| heartbeatReq.setNodeLabels(null); // Node heartbeat label update |
| nodeStatusObject = getNodeStatusObject(nodeId); |
| nodeStatusObject.setResponseId(responseId+1); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| Assert.assertEquals("InValid Node Labels were not accepted by RM", |
| NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); |
| assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), |
| oldLabels); |
| Assert.assertFalse("Node Labels should not accepted by RM", |
| nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testNodeHeartbeatWithNodeAttributes() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, |
| FileSystemNodeAttributeStore.class, NodeAttributeStore.class); |
| conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, |
| TEMP_DIR.getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| // Register to RM |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| Set<NodeAttribute> nodeAttributes = new HashSet<>(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host2")); |
| |
| // Set node attributes in HB. |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); |
| int responseId = nodeStatusObject.getResponseId(); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Ensure RM gets correct node attributes update. |
| NodeAttributesManager attributeManager = |
| rm.getRMContext().getNodeAttributesManager(); |
| Map<NodeAttribute, AttributeValue> attrs = attributeManager |
| .getAttributesForNode(nodeId.getHost()); |
| Assert.assertEquals(1, attrs.size()); |
| NodeAttribute na = attrs.keySet().iterator().next(); |
| Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host2", na.getAttributeValue()); |
| Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); |
| |
| |
| // Send another HB to RM with updated node atrribute |
| nodeAttributes.clear(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host3")); |
| nodeStatusObject = getNodeStatusObject(nodeId); |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM gets the updated attribute |
| attrs = attributeManager.getAttributesForNode(nodeId.getHost()); |
| Assert.assertEquals(1, attrs.size()); |
| na = attrs.keySet().iterator().next(); |
| Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host3", na.getAttributeValue()); |
| Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); |
| } |
| |
| @Test |
| public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, |
| FileSystemNodeAttributeStore.class, NodeAttributeStore.class); |
| conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, |
| TEMP_DIR.getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| // Register to RM |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| NodeAttribute validNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host2"); |
| NodeAttribute invalidPrefixNodeAttribute = NodeAttribute |
| .newInstance("_P", "Attr1", |
| NodeAttributeType.STRING, "V2"); |
| NodeAttribute invalidNameNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N", |
| NodeAttributeType.STRING, "V2"); |
| NodeAttribute invalidValueNodeAttribute = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", |
| NodeAttributeType.STRING, "..."); |
| |
| // Set node attributes in HB. |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); |
| int responseId = nodeStatusObject.getResponseId(); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| heartbeatReq.setNodeAttributes(toSet(validNodeAttribute)); |
| |
| // Send first HB to RM with invalid prefix node attributes |
| heartbeatReq.setNodeAttributes( |
| toSet(validNodeAttribute, invalidPrefixNodeAttribute)); |
| NodeHeartbeatResponse response = |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertNodeHeartbeatResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .endsWith("attributes in HB must have prefix nm.yarn.io")); |
| |
| // Send another HB to RM with invalid name node attributes |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq |
| .setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute)); |
| response = resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertNodeHeartbeatResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .startsWith("attribute name should only contains")); |
| |
| // Send another HB to RM with invalid value node attributes |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeAttributes( |
| toSet(validNodeAttribute, invalidValueNodeAttribute)); |
| response = resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode(nodeId.getHost()).size()); |
| assertNodeHeartbeatResponseForInvalidAttributes(response); |
| Assert.assertTrue(response.getDiagnosticsMessage() |
| .startsWith("attribute value should only contains")); |
| |
| // Send another HB to RM with updated node attribute |
| NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host3"); |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute)); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM gets the updated attribute |
| NodeAttributesManager attributeManager = |
| rm.getRMContext().getNodeAttributesManager(); |
| Map<NodeAttribute, AttributeValue> attrs = |
| attributeManager.getAttributesForNode(nodeId.getHost()); |
| Assert.assertEquals(1, attrs.size()); |
| NodeAttribute na = attrs.keySet().iterator().next(); |
| Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host3", na.getAttributeValue()); |
| Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); |
| } |
| |
| private void assertNodeHeartbeatResponseForInvalidAttributes( |
| NodeHeartbeatResponse response) { |
| Assert.assertEquals( |
| "On Invalid Node Labels action is expected to be normal", |
| NodeAction.NORMAL, response.getNodeAction()); |
| Assert.assertNotNull(response.getDiagnosticsMessage()); |
| Assert.assertFalse("Node Labels should not accepted by RM If Invalid", |
| response.getAreNodeLabelsAcceptedByRM()); |
| } |
| |
| @Test |
| public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded() |
| throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, |
| NullNodeAttributeStore.class, NodeAttributeStore.class); |
| conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, |
| TEMP_DIR.getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| // spy node attributes manager |
| NodeAttributesManager tmpAttributeManager = |
| rm.getRMContext().getNodeAttributesManager(); |
| NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager); |
| rm.getRMContext().setNodeAttributesManager(spyAttributeManager); |
| AtomicInteger count = new AtomicInteger(0); |
| Mockito.doAnswer(new Answer<Object>() { |
| public Object answer(InvocationOnMock invocation) throws Exception { |
| count.incrementAndGet(); |
| tmpAttributeManager |
| .replaceNodeAttributes((String) invocation.getArguments()[0], |
| (Map<String, Set<NodeAttribute>>) invocation.getArguments()[1]); |
| return null; |
| } |
| }).when(spyAttributeManager) |
| .replaceNodeAttributes(Mockito.any(String.class), |
| Mockito.any(Map.class)); |
| |
| // Register to RM |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| Set<NodeAttribute> nodeAttributes = new HashSet<>(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host2")); |
| |
| // Set node attributes in HB. |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); |
| int responseId = nodeStatusObject.getResponseId(); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Ensure RM gets correct node attributes update. |
| Map<NodeAttribute, AttributeValue> attrs = spyAttributeManager |
| .getAttributesForNode(nodeId.getHost()); |
| spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost())); |
| Assert.assertEquals(1, attrs.size()); |
| NodeAttribute na = attrs.keySet().iterator().next(); |
| Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host2", na.getAttributeValue()); |
| Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); |
| Assert.assertEquals(1, count.get()); |
| |
| // Send HBs to RM with the same node attributes |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM updated node attributes once |
| Assert.assertEquals(1, count.get()); |
| |
| // Send another HB to RM with updated node attributes |
| nodeAttributes.clear(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host3")); |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM gets the updated attribute |
| attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost()); |
| Assert.assertEquals(1, attrs.size()); |
| na = attrs.keySet().iterator().next(); |
| Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host3", na.getAttributeValue()); |
| Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); |
| |
| // Make sure RM updated node attributes twice |
| Assert.assertEquals(2, count.get()); |
| |
| // Add centralized attributes |
| Map<String, Set<NodeAttribute>> nodeAttributeMapping = ImmutableMap |
| .of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "centAttr", |
| NodeAttributeType.STRING, "x"))); |
| spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, |
| nodeAttributeMapping); |
| |
| // Make sure RM updated node attributes three times |
| Assert.assertEquals(3, count.get()); |
| |
| // Send another HB to RM with non-updated node attributes |
| nodeAttributes.clear(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host3")); |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM still updated node attributes three times |
| Assert.assertEquals(3, count.get()); |
| |
| // Send another HB to RM with updated node attributes |
| nodeAttributes.clear(); |
| nodeAttributes.add(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "host", |
| NodeAttributeType.STRING, "host4")); |
| nodeStatusObject.setResponseId(++responseId); |
| heartbeatReq.setNodeStatus(nodeStatusObject); |
| heartbeatReq.setNodeAttributes(nodeAttributes); |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // Make sure RM gets the updated attribute |
| attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost()); |
| Assert.assertEquals(2, attrs.size()); |
| attrs.keySet().stream().forEach(e -> { |
| Assert.assertEquals(NodeAttributeType.STRING, e.getAttributeType()); |
| if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_DISTRIBUTED) { |
| Assert.assertEquals("host", e.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("host4", e.getAttributeValue()); |
| } else if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_CENTRALIZED) { |
| Assert.assertEquals("centAttr", e.getAttributeKey().getAttributeName()); |
| Assert.assertEquals("x", e.getAttributeValue()); |
| } |
| }); |
| |
| // Make sure RM updated node attributes four times |
| Assert.assertEquals(4, count.get()); |
| |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeHeartBeatWithInvalidLabels() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| |
| try { |
| nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C")); |
| } catch (IOException e) { |
| Assert.fail("Caught Exception while initializing"); |
| e.printStackTrace(); |
| } |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest registerReq = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| registerReq.setResource(capability); |
| registerReq.setNodeId(nodeId); |
| registerReq.setHttpPort(1234); |
| registerReq.setNMVersion(YarnVersionInfo.getVersion()); |
| registerReq.setNodeLabels(toNodeLabelSet("A")); |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(registerReq); |
| |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| heartbeatReq.setNodeLabels(toNodeLabelSet("B", "#C")); // Invalid heart beat labels |
| heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| NodeHeartbeatResponse nodeHeartbeatResponse = |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // response should be NORMAL when RM heartbeat labels are rejected |
| Assert.assertEquals("Response should be NORMAL when RM heartbeat labels" |
| + " are rejected", NodeAction.NORMAL, |
| nodeHeartbeatResponse.getNodeAction()); |
| Assert.assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); |
| Assert.assertNotNull(nodeHeartbeatResponse.getDiagnosticsMessage()); |
| rm.stop(); |
| } |
| |
| @Test |
| public void testNodeHeartbeatWithCentralLabelConfig() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, |
| YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE); |
| |
| final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager(); |
| |
| rm = new MockRM(conf) { |
| @Override |
| protected RMNodeLabelsManager createNodeLabelManager() { |
| return nodeLabelsMgr; |
| } |
| }; |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = |
| Records.newRecord(RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| req.setNodeLabels(toNodeLabelSet("A", "B", "C")); |
| RegisterNodeManagerResponse registerResponse = |
| resourceTrackerService.registerNodeManager(req); |
| |
| NodeHeartbeatRequest heartbeatReq = |
| Records.newRecord(NodeHeartbeatRequest.class); |
| heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Valid heart beat labels |
| heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); |
| heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse |
| .getNMTokenMasterKey()); |
| heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse |
| .getContainerTokenMasterKey()); |
| NodeHeartbeatResponse nodeHeartbeatResponse = |
| resourceTrackerService.nodeHeartbeat(heartbeatReq); |
| |
| // response should be ok but the RMacceptNodeLabelsUpdate should be false |
| Assert.assertEquals(NodeAction.NORMAL, |
| nodeHeartbeatResponse.getNodeAction()); |
| // no change in the labels, |
| Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId)); |
| // heartbeat labels rejected |
| Assert.assertFalse("Invalid Node Labels should not accepted by RM", |
| nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); |
| if (rm != null) { |
| rm.stop(); |
| } |
| } |
| |
| @Test |
| public void testNodeRegistrationVersionLessThanRM() throws Exception { |
| writeToHostsFile("host2"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| conf.set(YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,"EqualToRM" ); |
| rm = new MockRM(conf); |
| rm.start(); |
| String nmVersion = "1.9.9"; |
| |
| ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = Records.newRecord( |
| RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(nmVersion); |
| // trying to register a invalid node. |
| RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction()); |
| Assert.assertTrue("Diagnostic message did not contain: 'Disallowed NodeManager " + |
| "Version "+ nmVersion + ", is less than the minimum version'", |
| response.getDiagnosticsMessage().contains("Disallowed NodeManager Version " + |
| nmVersion + ", is less than the minimum version ")); |
| |
| } |
| |
| @Test |
| public void testNodeRegistrationFailure() throws Exception { |
| writeToHostsFile("host1"); |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = Records.newRecord( |
| RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| // trying to register a invalid node. |
| RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction()); |
| Assert |
| .assertEquals( |
| "Disallowed NodeManager from host2, Sending SHUTDOWN signal to the NodeManager.", |
| response.getDiagnosticsMessage()); |
| } |
| |
| @Test |
| public void testSetRMIdentifierInRegistration() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); |
| RegisterNodeManagerResponse response = nm.registerNode(); |
| |
| // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse |
| Assert.assertEquals(ResourceManager.getClusterTimeStamp(), |
| response.getRMIdentifier()); |
| } |
| |
| @Test |
| public void testNodeRegistrationWithMinimumAllocations() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "2048"); |
| conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4"); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService |
| = rm.getResourceTrackerService(); |
| RegisterNodeManagerRequest req = Records.newRecord( |
| RegisterNodeManagerRequest.class); |
| NodeId nodeId = BuilderUtils.newNodeId("host", 1234); |
| req.setNodeId(nodeId); |
| |
| Resource capability = Resources.createResource(1024); |
| req.setResource(capability); |
| RegisterNodeManagerResponse response1 = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction()); |
| |
| capability.setMemorySize(2048); |
| capability.setVirtualCores(1); |
| req.setResource(capability); |
| RegisterNodeManagerResponse response2 = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction()); |
| |
| capability.setMemorySize(1024); |
| capability.setVirtualCores(4); |
| req.setResource(capability); |
| RegisterNodeManagerResponse response3 = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.SHUTDOWN,response3.getNodeAction()); |
| |
| capability.setMemorySize(2048); |
| capability.setVirtualCores(4); |
| req.setResource(capability); |
| RegisterNodeManagerResponse response4 = |
| resourceTrackerService.registerNodeManager(req); |
| Assert.assertEquals(NodeAction.NORMAL,response4.getNodeAction()); |
| } |
| |
| @Test |
| public void testReboot() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:1234", 2048); |
| |
| int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs(); |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| |
| nodeHeartbeat = nm2.nodeHeartbeat( |
| new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100); |
| Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction())); |
| Assert.assertEquals("Too far behind rm response id:0 nm response id:-100", |
| nodeHeartbeat.getDiagnosticsMessage()); |
| checkRebootedNMCount(rm, ++initialMetricCount); |
| } |
| |
| @Test |
| public void testNodeHeartbeatForAppCollectorsMap() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); |
| // set version to 2 |
| conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); |
| // enable aux-service based timeline collectors |
| conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector"); |
| conf.set(YarnConfiguration.NM_AUX_SERVICES + "." |
| + "timeline_collector" + ".class", |
| PerNodeTimelineCollectorsAuxService.class.getName()); |
| conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, |
| FileSystemTimelineWriterImpl.class, TimelineWriter.class); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:1234", 2048); |
| |
| NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); |
| NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| |
| RMNodeImpl node1 = |
| (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| |
| RMNodeImpl node2 = |
| (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); |
| |
| RMAppImpl app1 = (RMAppImpl) MockRMAppSubmitter.submitWithMemory(1024, rm); |
| String collectorAddr1 = "1.2.3.4:5"; |
| app1.setCollectorData(AppCollectorData.newInstance( |
| app1.getApplicationId(), collectorAddr1)); |
| |
| String collectorAddr2 = "5.4.3.2:1"; |
| RMAppImpl app2 = (RMAppImpl) MockRMAppSubmitter.submitWithMemory(1024, rm); |
| app2.setCollectorData(AppCollectorData.newInstance( |
| app2.getApplicationId(), collectorAddr2)); |
| |
| String collectorAddr3 = "5.4.3.2:2"; |
| app2.setCollectorData(AppCollectorData.newInstance( |
| app2.getApplicationId(), collectorAddr3, 0, 1)); |
| |
| String collectorAddr4 = "5.4.3.2:3"; |
| app2.setCollectorData(AppCollectorData.newInstance( |
| app2.getApplicationId(), collectorAddr4, 1, 0)); |
| |
| // Create a running container for app1 running on nm1 |
| ContainerId runningContainerId1 = BuilderUtils.newContainerId( |
| BuilderUtils.newApplicationAttemptId( |
| app1.getApplicationId(), 0), 0); |
| |
| ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1, |
| ContainerState.RUNNING, "", 0); |
| List<ContainerStatus> statusList = new ArrayList<ContainerStatus>(); |
| statusList.add(status1); |
| NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, |
| "", System.currentTimeMillis()); |
| NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, |
| statusList, null, nodeHealth, null, null, null); |
| node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); |
| |
| Assert.assertEquals(1, node1.getRunningApps().size()); |
| Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); |
| |
| // Create a running container for app2 running on nm2 |
| ContainerId runningContainerId2 = BuilderUtils.newContainerId( |
| BuilderUtils.newApplicationAttemptId( |
| app2.getApplicationId(), 0), 0); |
| |
| ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2, |
| ContainerState.RUNNING, "", 0); |
| statusList = new ArrayList<ContainerStatus>(); |
| statusList.add(status2); |
| nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, |
| statusList, null, nodeHealth, null, null, null); |
| node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus)); |
| Assert.assertEquals(1, node2.getRunningApps().size()); |
| Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); |
| |
| nodeHeartbeat1 = nm1.nodeHeartbeat(true); |
| Map<ApplicationId, AppCollectorData> map1 |
| = nodeHeartbeat1.getAppCollectors(); |
| Assert.assertEquals(1, map1.size()); |
| Assert.assertEquals(collectorAddr1, |
| map1.get(app1.getApplicationId()).getCollectorAddr()); |
| |
| nodeHeartbeat2 = nm2.nodeHeartbeat(true); |
| Map<ApplicationId, AppCollectorData> map2 |
| = nodeHeartbeat2.getAppCollectors(); |
| Assert.assertEquals(1, map2.size()); |
| Assert.assertEquals(collectorAddr4, |
| map2.get(app2.getApplicationId()).getCollectorAddr()); |
| } |
| |
| private void checkRebootedNMCount(MockRM rm2, int count) |
| throws InterruptedException { |
| |
| int waitCount = 0; |
| while (ClusterMetrics.getMetrics().getNumRebootedNMs() != count |
| && waitCount++ < 20) { |
| synchronized (this) { |
| wait(100); |
| } |
| } |
| Assert.assertEquals("The rebooted metrics are not updated", count, |
| ClusterMetrics.getMetrics().getNumRebootedNMs()); |
| } |
| |
| @Test |
| public void testUnhealthyNodeStatus() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs()); |
| // node healthy |
| nm1.nodeHeartbeat(true); |
| |
| // node unhealthy |
| nm1.nodeHeartbeat(false); |
| checkUnhealthyNMCount(rm, nm1, true, 1); |
| |
| // node healthy again |
| nm1.nodeHeartbeat(true); |
| checkUnhealthyNMCount(rm, nm1, false, 0); |
| } |
| |
| private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health, |
| int count) throws Exception { |
| |
| int waitCount = 0; |
| while((rm.getRMContext().getRMNodes().get(nm1.getNodeId()) |
| .getState() != NodeState.UNHEALTHY) == health |
| && waitCount++ < 20) { |
| synchronized (this) { |
| wait(100); |
| } |
| } |
| Assert.assertFalse((rm.getRMContext().getRMNodes().get(nm1.getNodeId()) |
| .getState() != NodeState.UNHEALTHY) == health); |
| Assert.assertEquals("Unhealthy metrics not incremented", count, |
| ClusterMetrics.getMetrics().getUnhealthyNMs()); |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| @Test |
| public void testHandleContainerStatusInvalidCompletions() throws Exception { |
| rm = new MockRM(new YarnConfiguration()); |
| rm.start(); |
| |
| EventHandler handler = |
| spy(rm.getRMContext().getDispatcher().getEventHandler()); |
| |
| // Case 1: Unmanaged AM |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) |
| .withUnmanagedAM(true) |
| .build(); |
| RMApp app = MockRMAppSubmitter.submit(rm, data); |
| |
| // Case 1.1: AppAttemptId is null |
| NMContainerStatus report = |
| NMContainerStatus.newInstance( |
| ContainerId.newContainerId( |
| ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0, |
| ContainerState.COMPLETE, Resource.newInstance(1024, 1), |
| "Dummy Completed", 0, Priority.newInstance(10), 1234); |
| rm.getResourceTrackerService().handleNMContainerStatus(report, null); |
| verify(handler, never()).handle((Event) any()); |
| |
| // Case 1.2: Master container is null |
| RMAppAttemptImpl currentAttempt = |
| (RMAppAttemptImpl) app.getCurrentAppAttempt(); |
| currentAttempt.setMasterContainer(null); |
| report = NMContainerStatus.newInstance( |
| ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0, |
| ContainerState.COMPLETE, Resource.newInstance(1024, 1), |
| "Dummy Completed", 0, Priority.newInstance(10), 1234); |
| rm.getResourceTrackerService().handleNMContainerStatus(report, null); |
| verify(handler, never()).handle((Event)any()); |
| |
| // Case 2: Managed AM |
| app = MockRMAppSubmitter.submitWithMemory(1024, rm); |
| |
| // Case 2.1: AppAttemptId is null |
| report = NMContainerStatus.newInstance( |
| ContainerId.newContainerId( |
| ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0, |
| ContainerState.COMPLETE, Resource.newInstance(1024, 1), |
| "Dummy Completed", 0, Priority.newInstance(10), 1234); |
| try { |
| rm.getResourceTrackerService().handleNMContainerStatus(report, null); |
| } catch (Exception e) { |
| // expected - ignore |
| } |
| verify(handler, never()).handle((Event)any()); |
| |
| // Case 2.2: Master container is null |
| currentAttempt = |
| (RMAppAttemptImpl) app.getCurrentAppAttempt(); |
| currentAttempt.setMasterContainer(null); |
| report = NMContainerStatus.newInstance( |
| ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0, |
| ContainerState.COMPLETE, Resource.newInstance(1024, 1), |
| "Dummy Completed", 0, Priority.newInstance(10), 1234); |
| try { |
| rm.getResourceTrackerService().handleNMContainerStatus(report, null); |
| } catch (Exception e) { |
| // expected - ignore |
| } |
| verify(handler, never()).handle((Event)any()); |
| } |
| |
| @Test |
| public void testReconnectNode() throws Exception { |
| rm = new MockRM() { |
| @Override |
| protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { |
| return new EventDispatcher<SchedulerEvent>(this.scheduler, |
| this.scheduler.getClass().getName()) { |
| @Override |
| public void handle(SchedulerEvent event) { |
| scheduler.handle(event); |
| } |
| }; |
| } |
| }; |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 5120); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(false); |
| rm.drainEvents(); |
| checkUnhealthyNMCount(rm, nm2, true, 1); |
| final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); |
| QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); |
| // TODO Metrics incorrect in case of the FifoScheduler |
| Assert.assertEquals(5120, metrics.getAvailableMB()); |
| |
| // reconnect of healthy node |
| nm1 = rm.registerNode("host1:1234", 5120); |
| NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); |
| rm.drainEvents(); |
| Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); |
| checkUnhealthyNMCount(rm, nm2, true, 1); |
| |
| // reconnect of unhealthy node |
| nm2 = rm.registerNode("host2:5678", 5120); |
| response = nm2.nodeHeartbeat(false); |
| Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); |
| rm.drainEvents(); |
| Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); |
| checkUnhealthyNMCount(rm, nm2, true, 1); |
| |
| // unhealthy node changed back to healthy |
| nm2 = rm.registerNode("host2:5678", 5120); |
| response = nm2.nodeHeartbeat(true); |
| response = nm2.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertEquals(5120 + 5120, metrics.getAvailableMB()); |
| |
| // reconnect of node with changed capability |
| nm1 = rm.registerNode("host2:5678", 10240); |
| response = nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); |
| Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); |
| |
| // reconnect of node with changed capability and running applications |
| List<ApplicationId> runningApps = new ArrayList<ApplicationId>(); |
| runningApps.add(ApplicationId.newInstance(1, 0)); |
| nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); |
| response = nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); |
| Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); |
| |
| // reconnect healthy node changing http port |
| nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); |
| nm1.setHttpPort(3); |
| nm1.registerNode(); |
| response = nm1.nodeHeartbeat(true); |
| response = nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| Assert.assertEquals(3, rmNode.getHttpPort()); |
| Assert.assertEquals(5120, rmNode.getTotalCapability().getMemorySize()); |
| Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); |
| |
| } |
| |
| @Test |
| public void testNMUnregistration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = rm |
| .getResourceTrackerService(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| |
| int shutdownNMsCount = ClusterMetrics.getMetrics() |
| .getNumShutdownNMs(); |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| |
| UnRegisterNodeManagerRequest request = Records |
| .newRecord(UnRegisterNodeManagerRequest.class); |
| request.setNodeId(nm1.getNodeId()); |
| resourceTrackerService.unRegisterNodeManager(request); |
| checkShutdownNMCount(rm, ++shutdownNMsCount); |
| |
| // The RM should remove the node after unregistration, hence send a reboot |
| // command. |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction())); |
| } |
| |
| @Test |
| public void testUnhealthyNMUnregistration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| ResourceTrackerService resourceTrackerService = rm |
| .getResourceTrackerService(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs()); |
| // node healthy |
| nm1.nodeHeartbeat(true); |
| int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); |
| |
| // node unhealthy |
| nm1.nodeHeartbeat(false); |
| checkUnhealthyNMCount(rm, nm1, true, 1); |
| UnRegisterNodeManagerRequest request = Records |
| .newRecord(UnRegisterNodeManagerRequest.class); |
| request.setNodeId(nm1.getNodeId()); |
| resourceTrackerService.unRegisterNodeManager(request); |
| checkShutdownNMCount(rm, ++shutdownNMsCount); |
| } |
| |
| @Test |
| public void testInvalidNMUnregistration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| ResourceTrackerService resourceTrackerService = rm |
| .getResourceTrackerService(); |
| int decommisionedNMsCount = ClusterMetrics.getMetrics() |
| .getNumDecommisionedNMs(); |
| |
| // Node not found for unregister |
| UnRegisterNodeManagerRequest request = Records |
| .newRecord(UnRegisterNodeManagerRequest.class); |
| request.setNodeId(BuilderUtils.newNodeId("host", 1234)); |
| resourceTrackerService.unRegisterNodeManager(request); |
| checkShutdownNMCount(rm, 0); |
| checkDecommissionedNMCount(rm, 0); |
| |
| // 1. Register the Node Manager |
| // 2. Exclude the same Node Manager host |
| // 3. Give NM heartbeat to RM |
| // 4. Unregister the Node Manager |
| MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService); |
| RegisterNodeManagerResponse response = nm1.registerNode(); |
| Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); |
| int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); |
| writeToHostsFile("host2"); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| rm.getNodesListManager().refreshNodes(conf); |
| NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); |
| checkDecommissionedNMCount(rm, decommisionedNMsCount); |
| request.setNodeId(nm1.getNodeId()); |
| resourceTrackerService.unRegisterNodeManager(request); |
| checkShutdownNMCount(rm, ++shutdownNMsCount); |
| checkDecommissionedNMCount(rm, decommisionedNMsCount); |
| |
| // 1. Register the Node Manager |
| // 2. Exclude the same Node Manager host |
| // 3. Unregister the Node Manager |
| MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService); |
| RegisterNodeManagerResponse response2 = nm2.registerNode(); |
| Assert.assertEquals(NodeAction.NORMAL, response2.getNodeAction()); |
| writeToHostsFile("host1"); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| rm.getNodesListManager().refreshNodes(conf); |
| request.setNodeId(nm2.getNodeId()); |
| resourceTrackerService.unRegisterNodeManager(request); |
| checkShutdownNMCount(rm, ++shutdownNMsCount); |
| checkDecommissionedNMCount(rm, decommisionedNMsCount); |
| rm.stop(); |
| } |
| |
| @Test(timeout = 30000) |
| public void testInitDecommMetric() throws Exception { |
| testInitDecommMetricHelper(true); |
| testInitDecommMetricHelper(false); |
| } |
| |
| public void testInitDecommMetricHelper(boolean hasIncludeList) |
| throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| |
| writeToHostsFile(excludeHostFile, "host1"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| |
| if (hasIncludeList) { |
| writeToHostsFile(hostFile, "host1", "host2"); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| } |
| rm.getNodesListManager().refreshNodes(conf); |
| rm.drainEvents(); |
| rm.stop(); |
| |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| nm1 = rm1.registerNode("host1:1234", 5120); |
| nm2 = rm1.registerNode("host2:5678", 10240); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| rm1.drainEvents(); |
| Assert.assertEquals("Number of Decommissioned nodes should be 1", |
| 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); |
| Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + |
| "decommissioned node", |
| 1, rm1.getRMContext().getInactiveRMNodes().size()); |
| writeToHostsFile(excludeHostFile, ""); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| rm1.getNodesListManager().refreshNodes(conf); |
| nm1 = rm1.registerNode("host1:1234", 5120); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| rm1.drainEvents(); |
| Assert.assertEquals("The decommissioned nodes metric should have " + |
| "decremented to 0", |
| 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); |
| Assert.assertEquals("The active nodes metric should be 2", |
| 2, ClusterMetrics.getMetrics().getNumActiveNMs()); |
| Assert.assertEquals("The inactive RMNodes entry should have been removed", |
| 0, rm1.getRMContext().getInactiveRMNodes().size()); |
| rm1.drainEvents(); |
| rm1.stop(); |
| } |
| |
| @Test(timeout = 30000) |
| public void testInitDecommMetricNoRegistration() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| //host3 will not register or heartbeat |
| writeToHostsFile(excludeHostFile, "host3", "host2"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| writeToHostsFile(hostFile, "host1", "host2"); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| rm.getNodesListManager().refreshNodes(conf); |
| rm.drainEvents(); |
| Assert.assertEquals("The decommissioned nodes metric should be 1 ", |
| 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); |
| rm.stop(); |
| |
| MockRM rm1 = new MockRM(conf); |
| rm1.start(); |
| rm1.getNodesListManager().refreshNodes(conf); |
| rm1.drainEvents(); |
| Assert.assertEquals("The decommissioned nodes metric should be 2 ", |
| 2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); |
| rm1.stop(); |
| } |
| |
| @Test |
| public void testIncorrectRecommission() throws Exception { |
| //Check decommissioned node not get recommissioned with graceful refresh |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| writeToHostsFile(excludeHostFile, "host3", "host2"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| writeToHostsFile(hostFile, "host1", "host2"); |
| writeToHostsFile(excludeHostFile, "host1"); |
| rm.getNodesListManager().refreshNodesGracefully(conf, null); |
| rm.drainEvents(); |
| nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue("Node " + nm1.getNodeId().getHost() + |
| " should be Decommissioned", rm.getRMContext() |
| .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState |
| .DECOMMISSIONED); |
| writeToHostsFile(excludeHostFile, ""); |
| rm.getNodesListManager().refreshNodesGracefully(conf, null); |
| rm.drainEvents(); |
| Assert.assertTrue("Node " + nm1.getNodeId().getHost() + |
| " should be Decommissioned", rm.getRMContext() |
| .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState |
| .DECOMMISSIONED); |
| rm.stop(); |
| } |
| |
| /** |
| * Remove a node from all lists and check if its forgotten. |
| */ |
| @Test |
| public void testNodeRemovalNormally() throws Exception { |
| testNodeRemovalUtil(false); |
| testNodeRemovalUtilLost(false); |
| testNodeRemovalUtilRebooted(false); |
| testNodeRemovalUtilUnhealthy(false); |
| } |
| |
| @Test |
| public void testNodeRemovalGracefully() throws Exception { |
| testNodeRemovalUtil(true); |
| testNodeRemovalUtilLost(true); |
| testNodeRemovalUtilRebooted(true); |
| testNodeRemovalUtilUnhealthy(true); |
| } |
| |
| public void refreshNodesOption(boolean doGraceful, Configuration conf) |
| throws Exception { |
| if (doGraceful) { |
| rm.getNodesListManager().refreshNodesGracefully(conf, null); |
| } else { |
| rm.getNodesListManager().refreshNodes(conf); |
| } |
| } |
| |
| public void testNodeRemovalUtil(boolean doGraceful) throws Exception { |
| Configuration conf = new Configuration(); |
| int timeoutValue = 500; |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, ""); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, ""); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| timeoutValue); |
| CountDownLatch latch = new CountDownLatch(1); |
| rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| RMContext rmContext = rm.getRMContext(); |
| refreshNodesOption(doGraceful, conf); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| ClusterMetrics metrics = ClusterMetrics.getMetrics(); |
| assert (metrics != null); |
| |
| //check all 3 nodes joined in as NORMAL |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| rm.drainEvents(); |
| Assert.assertEquals("All 3 nodes should be active", |
| metrics.getNumActiveNMs(), 3); |
| |
| //Remove nm2 from include list, should now be shutdown with timer test |
| String ip = NetUtils.normalizeHostName("localhost"); |
| writeToHostsFile("host1", ip); |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile |
| .getAbsolutePath()); |
| refreshNodesOption(doGraceful, conf); |
| if (doGraceful) { |
| rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); |
| } |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue("Node should not be in active node list", |
| !rmContext.getRMNodes().containsKey(nm2.getNodeId())); |
| |
| RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| Assert.assertEquals("Node should be in inactive node list", |
| rmNode.getState(), |
| doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); |
| Assert.assertEquals("Active nodes should be 2", |
| metrics.getNumActiveNMs(), 2); |
| Assert.assertEquals("Shutdown nodes should be expected", |
| metrics.getNumShutdownNMs(), doGraceful? 0 : 1); |
| |
| int nodeRemovalTimeout = |
| conf.getInt( |
| YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| YarnConfiguration. |
| DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); |
| int nodeRemovalInterval = |
| rmContext.getNodesListManager().getNodeRemovalCheckInterval(); |
| long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout + 100; |
| latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); |
| |
| rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| Assert.assertEquals("Node should have been forgotten!", |
| rmNode, null); |
| Assert.assertEquals("Shutdown nodes should be 0 now", |
| metrics.getNumShutdownNMs(), 0); |
| |
| //Check node removal and re-addition before timer expires |
| writeToHostsFile("host1", ip, "host2"); |
| refreshNodesOption(doGraceful, conf); |
| nm2 = rm.registerNode("host2:5678", 10240); |
| rm.drainEvents(); |
| writeToHostsFile("host1", ip); |
| refreshNodesOption(doGraceful, conf); |
| rm.waitForState(nm2.getNodeId(), |
| doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN); |
| nm2.nodeHeartbeat(true); |
| rm.drainEvents(); |
| rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| Assert.assertEquals("Node should be shutdown", |
| rmNode.getState(), |
| doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN); |
| Assert.assertEquals("Active nodes should be 2", |
| metrics.getNumActiveNMs(), 2); |
| Assert.assertEquals("Shutdown nodes should be expected", |
| metrics.getNumShutdownNMs(), doGraceful? 0 : 1); |
| |
| //add back the node before timer expires |
| latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS); |
| writeToHostsFile("host1", ip, "host2"); |
| refreshNodesOption(doGraceful, conf); |
| nm2 = rm.registerNode("host2:5678", 10240); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| rm.drainEvents(); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| Assert.assertEquals("Shutdown nodes should be 0 now", |
| metrics.getNumShutdownNMs(), 0); |
| Assert.assertEquals("All 3 nodes should be active", |
| metrics.getNumActiveNMs(), 3); |
| |
| //Decommission this node, check timer doesn't remove it |
| writeToHostsFile("host1", "host2", ip); |
| writeToHostsFile(excludeHostFile, "host2"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile |
| .getAbsolutePath()); |
| refreshNodesOption(doGraceful, conf); |
| rm.drainEvents(); |
| rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", |
| (rmNode.getState() == NodeState.DECOMMISSIONED) || |
| (rmNode.getState() == NodeState.DECOMMISSIONING)); |
| if (rmNode.getState() == NodeState.DECOMMISSIONED) { |
| Assert.assertEquals("Decommissioned/ing nodes should be 1 now", |
| metrics.getNumDecommisionedNMs(), 1); |
| } |
| latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); |
| |
| rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", |
| (rmNode.getState() == NodeState.DECOMMISSIONED) || |
| (rmNode.getState() == NodeState.DECOMMISSIONING)); |
| if (rmNode.getState() == NodeState.DECOMMISSIONED) { |
| Assert.assertEquals("Decommissioned/ing nodes should be 1 now", |
| metrics.getNumDecommisionedNMs(), 1); |
| } |
| |
| //Test decommed/ing node that transitions to untracked,timer should remove |
| testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful); |
| rm.stop(); |
| } |
| |
| // A helper method used by testNodeRemovalUtil to avoid exceeding |
| // max allowed length. |
| private void testNodeRemovalUtilDecomToUntracked( |
| RMContext rmContext, Configuration conf, |
| MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful |
| ) throws Exception { |
| ClusterMetrics metrics = ClusterMetrics.getMetrics(); |
| String ip = NetUtils.normalizeHostName("localhost"); |
| writeToHostsFile("host1", ip, "host2"); |
| writeToHostsFile(excludeHostFile, "host2"); |
| refreshNodesOption(doGraceful, conf); |
| nm1.nodeHeartbeat(true); |
| //nm2.nodeHeartbeat(true); |
| nm3.nodeHeartbeat(true); |
| Supplier<RMNode> nodeSupplier = doGraceful |
| ? () -> rmContext.getRMNodes().get(nm2.getNodeId()) |
| : () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId()); |
| pollingAssert(() -> nodeSupplier.get() != null, |
| "Timer for this node was not canceled!"); |
| final List<NodeState> expectedStates = Arrays.asList( |
| NodeState.DECOMMISSIONED, |
| NodeState.DECOMMISSIONING |
| ); |
| pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()), |
| "Node should be in one of these states: " + expectedStates); |
| |
| |
| writeToHostsFile("host1", ip); |
| writeToHostsFile(excludeHostFile, ""); |
| refreshNodesOption(doGraceful, conf); |
| nm2.nodeHeartbeat(true); |
| pollingAssert(() -> nodeSupplier.get() == null, |
| "Node should have been forgotten!"); |
| pollingAssert(metrics::getNumDecommisionedNMs, 0, |
| "metrics#getNumDecommisionedNMs should be 0 now"); |
| pollingAssert(metrics::getNumShutdownNMs, 0, |
| "metrics#getNumShutdownNMs should be 0 now"); |
| pollingAssert(metrics::getNumActiveNMs, 2, |
| "metrics#getNumActiveNMs should be 2 now"); |
| } |
| |
| private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { |
| Configuration conf = new Configuration(); |
| conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000); |
| int timeoutValue = 500; |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| writeToHostsFile(hostFile, "host1", "localhost", "host2"); |
| writeToHostsFile(excludeHostFile, ""); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| timeoutValue); |
| |
| rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| RMContext rmContext = rm.getRMContext(); |
| refreshNodesOption(doGraceful, conf); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); |
| ClusterMetrics metrics = clusterMetrics; |
| assert (metrics != null); |
| rm.drainEvents(); |
| //check all 3 nodes joined in as NORMAL |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| rm.drainEvents(); |
| Assert.assertEquals("All 3 nodes should be active", |
| metrics.getNumActiveNMs(), 3); |
| int waitCount = 0; |
| while(waitCount++ < 20){ |
| synchronized (this) { |
| wait(200); |
| } |
| nm3.nodeHeartbeat(true); |
| nm1.nodeHeartbeat(true); |
| } |
| Assert.assertNotEquals("host2 should be a lost NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("host2 should be a lost NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), |
| NodeState.LOST); |
| Assert.assertEquals("There should be 1 Lost NM!", |
| clusterMetrics.getNumLostNMs(), 1); |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| int nodeRemovalTimeout = |
| conf.getInt( |
| YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| YarnConfiguration. |
| DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); |
| int nodeRemovalInterval = |
| rmContext.getNodesListManager().getNodeRemovalCheckInterval(); |
| long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; |
| writeToHostsFile(hostFile, "host1", "localhost"); |
| writeToHostsFile(excludeHostFile, ""); |
| refreshNodesOption(doGraceful, conf); |
| nm1.nodeHeartbeat(true); |
| nm3.nodeHeartbeat(true); |
| rm.drainEvents(); |
| waitCount = 0; |
| while(rmContext.getInactiveRMNodes().get( |
| nm2.getNodeId()) != null && waitCount++ < 2){ |
| synchronized (this) { |
| wait(maxThreadSleeptime); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| } |
| } |
| Assert.assertEquals("host2 should have been forgotten!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("There should be no Lost NMs!", |
| clusterMetrics.getNumLostNMs(), 0); |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| rm.stop(); |
| } |
| |
| private void testNodeRemovalUtilRebooted(boolean doGraceful) |
| throws Exception { |
| Configuration conf = new Configuration(); |
| int timeoutValue = 500; |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| writeToHostsFile(hostFile, "host1", "localhost", "host2"); |
| writeToHostsFile(excludeHostFile, ""); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| timeoutValue); |
| |
| rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| RMContext rmContext = rm.getRMContext(); |
| refreshNodesOption(doGraceful, conf); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); |
| ClusterMetrics metrics = clusterMetrics; |
| assert (metrics != null); |
| NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat( |
| new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100); |
| rm.drainEvents(); |
| rm.drainEvents(); |
| |
| Assert.assertNotEquals("host2 should be a rebooted NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("host2 should be a rebooted NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), |
| NodeState.REBOOTED); |
| Assert.assertEquals("There should be 1 Rebooted NM!", |
| clusterMetrics.getNumRebootedNMs(), 1); |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| |
| int nodeRemovalTimeout = |
| conf.getInt( |
| YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| YarnConfiguration. |
| DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); |
| int nodeRemovalInterval = |
| rmContext.getNodesListManager().getNodeRemovalCheckInterval(); |
| long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; |
| writeToHostsFile(hostFile, "host1", "localhost"); |
| writeToHostsFile(excludeHostFile, ""); |
| refreshNodesOption(doGraceful, conf); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(true); |
| nm3.nodeHeartbeat(true); |
| rm.drainEvents(); |
| int waitCount = 0; |
| while(rmContext.getInactiveRMNodes().get( |
| nm2.getNodeId()) != null && waitCount++ < 2){ |
| synchronized (this) { |
| wait(maxThreadSleeptime); |
| } |
| } |
| Assert.assertEquals("host2 should have been forgotten!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("There should be no Rebooted NMs!", |
| clusterMetrics.getNumRebootedNMs(), 0); |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| rm.stop(); |
| } |
| |
| private void testNodeRemovalUtilUnhealthy(boolean doGraceful) |
| throws Exception { |
| Configuration conf = new Configuration(); |
| int timeoutValue = 500; |
| conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, |
| hostFile.getAbsolutePath()); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| writeToHostsFile(hostFile, "host1", "localhost", "host2"); |
| writeToHostsFile(excludeHostFile, ""); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| timeoutValue); |
| |
| rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| RMContext rmContext = rm.getRMContext(); |
| refreshNodesOption(doGraceful, conf); |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| MockNM nm2 = rm.registerNode("host2:5678", 10240); |
| MockNM nm3 = rm.registerNode("localhost:4433", 1024); |
| ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); |
| ClusterMetrics metrics = clusterMetrics; |
| assert (metrics != null); |
| rm.drainEvents(); |
| //check all 3 nodes joined in as NORMAL |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm2.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| nodeHeartbeat = nm3.nodeHeartbeat(true); |
| Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); |
| rm.drainEvents(); |
| Assert.assertEquals("All 3 nodes should be active", |
| metrics.getNumActiveNMs(), 3); |
| // node healthy |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(false); |
| nm3.nodeHeartbeat(true); |
| checkUnhealthyNMCount(rm, nm2, true, 1); |
| writeToHostsFile(hostFile, "host1", "localhost"); |
| writeToHostsFile(excludeHostFile, ""); |
| refreshNodesOption(doGraceful, conf); |
| nm1.nodeHeartbeat(true); |
| nm2.nodeHeartbeat(false); |
| nm3.nodeHeartbeat(true); |
| rm.drainEvents(); |
| if (!doGraceful) { |
| Assert.assertNotEquals("host2 should be a shutdown NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("host2 should be a shutdown NM!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), |
| NodeState.SHUTDOWN); |
| } |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| if (!doGraceful) { |
| Assert.assertEquals("There should be 1 Shutdown NM!", |
| clusterMetrics.getNumShutdownNMs(), 1); |
| } |
| Assert.assertEquals("There should be 0 Unhealthy NM!", |
| clusterMetrics.getUnhealthyNMs(), 0); |
| int nodeRemovalTimeout = |
| conf.getInt( |
| YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| YarnConfiguration. |
| DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); |
| int nodeRemovalInterval = |
| rmContext.getNodesListManager().getNodeRemovalCheckInterval(); |
| long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; |
| int waitCount = 0; |
| while(rmContext.getInactiveRMNodes().get( |
| nm2.getNodeId()) != null && waitCount++ < 2){ |
| synchronized (this) { |
| wait(maxThreadSleeptime); |
| } |
| } |
| Assert.assertEquals("host2 should have been forgotten!", |
| rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); |
| Assert.assertEquals("There should be no Shutdown NMs!", |
| clusterMetrics.getNumRebootedNMs(), 0); |
| Assert.assertEquals("There should be 2 Active NM!", |
| clusterMetrics.getNumActiveNMs(), 2); |
| rm.stop(); |
| } |
| |
| private void ensureFileExists(File file) throws IOException { |
| if (!file.exists()) { |
| TEMP_DIR.mkdirs(); |
| file.createNewFile(); |
| } |
| } |
| |
| private void writeToHostsFile(String... hosts) throws IOException { |
| writeToHostsFile(hostFile, hosts); |
| } |
| |
| private void writeToHostsFile(File file, String... hosts) |
| throws IOException { |
| ensureFileExists(file); |
| FileOutputStream fStream = null; |
| try { |
| fStream = new FileOutputStream(file); |
| for (int i = 0; i < hosts.length; i++) { |
| fStream.write(hosts[i].getBytes()); |
| fStream.write("\n".getBytes()); |
| } |
| } finally { |
| if (fStream != null) { |
| IOUtils.closeStream(fStream); |
| fStream = null; |
| } |
| } |
| } |
| |
| private void writeToHostsXmlFile( |
| File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception { |
| ensureFileExists(file); |
| DocumentBuilderFactory dbFactory = XMLUtils.newSecureDocumentBuilderFactory(); |
| Document doc = dbFactory.newDocumentBuilder().newDocument(); |
| Element hosts = doc.createElement("hosts"); |
| doc.appendChild(hosts); |
| for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) { |
| Element host = doc.createElement("host"); |
| hosts.appendChild(host); |
| Element name = doc.createElement("name"); |
| host.appendChild(name); |
| name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft())); |
| if (hostsAndTimeout.getRight() != null) { |
| Element timeout = doc.createElement("timeout"); |
| host.appendChild(timeout); |
| timeout.appendChild( |
| doc.createTextNode(hostsAndTimeout.getRight().toString()) |
| ); |
| } |
| } |
| TransformerFactory transformerFactory = XMLUtils.newSecureTransformerFactory(); |
| Transformer transformer = transformerFactory.newTransformer(); |
| transformer.setOutputProperty(OutputKeys.INDENT, "yes"); |
| transformer.transform(new DOMSource(doc), new StreamResult(file)); |
| } |
| |
| private void checkDecommissionedNMCount(MockRM rm, int count) |
| throws InterruptedException { |
| int waitCount = 0; |
| while (ClusterMetrics.getMetrics().getNumDecommisionedNMs() != count |
| && waitCount++ < 20) { |
| synchronized (this) { |
| wait(100); |
| } |
| } |
| Assert.assertEquals(count, ClusterMetrics.getMetrics() |
| .getNumDecommisionedNMs()); |
| Assert.assertEquals("The decommisioned metrics are not updated", count, |
| ClusterMetrics.getMetrics().getNumDecommisionedNMs()); |
| } |
| |
| private void checkShutdownNMCount(MockRM rm, int count) |
| throws InterruptedException { |
| int waitCount = 0; |
| while (ClusterMetrics.getMetrics().getNumShutdownNMs() != count |
| && waitCount++ < 20) { |
| synchronized (this) { |
| wait(100); |
| } |
| } |
| Assert.assertEquals("The shutdown metrics are not updated", count, |
| ClusterMetrics.getMetrics().getNumShutdownNMs()); |
| } |
| |
| @After |
| public void tearDown() { |
| if (hostFile != null && hostFile.exists()) { |
| hostFile.delete(); |
| } |
| |
| ClusterMetrics.destroy(); |
| if (rm != null) { |
| rm.stop(); |
| } |
| |
| MetricsSystem ms = DefaultMetricsSystem.instance(); |
| if (ms.getSource("ClusterMetrics") != null) { |
| DefaultMetricsSystem.shutdown(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testHandleOpportunisticContainerStatus() throws Exception{ |
| final DrainDispatcher dispatcher = new DrainDispatcher(); |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, |
| true); |
| rm = new MockRM(conf){ |
| @Override |
| protected Dispatcher createDispatcher() { |
| return dispatcher; |
| } |
| }; |
| |
| rm.start(); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) |
| .withUnmanagedAM(true) |
| .build(); |
| RMApp app = MockRMAppSubmitter.submit(rm, data); |
| ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() |
| .getAppAttemptId(); |
| |
| ResourceTrackerService resourceTrackerService = |
| rm.getResourceTrackerService(); |
| SchedulerApplicationAttempt applicationAttempt = null; |
| while (applicationAttempt == null) { |
| applicationAttempt = |
| ((AbstractYarnScheduler)rm.getRMContext().getScheduler()) |
| .getApplicationAttempt(appAttemptId); |
| Thread.sleep(100); |
| } |
| |
| Resource currentConsumption = applicationAttempt.getCurrentConsumption(); |
| Assert.assertEquals(Resource.newInstance(0, 0), currentConsumption); |
| Resource allocResources = |
| applicationAttempt.getQueue().getMetrics().getAllocatedResources(); |
| Assert.assertEquals(Resource.newInstance(0, 0), allocResources); |
| |
| RegisterNodeManagerRequest req = Records.newRecord( |
| RegisterNodeManagerRequest.class); |
| NodeId nodeId = NodeId.newInstance("host2", 1234); |
| Resource capability = Resources.createResource(1024); |
| |
| NodeStatus mockNodeStatus = createMockNodeStatus(); |
| |
| req.setResource(capability); |
| req.setNodeId(nodeId); |
| req.setHttpPort(1234); |
| req.setNMVersion(YarnVersionInfo.getVersion()); |
| req.setNodeStatus(mockNodeStatus); |
| ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); |
| ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); |
| ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); |
| NMContainerStatus queuedOpp = |
| NMContainerStatus.newInstance(c1, 1, ContainerState.RUNNING, |
| Resource.newInstance(1024, 1), "Dummy Queued OC", |
| ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "", |
| ExecutionType.OPPORTUNISTIC, -1); |
| NMContainerStatus runningOpp = |
| NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING, |
| Resource.newInstance(2048, 1), "Dummy Running OC", |
| ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", |
| ExecutionType.OPPORTUNISTIC, -1); |
| NMContainerStatus runningGuar = |
| NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING, |
| Resource.newInstance(2048, 1), "Dummy Running GC", |
| ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", |
| ExecutionType.GUARANTEED, -1); |
| req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar)); |
| // trying to register a invalid node. |
| RegisterNodeManagerResponse response = |
| resourceTrackerService.registerNodeManager(req); |
| dispatcher.await(); |
| Thread.sleep(2000); |
| dispatcher.await(); |
| Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); |
| |
| Collection<RMContainer> liveContainers = applicationAttempt |
| .getLiveContainers(); |
| Assert.assertEquals(3, liveContainers.size()); |
| Iterator<RMContainer> iter = liveContainers.iterator(); |
| while (iter.hasNext()) { |
| RMContainer rc = iter.next(); |
| Assert.assertEquals( |
| rc.getContainerId().equals(c3) ? |
| ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC, |
| rc.getExecutionType()); |
| } |
| |
| // Should only include GUARANTEED resources |
| currentConsumption = applicationAttempt.getCurrentConsumption(); |
| Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption); |
| allocResources = |
| applicationAttempt.getQueue().getMetrics().getAllocatedResources(); |
| Assert.assertEquals(Resource.newInstance(2048, 1), allocResources); |
| |
| SchedulerNode schedulerNode = |
| rm.getRMContext().getScheduler().getSchedulerNode(nodeId); |
| Assert.assertNotNull(schedulerNode); |
| Resource nodeResources = schedulerNode.getAllocatedResource(); |
| Assert.assertEquals(Resource.newInstance(2048, 1), nodeResources); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNodeHeartBeatResponseForUnknownContainerCleanUp() |
| throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.init(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| rm.drainEvents(); |
| |
| // send 1st heartbeat |
| nm1.nodeHeartbeat(true); |
| |
| // Create 2 unknown containers tracked by NM |
| ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1); |
| ApplicationAttemptId applicationAttemptId = BuilderUtils |
| .newApplicationAttemptId(applicationId, 1); |
| ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2); |
| ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3); |
| ArrayList<ContainerStatus> containerStats = |
| new ArrayList<ContainerStatus>(); |
| containerStats.add( |
| ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1)); |
| containerStats.add( |
| ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1)); |
| |
| Map<ApplicationId, List<ContainerStatus>> conts = |
| new HashMap<ApplicationId, List<ContainerStatus>>(); |
| conts.put(applicationAttemptId.getApplicationId(), containerStats); |
| |
| // add RMApp into context. |
| RMApp app1 = mock(RMApp.class); |
| when(app1.getApplicationId()).thenReturn(applicationId); |
| rm.getRMContext().getRMApps().put(applicationId, app1); |
| |
| // Send unknown container status in heartbeat |
| nm1.nodeHeartbeat(conts, true); |
| rm.drainEvents(); |
| |
| int containersToBeRemovedFromNM = 0; |
| while (true) { |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| rm.drainEvents(); |
| containersToBeRemovedFromNM += |
| nodeHeartbeat.getContainersToBeRemovedFromNM().size(); |
| // asserting for 2 since two unknown containers status has been sent |
| if (containersToBeRemovedFromNM == 2) { |
| break; |
| } |
| } |
| } |
| |
| @Test |
| public void testResponseIdOverflow() throws Exception { |
| Configuration conf = new Configuration(); |
| rm = new MockRM(conf); |
| rm.start(); |
| |
| MockNM nm1 = rm.registerNode("host1:1234", 5120); |
| |
| NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); |
| |
| // prepare the responseId that's about to overflow |
| RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); |
| |
| nm1.setResponseId(Integer.MAX_VALUE); |
| |
| // heartbeat twice and check responseId |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); |
| Assert.assertEquals(0, nodeHeartbeat.getResponseId()); |
| |
| nodeHeartbeat = nm1.nodeHeartbeat(true); |
| Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); |
| Assert.assertEquals(1, nodeHeartbeat.getResponseId()); |
| } |
| |
| @Test |
| public void testNMIpHostNameResolution() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, |
| "localhost:" + ServerSocketUtil.getPort(10000, 10)); |
| conf.setBoolean(YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY, |
| true); |
| MockRM mockRM = new MockRM(conf) { |
| @Override |
| protected ResourceTrackerService createResourceTrackerService() { |
| return new ResourceTrackerService(getRMContext(), nodesListManager, |
| this.nmLivelinessMonitor, |
| rmContext.getContainerTokenSecretManager(), |
| rmContext.getNMTokenSecretManager()) { |
| }; |
| } |
| }; |
| mockRM.start(); |
| ResourceTracker rmTracker = |
| ServerRMProxy.createRMProxy(mockRM.getConfig(), ResourceTracker.class); |
| RegisterNodeManagerResponse response = rmTracker.registerNodeManager( |
| RegisterNodeManagerRequest.newInstance( |
| NodeId.newInstance("host1" + System.currentTimeMillis(), 1234), |
| 1236, Resource.newInstance(10000, 10), "2", new ArrayList<>(), |
| new ArrayList<>())); |
| Assert |
| .assertEquals("Shutdown signal should be received", NodeAction.SHUTDOWN, |
| response.getNodeAction()); |
| Assert.assertTrue("Diagnostic Message", response.getDiagnosticsMessage() |
| .contains("hostname cannot be resolved ")); |
| // Test success |
| rmTracker = |
| ServerRMProxy.createRMProxy(mockRM.getConfig(), ResourceTracker.class); |
| response = rmTracker.registerNodeManager(RegisterNodeManagerRequest |
| .newInstance(NodeId.newInstance("localhost", 1234), 1236, |
| Resource.newInstance(10000, 10), "2", new ArrayList<>(), |
| new ArrayList<>())); |
| Assert.assertEquals("Successfull registration", NodeAction.NORMAL, |
| response.getNodeAction()); |
| mockRM.stop(); |
| } |
| |
| private void pollingAssert(Supplier<Boolean> supplier, String message) |
| throws InterruptedException, TimeoutException { |
| GenericTestUtils.waitFor(supplier, |
| 100, 10_000, message); |
| } |
| |
| private <T> void pollingAssert(Supplier<T> supplier, T expected, String message) |
| throws InterruptedException, TimeoutException { |
| GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected), |
| 100, 10_000, message); |
| } |
| |
| /** |
| * A no-op implementation of NodeAttributeStore for testing |
| */ |
| public static class NullNodeAttributeStore implements NodeAttributeStore { |
| |
| @Override |
| public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute) { |
| } |
| |
| @Override |
| public void addNodeAttributes(List<NodeToAttributes> nodeToAttribute) { |
| } |
| |
| @Override |
| public void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute) { |
| } |
| |
| @Override |
| public void init(Configuration configuration, NodeAttributesManager mgr) { |
| } |
| |
| @Override |
| public void recover() { |
| } |
| |
| @Override |
| public void close() { |
| } |
| } |
| |
| @Test(timeout = 5000) |
| public void testSystemCredentialsAfterTokenSequenceNoChange() |
| throws Exception { |
| |
| Configuration conf = new Configuration(); |
| |
| RMContext rmContext = mock(RMContextImpl.class); |
| |
| Dispatcher dispatcher = new InlineDispatcher(); |
| when(rmContext.getDispatcher()).thenReturn(dispatcher); |
| |
| NodeId nodeId = NodeId.newInstance("localhost", 1234); |
| ConcurrentMap<NodeId, RMNode> rmNodes = |
| new ConcurrentHashMap<NodeId, RMNode>(); |
| RMNode rmNode = MockNodes.newNodeInfo(1, Resource.newInstance(1024, 1), 1, |
| "localhost", 1234, rmContext); |
| rmNodes.put(nodeId, rmNode); |
| when(rmContext.getRMNodes()).thenReturn(rmNodes); |
| |
| ConcurrentMap<NodeId, RMNode> inactiveNodes = |
| new ConcurrentHashMap<NodeId, RMNode>(); |
| when(rmContext.getInactiveRMNodes()).thenReturn(inactiveNodes); |
| when(rmContext.getConfigurationProvider()) |
| .thenReturn(new LocalConfigurationProvider()); |
| |
| dispatcher.register(SchedulerEventType.class, |
| new InlineDispatcher.EmptyEventHandler()); |
| dispatcher.register(RMNodeEventType.class, |
| new NodeEventDispatcher(rmContext)); |
| |
| NMLivelinessMonitor nmLivelinessMonitor = |
| new NMLivelinessMonitor(dispatcher); |
| nmLivelinessMonitor.init(conf); |
| nmLivelinessMonitor.start(); |
| NodesListManager nodesListManager = new NodesListManager(rmContext); |
| nodesListManager.init(conf); |
| RMContainerTokenSecretManager containerTokenSecretManager = |
| new RMContainerTokenSecretManager(conf); |
| containerTokenSecretManager.start(); |
| NMTokenSecretManagerInRM nmTokenSecretManager = |
| new NMTokenSecretManagerInRM(conf); |
| nmTokenSecretManager.start(); |
| ResourceTrackerService resourceTrackerService = new ResourceTrackerService( |
| rmContext, nodesListManager, nmLivelinessMonitor, |
| containerTokenSecretManager, nmTokenSecretManager); |
| |
| resourceTrackerService.init(conf); |
| resourceTrackerService.start(); |
| |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| |
| RegisterNodeManagerRequest request = |
| recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); |
| request.setNodeId(nodeId); |
| request.setHttpPort(1234); |
| request.setResource(Resources.createResource(1024)); |
| resourceTrackerService.registerNodeManager(request); |
| |
| org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = |
| recordFactory.newRecordInstance( |
| org.apache.hadoop.yarn.server.api.records.NodeStatus.class); |
| nodeStatus.setNodeId(nodeId); |
| nodeStatus.setResponseId(0); |
| nodeStatus.setNodeHealthStatus( |
| recordFactory.newRecordInstance(NodeHealthStatus.class)); |
| nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true); |
| |
| NodeHeartbeatRequest request1 = |
| recordFactory.newRecordInstance(NodeHeartbeatRequest.class); |
| request1.setNodeStatus(nodeStatus); |
| |
| // Set NM's token sequence no as 1 |
| request1.setTokenSequenceNo(1); |
| |
| // Set RM's token sequence no as 1 |
| when(rmContext.getTokenSequenceNo()).thenReturn((long) 1); |
| |
| // Populate SystemCredentialsForApps |
| final ApplicationId appId = ApplicationId.newInstance(1234, 1); |
| Credentials app1Cred = new Credentials(); |
| |
| Token<DelegationTokenIdentifier> token = |
| new Token<DelegationTokenIdentifier>(); |
| token.setKind(new Text("kind1")); |
| app1Cred.addToken(new Text("token1"), token); |
| Token<DelegationTokenIdentifier> token2 = |
| new Token<DelegationTokenIdentifier>(); |
| token2.setKind(new Text("kind2")); |
| app1Cred.addToken(new Text("token2"), token2); |
| |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| app1Cred.writeTokenStorageToStream(dob); |
| ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| |
| SystemCredentialsForAppsProto systemCredentialsForAppsProto = |
| YarnServerBuilderUtils.newSystemCredentialsForAppsProto(appId, |
| byteBuffer); |
| |
| ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentialsForApps = |
| new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>(1); |
| |
| systemCredentialsForApps.put(appId, systemCredentialsForAppsProto); |
| |
| when(rmContext.getSystemCredentialsForApps()) |
| .thenReturn(systemCredentialsForApps); |
| |
| // first ping |
| NodeHeartbeatResponse response = |
| resourceTrackerService.nodeHeartbeat(request1); |
| |
| // Though SystemCredentialsForApps size is 1, it is not being sent as part |
| // of response as there is no difference between NM's and RM's token |
| // sequence no |
| assertEquals(1, rmContext.getTokenSequenceNo()); |
| assertEquals(1, rmContext.getSystemCredentialsForApps().size()); |
| assertEquals(1, response.getTokenSequenceNo()); |
| assertEquals(0, response.getSystemCredentialsForApps().size()); |
| |
| // Set RM's token sequence no as 2 |
| when(rmContext.getTokenSequenceNo()).thenReturn((long) 2); |
| |
| // Ensure new heartbeat has been sent to avoid duplicate issues |
| nodeStatus.setResponseId(1); |
| request1.setNodeStatus(nodeStatus); |
| |
| // second ping |
| NodeHeartbeatResponse response1 = |
| resourceTrackerService.nodeHeartbeat(request1); |
| |
| // Since NM's and RM's token sequence no is different, response should |
| // contain SystemCredentialsForApps |
| assertEquals(2, response1.getTokenSequenceNo()); |
| assertEquals(1, response1.getSystemCredentialsForApps().size()); |
| |
| resourceTrackerService.close(); |
| } |
| |
| /** |
| * Decommissioning without pre-configured include hosts file. |
| */ |
| @Test |
| public void testDecommissionWithoutIncludeFile() throws Exception { |
| // clear exclude hosts |
| writeToHostsFile(excludeHostFile, ""); |
| // init conf: |
| // (1) set untracked removal timeout to 500ms |
| // (2) set exclude path (no include path) |
| // (3) enable node untracked without pre-configured include path |
| Configuration conf = new Configuration(); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, |
| 500); |
| conf.setBoolean( |
| YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeHostFile.getAbsolutePath()); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 10240); |
| MockNM nm2 = rm.registerNode("host2:1234", 10240); |
| MockNM nm3 = rm.registerNode("host3:1234", 10240); |
| MockNM nm4 = rm.registerNode("host4:1234", 10240); |
| assertEquals(4, rm.getRMContext().getRMNodes().size()); |
| assertEquals(0, rm.getRMContext().getInactiveRMNodes().size()); |
| |
| // decommission nm1 via adding nm1 into exclude hosts |
| RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| writeToHostsFile(excludeHostFile, "host1"); |
| rm.getNodesListManager().refreshNodes(conf); |
| rm.drainEvents(); |
| assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED); |
| assertEquals(3, rm.getRMContext().getRMNodes().size()); |
| assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); |
| assertEquals(Sets.newHashSet(nm1.getNodeId()), |
| rm.getRMContext().getInactiveRMNodes().keySet()); |
| |
| // remove nm1 from exclude hosts, so that it will be marked as untracked |
| // and removed from inactive nodes after the timeout |
| writeToHostsFile(excludeHostFile, ""); |
| rm.getNodesListManager().refreshNodes(conf); |
| // confirmed that nm1 should be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor( |
| () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000); |
| |
| // lost nm2 |
| RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); |
| rm.getRMContext().getDispatcher().getEventHandler() |
| .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE)); |
| rm.drainEvents(); |
| assertEquals(rmNode2.getState(), NodeState.LOST); |
| assertEquals(2, rm.getRMContext().getRMNodes().size()); |
| assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); |
| assertEquals(Sets.newHashSet(nm2.getNodeId()), |
| rm.getRMContext().getInactiveRMNodes().keySet()); |
| // confirmed that nm2 should be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor( |
| () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000); |
| |
| // shutdown nm3 |
| RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); |
| rm.getRMContext().getDispatcher().getEventHandler() |
| .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN)); |
| rm.drainEvents(); |
| assertEquals(rmNode3.getState(), NodeState.SHUTDOWN); |
| assertEquals(1, rm.getRMContext().getRMNodes().size()); |
| assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); |
| assertEquals(Sets.newHashSet(nm3.getNodeId()), |
| rm.getRMContext().getInactiveRMNodes().keySet()); |
| // confirmed that nm3 should be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor( |
| () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000); |
| |
| // nm4 is still active node at last |
| assertEquals(Sets.newHashSet(nm4.getNodeId()), |
| rm.getRMContext().getRMNodes().keySet()); |
| |
| rm.close(); |
| } |
| |
| /** |
| * Decommissioning with selective states for untracked nodes. |
| */ |
| @Test |
| public void testDecommissionWithSelectiveStates() throws Exception { |
| // clear exclude hosts |
| writeToHostsFile(excludeHostFile, ""); |
| // init conf: |
| // (1) set untracked removal timeout to 500ms |
| // (2) set exclude path (no include path) |
| // (3) enable node untracked without pre-configured include path |
| Configuration conf = new Configuration(); |
| conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, 500); |
| conf.setBoolean(YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true); |
| conf.setStrings(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE, |
| "DECOMMISSIONED", "SHUTDOWN"); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); |
| |
| rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm1 = rm.registerNode("host1:1234", 10240); |
| MockNM nm2 = rm.registerNode("host2:1234", 10240); |
| MockNM nm3 = rm.registerNode("host3:1234", 10240); |
| MockNM nm4 = rm.registerNode("host4:1234", 10240); |
| assertEquals(4, rm.getRMContext().getRMNodes().size()); |
| assertEquals(0, rm.getRMContext().getInactiveRMNodes().size()); |
| |
| // decommission nm1 via adding nm1 into exclude hosts |
| RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| writeToHostsFile(excludeHostFile, "host1"); |
| rm.getNodesListManager().refreshNodes(conf); |
| rm.drainEvents(); |
| assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED); |
| assertEquals(3, rm.getRMContext().getRMNodes().size()); |
| assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); |
| assertEquals(new HashSet(Arrays.asList(nm1.getNodeId())), |
| rm.getRMContext().getInactiveRMNodes().keySet()); |
| |
| // remove nm1 from exclude hosts, so that it will be marked as untracked |
| // and removed from inactive nodes after the timeout |
| writeToHostsFile(excludeHostFile, ""); |
| rm.getNodesListManager().refreshNodes(conf); |
| // confirmed that nm1 should be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 0, |
| 100, 1000); |
| |
| // lost nm2 |
| RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); |
| rm.getRMContext().getDispatcher().getEventHandler() |
| .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE)); |
| rm.drainEvents(); |
| assertEquals(rmNode2.getState(), NodeState.LOST); |
| assertEquals(2, rm.getRMContext().getRMNodes().size()); |
| assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); |
| // confirmed that nm2 should not be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1, |
| 100, 1000); |
| |
| // shutdown nm3 |
| RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); |
| rm.getRMContext().getDispatcher().getEventHandler() |
| .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN)); |
| rm.drainEvents(); |
| assertEquals(rmNode3.getState(), NodeState.SHUTDOWN); |
| assertEquals(1, rm.getRMContext().getRMNodes().size()); |
| assertEquals(2, rm.getRMContext().getInactiveRMNodes().size()); |
| // confirmed that nm3 should be removed from inactive nodes in 1 second |
| GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1, |
| 100, 1000); |
| |
| // nm4 is still active node at last |
| assertEquals(new HashSet(Arrays.asList(nm4.getNodeId())), |
| rm.getRMContext().getRMNodes().keySet()); |
| |
| // nm2 is still inactive node at last, not removed |
| assertEquals(new HashSet(Arrays.asList(nm2.getNodeId())), |
| rm.getRMContext().getInactiveRMNodes().keySet()); |
| |
| rm.close(); |
| } |
| } |